apue线程

1.线程的概念

线程就是一个正在运行的函数。

实际项目中多线程用得比较多,因为多线程是先有标准后有实现的,所以不会向多进程那样在不同平台上有许多不同的情况。

C 语言线程有很多标准,POSIX 是其中的一种(还有比如openmp标准线程)。POSIX 是一套标准,而不是一种实现。所以 POSIX 只是规定了 pthread_t 作为线程标识符,但是并没有规定它必须是由什么类型组成的。在有的平台上它可能是 int,有些平台上它可能是 struct,还有些平台上它可能是 union,所以不要直接操作这个类型,而是要使用 POSIX 规定的各种线程函数来操作它。

有木有觉得像标准 IO 里 FILE 的赶脚?没错,标准制定出来的很多东西都是这种风格的,它为你提供一个数据类型而不让你直接对这个类型操作,要通过它定义的一系列函数来实现对这个类型的操作,这样就在各个平台上实现统一的接口了,所以这样做才能让标准制定出来的东西具有较好的可移植性。

pthread_t 是个很重要的东西,我们所有使用 POSIX 标准的线程操作都是围绕着它来进行的,通过它配合各种函数就可以对线程进行各种花样作死的玩了。:)

线程没有父子关系,是兄弟关系,可以相互收尸(别说主线程,建议说main线程)。线程间比进程间通讯简单,因为线程共用一片地址空间。

查看线程

ps axf 查看进程关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# li @ evpower in ~ [14:31:02]
$ ps axf
PID TTY STAT TIME COMMAND
2 ? S 0:00 [kthreadd]
3 ? I< 0:00 \_ [rcu_gp]
4 ? I< 0:00 \_ [rcu_par_gp]
6 ? I< 0:00 \_ [kworker/0:0H-kblockd]
9 ? I< 0:00 \_ [mm_percpu_wq]
...
1 ? Ss 0:03 /sbin/init splash
689 ? Ss 0:00 avahi-daemon: running [evpower.local]
738 ? S 0:00 \_ avahi-daemon: chroot helper
701 ? Ss 0:00 /usr/bin/python3 /usr/bin/networkd-dispatcher --run-startup-triggers
829 ? Ssl 0:00 /usr/sbin/gdm3
1610 ? Sl 0:00 \_ gdm-session-worker [pam/gdm-password]
1712 tty2 Ssl+ 0:00 \_ /usr/lib/gdm3/gdm-x-session --register-session --run-script i3
1714 tty2 Sl+ 2:09 \_ /usr/lib/xorg/Xorg vt2 -displayfd 3 -auth /run/user/1000/gdm/Xa
1745 tty2 S+ 0:02 \_ i3
1824 ? Ss 0:00 \_ /usr/bin/ssh-agent /usr/bin/im-launch i3

ps axm 查看进程详细信息,包括线程(- -表示线程)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    PID TTY      STAT   TIME COMMAND
1 ? - 0:04 /sbin/init splash
- - Ss 0:04 -
2 ? - 0:00 [kthreadd]
- - S 0:00 -
3 ? - 0:00 [rcu_gp]
- - I< 0:00 -
...
693 ? - 0:03 /usr/sbin/NetworkManager --no-daemon
- - Ssl 0:03 -
- - Ssl 0:00 -
- - Ssl 0:00 -
700 ? - 0:00 /usr/sbin/irqbalance --foreground
- - Ssl 0:00 -
- - Ssl 0:00 -
701 ? - 0:00 /usr/bin/python3 /usr/bin/networkd-dispatcher --run-startup-triggers
- - Ss 0:00 -

ps ax -L也可以查进程与线程(PID是进程id,LWP是轻量级进程id,常称线程id,也占一个PID)

1
2
3
4
5
6
7
8
9
10
11
12
    PID     LWP TTY      STAT   TIME COMMAND
1 1 ? Ss 0:04 /sbin/init splash
2 2 ? S 0:00 [kthreadd]
3 3 ? I< 0:00 [rcu_gp]
...
708 708 ? Ssl 0:00 /usr/lib/policykit-1/polkitd --no-debug
708 715 ? Ssl 0:00 /usr/lib/policykit-1/polkitd --no-debug
708 769 ? Ssl 0:00 /usr/lib/policykit-1/polkitd --no-debug
710 710 ? Ssl 0:00 /usr/sbin/rsyslogd -n -iNONE
710 764 ? Ssl 0:00 /usr/sbin/rsyslogd -n -iNONE
710 765 ? Ssl 0:00 /usr/sbin/rsyslogd -n -iNONE
710 766 ? Ssl 0:00 /usr/sbin/rsyslogd -n -iNONE

