《Operating Systems: Three Easy Pieces》学习笔记(二十二) 锁
锁的基本思想
lock()和 unlock()函数的语义很简单。调用 lock()
尝试获取锁,如果没有其他线程持有锁(即它是可用的
),该线程会获得锁
,进入临界区
当持有锁的线程在临界区时,其他线程就无法进入临界区。
Pthread 库
POSIX 库将锁称为互斥量(mutex),因为它被用来提供线程之间的互斥。即当一个线程在临界区,它能够阻止其他线程进入直到本线程离开临界区。
1
2
3
4
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&lock); // wrapper for pthread_mutex_lock()
balance = balance + 1;
pthread_mutex_unlock(&lock);
评价锁的维度
- 互斥(mutual exclusion)
- 能够阻止多个线程 进入临界区
- 公平性(fairness)
- 每一个竞争线程有公平的机会抢到锁。是否有竞争锁的线程会饿死(starve),一直无法获得锁
- 性能(performance)
- 使用锁之后增加的时间开销
控制中断
最早提供的互斥
解决方案之一,就是在临界区关闭中断:
1
2
3
4
5
6
void lock() {
DisableInterrupts();
}
void unlock() {
EnableInterrupts();
}
假设我们运行在这样一个单处理器系统上。通过在进入临界区
之前关闭中断
(使用特殊的硬件指令),可以保证临界区的代码不会被中断,从而原子
地执行。
缺点:
- 要求我们允许所有调用线程执行
特权操作
(打开关闭中断),即信任这种机制不会被滥用
。关中断后操作系统无法获得控制权
- 这种方案不支持
多处理器
。如果多个线程运行在不同
的 CPU 上,每个线程都试图进入同一个
临界区,关闭中断也没有作用。线程可以运行在其他处理器上,因此能够进入临界区
- 关闭中断导致
中断丢失
,可能会导致严重的系统问题。假如磁盘设备完成了读取请求,正常应该会给个中断,但 CPU 错失了这一中断,那么,操作系统如何知道去唤醒等待读取的进程
硬件支持:测试并设置指令
锁的实现需要硬件支持,最简单的硬件支持
是测试并设置指令
(test-and-set instruction),也叫作原子交换
(atomic exchange)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct lock_t { int flag; } lock_t;
void init(lock_t *mutex) {
// 0 -> lock is available, 1 -> held
mutex->flag = 0;
}
void lock(lock_t *mutex) {
while (mutex->flag == 1) // TEST the flag
; // spin-wait (do nothing)
mutex->flag = 1; // now SET it!
}
void unlock(lock_t *mutex) {
mutex->flag = 0;
}
上述代码中,每次 lock 会判断 flag 的值,也就是测试并设置指令(test-and-set instruction)
中的test
,然后判断成功才set
但如果没有硬件辅助,也就是让测试并设置
作为一个原子操作,会导致两个线程有可能同时进入
临界区。
注意自旋等待spin-wait
会影响性能
自旋锁(spin lock)
一直自旋(判断某个条件成立),直到锁可用。
实现可用的自旋锁
在 SPARC 上,需要的指令叫 ldstub(load/store unsigned byte,加载/保存无符号字节);在 x86 上,是 xchg(atomic exchange,原子交换)指令。但它们基本上在不同的平台上做同样的事,通常称为测试并设置指令
(test-and-set)。
1
2
3
4
5
int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr; // fetch old value at old_ptr
*old_ptr = new; // store 'new' into old_ptr
return old; // return the old value
}
将测试并设置
作为原子操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct lock_t {
int flag;
} lock_t;
void init(lock_t *lock) {
// 0 indicates that lock is available, 1 that it is ld
lock->flag = 0;
}
void lock(lock_t *lock) {
while (TestAndSet(&lock->flag, 1) == 1)
; // spin-wait (do nothing)
}
void unlock(lock_t *lock) {
lock->flag = 0;
}
评价自旋锁
正确性(correctness): 自旋锁一次只允许一个线程进入临界区。因此,这是正确
的锁
公平性(fairness): 自旋锁不提供
任何公平
性保证
性能(performance): 在单 CPU
的情况下,性能开销相当大。其他线程都在竞争锁,都会在放弃 CPU 之前,自旋一个时间片,浪费 CPU 周期。在多 CPU
上,自旋锁性能不错(如果线程数大致等于 CPU 数)
硬件支持:比较并交换指令
比较并交换指令
(SPARC 系统中是 compare-and-swap,x86 系统是 compare-and-exchange)
1
2
3
4
5
6
int CompareAndSwap(int *ptr, int expected, int new) {
int actual = *ptr;
if (actual == expected)
*ptr = new;
return actual;
}
比较并交换的基本思路是检测 ptr 指向的值是否和 expected 相等
;如果是
,更新
ptr 所指的值为新值。否则,什么也不做。不论哪种情况,都返回该内存地址的实际值,让调用者能够知道执行是否成功。
在自旋锁中可以替换测试并设置指令
1
2
3
4
void lock(lock_t *lock) {
while (CompareAndSwap(&lock->flag, 0, 1) == 1)
; // spin
}
C 中实现(x86):
1
2
3
4
5
6
7
8
9
10
11
12
13
char CompareAndSwap(int *ptr, int old, int new) {
unsigned char ret;
// Note that sete sets a 'byte' not the word
__asm__ __volatile__ (
" lock\n"
" cmpxchgl %2,%1\n"
" sete %0\n"
: "=q" (ret), "=m" (*ptr)
: "r" (new), "m" (*ptr), "a" (old)
: "memory");
return ret;
}
链接的加载和条件式存储指令
一些平台提供了实现临界区的一对指令
链接的加载
(load-linked
)和条件式存储
(store-conditional
)
1
2
3
4
5
6
7
8
9
10
11
12
int LoadLinked(int *ptr) {
return *ptr;
}
int StoreConditional(int *ptr, int value) {
if (no one has updated *ptr since the LoadLinked to this address) {
*ptr = value;
return 1; // success!
} else {
return 0; // failed to update
}
}
条件式存储
(store-conditional)指令,只有上一次执行LoadLinked
的地址在期间都没有更新
时, 才会成功,同时更新了该地址的值
先通过 LoadLinked
尝试获取锁值,如果判断到锁被释放了,就执行StoreConditional
判断在执行完LoadLinked
到StoreConditional执行前
ptr 有没有被更新
,没有
:说明没有
其他线程来抢
,可以进临界区,有
:说明已经被其他线程抢走了
,继续重复本段落所述内容循环:
1
2
3
4
5
6
7
8
9
10
11
12
13
void lock(lock_t *lock) {
while (1) {
while (LoadLinked(&lock->flag) == 1)
; // spin until it's zero
if (StoreConditional(&lock->flag, 1) == 1)
return; // if set-it-to-1 was a success: all done
// otherwise: try it all over again
}
}
void unlock(lock_t *lock) {
lock->flag = 0;
}
硬件支持:获取并增加指令
获取并增加
(fetch-and-add)指令能原子
地返回
特定地址的旧值
,并且让该值自增一
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
typedef struct lock_t {
int ticket;
int turn;
} lock_t;
void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn = 0;
}
void lock(lock_t *lock) {
// 开始挂号,获取当前号码。然后号码加1,保证每个人号码都不同
int myturn = FetchAndAdd(&lock->ticket);
// 等待被叫号
while (lock->turn != myturn)
; // spin
}
void unlock(lock_t *lock) {
// 只需要加1功能,不用返回旧值
FetchAndAdd(&lock->turn);
}
原理是每次进入 lock,就获取当前ticket值
,相当于挂号
,然后全局 ticket 本身会自增一
,后续线程都会获得属于自己的唯一ticket值
,lock->turn
表示当前叫号值
,叫到号的运行,unlock 时递增lock->turn
更新叫号值
就行。
这种方法保证了公平性,FIFO
自旋过多:怎么办
一个线程会一直自旋检查一个不会改变
的值,浪费
掉整个时间片!如果有 N 个
线程去竞争
一个锁,情况会更糟糕。同样的场景下,会浪费 N−1 个时间片
,只是自旋并等待一个线程释放该锁。
如何让锁不会不必要地自旋
,浪费 CPU 时间?
使用队列:休眠替代自旋
需要一个队列来保存等待锁的线程。
Solaris 中 park()能够让调用线程休眠,unpark(threadID)则会唤醒 threadID 标识的线程。
两阶段锁
两阶段锁
(two-phase lock),如果第一个自旋阶段
没有获得锁,第二阶段
调用者会睡眠
,直到锁可用。上文的 Linux 锁就是这种锁,不过只自旋一次;更常见的方式是在循环中自旋固定的次数(希望
这段时间内能获取到锁
),然后使用 futex 睡眠。