文章

《Operating Systems: Three Easy Pieces》学习笔记(二十二) 锁

《Operating Systems: Three Easy Pieces》学习笔记(二十二) 锁

锁的基本思想

lock()和 unlock()函数的语义很简单。调用 lock() 尝试获取锁,如果没有其他线程持有锁(即它是可用的),该线程会获得锁,进入临界区

当持有锁的线程在临界区时,其他线程就无法进入临界区。

Pthread 库

POSIX 库将锁称为互斥量(mutex),因为它被用来提供线程之间的互斥。即当一个线程在临界区,它能够阻止其他线程进入直到本线程离开临界区。

1
2
3
4
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&lock); // wrapper for pthread_mutex_lock()
balance = balance + 1;
pthread_mutex_unlock(&lock);

评价锁的维度

互斥(mutual exclusion)
能够阻止多个线程 进入临界区
公平性(fairness)
每一个竞争线程有公平的机会抢到锁。是否有竞争锁的线程会饿死(starve),一直无法获得锁
性能(performance)
使用锁之后增加的时间开销

控制中断

最早提供的互斥解决方案之一,就是在临界区关闭中断:

1
2
3
4
5
6
void lock() {
    DisableInterrupts();
}
void unlock() {
    EnableInterrupts();
}

假设我们运行在这样一个单处理器系统上。通过在进入临界区之前关闭中断(使用特殊的硬件指令),可以保证临界区的代码不会被中断,从而原子地执行。

缺点:

  1. 要求我们允许所有调用线程执行特权操作(打开关闭中断),即信任这种机制不会被滥用。关中断后操作系统无法获得控制权
  2. 这种方案不支持多处理器。如果多个线程运行在不同的 CPU 上,每个线程都试图进入同一个临界区,关闭中断也没有作用。线程可以运行在其他处理器上,因此能够进入临界区
  3. 关闭中断导致中断丢失,可能会导致严重的系统问题。假如磁盘设备完成了读取请求,正常应该会给个中断,但 CPU 错失了这一中断,那么,操作系统如何知道去唤醒等待读取的进程

硬件支持:测试并设置指令

锁的实现需要硬件支持,最简单的硬件支持测试并设置指令(test-and-set instruction),也叫作原子交换(atomic exchange)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct lock_t { int flag; } lock_t;

void init(lock_t *mutex) {
    // 0 -> lock is available, 1 -> held
    mutex->flag = 0;
}

void lock(lock_t *mutex) {
    while (mutex->flag == 1) // TEST the flag
        ; // spin-wait (do nothing)
    mutex->flag = 1; // now SET it!
}

void unlock(lock_t *mutex) {
    mutex->flag = 0;
}

上述代码中,每次 lock 会判断 flag 的值,也就是测试并设置指令(test-and-set instruction)中的test,然后判断成功才set

但如果没有硬件辅助,也就是让测试并设置作为一个原子操作,会导致两个线程有可能同时进入临界区。

注意自旋等待spin-wait会影响性能

自旋锁(spin lock)

一直自旋(判断某个条件成立),直到锁可用。

实现可用的自旋锁

在 SPARC 上,需要的指令叫 ldstub(load/store unsigned byte,加载/保存无符号字节);在 x86 上,是 xchg(atomic exchange,原子交换)指令。但它们基本上在不同的平台上做同样的事,通常称为测试并设置指令(test-and-set)。

1
2
3
4
5
int TestAndSet(int *old_ptr, int new) {
    int old = *old_ptr; // fetch old value at old_ptr
    *old_ptr = new; // store 'new' into old_ptr
    return old; // return the old value
}

测试并设置作为原子操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct lock_t {
    int flag;
} lock_t;

void init(lock_t *lock) {
    // 0 indicates that lock is available, 1 that it is ld
    lock->flag = 0;
}

void lock(lock_t *lock) {
    while (TestAndSet(&lock->flag, 1) == 1)
        ; // spin-wait (do nothing)
}

void unlock(lock_t *lock) {
    lock->flag = 0;
}

评价自旋锁

正确性(correctness): 自旋锁一次只允许一个线程进入临界区。因此,这是正确的锁

公平性(fairness): 自旋锁不提供任何公平性保证

性能(performance): 在单 CPU 的情况下,性能开销相当大。其他线程都在竞争锁,都会在放弃 CPU 之前,自旋一个时间片,浪费 CPU 周期。在多 CPU 上,自旋锁性能不错(如果线程数大致等于 CPU 数)

硬件支持:比较并交换指令

比较并交换指令(SPARC 系统中是 compare-and-swap,x86 系统是 compare-and-exchange)

