▶ 使用信号量来进行线程间信息传递
● 代码
1 #include <stdio.h>
2 #include <pthread.h>
3 #include <semaphore.h>
4 #pragma comment(lib, "pthreadVC2.lib")
5
6 const int thread = 8, messageSize = 100;
7 char messageList[thread][messageSize]; // 全局信息列表
8 sem_t sem[thread]; // 各线程信号量,注意每个线程都有一个
9
10 void* sendMessage(void* rank)
11 {
12 const long long localRank = (long long)rank, dest = (localRank + 1) % thread;
13 int i;
14 sprintf_s(messageList[dest], messageSize, "Hello from %2d to %2d.", localRank, dest);
15 sem_post(&sem[dest]); // 解锁 dest 的信号量,因为上一行低吗已经完成了写入,注意每次执行完函数 sem_post() 后 sem[dest] 的值增加 1
16 sem_wait(&sem[localRank]); // 等待自己编号的信号量解锁,注意每次执行完函数 sem_wait() 后 sem[localRank] 的值减小 1(但不会小于0)
17 printf("Thread %2d > %s\n", localRank, messageList[localRank]);
18 return nullptr;
19 }
20
21 int main()
22 {
23 pthread_t pth[thread];
24 int i;
25 long long list[thread];
26
27 for (i = 0; i < thread; i++)
28 {
29 sem_init(&sem[i], 0, 0); // 依次初始化信号量
30 list[i] = i;
31 pthread_create(&pth[i], nullptr, sendMessage, (void *)list[i]);
32 }
33 for (i = 0; i < thread; i++)
34 {
35 pthread_join(pth[i], nullptr);
36 sem_destroy(&sem[i]); // 销毁信号量
37 }
38 printf("\nfinish.\n");
39 getchar();
40 return 0;
41 }
● 输出结果
1 Thread 1 > Hello from 0 to 1.
2 Thread 2 > Hello from 1 to 2.
3 Thread 3 > Hello from 2 to 3.
4 Thread 4 > Hello from 3 to 4.
5 Thread 5 > Hello from 4 to 5.
6 Thread 6 > Hello from 5 to 6.
7 Thread 0 > Hello from 7 to 0.
8 Thread 7 > Hello from 6 to 7.
9
10 finish.
● 用到的定义,注意信号量不是由 phread.h 提供的,而是 semaphore.h
1 typedef struct sem_t_ * sem_t;
2
3 PTW32_DLLPORT int __cdecl sem_init(sem_t * sem, int pshared, unsigned int value);
4 // 初始化信号量,输入一个已经声明的信号量的指针,第二个参数不明,第三个参数为 0 表示初始化完成后信号量为上锁状态
5
6 PTW32_DLLPORT int __cdecl sem_destroy(sem_t * sem);// 销毁信号量
7
8 PTW32_DLLPORT int __cdecl sem_wait(sem_t * sem); // 等待信号量为解锁状态再向下运行
9
10 PTW32_DLLPORT int __cdecl sem_post(sem_t * sem); // 解锁信号量
▶ 使用忙等待和互斥量来实现路障
● 代码
1 #include <stdio.h>
2 #include <pthread.h>
3 #pragma comment(lib, "pthreadVC2.lib")
4
5 const int thread = 8;
6 int count;
7 pthread_mutex_t pmt;
8
9 void* work(void* rank)
10 {
11 const long long localRank = (long long)rank, dest = (localRank + 1) % thread;
12 pthread_mutex_lock(&pmt); // 进入读写区,上锁,计数器加一,解锁
13 printf("Thread %2d reached the barrier.\n", localRank); fflush(stdout);
14 count++;
15 pthread_mutex_unlock(&pmt);
16 while (count < thread); // 使用忙等待来等所有的线程都达到栅栏
17 printf("Thread %2d passed the barrier.\n", localRank); fflush(stdout);
18 return nullptr;
19 }
20
21 int main()
22 {
23 pthread_t pth[thread];
24 int i;
25 long long list[thread];
26 pthread_mutex_init(&pmt, nullptr);
27 for (i = count = 0; i < thread; i++)
28 {
29 list[i] = i;
30 pthread_create(&pth[i], nullptr, work, (void *)list[i]);
31 }
32 for (i = 0; i < thread; i++)
33 pthread_join(pth[i], nullptr);
34 pthread_mutex_destroy(&pmt);
35 printf("\nfinish.\n");
36 getchar();
37 return 0;
38 }
● 输出结果
1 Thread 1 reached the barrier.
2 Thread 5 reached the barrier.
3 Thread 2 reached the barrier.
4 Thread 3 reached the barrier.
5 Thread 4 reached the barrier.
6 Thread 0 reached the barrier.
7 Thread 6 reached the barrier.
8 Thread 7 reached the barrier.
9 Thread 5 passed the barrier.
10 Thread 6 passed the barrier.
11 Thread 3 passed the barrier.
12 Thread 0 passed the barrier.
13 Thread 1 passed the barrier.
14 Thread 4 passed the barrier.
15 Thread 2 passed the barrier.
16 Thread 7 passed the barrier.
17
18 finish.
▶ 使用信号量来实现路障
● 代码
1 #include <stdio.h>
2 #include <pthread.h>
3 #include <semaphore.h>
4 #pragma comment(lib, "pthreadVC2.lib")
5
6 const int thread = 8;
7 int count;
8 sem_t sem_count, sem_barrier;
9
10 void* work(void* rank)
11 {
12 const long long localRank = (long long)rank, dest = (localRank + 1) % thread;
13 printf("Thread %2d reached the barrier.\n", localRank); fflush(stdout);
14 sem_wait(&sem_count); // 等待允许访问计数器 count,注意执行完该语句时 sem_count 值减 1,自动上锁
15 if (count == thread - 1) // 最后一个到达进入的线程
16 {
17 count = 0; // 计数器清零,以后可以重复使用
18 sem_post(&sem_count); // 计数器解锁,sem_count 值加 1
19 for (int i = 0; i < thread - 1; sem_post(&sem_barrier), i++);// 解锁整个栅栏,
20 } // 每有一个线程通过后面的语句 sem_wait(&sem_barrier);,
21 else // 前面到达的线程 // sem_barrier 的值就减 1,所以这里要为该变量加上 thread - 1
22 {
23 count++; // 计数器加一
24 sem_post(&sem_count); // 解锁计数器
25 sem_wait(&sem_barrier); // 等待栅栏解锁
26 }
27 printf("Thread %2d passed the barrier.\n", localRank); fflush(stdout);
28 return nullptr;
29 }
30
31 int main()
32 {
33 pthread_t pth[thread];
34 int i;
35 long long list[thread];
36
37 sem_init(&sem_count, 0, 1); // 计数器锁初始化为 1,开锁状态
38 sem_init(&sem_barrier, 0, 0); // 栅栏初始化为 0,关锁状态
39 for (i = count = 0; i < thread; i++)
40 {
41 list[i] = i;
42 pthread_create(&pth[i], nullptr, work, (void *)list[i]);
43 }
44 for (i = 0; i < thread; i++)
45 pthread_join(pth[i], nullptr);
46 sem_destroy(&sem_count), sem_destroy(&sem_barrier);
47 printf("\nfinish.\n");
48 getchar();
49 return 0;
50 }
● 输出结果
Thread 0 reached the barrier.
Thread 3 reached the barrier.
Thread 4 reached the barrier.
Thread 2 reached the barrier.
Thread 1 reached the barrier.
Thread 5 reached the barrier.
Thread 7 reached the barrier.
Thread 6 reached the barrier.
Thread 4 passed the barrier.
Thread 5 passed the barrier.
Thread 2 passed the barrier.
Thread 7 passed the barrier.
Thread 3 passed the barrier.
Thread 1 passed the barrier.
Thread 6 passed the barrier.
Thread 0 passed the barrier.
finish.
▶ 使用条件变量来实现路障
1 #include <stdio.h>
2 #include <pthread.h>
3 #include <semaphore.h>
4 #pragma comment(lib, "pthreadVC2.lib")
5
6 const int thread = 8;
7 int count;
8 pthread_mutex_t mutex;
9 pthread_cond_t cond;
10
11 void* work(void* rank)
12 {
13 const long long localRank = (long long)rank, dest = (localRank + 1) % thread;
14 printf("Thread %2d reached the barrier.\n", localRank); fflush(stdout);
15 pthread_mutex_lock(&mutex); // 上锁
16 count++;
17 if (count == thread) // 最后一个进入的线程
18 {
19 count = 0; // 计数器清零
20 pthread_cond_broadcast(&cond); // 广播所有线程继续向下执行
21 }
22 else
23 for (; pthread_cond_wait(&cond, &mutex) != 0;);// 等待其他线程
24 pthread_mutex_unlock(&mutex); // 条件变量阻塞解除后会自动将互斥量上锁,需要手工解除
25
26 printf("Thread %2d passed the barrier.\n", localRank); fflush(stdout);
27 return nullptr;
28 }
29
30 int main()
31 {
32 pthread_t pth[thread];
33 int i;
34 long long list[thread];
35 pthread_mutex_init(&mutex, nullptr);
36 pthread_cond_init(&cond, nullptr);
37 for (i = count = 0; i < thread; i++)
38 {
39 list[i] = i;
40 pthread_create(&pth[i], nullptr, work, (void *)list[i]);
41 }
42 for (i = 0; i < thread; i++)
43 pthread_join(pth[i], nullptr);
44 pthread_mutex_destroy(&mutex);
45 pthread_cond_destroy(&cond);
46 printf("\nfinish.\n");
47 getchar();
48 return 0;
49 }
● 输出结果
Thread 0 reached the barrier.
Thread 1 reached the barrier.
Thread 2 reached the barrier.
Thread 4 reached the barrier.
Thread 5 reached the barrier.
Thread 6 reached the barrier.
Thread 7 reached the barrier.
Thread 3 reached the barrier.
Thread 3 passed the barrier.
Thread 0 passed the barrier.
Thread 1 passed the barrier.
Thread 5 passed the barrier.
Thread 4 passed the barrier.
Thread 7 passed the barrier.
Thread 2 passed the barrier.
Thread 6 passed the barrier.
finish.
● 用到的定义,pthread.h
1 typedef struct pthread_cond_t_ * pthread_cond_t;
2
3 PTW32_DLLPORT int PTW32_CDECL pthread_cond_init(pthread_cond_t * cond, const pthread_condattr_t * attr);// 初始化已经声明了的条件变量,第二个参数为属性指针
4
5 PTW32_DLLPORT int PTW32_CDECL pthread_cond_destroy(pthread_cond_t * cond); // 销毁条件变量
6
7 PTW32_DLLPORT int PTW32_CDECL pthread_cond_wait(pthread_cond_t * cond, pthread_mutex_t * mutex); // 阻塞线程以等待 signal 或 brocast
8
9 PTW32_DLLPORT int PTW32_CDECL pthread_cond_signal(pthread_cond_t * cond); // 解锁一个线程
10
11 PTW32_DLLPORT int PTW32_CDECL pthread_cond_broadcast(pthread_cond_t * cond); // 解锁所有的线程