Ostep 31 Semaphores
本文最后更新于 182 天前,如有失效请评论区留言。

这章主要介绍了信号量的使用,如何使用信号量来实现并发编程。

信号量的定义

sem_init(&s, 0, 1);

初始化语句,第二个参数0表示是线程间的(信号量也存在进程间的),1表示信号量的值value

int sem_wait(sem_t *s) {
// Decrement the value of semaphore s by one
// Wait if the value of semaphore s is negative
}

信号量的等待语句,将信号量的值减1,如果减1后的值为负数,那么就进入wait等待

int sem_post(sem_t *s) {
// Increment the value of semaphore s by one
// If there are one or more threads waiting, wake one
}

可以认为信号量的唤醒语句,将信号量的值加1,如果此时有正在等待的线程,那么唤醒其中一个线程

注意这里的wait注释的顺序和实际的逻辑可能会有些差异,实际上并不是每次都会减一,在value值已经是负数(或许是0)时,

会直接进入阻塞。在多个线程阻塞wait时,一次post唤醒以后加1,再由被唤醒的线程-1,然后再post,然后一直在0和1之间来回循环,直到所有线程被唤醒执行完毕(可以主要看生产以及消费者队列)

信号量实现并发

信号量实现锁

只需要将信号量的value设置为1,就可以实现一个锁,执行过程如下图:

  1. 第一个线程执行wait时,value修改为了0,但是此时是非负数,所以继续执行
  2. 当第二个线程再次去wait时,value从0修改为了-1,然后进入了阻塞等待
  3. 第一个线程执行post,将值+1,然后此时唤醒等待中的线程

信号量实现两个线程按顺序执行

sem_t s;

void* child(void* arg) {
    printf("child\n");
    sem_post(&s); // signal here: child is done
    return NULL;
}

int main(int argc, char* argv[]) {
    sem_init(&s, 0, X); // X should be 0
    printf("parent: begin\n");
    pthread_t c;
    pthread_create(&c, NULL, child, NULL);
    sem_wait(&s); // wait here for child
    printf("parent: end\n");
    sem_destroy(&s); // Clean up semaphore
    return 0;
}

这里需要理解一个点,父子线程的执行顺序可能乱序,但是信号量通过他的语义依旧能保证按顺序执行。

父线程的wait先执行: 此时value=-1,然后子线程执行post,value回到0并且唤醒父线程

子线程的post先执行:此时value变成1(0+1),然后父线程执行wait,将value-1,减1后value还是0,不需要阻塞,直接继续执行,依旧保证了顺序。

消费者生产者(限界队列)问题

这里分几个步骤,才能讲清并发下会出现的问题

基本实现

int buffer[MAX];
int fill = 0;
int use = 0;

void put(int value) {
    buffer[fill] = value;        // Line F1
    fill = (fill + 1) % MAX;     // Line F2
}

int get() {
    int tmp = buffer[use];       // Line G1
    use = (use + 1) % MAX;       // Line G2
    return tmp;
}

sem_t empty;
sem_t full;

void* producer(void* arg) {
    int i;
    for (i = 0; i < loops; i++) {
        sem_wait(&empty);        // Line P1
        put(i);                  // Line P2
        sem_post(&full);         // Line P3
    }
    return NULL;
}

void* consumer(void* arg) {
    int tmp = 0;
    while (tmp != -1) {
        sem_wait(&full);         // Line C1
        tmp = get();             // Line C2
        sem_post(&empty);        // Line C3
        printf("%d\n", tmp);
    }
    return NULL;
}

int main(int argc, char* argv[]) {

    sem_init(&empty, 0, MAX);    // MAX are empty
    sem_init(&full, 0, 0);       // 0 are full
    //more code
    return 0;
}

​ 基本实现通过empty和full两个信号量,其中empty的值是value,生产者每次将empty减1(wait),并且对full加1(post),当生产的足够多时,如果生产速度大于消费速度,empty就从max变为负数,此时生产者进入等待。

​ 消费者每次对full进行-1,同理,消费速度大于生产速度时,那么消费者就进入阻塞等待。

这种实现在单消费者和单生产者时可以正常运作,但是在多消费者和多生产者时,get和put方法本身会出现并发问题,这点稍微理解下就知道,这两个操作不是原子操作

优化,但是会死锁

void* producer(void* arg) {
    int i;
    for (i = 0; i < loops; i++) {
        sem_wait(&mutex);        // Line P0 (NEW LINE)
        sem_wait(&empty);        // Line P1
        put(i);                  // Line P2
        sem_post(&full);         // Line P3
        sem_post(&mutex);        // Line P4 (NEW LINE)
    }
    return NULL;
}

void* consumer(void* arg) {
    int i;
    for (i = 0; i < loops; i++) {
        sem_wait(&mutex);        // Line C0 (NEW LINE)
        sem_wait(&full);         // Line C1
        int tmp = get();         // Line C2
        sem_post(&empty);        // Line C3
        sem_post(&mutex);        // Line C4 (NEW LINE)
        printf("%d\n", tmp);
    }
    return NULL;
}