1
2
3
4
5
6
int CompareAndSwap(int *ptr, int expected, int new) {
    int actual = *ptr;
    if (actual == expected)
        *ptr = new;
    return actual;
}

比较并交换的基本思路是检测 ptr 指向的值是否和 expected 相等;如果更新 ptr 所指的值为新值。否则,什么也不做。不论哪种情况,都返回该内存地址的实际值,让调用者能够知道执行是否成功。

在自旋锁中可以替换测试并设置指令

1
2
3
4
void lock(lock_t *lock) {
while (CompareAndSwap(&lock->flag, 0, 1) == 1)
    ; // spin
}

C 中实现(x86):

1
2
3
4
5
6
7
8
9
10
11
12
13
char CompareAndSwap(int *ptr, int old, int new) {
    unsigned char ret;

    // Note that sete sets a 'byte' not the word
    __asm__ __volatile__ (
        " lock\n"
        " cmpxchgl %2,%1\n"
        " sete %0\n"
        : "=q" (ret), "=m" (*ptr)
        : "r" (new), "m" (*ptr), "a" (old)
        : "memory");
    return ret;
}

链接的加载和条件式存储指令

一些平台提供了实现临界区的一对指令

链接的加载load-linked)和条件式存储store-conditional

1
2
3
4
5
6
7
8
9
10
11
12
int LoadLinked(int *ptr) {
    return *ptr;
}

int StoreConditional(int *ptr, int value) {
    if (no one has updated *ptr since the LoadLinked to this address) {
        *ptr = value;
        return 1; // success!
    } else {
        return 0; // failed to update
    }
}

条件式存储(store-conditional)指令,只有上一次执行LoadLinked的地址在期间都没有更新时, 才会成功,同时更新了该地址的值

先通过 LoadLinked 尝试获取锁值,如果判断到锁被释放了,就执行StoreConditional判断在执行完LoadLinkedStoreConditional执行前ptr 有没有被更新没有:说明没有其他线程来,可以进临界区,:说明已经被其他线程抢走了,继续重复本段落所述内容循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
void lock(lock_t *lock) {
    while (1) {
        while (LoadLinked(&lock->flag) == 1)
            ; // spin until it's zero
        if (StoreConditional(&lock->flag, 1) == 1)
            return; // if set-it-to-1 was a success: all done
                // otherwise: try it all over again
    }
}

void unlock(lock_t *lock) {
    lock->flag = 0;
}

硬件支持:获取并增加指令

获取并增加(fetch-and-add)指令能原子返回特定地址的旧值,并且让该值自增一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int FetchAndAdd(int *ptr) {
    int old = *ptr;
    *ptr = old + 1;
    return old;
}
typedef struct lock_t {
    int ticket;
    int turn;
} lock_t;

void lock_init(lock_t *lock) {
    lock->ticket = 0;
    lock->turn = 0;
}

void lock(lock_t *lock) {
    // 开始挂号,获取当前号码。然后号码加1,保证每个人号码都不同
    int myturn = FetchAndAdd(&lock->ticket);
    // 等待被叫号
    while (lock->turn != myturn)
        ; // spin
}

void unlock(lock_t *lock) {
    // 只需要加1功能,不用返回旧值
    FetchAndAdd(&lock->turn);
}

原理是每次进入 lock,就获取当前ticket值,相当于挂号,然后全局 ticket 本身会自增一,后续线程都会获得属于自己的唯一ticket值lock->turn表示当前叫号值,叫到号的运行,unlock 时递增lock->turn更新叫号值就行。

这种方法保证了公平性,FIFO

自旋过多:怎么办

一个线程会一直自旋检查一个不会改变的值,浪费掉整个时间片!如果有 N 个线程去竞争一个锁,情况会更糟糕。同样的场景下,会浪费 N−1 个时间片,只是自旋并等待一个线程释放该锁。

如何让锁不会不必要地自旋,浪费 CPU 时间?

使用队列:休眠替代自旋

需要一个队列来保存等待锁的线程。

Solaris 中 park()能够让调用线程休眠,unpark(threadID)则会唤醒 threadID 标识的线程。

两阶段锁

两阶段锁(two-phase lock),如果第一个自旋阶段没有获得锁,第二阶段调用者会睡眠,直到锁可用。上文的 Linux 锁就是这种锁,不过只自旋一次;更常见的方式是在循环中自旋固定的次数(希望这段时间内能获取到锁),然后使用 futex 睡眠。

参考

本文由作者按照 CC BY 4.0 进行授权

© Kai. 保留部分权利。

浙ICP备20006745号-2,本站由 Jekyll 生成,采用 Chirpy 主题。