Linux线程同步

本文最后更新于:2021年4月10日 下午

概览:Linux线程

线程同步

  • 线程的优势在于,能够通过全局变量来共享信息,但是可能会造成信息安全问题。
  • 必须要保证多个线程不会同时修改同一个变量,或者某一个线程不会读取正在由其他线程修改的变量。

临界区:只访问某一共享资源的代码片段,并且这段代码的执行应当为原子操作,也就是同时访问同一共享资源的其他线程不应该终止该片段的执行。

线程同步:即当有一个线程在对内存进行操作时,其他线程都不可以对这个内存地址进行操作,直到该线程完成操作,其他线程才能对该内存地址进行操作,而其他线程则处于等待状态。

互斥量 mutex

mutex:mutual exclusion

为避免线程更新共享变量时出现问题,可以使用互斥量来确保同时仅有一个线程可以访问某项共享资源。可以使用互斥量来保证对任意共享资源的原子访问。

  • 互斥量有两种状态:已锁定(locked)**和未锁定(unlocked)**。
  • 任何时候,只有一个线程可以锁定该互斥量。
  • 试图对已经锁定的某一互斥量再次加锁,将可能阻塞线程或者报错失败,这具体取决于加锁时使用的方法。

一旦线程锁定了互斥量,即成为了该互斥量的所有者,只有所有者才能给互斥量解锁。而一般情况下,对每一共享资源会使用不同的互斥量,每一线程在访问同一资源时将采用如下协议:

  • 针对共享资源锁定互斥量
  • 访问共享资源
  • 对互斥量解锁。

互斥量的相关操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//互斥量的类型 pthread_mutex_t
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
/*
- 初始化互斥量
- 参数 :
- mutex : 需要初始化的互斥量变量
- attr : 互斥量相关的属性,NULL
* PTHREAD_MUTEX_TIMED_NP,这是缺省值,也就是普通锁。当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性;
* PTHREAD_MUTEX_RECURSIVE_NP,嵌套锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。如果是不同线程请求,则在加锁线程解锁时重新竞争;
* PTHREAD_MUTEX_ERRORCHECK_NP,检错锁,如果同一个线程请求同一个锁,则返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP类型动作相同。这样就保证当不允许多次加锁时不会出现最简单情况下的死锁;
* PTHREAD_MUTEX_ADAPTIVE_NP,适应锁,动作最简单的锁类型,仅等待解锁后重新竞争;

- restrict : C语言的修饰符,被修饰的指针,不能由另外的一个指针进行操作。
pthread_mutex_t *restrict mutex = xxx;
pthread_mutex_t * mutex1 = mutex;
*/
int pthread_mutex_destroy(pthread_mutex_t *mutex);
// - 释放互斥量的资源

int pthread_mutex_lock(pthread_mutex_t *mutex);
// - 加锁,阻塞的,如果有一个线程加锁了,那么其他的线程只能阻塞等待

int pthread_mutex_trylock(pthread_mutex_t *mutex);
// - 尝试加锁,如果加锁失败,不会阻塞,会直接返回。

int pthread_mutex_unlock(pthread_mutex_t *mutex);
//- 解锁

案例:三个窗口卖票

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>

//全局变量,所有线程共享
int tickets = 500;
pthread_mutex_t mutex;

void * sellticket(void *arg){

while(1){

//加锁
pthread_mutex_lock(&mutex);

if(tickets > 0){
usleep(600000);
printf("%ld 迈出了 第 %d 张门票\n",pthread_self(),tickets);
tickets--;
}else{
//解锁
pthread_mutex_unlock(&mutex);
break;
}

//解锁
pthread_mutex_unlock(&mutex);
}
return NULL;
}

int main(){

//初始化互斥量
pthread_mutex_init(&mutex,NULL);

pthread_t tid1,tid2,tid3;
pthread_create(&tid1,NULL,sellticket,NULL);
pthread_create(&tid2,NULL,sellticket,NULL);
pthread_create(&tid3,NULL,sellticket,NULL);

//回收子线程的资源 阻塞
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
pthread_join(tid3,NULL);

//退出主线程
pthread_exit(NULL);

printf("main thread is exit\n");

//释放互斥量资源
pthread_mutex_destroy(&mutex);
return 0;
}
  • 三个线程开启之后,都会进入while循环,先抢到那个互斥量然后加上锁的人,就可以先买票,卖完一张票之后就释放锁。
  • 其他因为加锁而阻塞的线程就可以枷锁然后进入临界区!

