文章

《Operating Systems: Three Easy Pieces》学习笔记(二十五) 信号量

条件变量两篇文章可知条件变量必须和锁配合使用,那为什么不直接封装在一起呢?于是就有个信号量。

信号量只是将锁和单值条件的条件变量封装在一起,所以它不是一个全新的概念,它能实现的事锁加条件变量都能实现。对于比较复杂情况下的条件判断无法使用信号量解决,因为其只内置了一个简单的整型的 value 条件。

信号量的定义

信号量是有一个整数值的对象,可以用两个函数来操作它。在 POSIX 标准中,是sem_wait()sem_post()

1
2
3
#include <semaphore.h>
sem_t s;
sem_init(&s, 0, 1);

sem_init 用于初始化信号量。

1
2
3
4
5
6
7
8
9
int sem_wait(sem_t *s) {
    decrement the value of semaphore s by one
    wait if value of semaphore s is negative
}

int sem_post(sem_t *s) {
    increment the value of semaphore s by one
    if there are one or more threads waiting, wake one
}
  • sem_wait()要么立刻返回(调用 sem_wait()时,信号量的值大于等于 1),同时信号量减1,要么会让调用线程挂起,直到之后的一个 post 操作。
  • sem_post()并没有等待某些条件满足。它直接增加信号量的值,如果有等待线程,唤醒其中一个。
  • 当信号量的值为负数时,这个值就是等待线程的个数

二值信号量(锁)

用信号量作为锁

1
2
3
4
5
6
7
sem_t m;
// 初值要为1
sem_init(&m, 0, 1);

sem_wait(&m);
// critical section here
sem_post(&m);

信号量用作条件变量

假设一个线程创建另外一线程,并且等待它结束:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
sem_t s;

void *
child(void *arg)
{
    printf("child\n");
    sem_post(&s); // signal here: child is done
    return NULL;
}

int main(int argc, char *argv[])
{
    // X初始值应是0
    sem_init(&s, 0, X); // what should X be?
    printf("parent: begin\n");
    pthread_t c;
    Pthread_create(c, NULL, child, NULL);
    sem_wait(&s); // wait here for child
    printf("parent: end\n");
    return 0;
}

生产者/消费者(有界缓冲区)问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
sem_t empty;
sem_t full;
sem_t mutex;

void *producer(void *arg)
{
    int i;
    for (i = 0; i < loops; i++)
    {
        // empty/full是一个条件变量,用于唤醒和等待
        // 不过可以自动形成等待队列,比条件变量方便
        sem_wait(&empty); // line p1
        // mutex是一个锁,用于商品队列的操作的保护
        // 锁的作用域很重要,尽可能缩小临界区来避免死锁和提高性能
        sem_wait(&mutex); // line p1.5 (MOVED MUTEX HERE...)
        put(i);           // line p2
        sem_post(&mutex); // line p2.5 (... AND HERE)
        sem_post(&full);  // line p3
    }
}

void *consumer(void *arg)
{
    int i;
    for (i = 0; i < loops; i++)
    {
        sem_wait(&full);  // line c1
        sem_wait(&mutex); // line c1.5 (MOVED MUTEX HERE...)
        int tmp = get();  // line c2
        sem_post(&mutex); // line c2.5 (... AND HERE)
        sem_post(&empty); // line c3
        printf("%d\n", tmp);
    }
}

int main(int argc, char *argv[])
{
    // ...
    sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with...
    sem_init(&full, 0, 0);    // ... and 0 are full
    sem_init(&mutex, 0, 1);   // mutex=1 because it is a lock
    // ...
}

读者—写者锁

读者之间不互斥,写者之间互斥,读者和写者互斥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
typedef struct _rwlock_t
{
    sem_t lock;      // binary semaphore (basic lock)
    sem_t writelock; // used to allow ONE writer or MANY readers
    int readers;     // count of readers reading in critical section
} rwlock_t;

void rwlock_init(rwlock_t *rw)
{
    rw->readers = 0;
    sem_init(&rw->lock, 0, 1);
    sem_init(&rw->writelock, 0, 1);
}