Linux以线程为维护单位,也就是PID,如果线程没有被收尸,那尸体就占用一个PID。

pthread_equal

1
2
//比较两个线程标识符是否相同,相同返回非0,不同则返回0。
int pthread_equal(pthread_t t1, pthread_t t2);

为什么不能使用 if (t1 == t2) 的方式比较两个线程标识符呢?因为各系统实现不一样,你不知道 pthread_t 是什么类型的,所以永远不要自己直接操作它。

pthread_self

1
2
//获得当前线程 ID,永远不出错
pthread_t pthread_self(void);

2.线程的创建&终止&取消&栈清理

pthread_create

1
2
3
4
5
6
7
8
//创建线程,成功返回0,失败返回errno
//某些平台上 errno 是全局变量,为避免在多线程的情况下出现竞争,POSIX 指定在失败的时直接返回 errno
//创建后的线程id回填到thread
//attr指定创建线程具有的属性(常用NULL就够用)
//start_routine 指定函数也就是线程主体
//arg指定传给start_routine的参数
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);

pthread_exit

线程终止有3中方式,pthread_exit 只是其一

  1. 线程从启动例程返回,返回值就是线程退出码
  2. 线程可以被同一进程中的其他线程取消
  3. 线程调用pthread_exit()
1
2
//终止线程,并通过retval返回一个值
void pthread_exit(void *retval);

pthread_join

1
2
3
//收thread的尸体,并把thread返回的值回填到retval
//成功返回0,失败返回errno
int pthread_join(pthread_t thread, void **retval);