restrict关键字

  • C语言的一种类型限定符号。
  • 用于限定和约束指针!表明这个指针是访问一个数据对象唯一且初始的方式。
  • 即:所有修改该指针所指向内存中的内容都必须通过该指针来修改,而不能通过其他变量或者指针来修改。

死锁

  • 有时,一个线程需要同时访问两个或更多不同的共享资源,而每个资源又都由不同的互斥量管理。当超过一个线程加锁同一组互斥量时,就有可能发生死锁。
  • 两个或两个以上的进程在执行过程中,因争夺共享资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁。

产生死锁的条件

操作系统:

  • 互斥条件:一段时间内某个资源只由一个进程来占用,如果其他线程请求资源,则只能进行等待。
  • 不可抢占条件:已获得的资源,未使用完之前,不能够被剥夺,只能在使用完时自己释放。
  • 请求和保持条件:进程已经保持了至少一个资源,但是又要去申请新的资源,而那个资源又被其他的进程占有,同时进程又对自己的资源保持不放。
  • 循环等待条件:指在发生死锁时,必然存在一个进程——资源的环形链。

死锁的几种情景:

  • 忘记释放锁
  • 重复加同一个锁
    • 有可能是调用的两个函数内都加了同一个锁
  • 多线程,多个资源,多个锁,抢占了资源。

读写锁

当有一个线程已经持有互斥锁时,互斥锁将所有试图进入临界区的线程都阻塞住。但是考虑一种情形,当前持有互斥锁的线程只是要读访问共享资源,而同时有其它几个线程也想读取这个共享资源,但是由于互斥锁的排它性,所有其它线程都无法获取锁,也就无法读访问共享资源了,但是实际上多个线程同时读访问共享资源并不会导致问题。

在对数据的读写操作中,更多的是读操作,写操作较少,例如对数据库数据的读写应用。

为了满足当前能够允许多个读出,但只允许一个写入的需求,线程提供了读写锁来实现。

读写锁的特点:

  • 如果有其它线程读数据,则允许其它线程执行读操作,但不允许写操作。

  • 如果有其它线程写数据,则其它线程都不允许读、写操作。写是独占的,写的优先级高。

1
2
3
4
5
6
7
8
9
10
11
12
//读写锁的类型 pthread_rwlock_t
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
/*
PTHREAD_PROCESS_SHARED:允许可访问用于分配读写锁的内存的任何线程对读写锁进行处理。即使该锁是在由多个进程共享的内存中分配的,也允许对其进行处理;
PTHREAD_PROCESS_PRIVATE:读写锁只能由某些线程处理,这些线程与初始化该锁的线程在同一进程中创建。如果不同进程的线程尝试对此类读写锁进行处理,则其行为是不确定的。缺省值为 PTHREAD_PROCESS_PRIVATE;
*/
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

案例:读写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

//共享数据
int num = 1;
pthread_rwlock_t rwlock;//读写锁

void * readcall(void * arg){

while(1){
//读
pthread_rwlock_rdlock(&rwlock);
printf("--- read : %d, tid : %ld \n",num,pthread_self());
pthread_rwlock_unlock(&rwlock);
usleep(100);
}

return NULL;
}

void * writecall(void *arg){

while(1){
pthread_rwlock_wrlock(&rwlock);
num++;
printf("+++ write : %d, tid : %ld \n",num,pthread_self());
pthread_rwlock_unlock(&rwlock);
usleep(100);
}

return NULL;
}

int main(){

pthread_rwlock_init(&rwlock,NULL);

pthread_t rtid[5],wtid[3];//5个线程读,3个线程写

for(int i = 0;i<5;i++){
pthread_create(&rtid[i],NULL,readcall,NULL);
}
for(int i = 0;i<3;i++){
pthread_create(&wtid[i],NULL,writecall,NULL);
}

//设置线程分离
for(int i=0;i<5;i++){
pthread_detach(rtid[i]);
}
for(int i=0;i<3;i++){
pthread_detach(wtid[i]);
}

while(1){
sleep(10);
}

pthread_rwlock_destroy(&rwlock);

pthread_exit(NULL);//主线程退出,防止结束干扰其他线程

return 0;
}