临界问题?那简单,我加个互斥的锁不就好了?上面代码对get和put中加了锁,但是会导致新的问题:死锁。

这里会出现一个很简单的死锁,当消费者先拿到锁后,并进入full的信号量等待,此时生产者永远拿不到锁,永远无法进入put去生产,两个线程一个永久在等待锁(生产者),一个永久在等待生产者生产内容(消费者),两个线程死锁了。

最终解决,更换一下锁和等待的顺序

void* producer(void* arg) {
    int i;
    for (i = 0; i < loops; i++) {
        sem_wait(&empty);       // Line P1
        sem_wait(&mutex);       // Line P1.5 (MUTEX HERE)
        put(i);                 // Line P2
        sem_post(&mutex);       // Line P2.5 (AND HERE)
        sem_post(&full);        // Line P3
    }
}

void* consumer(void* arg) {
    int i;
    for (i = 0; i < loops; i++) {
        sem_wait(&full);        // Line C1
        sem_wait(&mutex);       // Line C1.5 (MUTEX HERE)
        int tmp = get();        // Line C2
        sem_post(&mutex);       // Line C2.5 (AND HERE)
        sem_post(&empty);       // Line C3
        printf("%d\n", tmp);
    }
}

这样你会发现,消费者或者生产者会先一步进入等待条件变量再获取锁,这种解决方式其实你可以等价于在get和put方法中的开始和结束加入了锁,减小了锁的粒度,避免了死锁。

Read-Write Lock 读写锁

void rwlock_acquire_readlock(rwlock_t* rw) {
    sem_wait(&rw->lock);
    rw->readers++;
    if (rw->readers == 1) // first reader gets writelock
        sem_wait(&rw->writelock);
    sem_post(&rw->lock);
}

void rwlock_release_readlock(rwlock_t* rw) {
    sem_wait(&rw->lock);
    rw->readers--;
    if (rw->readers == 0) // last reader lets it go
        sem_post(&rw->writelock);
    sem_post(&rw->lock);
}

void rwlock_acquire_writelock(rwlock_t* rw) {
    sem_wait(&rw->writelock);
}

void rwlock_release_writelock(rwlock_t* rw) {
    sem_post(&rw->writelock);
}

这个锁的实现其实和锁比较类似,只不过区分读写锁,这里只说下读写锁的特性:

  1. 获取读锁时
  2. 首先读锁操作都需要一个大锁来保证读锁数量++的并发正确性。
  3. 如果当前没有读锁,自己是第一个读锁持有者,那么去获取写锁,获取写锁成功后继续。
  4. 如果已经有读锁了,那么就读锁持有+1
  5. 释放读锁时,判断如果当前自己是最后一个持有读锁的(读锁持有数为0),那么释放写锁。
  6. 获取写锁时,直接获取写锁即可。

这样的方式可以保证读写锁的特性:可以多个线程同时读,但是在读写互斥,写操作之间本身也是互斥的。

读写锁只适合多读少写场景,但是会有较大写饿死的风险。

餐叉问题

这个问题我感觉实际意义不大,这里就不再总结了,主要表达了和前面生产消费者类似的问题,通过更换锁或者条件的依赖,来避免死锁。

Thread Throttling

线程节流,其实就是限流,我们可以通过控制信号量的值n,来做到只有n个线程能进入某段代码执行(相当于一个可以持有n个的令牌),多余的线程会被阻塞。以此来达到对系统某些资源使用的限流控制(比如大内存使用,大量计算,比较重的I/O)。

信号量自身的实现

我们通过锁和条件变量就能实现信号量:

typedef struct __Zem_t {
    int value;
    pthread_cond_t cond;
    pthread_mutex_t lock;
} Zem_t;

// only one thread can call this
void Zem_init(Zem_t* s, int value) {
    s->value = value;
    Cond_init(&s->cond);
    Mutex_init(&s->lock);
}

void Zem_wait(Zem_t* s) {
    Mutex_lock(&s->lock);
    while (s->value <= 0)
        Cond_wait(&s->cond, &s->lock);
    s->value--;
    Mutex_unlock(&s->lock);
}

void Zem_post(Zem_t* s) {
    Mutex_lock(&s->lock);
    s->value++;
    Cond_signal(&s->cond);
    Mutex_unlock(&s->lock);
}

我们通过2种简单的语义实现了一个更好用的工具。

有一个很有意思的点:使用信号量来实现条件变量这件事会很困难。

​ 在windows上很多程序员尝试这么做,但是他们写出了更多的bug。

​ 个人猜测,例如条件变量中的broadcast广播通知,使用信号量就会很难实现。虽然可以通过while+post来实现类似的效果,但是这样就有了次序,不能达成同时唤醒的效果了。同时,如果被唤醒的线程又会对信号量进行处理并且进入wait,那么有些线程就会一直得不到唤醒。

版权声明:除特殊说明,博客文章均为intotw原创,依据CC BY-SA 4.0许可证进行授权,转载请附上出处链接及本声明。
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