类似进程里面的wait()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/* Filename: creat1.c
* No.83-84.线程-线程创建&线程终止和栈清理
* $ ps axm
* $ ps ax -L
* Description:
* $ gcc -Wall creat1.c -o creat1 -lpthread
* Last modified: humble 2020-09-01 15:29:48
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
//#include <unistd.h>

static void *func(void *p)
{
printf("func %d\n", *(int *)p);
pthread_exit(3);
//pthread_exit(NULL);
//return NULL; //pthread_exit可以实现线程清理,retuan NULL不可以
}

int main(int argc, char **argv)
{
int i = 2;
int retval = -1;
pthread_t tid;

puts("begin");
int err = pthread_create(&tid, NULL, func, (void *)&i);
if(err){
fprintf(stderr, "pthread_create():%s\n", strerror(err));
exit(1);
}

//线程的调度取决于调度器的策略
//调用pthread_create后,可能main线程先往下运行,也可能func线程先运行
//所以"func 7"和"main"打印顺序可能不确定

//sleep(1);
puts("main");

//为tid收尸,并用retval接收tid返回值
pthread_join(tid, &retval); //类似进程的wati()

printf("end %d\n", retval);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [18:32:28]
$ gcc -Wall creat1.c -o creat1 -lpthread
creat1.c: In function ‘func’:
creat1.c:18:18: warning: passing argument 1 of ‘pthread_exit’ makes pointer from integer without a cast [-Wint-conversion]
18 | pthread_exit(3);
| ^
| |
| int
In file included from creat1.c:12:
/usr/include/pthread.h:207:33: note: expected ‘void *’ but argument is of type ‘int’
207 | extern void pthread_exit (void *__retval) __attribute__ ((__noreturn__));
| ~~~~~~^~~~~~~~
creat1.c: In function ‘main’:
creat1.c:44:23: warning: passing argument 2 of ‘pthread_join’ from incompatible pointer type [-Wincompatible-pointer-types]
44 | pthread_join(tid, &retval); //类似进程的wati()
| ^~~~~~~
| |
| int *
In file included from creat1.c:12:
/usr/include/pthread.h:215:49: note: expected ‘void **’ but argument is of type ‘int *’
215 | extern int pthread_join (pthread_t __th, void **__thread_return);
| ~~~~~~~^~~~~~~~~~~~~~~

# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [18:40:25]
$ ./creat1
begin
main
func 2
end 3

# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [18:40:27]
$

栈清理

1
2
3
4
5
6
//挂钩子函数 routine
//arg指定传给routine 的参数
void pthread_cleanup_push(void (*routine)(void *), void *arg);

//根据 execute 值为真/假判断是否调用钩子函数
void pthread_cleanup_pop(int execute);

类似atexit()挂钩子函数。用钩子函数对线程进行清理,钩子函数被调用顺序跟注册时相反。

这两个是带参的宏而不是函数,所以必须成对使用,而且必须先使用 pthread_cleanup_push 再使用 pthread_cleanup_pop,否则会报语法错误,括号不匹配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/* Filename: cleanup.c
* No.84.线程-线程终止和栈清理
* $ ps axm
* $ ps ax -L
* Description:
* $ gcc -Wall cleanup.c -o cleanup -lpthread
* Last modified: humble 20200411 17:32
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

//pthread_cleanup_push 和 pthread_cleanup_pop 需要成对使用,位置无所谓
//demo 不为0或1,则编译报错
#define demo (0) //1

static void func_cleanup(void *p)
{
puts(p);
return;
}

static void *func(void *p)
{
puts("pthread func is working");
pthread_cleanup_push(func_cleanup, "cleanup1");
pthread_cleanup_push(func_cleanup, "cleanup2");
pthread_cleanup_push(func_cleanup, "cleanup3");

puts("push over");

#if (demo == 0)
pthread_cleanup_pop(1);
pthread_cleanup_pop(0);
pthread_cleanup_pop(0);
#endif

pthread_exit(NULL);

#if (demo == 1)
//如果 pthread_cleanup_pop 放在pthread_exit后面,传参execute永远为真
pthread_cleanup_pop(1);
pthread_cleanup_pop(0);
pthread_cleanup_pop(0);
#endif
}

int main(int argc, char **argv)
{
pthread_t tid;

puts("begin");
int err = pthread_create(&tid, NULL, func, NULL);
if(err){
fprintf(stderr, "pthread_create():%s\n", strerror(err));
exit(1);
}
pthread_join(tid, NULL);
puts("end");
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [19:07:58]
$ gcc -Wall cleanup.c -o cleanup -lpthread #demo0

# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [19:08:00]
$ ./cleanup
begin
pthread func is working
push over
cleanup3
end

# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [19:08:04]
$ vi cleanup.c

# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [19:08:13]
$ gcc -Wall cleanup.c -o cleanup -lpthread #demo1

# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [19:08:24]
$ ./cleanup
begin
pthread func is working
push over
cleanup3
cleanup2
cleanup1
end

# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [19:08:26]
$

pthread_cancel

1
2
3
//取消同一进程中的线程thread
//成功返回0,失败返回errno
int pthread_cancel(pthread_t thread);

当一个线程没有必要继续执行下去时,又没法直接为它收尸,就需要先pthread_cancel取消这个线程,再pthread_join收尸。

如:多线程遍历一个很大的二叉树查找一个数据时,某一个线程找到了数据,那其它线程就没有必要继续执行了,可以取消它们了。

注意pthread_cancel()并不等待线程终止,它仅仅提出请求。 而目标线程收到这个请求也不会立即终止,要执行到取消点才能被取消。

取消有2种状态:

  • 允许
    • 异步cancel
    • 推迟cancel(默认) 推迟至cancel点再取消
  • 不允许。

POSIX 定义的cancel点,都是可能引发阻塞的系统调用(比如open(),read(),write()..)

1
2
3
4
5
6
7
8
9
10
11
12
13
fd1 = open();
// if err

//fd1刚打开,就收到一个pthread_cancel(但这里不是cancel点,所以不会响应)
//'可能引发阻塞的系统调用'才是POSIX定义的cancel点,要执行到下一个cancel点才会响应
//所以,在下一个cancel点之前可以挂钩子函数去关闭/释放资源
pthread_cleanup_push(close(fd1));

fd2 = open();
// if err

pthread_cleanup_push(close(fd2));
...

pthread_setcanceltype

1
2
3
4
//设置是否允许被取消
int pthread_setcancelstate(int state, int *oldstate);
//设置取消方式(异步cancel还是推迟cancel)
int pthread_setcanceltype(int type, int *oldtype);

pthread_testcancel

1
2
//什么都不做,充当一个cancel点
void pthread_testcancel(void);

pthread_detach

1
2
//分离一个线程
int pthread_detach(pthread_t thread);

习惯上谁创建线程谁pthread_join收尸。当分离了thread线程,那就不能为其收尸。

例0:多线程筛质数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/* Filename: primer0.c
* No.86.线程-线程竞争实例
* Description:
* 多线程去计算某一范围内的质数
* $ gcc -Wall primer0.c -o primer0 -lpthread
* Last modified: humble 2020-09-01 21:04:58
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

#define LEFT (30000000)
#define RIGHT (30000200)
#define THRNUM (RIGHT - LEFT +1)

static void *thr_primer(void *p);

int main(int argc, char **argv)
{
int i,err;
pthread_t tid[THRNUM];

for(i = LEFT; i <= RIGHT; i++){
err = pthread_create(tid + (i - LEFT), NULL, thr_primer, (void *)i); //如果把i的地址则会有竞争
if(err){
//发生错误就收尸已经创建成功的线程
//while(--i < 0){ pthread_join(tid[i - LEFT], NULL); } //未测
fprintf(stderr, "pthread_create():%s\n", strerror(err));
exit(1);
}
}

for(i = LEFT; i <= RIGHT; i++){ //收尸
pthread_join(tid[i - LEFT], NULL);
}

exit(0);
}

static void *thr_primer(void *p)
{
int i, j, mark;
i = (int)p;
mark = 1;
for(j = 2; j < i/2; j++) {
if(i % j == 0){
mark = 0;
break;
}
}

if(mark == 1){
printf("%d is a primer\n", i);
}

pthread_exit(NULL);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [21:08:38]
$ gcc -Wall primer0.c -o primer0 -lpthread
primer0.c: In function ‘main’:
primer0.c:25:66: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
25 | err = pthread_create(tid + (i - LEFT), NULL, thr_primer, (void *)i); //如果把i的地址则会有竞争
| ^
primer0.c: In function ‘thr_primer’:
primer0.c:45:9: warning: cast from pointer to integer of different size [-Wpointer-to-int-cast]
45 | i = (int)p;
| ^

# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [21:08:40]
$ ./primer0
30000059 is a primer
30000071 is a primer
30000083 is a primer
30000023 is a primer
30000137 is a primer
30000001 is a primer
30000041 is a primer
30000199 is a primer
30000193 is a primer
30000037 is a primer
30000149 is a primer
30000049 is a primer
30000109 is a primer
30000169 is a primer
30000163 is a primer
30000167 is a primer
30000133 is a primer
30000079 is a primer

# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [21:09:12]
$

例e:多线程筛质数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/* Filename: primer0_e.c
* No.87.线程-线程竞争实例2
* Description:
* 多线程去计算某一范围内的质数
* $ gcc -Wall primer0_e.c -o primer0_e -lpthread
* Last modified: humble 2020-09-01 21:14:22
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

#define LEFT (30000000)
#define RIGHT (30000200)
#define THRNUM (RIGHT - LEFT +1)

typedef struct{
int n;
}thrarg_t;

static void *thr_primer(void *p);

int main(int argc, char **argv)
{
int i,err;
pthread_t tid[THRNUM];
thrarg_t *p = NULL;
void *ptr;

for(i = LEFT; i <= RIGHT; i++)
{
p = malloc(sizeof(thrarg_t));
if(!p){
perror("malloc()");
exit(1);
}
p->n = i;

err = pthread_create(tid + (i - LEFT), NULL, thr_primer, p);
if(err){
//while(--i < 0){ pthread_join(tid[i - LEFT], NULL);free(.ptr.); } //未测
fprintf(stderr, "pthread_create():%s\n", strerror(err));
exit(1);
}
}

for(i = LEFT; i <= RIGHT; i++){
pthread_join(tid[i - LEFT], &ptr);
free(ptr);
}
exit(0);
}

static void *thr_primer(void *p)
{
int i, j, mark;
i = ((thrarg_t *)p)->n;
//free(p);
mark = 1;
for(j = 2; j < i/2; j++){
if(i % j == 0){
mark = 0;
break;
}
}

if(mark == 1){
printf("%d is a primer\n", i);
}
pthread_exit(p); //把p返回给pthread_join
}

运行结果同上。例子创建了200个线程,但能进程空间允许创建的线程数量有限(可能PID耗尽,也可能栈空间耗尽)。每个线程消耗一个PID,并且消耗一份栈空间ulimit -s(32位进程栈最多创建约300个线程),使用线程属性修改线程栈大小可提高最大线程创建量。使用ulimit -a命令可以查看所有资源上限。

3.线程同步

互斥量(pthead_mutex_t)

可以使各个线程实现互斥的效果。由它来保护临界区每次只能由一个线程进入。当一个线程想要进入临界区之前需要先抢锁(加锁),如果能抢到锁就进入临界区工作,并且要在离开的时候解锁以便让其它线程可以抢到锁进入临界区;如果没有抢到锁则进入阻塞状态等待锁被释放然后再抢锁。

要在进入临界区之前加锁,在退出临界区的时候解锁。临界区是每个线程要单独执行的,所以临界区中的代码执行时间越短越好。互斥量限制的是一段代码能否执行,而不是一个变量或一个资源。

防止死锁:在临界区内跳转到临界区外时记得先解锁。

pthread_mutex_xxx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//静态初始化,定义mutex时用宏初始化,使用默认属性
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

//动态初始化,先定义未被初始化的mutex,再用 pthread_mutex_init 函数对mutex初始化,并指定属性mutexattr。返回0(永远不出错)
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);

//销毁mutex锁,成功返回0,失败返回errno
int pthread_mutex_destroy(pthread_mutex_t *mutex);

//阻塞死等抢锁,成功返回0,失败返回errno
int pthread_mutex_lock(pthread_mutex_t *mutex);

//非死等抢锁,成功返回0,失败返回errno
int pthread_mutex_trylock(pthread_mutex_t *mutex);

//解锁,成功返回0,失败返回errno
int pthread_mutex_unlock(pthread_mutex_t *mutex);

例:多线程读写文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/* Filename: add.c
* No.88.线程-竞争故障
* Description: 线程冲突(线程互斥量)
* $ gcc -Wall add.c -o add -lpthread
* $ echo 1 > /tmp/out
* $ ./add && cat /tmp/out
* Last modified: humble 2020-09-02 09:03:58
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define THRNUM (20)
#define LINEBUFSIZE (1024)
#define FNAME "/tmp/out"

//demo0 不带互斥量,同时读写资源发生冲突
//demo1 使用互斥量,争抢资源,非同时读写
#define demo (1) //0,1

#if (demo == 1)
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
#endif

static void *thr_add(void *p)
{
FILE *fp;
char linebuf[LINEBUFSIZE];

fp = fopen(FNAME, "r+");
if(!fp){
perror("fopen()");
exit(1);
}

#if (demo == 1)
pthread_mutex_lock(&mut);
#endif

fgets(linebuf, LINEBUFSIZE, fp);
fseek(fp, 0, SEEK_SET);
#if (demo == 0)
sleep(1); //扩大竞争时间域,等全部线程都读取完毕,再往下操作
#endif
fprintf(fp, "%d\n", atoi(linebuf) + 1);
fclose(fp); //刷新缓冲区,不能放到unlock后面

#if (demo == 1)
pthread_mutex_unlock(&mut);
#endif

pthread_exit(NULL);
}

int main(int argc, char **argv)
{
int i, err;
pthread_t tid[THRNUM];

for(i = 0; i < THRNUM; i++){
err = pthread_create(tid + i, NULL, thr_add, NULL);
if(err){
//while(--i < 0){ pthread_join(tid[i - LEFT], NULL); } //未测
fprintf(stderr, "pthread_create():%s\n", strerror(err));
exit(1);
}
}

for(i = 0; i < THRNUM; i++){
pthread_join(tid[i], NULL);
}

#if (demo == 1)
pthread_mutex_destroy(&mut); //对应 pthread_mutex_init()
#endif
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [9:46:35]
$ gcc -Wall add.c -o add -lpthread #demo0

# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [9:46:40]
$ echo 1 > /tmp/out

# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [9:46:42]
$ ./add

# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [9:46:46]
$ cat /tmp/out
2

# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [9:46:48]
$ vi add.c

# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [9:47:07]
$ gcc -Wall add.c -o add -lpthread #demo1

# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [9:47:10]
$ echo 1 > /tmp/out

# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [9:47:12]
$ ./add

# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [9:47:14]
$ cat /tmp/out
21

# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [9:47:16]
$

例:4个线程分别打印abcd,且按顺序,持续5秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/* Filename: abcd.c
* No.89.线程-互斥量
* Description:4个线程分别打印abcd,且按顺序,持续5秒
* $ gcc -Wall abcd.c -o abcd -lpthread
* Last modified: humble 2020-09-02 10:06:45
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define THRNUM (4)

static pthread_mutex_t mut[THRNUM];

static int next(int n)
{
if(n + 1 == THRNUM){
return 0;
}
return n + 1;
}


static void *thr_abcd(void *p)
{
int n = (int)p;
int c = 'a' + (int)p;

while(1){
pthread_mutex_lock(mut + n); //死等着抢属于自己的锁
write(1, &c, 1);
pthread_mutex_unlock(mut + next(n)); //放开下一个人的锁
}
pthread_exit(NULL);
}

int main(int argc, char **argv)
{
int i, err;
pthread_t tid[THRNUM];

for(i = 0; i < THRNUM; i++)
{
pthread_mutex_init(mut + i, NULL);
pthread_mutex_lock(mut + i); //初始化完,立刻抢锁
err = pthread_create(tid + i, NULL, thr_abcd, (void *)i);
if(err){
//while(--i < 0){ pthread_join(tid[i - LEFT], NULL); } //未测
fprintf(stderr, "pthread_create():%s\n", strerror(err));
exit(1);
}
}
//线程创建完毕时全部锁都抢到了

pthread_mutex_unlock(mut + 0); //放开a的锁
alarm(5); //5秒钟后进程被信号杀死

for(i = 0; i < THRNUM; i++)
{
pthread_join(tid[i], NULL);
}
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [11:20:10] C:142
$ gcc -Wall abcd.c -o abcd -lpthread
abcd.c: In function ‘thr_abcd’:
abcd.c:29:13: warning: cast from pointer to integer of different size [-Wpointer-to-int-cast]
29 | int n = (int)p;
| ^
abcd.c:30:19: warning: cast from pointer to integer of different size [-Wpointer-to-int-cast]
30 | int c = 'a' + (int)p;
| ^
abcd.c: In function ‘main’:
abcd.c:49:55: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
49 | err = pthread_create(tid + i, NULL, thr_abcd, (void *)i);
| ^

# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [11:21:54]
$ ./abcd
abcdabcdabcdabcdabcdabcdabcda...
[1] 20197 alarm ./abcd

让出cpu

1
2
3
//让出当前线程所占用的调度器给其它线程使用,而不必等待时间片耗尽才切换调度器
//成功返回0,失败返回-1并设置errno
int sched_yield(void);

可以把它理解成一个很短暂的 sleep() 。一般用于在使用一个资源时需要同时获得多把锁但是却没法一次性获得全部的锁的场景下,只要有任何一把锁没有抢到,那么就立即释放已抢到的锁,并让出自己的调度器让其它线程有机会获得被自己释放的锁。当再次调度到自己时再重新抢锁,直到能一次性抢到所有的锁时再进入临界区,这样就避免了出现死锁的情况。

例:N个线程去计算某一范围内的质数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/* Filename: primer0_pool.c
* No.90.线程-线程池实现
* Description: N个线程去计算某一范围内的质数
* 查询法/忙等版,main和多个thr_primer一起抢锁,会出现main长时间抢不到锁的情况
* $ gcc -Wall primer0_pool.c -o primer0_pool -lpthread
* Last modified: humble 2020-09-02 15:18:52
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sched.h>

#define LEFT (30000000)
#define RIGHT (30000200)
#define THRNUM (4)

static int num = 0; //猪圈
static pthread_mutex_t mut_num = PTHREAD_MUTEX_INITIALIZER; //猪圈锁

static void *thr_primer(void *p);

int main(int argc, char **argv)
{
int i,err;
pthread_t tid[THRNUM];

//创建线程(创建4头猪)
for(i = 0; i <= THRNUM; i++){
err = pthread_create(tid + i, NULL, thr_primer, (void *)i);
if(err){
//while(--i < 0){ pthread_join(tid[i], NULL); } //未测
fprintf(stderr, "pthread_create():%s\n", strerror(err));
exit(1);
}
}

//main把数字投放到num里面(主人把猪食扔到猪圈里)
for(i = LEFT; i <= RIGHT; i++){
pthread_mutex_lock(&mut_num); //抢猪圈的锁
while(num != 0){
//上一份猪食居然还没被猪吃掉
//解锁猪圈,好让猪能抢锁进猪圈吃掉
pthread_mutex_unlock(&mut_num);
//因为有可能某个thr_primer(猪)抢到也可能又被main(主人)抢到
//所以操作调度器,让出cpu(可理解为段时间sleep),避免接下来又是main(主人)抢到锁
sched_yield();
pthread_mutex_lock(&mut_num);
}
num = i; //上一份猪食被猪吃掉了,投放猪食
pthread_mutex_unlock(&mut_num); //解锁猪圈,好让猪能抢锁进猪圈吃掉
}
//到此,猪食已经投完了

//等猪取走猪食后,往猪圈投放一份猪粪(-1),让猪别再抢锁进猪圈
pthread_mutex_lock(&mut_num);
while(num != 0){
//上一份猪食居然还没被猪吃掉
//解锁猪圈,好让猪能抢锁进猪圈吃掉
pthread_mutex_unlock(&mut_num);
//因为有可能别的thr_primer(猪)抢到也可能又被自己抢到
//所以操作调度器,让出cpu(可理解为段时间sleep),避免接下来又是main(主人)抢到锁
sched_yield();
pthread_mutex_lock(&mut_num);
}
num = -1; //猪食被猪吃光了,投放猪粪(-1)
pthread_mutex_unlock(&mut_num); //解锁猪圈,好让猪能抢锁进猪圈来发现猪粪(-1)而结束

for(i = 0; i <= THRNUM; i++)
{
pthread_join(tid[i], NULL);
}

pthread_mutex_destroy(&mut_num);
exit(0);
}

static void *thr_primer(void *p)
{
int i, j, mark;

while(1){
pthread_mutex_lock(&mut_num); //猪抢到锁
while(num == 0){
//发现猪圈是空的
//解锁给main(主人)去投放猪食(有可能被其他thr_primer(猪)抢到)
pthread_mutex_unlock(&mut_num);
sched_yield(); //让出cpu,避免接下来又是自己抢到锁
pthread_mutex_lock(&mut_num);
}

//猪圈终于有东西了
if(num == -1){
//居然是猪粪,释放锁
pthread_mutex_unlock(&mut_num);
break; //准备自杀
}

//是猪食,取走,释放锁给主人或其他猪去抢
i = num;
num = 0;
pthread_mutex_unlock(&mut_num);

//开吃
mark = 1;
for(j = 2; j < i/2; j++)
{
if(i % j == 0){
mark = 0;
break;
}
}

if(mark == 1){
printf("[%d] %d is a primer\n", (int)p, i);
}
}

pthread_exit(NULL);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [15:10:33]
$ gcc -Wall primer0_pool.c -o primer0_pool -lpthread
primer0_pool.c: In function ‘main’:
primer0_pool.c:30:57: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
30 | err = pthread_create(tid + i, NULL, thr_primer, (void *)i);
| ^
primer0_pool.c: In function ‘thr_primer’:
primer0_pool.c:115:45: warning: cast from pointer to integer of different size [-Wpointer-to-int-cast]
115 | printf("[%d] %d is a primer\n", (int)p, i);
| ^

# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [15:17:54]
$ ./primer0_pool
[0] 30000001 is a primer
[1] 30000023 is a primer
[3] 30000041 is a primer
[2] 30000037 is a primer
[4] 30000049 is a primer
[0] 30000059 is a primer
[4] 30000083 is a primer
[3] 30000071 is a primer
[2] 30000109 is a primer
[1] 30000079 is a primer
[4] 30000137 is a primer
[3] 30000149 is a primer
[0] 30000133 is a primer
[4] 30000169 is a primer
[3] 30000193 is a primer
[2] 30000163 is a primer
[1] 30000167 is a primer
[0] 30000199 is a primer

# li @ evpower in ~/humble/tmp/lhq/parallel/thread/posix on git:master x [15:17:56]
$

pthread_once

1
2
3
4
5
6
7
//定义一个 控制执行一次 的变量
pthread_once_t once_control = PTHREAD_ONCE_INIT;

//使用 once_control 的变量判断对init_routine()进行调用
//动态单次初始化,它能保证 init_routine 函数仅被调用一次
//返回0永远不出错
int pthread_once(pthread_once_t *once_control, void (*init_routine) (void));

例:查询法令牌桶

把令牌桶写成库的方式提供给它人调用,需要解决并发调用的资源竞争问题。另外实现只在第一次使用时才加载模块module_load(),类似C++的构造函数。

在临界区需要调用一个函数,但是从程序的布局来看该函数内无法通过加锁来避免并发冲突/函数重入。根据 POSIX 标准的约定,这种函数的命名规则是必须以 _unlocked 作为后缀,所以大家在看到这样的函数时在调用之前一定要先加锁。例如下面代码中的 get_free_pos_unlocked()

查看代码

1
2
./parallel/thread/posix/mytbf_mt/
#define demo (mutex_only)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix/mytbf_mt on git:master x [19:17:09] C:2
$ tree
.
├── main.c
├── Makefile
├── mytbf.c
└── mytbf.h

0 directories, 4 files

# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix/mytbf_mt on git:master x [19:17:10]
$ make
cc -Wall -c -o main.o main.c
cc -Wall -c -o mytbf.o mytbf.c
gcc -Wall main.o mytbf.o -o mytbf -lpthread

# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix/mytbf_mt on git:master x [19:17:16]
$ ./mytbf /etc/services
# Network services, Internet style
#
# Note that i^C

# mi @ evpower in ~/humble/tmp/lhq/parallel/thread/posix/mytbf_mt on git:master x [19:17:29] C:130
$

上面的程序经过测试,发现 CPU 正在满负荷工作,说明程序中出现了忙等。就是 mytbf_fetchtoken() 函数获得锁的时候采用了忙等的方式。异步程序有两种处理方式,一种是通知法,一种是查询法。我们这里用的就是查询法,下面改成通知法实现。

条件变量(pthead_cond_t)

让线程以无竞争的形式等待某个条件的发生,当条件发生时通知等待的线程醒来去做某件事。另一种方式是把所有等待同一个条件的线程都唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//静态初始化,定义cond时用宏初始化,使用默认属性
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

//动态初始化,先定义未被初始化的cond,再用 pthread_cond_init 函数对cond初始化,并指定属性cond_attr
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);

//函数用于唤醒当前多个等待的线程中的任何一个
int pthread_cond_signal(pthread_cond_t *cond);

//惊群,将现在正在等待的线程全部唤醒
int pthread_cond_broadcast(pthread_cond_t *cond);

//在临界区外阻塞等待某一个条件发生变化,(死等)直到有一个通知到来打断它的等待,被唤醒后开始抢mutex锁
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

//增加了超时功能的等待,(尝试等)超时之后无论能否拿到锁都返回
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec
*abstime);

//销毁cond锁
int pthread_cond_destroy(pthread_cond_t *cond);

//全部函数成功返回0,失败返回errno

把查询法(忙等)修改为通知法(非忙等)仅仅加一个条件变量(pthread_cond_t) 就行了。

例:通知法实现令牌桶

查看代码

1
2
./parallel/thread/posix/mytbf_mt/
#define demo (with_cond)

通常cond_wait()等待会放在一个循环中,就像上面的令牌桶栗子一样,因为可能有多个线程都在等待条件满足,当前的线程’等待条件’被唤醒时不代表’执行条件’一定满足。可能先被唤醒的线程发现条件满足已经去工作了,等轮到当前线程调度的时候条件可能就又不满足了,所以如果条件不满足又通过while继续进入等待。

用 pthread_cond_signal(3) 还是用 pthread_cond_broadcast(3) 呢?

根据具体场景来选择。一般只有一个线程在等待或者明确知道哪个线程应该被唤醒的时候使用 _signal() 函数,如果有多个线程在等待并且不确定应该由谁起来工作的时候使用惊群。不确定是指业务上不能确定哪个线程应该工作,而不是你作为程序猿稀里糊涂的不知道哪个线程该工作。程序猿应该保证了解你的每一行代码在做什么,而不要写出一坨自己都不知道它在做什么的代码。

4.线程属性&同步属性

5.重入&线程与信号&线程与fork