生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

struct Node{
int val;
struct Node *next;
};

//头节点
struct Node *head = NULL;

void * profunc(void * arg){
//生产者
while(1){
//不断地创建新的节点
struct Node * newNode = (struct Node *)malloc(sizeof(struct Node *));
newNode->next = head;
head = newNode;

head->val = rand() % 1000;
printf("pro a new val : %d \n",head->val);

usleep(500);
}

return NULL;
}

void * cumfunc(void * arg){
//消费者
while(1){
//不断地消费节点
struct Node *tmp = head;
if(tmp != NULL){
printf("cum a val : %d \n",tmp->val);
head = head->next;
free(tmp);

usleep(500);
}
}
return NULL;
}

int main(){

pthread_t ptid[5],ctid[5];
for(int i=0;i<5;i++){
pthread_create(&ptid[i],NULL,profunc,NULL);
pthread_create(&ctid[i],NULL,cumfunc,NULL);
}

//设置线程分离
for(int i=0;i<5;i++){
pthread_detach(ptid[i]);
pthread_detach(ctid[i]);
}

while(1){
sleep(10);
}

pthread_exit(NULL);

return 0;
}

问题

完全没有考虑线程同步地问题,会出各种各样的问题,例如消费者重复释放了同一个节点。比如生产者生产了好多个节点到链表中,但是由于没有阻塞,消费者只能读到头部节点。。。

加互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

struct Node{
int val;
struct Node *next;
};

//头节点
struct Node *head = NULL;

pthread_mutex_t mutex;

void * profunc(void * arg){
//生产者
while(1){
//不断地创建新的节点

//加锁
pthread_mutex_lock(&mutex);

struct Node * newNode = (struct Node *)malloc(sizeof(struct Node *));
newNode->next = head;
head = newNode;

head->val = rand() % 1000;
printf("pro a new val : %d \n",head->val);

pthread_mutex_unlock(&mutex);

usleep(500);
}

return NULL;
}

void * cumfunc(void * arg){
//消费者
while(1){
//不断地消费节点

pthread_mutex_lock(&mutex);

struct Node *tmp = head;
if(tmp != NULL){
printf("cum a val : %d \n",tmp->val);
head = head->next;
free(tmp);

pthread_mutex_unlock(&mutex);
usleep(500);
}else{
pthread_mutex_unlock(&mutex);
}
}
return NULL;
}

int main(){

//初始化互斥锁
pthread_mutex_init(&mutex,NULL);

pthread_t ptid[5],ctid[5];
for(int i=0;i<5;i++){
pthread_create(&ptid[i],NULL,profunc,NULL);
pthread_create(&ctid[i],NULL,cumfunc,NULL);
}

//设置线程分离
for(int i=0;i<5;i++){
pthread_detach(ptid[i]);
pthread_detach(ctid[i]);
}

while(1){
sleep(10);
}

pthread_mutex_destroy(&mutex);

pthread_exit(NULL);

return 0;
}

这样结果稍微好一些!

但是依旧有问题:如果没有数据了,那么消费者就要一直循环等待,更好地方式是如果有了数据,生产者就通知消费者去消费!

条件变量

1
2
3
4
5
6
7
8
9
10
11
12
//条件变量的类型 pthread_cond_t
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t *co nd);
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
// - 等待,调用了该函数,线程会阻塞。只要进入这个函数就会对锁进行解锁
// 当不阻塞地时候,继续向下执行,就会加锁
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
// - 等待多长时间,调用了这个函数,线程会阻塞,直到指定的时间结束。
int pthread_cond_signal(pthread_cond_t *cond);
// - 唤醒一个或者多个等待的线程
int pthread_cond_broadcast(pthread_cond_t *cond);
// - 唤醒所有的等待的线程

采用条件变量来优化生产者消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

struct Node{
int val;
struct Node *next;
};

//头节点
struct Node *head = NULL;

pthread_mutex_t mutex;
pthread_cond_t cond;//条件变量