void rwlock_acquire_readlock(rwlock_t *rw)
{
    // lock锁保护readers计数器
    sem_wait(&rw->lock);
    rw->readers++;
    // 第一个读者需要获取写锁,来与写者互斥
    // 不停有新读者进来会导致写者饿死
    if (rw->readers == 1)
        sem_wait(&rw->writelock); // first reader acquires writelock
    sem_post(&rw->lock);
}

void rwlock_release_readlock(rwlock_t *rw)
{
    sem_wait(&rw->lock);
    rw->readers--;
    // 最后一个读者需要释放写锁
    if (rw->readers == 0)
        sem_post(&rw->writelock); // last reader releases writelock
    sem_post(&rw->lock);
}

void rwlock_acquire_writelock(rwlock_t *rw)
{
    // 写者之间互斥,且与读者互斥,用写锁管理
    sem_wait(&rw->writelock);
}

void rwlock_release_writelock(rwlock_t *rw)
{
    sem_post(&rw->writelock);
}

哲学家就餐问题

F31.10

假定有 5 位“哲学家”围着一个圆桌。每两位哲学家之间有一把餐叉(一共 5 把)。哲学家有时要思考一会,不需要餐叉;有时又要就餐。而一位哲学家只有同时拿到了左手边和右手边的两把餐叉,才能吃到东西。

1
2
3
4
5
6
while (1) {
    think();
    getforks();
    eat();
    putforks();
}

关键的挑战就是如何实现 getforks()putforks()函数,保证没有死锁,没有哲学家饿死,并且并发度更高(尽可能让更多哲学家同时吃东西)。

查找餐叉的函数:

1
2
int left(int p) { return p; }
int right(int p) { return (p + 1) % 5; }

为每个餐叉分配信号量:

1
sem_t forks[5]

尝试加锁:

1
2
3
4
5
6
7
8
9
void getforks() {
    sem_wait(forks[left(p)]);
    sem_wait(forks[right(p)]);
}

void putforks() {
    sem_post(forks[left(p)]);
    sem_post(forks[right(p)]);
}

使用这个方案会形成死锁,每个人都拿着左手边的叉子就无法继续了(循环等待)

优化方案:

选第 4 个人改为先获取右手的叉子,打破循环等待

1
2
3
4
5
6
7
8
9
10
11
12
13
void getforks()
{
    if (p == 4)
    {
        sem_wait(forks[right(p)]);
        sem_wait(forks[left(p)]);
    }
    else
    {
        sem_wait(forks[left(p)]);
        sem_wait(forks[right(p)]);
    }
}

还有其他一些类似的“著名”问题,比如吸烟者问题(cigarette smoker’s problem),理发师问题(sleeping barber problem)。

如何实现信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
typedef struct _Zem_t
{
    int value;
    pthread_cond_t cond;
    // 对value的修改锁
    pthread_mutex_t lock;
} Zem_t;

// only one thread can call this
void Zem_init(Zem_t *s, int value)
{
    s->value = value;
    Cond_init(&s->cond);
    Mutex_init(&s->lock);
}

void Zem_wait(Zem_t *s)
{
    Mutex_lock(&s->lock);
    while (s->value <= 0)
        Cond_wait(&s->cond, &s->lock);
    // 在这里递减value,value值就不会小于0了。
    s->value--;
    Mutex_unlock(&s->lock);
}

void Zem_post(Zem_t *s)
{
    Mutex_lock(&s->lock);
    s->value++;
    Cond_signal(&s->cond);
    Mutex_unlock(&s->lock);
}

小结

信号量是编写并发程序的强大而灵活的原语。有程序员会因为简单实用,只用信号量,不用锁和条件变量

信号量基于锁和条件变量,可以实现两者的功能

作为锁时,可以自动管理等待队列

作为条件变量时,免去了 while 判断是否需要等待的操作,因为内部包含了一个 value 值用于判断

参考

本文由作者按照 CC BY 4.0 进行授权

© Kai. 保留部分权利。

浙ICP备20006745号-2,本站由 Jekyll 生成,采用 Chirpy 主题。