27章罗列了线程的api,所以就略过了
28章主要介绍了Lock 锁,上一章说到了,处理并发下的数据竞争最好的方式,就是使用锁来使得那段临界代码互斥,所以这一章主要介绍了锁。
锁的模型
锁的话,简单的模型其实也就2个功能:加锁、解锁
锁的评估
一般我们评估锁,从3个维度:
- 基本功能,是否提供了互斥
- 公平性,是否能提供对于争抢锁之间线程的公平性,也就是防止饿死
- 性能,锁本身对于多线程环境执行下的性能增加开销
锁的实现
最简单的实现,关闭中断
前文说到了,数据竞争的原因就是多线程之间不可知的调度顺序,所以当一个线程获取锁的时候,关闭掉所有的中断响应,释放锁的时候,恢复中断响应,这样在持有锁时就可以保证一定是自己执行完, 其他线程无法通过操作系统的中断被调度,保证了基本功能。
但是问题也很明显,首先,关闭中断响应需要进入内核态。如果关闭中断响应以后,如果线程不主动释放,那么连操作系统都无法夺回对整个系统的控制权了。
这种实现的唯一优点就是简单,但是风险过高,所以不可能如此实现,只是一种思路。
通过共享变量
我们可以通过一个共享变量来实现,使用一个flag,获取锁时修改为1,其他线程判断为1时则自旋一直判断等待为0,在释放锁时则将flag修改为0标识为已释放。
问题也很明显,修改和判断flag时,依旧可能中断,这个逻辑本身也是一段临界代码。
别急,在这种方式下,我们要解决问题的规模就变小了,只要保证修改flag的原子性就可以。
基本原语保证原子性
基于共享变量的方式,CPU提供了一些硬件支持来支持原子性操作,方便共享变量的方式来实现锁。
Test And Set
TAS的效果很简单,对一个变量(内存地址)修改值后,返回其原来的值。通过TAS,我们可以实现一个简单的锁:
typedef struct __lock_t{
int flag;
} lock_t
void init(lock_t*lock){
// 0:lock is available,1:lock is held
lock->flag = 0;
}
void lock(lock_t*1ock){
while(TestAndSet(&lock->flag,1) == 1);
// spin-wait (do nothing)
}
void unlock (lock_t *lock){
lock->flag = 0;
}
原理和逻辑也比较简单,在将flag修改为1后,返回原来的值,只有在原来的值为0时,才认为自己获取到了这个锁。
Compare-And-Swap
CAS是很多人都知道的操作了,给予2个值,只有在地址的原来值与给定的期望值相等时,才将地址的值修改为目标值,并且返回修改后的值。实现锁的思路和TAS基本一致。
void lock(lock_t*lock) {
while (CompareAndSwap(&lock->flag, 0, 1) == 1)
; // spin4
}
在实现锁上,CAS和TAS并没有什么区别,但是CAS在实现无锁数据结构时,会是非常强大的工具。
Load-Linked and Store-Conditional
这个很难翻译,我个人认为翻译成监听以及条件保存适当一些。
这两个原语的作用是,首先通过LL监听一个地址后,再使用SC条件保存去更新这个地址的内容,在SC更新一个LL的地址时,如果LL的地址从监听以后被修改过,那么返回false并且更新失败,如果没有被修改过,那么返回true并且更新成功。锁的实现如下:
void lock(lock_t*lock) {
while (1) {
while (LoadLinked(&lock->flag) == 1)
; // spin until it’s zero
if (StoreConditional(&lock->flag, 1) == 1)
return; // if set-it-to-1 was a success: all done
// otherwise: try it all over again
}
}
void unlock(lock_t*lock) {
lock->flag = 0;
}
Fetch-And-Add
最后一个硬件原语是获取和添加指令,它自动增加一个值,同时返回特定地址的旧值。使用这种原语实现锁的方式截然不同,类似于令牌:
- 我们的锁需要2个变量,turn和ticket。
- 在获取锁时,将ticket标记为1获得。此时线程自己的myturn==turn。
- 其他线程进入,ticket标记为获得了,进入等待,此时使用这个原语进行turn的原子+1,获得的myturn==turn+1,并且进入自旋判断turn是否等于自己的myturn了。
- 释放锁时,原子增加turn,把ticket修改为0,释放。
- 此时那些在自旋的某个线程的myturn就等于被释放时+1的turn了,该线程获取锁并进入临界代码。
自旋的性能问题
前面简单的锁,都存在自旋的性能问题。
通过无脑自旋,会浪费大量的CPU,比如100个线程在等待一个锁,其中99个线程在分配到CPU时间片时只是在做无意义的自旋浪费CPU。并且这种浪费反而会影响实际正在执行的线程对CPU的实际使用,进一步减慢锁的释放。
让出线程(yield)
简而言之就是一个功能,使得线程可以主动退出调度,由操作系统来决定什么时候恢复调度它,是最简单的方式,比较简陋。
使用队列并且sleep
sleep的区别是,sleep可以由其他线程主动唤醒(对应park和unpark,java的unsafe也有)。
使用sleep加队列,我们可以实现一个初具人形的低自旋的锁了。
typedef struct __lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;
void lock_init(lock_t *m) {
m->flag = 0;
m->guard = 0;
queue_init(m->q);
}
void lock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; // acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired
m->guard = 0;
} else {
queue_add(m->q, gettid());
m->guard = 0;
park();
}
}
void unlock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; // acquire guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
unpark(queue_remove(m->q)); // hold lock (for next thread!)
m->guard = 0;
}
有一点需要额外补充下方便理解:这段代码中,guard实际用来操作控制队列的原子性,flag用来标记锁的状态。
这种实现也会有个问题,当调度发生时
queue_add(m->q, gettid());
m->guard = 0;
park();
这里在加入队列后,m->guard = 0;会释放掉队列的“锁”->在park执行之前,如果另外的线程释放了锁从队列中取出来这个线程刚加到队列里的自己进行unpark->此时unpark必然会失败(因为park还没执行)->此时再park,就永远park下去永远不会被唤醒了(队列里没他,所以不会被取出来unpark,但是自己又park进去了)。
队列的方式可以保证锁的公平性(FIFO)。
其实到这就想到一个点:队列、park、unpark?你要找的是不是juc.aqs?(手动狗头)
Linux的锁
Linux提供的原语是futex_wait和futex_wake,是根据地址的值变化来阻塞的。
void mutex_lock(int *mutex) {
int v;
// 尝试快速路径获取锁
if (atomic_bit_test_set(mutex, 31) == 0)
return;
// 等待值+1
atomic_increment(mutex);
while (1) {
if (atomic_bit_test_set(mutex, 31) == 0) {
atomic_decrement(mutex);
return;
}
v = *mutex;
if (v >= 0)
continue;
// 使用 futex 等待锁释放
futex_wait(mutex, v);
}
}
void mutex_unlock(int *mutex) {
// 尝试快速路径释放锁
if (atomic_add_zero(mutex, 0x80000000))
return;
// 如果有其他线程在等待锁,唤醒其中一个
futex_wake(mutex);
}
这里有一个点比较难理解,因为是伪代码,只能通过整体去猜测:
if (atomic_add_zero(mutex, 0x80000000))
这个函数是将mutex-0x80000000,剩下的值就是等待中的线程数量(atomic_increment(mutex)),此时如果减掉后为0,说明没有线程在等待,那么直接返回,否则的话,futex_wake(mutex);唤醒等待这个值的所有线程去简单自旋一下继续争夺。详细过程大致如下:
初始化状态
初始时,
mutex
的值为0
。加锁 (
mutex_lock
)
- 快速路径获取锁成功:
- 如果
atomic_bit_test_set(mutex, 31) == 0
成功,则mutex
的第 31 位被设置为1
,表示锁已被持有。mutex
的值从0
变为0x80000000
(即1000 0000 0000 0000 0000 0000 0000 0000
)。- 快速路径获取锁失败:
- 如果
atomic_bit_test_set(mutex, 31)
失败,说明锁已经被其他线程持有。- 执行
atomic_increment(mutex)
,将mutex
的值增加1
。mutex
的值从0x80000000
变为0x80000001
(即1000 0000 0000 0000 0000 0000 0000 0001
),表示有一个线程在等待锁。- 如果其他线程也尝试获取锁,则
mutex
的值会进一步增加,例如有两个线程在等待时,mutex
的值为0x80000002
(即1000 0000 0000 0000 0000 0000 0000 0010
)。- 等待锁释放:
- 当线程检测到
mutex
的值为负数时,进入futex_wait
。futex_wait
挂起当前线程,等待锁的释放。解锁 (
mutex_unlock
)
- 快速路径释放锁:
- 如果
atomic_add_zero(mutex, 0x80000000)
返回true
,表示没有其他线程在等待锁。mutex
的值从0x80000000
变为0
,锁被释放。- 有线程在等待锁:
- 如果
atomic_add_zero(mutex, 0x80000000)
返回false
,表示有其他线程在等待锁。mutex
的值从0x80000001
变为0x1
(即0000 0000 0000 0000 0000 0000 0000 0001
),表示有一个线程在等待。- 调用
futex_wake(mutex)
唤醒一个等待的线程。线程等待时
- 线程等待锁时
:
- 当一个线程进入等待状态时,
mutex
的值会变为负数(即高位为1
)。mutex
的值会增加,例如有两个线程在等待锁时,mutex
的值为0x80000002
。
两阶段锁
这个就是为了锁的性能考虑,和jvm中synchronized关键字分轻量级以及重量级一样,实际现在的锁实现出于性能考虑,都会先进入自旋,自旋一定次数后,再进入阻塞等待,这么设计的原因就是因为大部分锁的释放都很快,自旋可以大大降低线程上下文切换的开销。
总结
其实锁的本质就是CPU的原子操作原语(TAS,CAS)加一定数据结构和逻辑,实现了2个关键功能:加锁、解锁。
锁的主要等待方式包括两种:自旋或阻塞(让出CPU进入休眠)。
锁的关键组成:CPU硬件原语(保障互斥)->自旋和阻塞(优化性能)->队列(保障公平性,提供阻塞)->两阶段(性能)
结合Java的话,你会发现,轻量级重量级锁,就考虑了自旋和阻塞的性能优缺点,并且实现了两阶段锁。
AQS则是实现了JVM用户级别层面的一个锁队列模型,这块为什么不直接用linux提供而是要自己实现呢?我猜测是因为java作为跨平台的语言,不能保证所有操作系统都实现了阻塞队列这种功能的锁实现,所以只利用park和unpark还有cas这种原语来自己实现。