void * profunc(void * arg){
//生产者
while(1){
//不断地创建新的节点

//加锁
pthread_mutex_lock(&mutex);

struct Node * newNode = (struct Node *)malloc(sizeof(struct Node *));
newNode->next = head;
head = newNode;

head->val = rand() % 1000;
printf("pro a new val : %d \n",head->val);

//只要生产了一个,就通知消费者
pthread_cond_signal(&cond);

pthread_mutex_unlock(&mutex);

usleep(500);
}

return NULL;
}

void * cumfunc(void * arg){
//消费者
while(1){
//不断地消费节点

pthread_mutex_lock(&mutex);

struct Node *tmp = head;
if(tmp != NULL){
printf("cum a val : %d \n",tmp->val);
head = head->next;
free(tmp);

pthread_mutex_unlock(&mutex);
usleep(500);
}else{
//没有数据地时候,就需要阻塞等待

//wait函数调用阻塞地时候,会对互斥锁进行解锁
//当不阻塞地时候,继续向下执行,会重新枷锁
pthread_cond_wait(&cond,&mutex);
pthread_mutex_unlock(&mutex);
}
}
return NULL;
}

int main(){

//初始化互斥锁
pthread_mutex_init(&mutex,NULL);
pthread_cond_init(&cond,NULL);

pthread_t ptid[5],ctid[5];
for(int i=0;i<5;i++){
pthread_create(&ptid[i],NULL,profunc,NULL);
pthread_create(&ctid[i],NULL,cumfunc,NULL);
}

//设置线程分离
for(int i=0;i<5;i++){
pthread_detach(ptid[i]);
pthread_detach(ctid[i]);
}

while(1){
sleep(10);
}

pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);

pthread_exit(NULL);

return 0;
}

信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <semaphore.h>
//信号量的类型 sem_t
int sem_init(sem_t *sem, int pshared, unsigned int value);
//- 初始化信号量
//- 参数:
//- sem : 信号量变量的地址
//- pshared : 0 用在线程间 ,非0 用在进程间
//- value : 信号量中的值

int sem_destroy(sem_t *sem);
//- 释放资源

int sem_wait(sem_t *sem);
//- 对信号量加锁,调用一次对信号量的值-1,如果值为0,就阻塞

int sem_trywait(sem_t *sem);

int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
int sem_post(sem_t *sem);
//- 对信号量解锁,调用一次对信号量的值+1

int sem_getvalue(sem_t *sem, int *sval);

信号量机制,可以用在进程以及线程之中!

使用信号量来解决生产者消费者问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>

struct Node{
int val;
struct Node *next;
};

//头节点
struct Node *head = NULL;

pthread_mutex_t mutex;

//创建两个信号量
sem_t psem;
sem_t csem;

void * profunc(void * arg){
//生产者
while(1){
//不断地创建新的节点
sem_wait(&psem);//信号量减一

//加锁
pthread_mutex_lock(&mutex);

struct Node * newNode = (struct Node *)malloc(sizeof(struct Node *));
newNode->next = head;
head = newNode;

head->val = rand() % 1000;
printf("pro a new val : %d \n",head->val);

pthread_mutex_unlock(&mutex);

sem_post(&csem);//信号量加1,消费者可以采用
}

return NULL;
}

void * cumfunc(void * arg){
//消费者
while(1){
//不断地消费节点

sem_wait(&csem);//消费者使用

pthread_mutex_lock(&mutex);

struct Node *tmp = head;

printf("cum a val : %d \n",tmp->val);
head = head->next;
free(tmp);

pthread_mutex_unlock(&mutex);

sem_post(&psem);//生产者有了多余资源

}
return NULL;
}

int main(){

//初始化互斥锁
pthread_mutex_init(&mutex,NULL);
sem_init(&psem,0,5);
sem_init(&csem,0,0);//初始为0

pthread_t ptid[5],ctid[5];
for(int i=0;i<5;i++){
pthread_create(&ptid[i],NULL,profunc,NULL);
pthread_create(&ctid[i],NULL,cumfunc,NULL);
}

//设置线程分离
for(int i=0;i<5;i++){
pthread_detach(ptid[i]);
pthread_detach(ctid[i]);
}

while(1){
sleep(10);
}

pthread_mutex_destroy(&mutex);

pthread_exit(NULL);

return 0;
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!