线程
在linux内核那一部分我们知道,线程其实就是一种特殊的进程,只是他们共享进程的文件和内存等资源,无论如何对于资源共享,就必须处理一致性的问题。
1.线程概念
一个典型的Unix程序可以看做只有一个线程:一个进程在某一时刻只能做一件事情。有了多线程控制以后,在程序设计时就可以把程序设计成在某一时刻能够做不止一件事情,每个线程都能独立处理各自独立的任务。
每个线程包含有表示执行环节所必须的信息,其中包括线程ID、一组寄存器值、栈、调度优先级和策略、信号屏蔽字、errno变量以及线程私有数据。
一个进程的所有信息对该线程的所有线程都是共享的,包括可执行程序的代码、程序的全局内存和堆内存、栈以及文件描述符。
2.线程标识
进程有进程的ID,而线程同样有线程ID。进程ID在系统中是唯一标识,但线程ID不同,线程ID只有在它所属的进程上下文中才有意义。
进程ID使用pid_t数据类型,而线程ID是用pthread_t数据类型标识,通常使用一个结构。下面函数用来比较两个线程ID。
#include <pthread.h>
int pthread_equal(pthread_t tid1, pthread_t tid2);
/* 返回值:相等返回非0,否则返回0*/
获取线程自身ID可以通过调用函数pthread_self。
#include <pthread.h>
pthread_t pthread_self(void);
/* 返回值:调用线程的线程ID */
3.线程创建
新增线程可以通过函数pthread_create函数创建。
#include <pthread.h>
int pthread_create(pthread_t *restrict tidp,
const pthread_attr_t *restrict attr,
void *(*start_rtn)(void *),
void *restrict arg);
/*返回值:成功返回0,否则返回错误编号*/
新创建的线程ID会设置为tidp
指向的内存单元;新创建的线程从start_rtn
函数的地址开始运行,从定义可以看到该函数没有输入参数,因此要传递参数,可以将所有参数放到一个结构中,然后把这个结构的地址作为arg
参数传入。
线程创建时并不能保证哪个线程先执行。新创建的线程可以访问进程的地址空间,并且继承调用线程的浮点环境和信号屏蔽字,但是该线程的挂起信号会被清除。
下面实例创建了一个线程,打印了进程ID、新线程ID以及初始化线程的线程ID:
#include "apue.h"
#include <pthread.h>
pthread_t ntid;
void
printids(const char *s)
{
pid_t pid;
pthread_t tid;
pid = getpid();
tid = pthread_self();
printf("%s pid %lu tid %lu (0x%lx)\n", s, (unsigned long)pid,
(unsigned long)tid, (unsigned long)tid);
}
void *
thr_fn(void *arg)
{
printids("new thread: ");
return((void *)0);
}
int
main(void)
{
int err;
err = pthread_create(&ntid, NULL, thr_fn, NULL);
if (err != 0)
err_exit(err, "can't create thread");
printids("main thread:");
sleep(1);
exit(0);
}
上面的程序主要注意两点:
- 主线程需要休眠,如果不休眠,可能主线程已经退出了,此时新建线程还没机会运行
- 新线程通过pthread_self获取自己的线程ID,而不是从共享内存中读取。虽然调用pthread_create可以通过第一个参数
tidp
指定新线程的ID,但是如果新线程在主线程调用pthread_create返回之前运行了,那么新线程看到的是未经初始化的ntid
的内容。
4.线程终结
进程中任意线程调用了exit、_Exit或者_exit,那么整个进程就会终止。与此类似,如果默认的动作是终止进程,那么,发送到线程的信号就会终止整个进程。
单个线程有3种方式退出,而不终止进程。
- 线程可以简单地从启动例程中返回,返回值是线程的退出码
- 线程可以被同一进程的其他线程取消
- 线程调用pthread_exit
#include <pthread.h>
void pthread_exit(void *rval_ptr);
rval_ptr
参数是一个无类型指针,与传给启动例程的单个参数类似。进程中的其他线程可以通过pthread_join函数访问到这个指针。
#include <pthread.h>
int pthread_join(pthread_t thread, void **rval_ptr);
/* 返回值:成功返回0,失败返回错误编号*/
调用线程将一直阻塞,直到指定的线程调用pthread_exit、从启动例程中返回或者被取消。如果线程简单地从启动例程返回,rval_ptr
就包含返回码。如果线程被取消,由rval_ptr
指定的内存单元就设置为PTHREAD_CANCELED。
可以通过pthread_join自动将线程置为分离状态,这样资源可以恢复。如果线程以及处于分离状态,pthread_join调用会失败,返回EINVAL。
如果对线程的返回值不感兴趣,则可以把rval_ptr
设置为NULL。这种情况下,调用pthread_join函数可以等待指定的线程终止,但并不获取线程的终止状态。
下面实例展示如何获取以及终止的线程的退出码。
#include "apue.h"
#include <pthread.h>
void *
thr_fn1(void *arg)
{
printf("thread 1 returning\n");
return((void *)1);
}
void *
thr_fn2(void *arg)
{
printf("thread 2 exiting\n");
pthread_exit((void *)2);
}
int
main(void)
{
int err;
pthread_t tid1, tid2;
void *tret;
err = pthread_create(&tid1, NULL, thr_fn1, NULL);
if (err != 0)
err_exit(err, "can't create thread 1");
err = pthread_create(&tid2, NULL, thr_fn2, NULL);
if (err != 0)
err_exit(err, "can't create thread 2");
err = pthread_join(tid1, &tret);
if (err != 0)
err_exit(err, "can't join with thread 1");
printf("thread 1 exit code %ld\n", (long)tret);
err = pthread_join(tid2, &tret);
if (err != 0)
err_exit(err, "can't join with thread 2");
printf("thread 2 exit code %ld\n", (long)tret);
exit(0);
}
运行结果为:
可以看到进程1,2分别return或者exit时设定了返回码,进程中其他线程可以通过调用pthread_join函数获得该线程的退出码。
需要注意的是pthread_create和pthread_exit函数的无类型指针参数可以传递的值不止一个,这个指针可以包含复杂信息的结构指针,同时注意,这个结构所使用的内存调用者在完成调用以后必须仍然有效(毕竟前面说了不能确定哪个线程先执行,如果调用线程先执行可能这个结构还没传递给操作的对象线程)。
线程可以通过pthread_cancel函数来请求取消同一进程中的其他线程。
#include<pthread_cancel>
int pthread_cancel(pthread_t tid);
/* 返回值:成功返回0,失败返回错误编号*/
在默认情况下,pthread_cancel函数会使得由tid
标识的线程的行为表现为如同调用了参数为PTHREAD_CANCELED的pthread_exit函数,但是线程可以选择忽略取消或控制如何取消(所以它只是请求取消,而不是控制线程终止)。
线程可以安排它退出时需要的函数,这样的函数称为线程处理程序(thread cleanup handler)。一个线程可以建立多个清楚处理程序,由于处理程序记录在栈中,因此他们的执行顺序和注册时相反:
#include <pthread.h>
void pthread_cleanup_push(void (*rtn)(void *), void *arg);
void pthread_cleanup_pop(int execute);
当线程执行以下动作时,清理函数rtn
是由pthread_cleanup_push函数调度的,调用时只有一个参数arg
:
- 调用pthread_exit时
- 响应取消请求时
- 用非0的
execute
参数调用pthread_cleanup_pop时
若execute
参数设置为0,清理函数将不被调用。上述任一情况下,pthread_cleanup_pop都将删除上次pthread_cleanup_push调用建立的清除处理程序。
下面通过一个实例来说明线程清理处理程序的使用。
#include "apue.h"
#include <pthread.h>
void
cleanup(void *arg)
{
printf("cleanup: %s\n", (char *)arg);
}
void *
thr_fn1(void *arg)
{
printf("thread 1 start\n");
pthread_cleanup_push(cleanup, "thread 1 first handler");
pthread_cleanup_push(cleanup, "thread 1 second handler");
printf("thread 1 push complete\n");
if (arg)
return((void *)1);
pthread_cleanup_pop(0);
pthread_cleanup_pop(0);
return((void *)1);
}
void *
thr_fn2(void *arg)
{
printf("thread 2 start\n");
pthread_cleanup_push(cleanup, "thread 2 first handler");
pthread_cleanup_push(cleanup, "thread 2 second handler");
printf("thread 2 push complete\n");
if (arg)
pthread_exit((void *)2);
pthread_cleanup_pop(0);
pthread_cleanup_pop(0);
pthread_exit((void *)2);
}
int
main(void)
{
int err;
pthread_t tid1, tid2;
void *tret;
err = pthread_create(&tid1, NULL, thr_fn1, (void *)1);
if (err != 0)
err_exit(err, "can't create thread 1");
err = pthread_create(&tid2, NULL, thr_fn2, (void *)1);
if (err != 0)
err_exit(err, "can't create thread 2");
err = pthread_join(tid1, &tret);
if (err != 0)
err_exit(err, "can't join with thread 1");
printf("thread 1 exit code %ld\n", (long)tret);
err = pthread_join(tid2, &tret);
if (err != 0)
err_exit(err, "can't join with thread 2");
printf("thread 2 exit code %ld\n", (long)tret);
exit(0);
}
我的电脑是Mac执行结果如下(Linux下不会出错):
Mac出错的原因是pthread_cleanup_pop和pthread_cleanup_push这两个函数是用宏实现的,而宏把某些上下文放在栈上。线程1在调用push和pop之间返回时,栈已经被改写,而Mac在调用清除处理程序时就用了这个改写的上下文。
在默认情况下,线程的终止状态会保存直到对线程调用pthread_join。如果线程已经被分离,则线程的底层存储资源可以在线程终止时立即被返回。在线程被分离后,我们不能用pthread_join函数等待它的终止状态,因为对分离状态的线程调用pthread_join会产生未定义行为。可以调用pthread_detach分离线程。
#include <pthread.h>
int pthread_detach(pthread_t tid);
/* 返回值:成功返回0,失败返回错误编号*/
关于分离状态的线程内容,在线程控制这篇文章有介绍。
5.线程同步
由于多个线程之间共享相同的内存,因此需要同步技术来保证每个线程看到相同的数据视图。
5.1互斥量
可以使用pthread的互斥接口来保护数据,确保同一时间只有一个线程访问数据。由Linux内核部分的知识可以知道互斥量本质上是一把锁,在访问共享资源前对互斥量进行加锁,在访问之后释放锁。如果释放互斥量时有一个以上的线程阻塞,那么所有该锁上的阻塞线程都会变成可运行状态,第一个变成运行的线程就可以对互斥量加锁,其他线程继续阻塞。因此每次只有一个线程可以执行。
互斥变量是用pthrea_mutex_t数据表示的。在使用互斥变量之前,必须对它进行初始化,可以把它设置为常量PTHREAD_MUTEX_INITIALIZER(只适用于静态分配),也可以通过调用pthread_mutex_init函数进行初始化。若动态分配互斥量(比如malloc分配),在释放内存前需要调用pthread_mutex_destroy。
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
/* 返回值:成功返回0,失败返回错误编号*/
要用默认的属性初始化互斥量只需要把attr
设为NULL。
对互斥量加锁需要调用pthread_mutex_lock。如果互斥量已经上锁,则调用用线程将阻塞直到互斥量被解锁。对互斥量解锁,需要调用pthread_mutex_unlock。
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
/* 返回值:成功返回0,失败返回错误编号*/
如果线程不希望被阻塞,它可以使用pthread_mutex_trylock尝试对互斥量进行加锁。如果调用pthread_mutex_trylock时互斥量处于未锁住状态,那么pthread_mutex_trylock将锁住互斥量,不会出现阻塞直接返回0,否则pthread_mutex_trylock就会失败,并返回EBUSY。
下面一个实例描述了前面所述的函数用法,线程引用结构体时首先获取结构体的锁,然后对引用值+1,减引用时,同样先获取线程锁,防止其他线程修改结构体的值,判定当前结构体的引用次数。
#include <stdlib.h>
#include <pthread.h>
struct foo {
int f_count;
pthread_mutex_t f_lock;
int f_id;
/* ... more stuff here ... */
};
struct foo *
foo_alloc(int id) /* allocate the object */
{
struct foo *fp;
if ((fp = malloc(sizeof(struct foo))) != NULL) {
fp->f_count = 1;
fp->f_id = id;
if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
free(fp);
return(NULL);
}
/* ... continue initialization ... */
}
return(fp);
}
void
foo_hold(struct foo *fp) /* add a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
void
foo_rele(struct foo *fp) /* release a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
if (--fp->f_count == 0) { /* last reference */
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
} else {
pthread_mutex_unlock(&fp->f_lock);
}
}
5.2避免死锁
如果线程试图对同一互斥量加锁两次,那么它自身就会陷入死锁状态。除此之外,如果两个线程相互请求对方占用的互斥量,同样会产生死锁,对于这种情况,通常需要设定程序的执行顺序,下面的程序实例就是实现该功能。
#include <stdlib.h>
#include <pthread.h>
#define NHASH 29
#define HASH(id) (((unsigned long)id)%NHASH)
struct foo *fh[NHASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
struct foo {
int f_count;
pthread_mutex_t f_lock;
int f_id;
struct foo *f_next; /* protected by hashlock */
/* ... more stuff here ... */
};
struct foo *
foo_alloc(int id) /* allocate the object */
{
struct foo *fp;
int idx;
if ((fp = malloc(sizeof(struct foo))) != NULL) {
fp->f_count = 1;
fp->f_id = id;
if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
free(fp);
return(NULL);
}
idx = HASH(id);
pthread_mutex_lock(&hashlock);
fp->f_next = fh[idx];
fh[idx] = fp;
pthread_mutex_lock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
/* ... continue initialization ... */
pthread_mutex_unlock(&fp->f_lock);
}
return(fp);
}
void
foo_hold(struct foo *fp) /* add a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
struct foo *
foo_find(int id) /* find an existing object */
{
struct foo *fp;
pthread_mutex_lock(&hashlock);
for (fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next) {
if (fp->f_id == id) {
foo_hold(fp);
break;
}
}
pthread_mutex_unlock(&hashlock);
return(fp);
}
void
foo_rele(struct foo *fp) /* release a reference to the object */
{
struct foo *tfp;
int idx;
pthread_mutex_lock(&fp->f_lock);
if (fp->f_count == 1) { /* last reference */
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_lock(&hashlock);
pthread_mutex_lock(&fp->f_lock);
/* need to recheck the condition */
if (fp->f_count != 1) {
fp->f_count--;
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
return;
}
/* remove from list */
idx = HASH(fp->f_id);
tfp = fh[idx];
if (tfp == fp) {
fh[idx] = fp->f_next;
} else {
while (tfp->f_next != fp)
tfp = tfp->f_next;
tfp->f_next = fp->f_next;
}
pthread_mutex_unlock(&hashlock);
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
} else {
fp->f_count--;
pthread_mutex_unlock(&fp->f_lock);
}
}
上面程序中使用了两个互斥量,通过hashlock
互斥量保护散列表fh
和f_next
。foo
结构中的f_lock
互斥量保护对foo
结构中的其他字段的访问。访问这两个互斥量时总是以相同的顺序加锁,避免死锁。在释放函数foo_rele
为了保证加锁的顺序,实现过程过于复杂,下面给出一个优化版本。
#include <stdlib.h>
#include <pthread.h>
#define NHASH 29
#define HASH(id) (((unsigned long)id)%NHASH)
struct foo *fh[NHASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
struct foo {
int f_count; /* protected by hashlock */
pthread_mutex_t f_lock;
int f_id;
struct foo *f_next; /* protected by hashlock */
/* ... more stuff here ... */
};
struct foo *
foo_alloc(int id) /* allocate the object */
{
struct foo *fp;
int idx;
if ((fp = malloc(sizeof(struct foo))) != NULL) {
fp->f_count = 1;
fp->f_id = id;
if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
free(fp);
return(NULL);
}
idx = HASH(id);
pthread_mutex_lock(&hashlock);
fp->f_next = fh[idx];
fh[idx] = fp;
pthread_mutex_lock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
/* ... continue initialization ... */
pthread_mutex_unlock(&fp->f_lock);
}
return(fp);
}
void
foo_hold(struct foo *fp) /* add a reference to the object */
{
pthread_mutex_lock(&hashlock);
fp->f_count++;
pthread_mutex_unlock(&hashlock);
}
struct foo *
foo_find(int id) /* find an existing object */
{
struct foo *fp;
pthread_mutex_lock(&hashlock);
for (fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next) {
if (fp->f_id == id) {
fp->f_count++;
break;
}
}
pthread_mutex_unlock(&hashlock);
return(fp);
}
void
foo_rele(struct foo *fp) /* release a reference to the object */
{
struct foo *tfp;
int idx;
pthread_mutex_lock(&hashlock);
if (--fp->f_count == 0) { /* last reference, remove from list */
idx = HASH(fp->f_id);
tfp = fh[idx];
if (tfp == fp) {
fh[idx] = fp->f_next;
} else {
while (tfp->f_next != fp)
tfp = tfp->f_next;
tfp->f_next = fp->f_next;
}
pthread_mutex_unlock(&hashlock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
} else {
pthread_mutex_unlock(&hashlock);
}
}
5.3函数pthread_mutex_timedlock
线程获取一个已经加锁的互斥量时,pthread_mutex_timedlock允许绑定线程设定最长阻塞时间。
#include <pthread.h>
#include <time.h>
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex,
const struct timespec *restrict tsptr);
/* 返回值:成功返回0,失败返回错误编号 */
这个超时时间用timespec
结构表示,它用秒和纳秒来描述时间。
5.4读写锁
读写锁可以有3种状态:读模式下加锁状态,写模式下加锁状态,不加锁状态。一次只有一个线程可以占有写模式的读写锁,但是多个线程可以同时占有读模式的读写锁。
读写锁是写加锁时,在这个锁被解锁之前,所有试图对这个锁加锁的线程都会被阻塞。当读写锁在读加锁状态时,所有试图以读模式对它进行加锁的线程都可以得到访问权,但是任何希望以写模式对此锁进行加锁的线程都会阻塞。当读写锁处于读模式锁住的状态时,如果有一个线程试图以写模式获取锁时,读写锁通常会阻塞随后的读模式锁请求。
与互斥量相比,读写锁在使用之前必须初始化,在释放他们底层的内存之前必须销毁。
#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
/* 返回值:成功返回0,失败返回错误编号 */
在读模式下加锁,写模式下加锁分别用到pthread_rwlock_rdlock和pthread_rwlock_wrlock。
#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
/* 返回值:成功返回0,失败返回错误编号*/
同样对于读写锁也有条件版本的函数
#include <pthread.h>
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
/* 返回值:成功返回0,失败返回错误编号 */
上面两个函数在可以获得锁时返回0,否则返回错误EBUSY。
下面一个实例用来描述读写锁的使用过程。
#include <stdlib.h>
#include <pthread.h>
struct job {
struct job *j_next;
struct job *j_prev;
pthread_t j_id; /* tells which thread handles this job */
/* ... more stuff here ... */
};
struct queue {
struct job *q_head;
struct job *q_tail;
pthread_rwlock_t q_lock;
};
/*
* Initialize a queue.
*/
int
queue_init(struct queue *qp)
{
int err;
qp->q_head = NULL;
qp->q_tail = NULL;
err = pthread_rwlock_init(&qp->q_lock, NULL);
if (err != 0)
return(err);
/* ... continue initialization ... */
return(0);
}
/*
* Insert a job at the head of the queue.
*/
void
job_insert(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
jp->j_next = qp->q_head;
jp->j_prev = NULL;
if (qp->q_head != NULL)
qp->q_head->j_prev = jp;
else
qp->q_tail = jp; /* list was empty */
qp->q_head = jp;
pthread_rwlock_unlock(&qp->q_lock);
}
/*
* Append a job on the tail of the queue.
*/
void
job_append(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
jp->j_next = NULL;
jp->j_prev = qp->q_tail;
if (qp->q_tail != NULL)
qp->q_tail->j_next = jp;
else
qp->q_head = jp; /* list was empty */
qp->q_tail = jp;
pthread_rwlock_unlock(&qp->q_lock);
}
/*
* Remove the given job from a queue.
*/
void
job_remove(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
if (jp == qp->q_head) {
qp->q_head = jp->j_next;
if (qp->q_tail == jp)
qp->q_tail = NULL;
else
jp->j_next->j_prev = jp->j_prev;
} else if (jp == qp->q_tail) {
qp->q_tail = jp->j_prev;
jp->j_prev->j_next = jp->j_next;
} else {
jp->j_prev->j_next = jp->j_next;
jp->j_next->j_prev = jp->j_prev;
}
pthread_rwlock_unlock(&qp->q_lock);
}
/*
* Find a job for the given thread ID.
*/
struct job *
job_find(struct queue *qp, pthread_t id)
{
struct job *jp;
if (pthread_rwlock_rdlock(&qp->q_lock) != 0)
return(NULL);
for (jp = qp->q_head; jp != NULL; jp = jp->j_next)
if (pthread_equal(jp->j_id, id))
break;
pthread_rwlock_unlock(&qp->q_lock);
return(jp);
}
这个例子中,无论是对队列增加工作或者三处工作都需写模式来锁住队列的读写锁。搜索队列时允许所有的工作线程并发地搜索队列。
5.5带超时的读写锁
与互斥量一样,读写锁也有一组了带超时的读写锁函数。
#include <pthread.h>
#include <time.h>
int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock,
const struct timespec *restrict tsptr);
int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock,
const struct timespec *restrict tsptr);
/* 返回值:成功返回0,失败返回错误编号 */
tsptr
同样指向timespec结构,指定线程应该停止阻塞的时间,如果超时并未获取到锁则返回ETIMEDOUT错误。
5.6条件变量
条件变量是线程可用的另一种同步机制,条件变量给多线程提供了一个会和的场所。条件变量和互斥量一起使用时,允许线程以无竞争的方式等待特定的条件发生(就是说多个线程等待某一时间发生,该事件一旦发生,这些等待的线程就可以继续工作了)。
条件由互斥量保护,线程在改变条件状态之前必须首先锁住互斥量。
在使用条件变量之前,必须对它初始化。由pthread_cond_t数据类型表示的条件变量可以用两种方式初始化:用常量PTHREAD_COND_INITIALIZER赋给静态分配的条件变量;用pthread_cond_init函数对动态分配的条件变量进行初始化。同样释放条件变量底层内存空间之前,用pthread_cond_destroy函数进行反初始化。
#include <pthread.h>
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t *cond);
/* 返回值:成功返回0,失败返回错误编号 */
若使用默认属性的条件变量则attr
参数可以设置为NULL。
我们可以使用pthread_cond_wait等待条件变量为真,如果在给定时间内条件变量不能满足,那么会生成一个返回错误码的变量。
#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex,
const struct timespec *restrict tsptr);
/* 返回值:成功返回0,失败返回错误编号 */
传递给pthread_cond_wait的互斥量对条件进行保护,调用者把锁住的互斥量传给函数,函数然后自动把调用线程放到等待条件的线程列表上,对互斥量解锁。这就关闭了条件检查和线程进入休眠状态等待条件改变着两个操作之间的时间通道,这样线程就不会错过条件的任何变化。pthread_cond_wait返回时,互斥量再次被锁住。
有两个函数用于通知线程条件已经满足。pthread_cond_signal函数至少能唤醒一个等待该条件的线程,而pthread_cond_broadcast函数则能唤醒等待该条件的所有线程。
#include <pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
/* 返回值:成功返回0,失败返回错误编号*/
下面实例给出如何结合条件变量和互斥量进行同步。
#include <pthread.h>
struct msg {
struct msg *m_next;
/* ... more stuff here ... */
};
struct msg *workq;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
void
process_msg(void)
{
struct msg *mp;
for (;;) {
pthread_mutex_lock(&qlock);
while (workq == NULL)
pthread_cond_wait(&qready, &qlock);
mp = workq;
workq = mp->m_next;
pthread_mutex_unlock(&qlock);
/* now process the message mp */
}
}
void
enqueue_msg(struct msg *mp)
{
pthread_mutex_lock(&qlock);
mp->m_next = workq;
workq = mp;
pthread_mutex_unlock(&qlock);
pthread_cond_signal(&qready);
}
上述程序中,条件是工作队列的状态,我们用互斥量保护条件,在while循环中判断条件。把消息放到工作队列时,需要占用互斥量,但在给等待线程发信号时,不需要占用互斥量。
5.7自旋锁
自旋锁与互斥量类似,但是它不是通过休眠使线程阻塞,而是在获取锁之前一直处于阻塞状态。因此自旋锁可以用于:锁被持有的时间短,而且线程并不希望在重新调度上花太多成本。
自旋锁的初始化和反初始化函数如下:
#include <pthread.h>
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);
/* 返回值:成功返回0,失败返回错误编号 */
自旋锁还有一个特殊的pshared
参数表示进程共享属性,如果该参数设置为PTHREAD_PROCESS_SHARED,则自旋锁可以被访问锁底层内存的线程获取,即使那些线程不属于同一个进程。
可以通过pthread_spin_lock和pthread_spin_trylock对自旋锁加锁,签字在获取锁之前一直自旋,后者如果不能获取锁就立即返回EBUSY错误。
#include <pthread.h>
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);
/* 成功返回0,失败返回错误编码*/
5.8屏障(barrier)
屏障同样用于多个线程并行工作的同步。屏障允许每个线程等待,知道所有的合作线程均到达某一点,然后从该点继续执行。
屏障初始化和反初始化函数如下:
#include <pthread.h>
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
const pthread_barrierattr_t *restrict attr,
unsigned int count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
/* 返回值:成功返回0,失败返回错误编码*/
初始化中,count
指定到达屏障的线程数,attr
指定屏障对象的属性。
可以使用pthread_barrier_wait函数表明线程已经完成工作,等待其他线程赶上来。
#include <pthread.h>
int pthread_barrier_wait(pthread_barrier_t *barrier);
/* 返回值:成功返回0或PTHREAD_BARRIER_SERIAL_THREAD,失败返回错误编号 */
调用pthread_barrier_wait的线程在屏障计数未满足条件时,会进入休眠状态。如果该线程是最后一个调用pthread_barrier_wait的线程,就满足屏障计数,所有的线程被唤醒。
下面通过实例给出在一个任务上合作的多个线程之间如何用屏障进行同步。
#include "apue.h"
#include <pthread.h>
#include <limits.h>
#include <sys/time.h>
#define NTHR 8 /* number of threads */
#define NUMNUM 8000000L /* number of numbers to sort */
#define TNUM (NUMNUM/NTHR) /* number to sort per thread */
long nums[NUMNUM];
long snums[NUMNUM];
pthread_barrier_t b;
#ifdef SOLARIS
#define heapsort qsort
#else
extern int heapsort(void *, size_t, size_t,
int (*)(const void *, const void *));
#endif
/*
* Compare two long integers (helper function for heapsort)
*/
int
complong(const void *arg1, const void *arg2)
{
long l1 = *(long *)arg1;
long l2 = *(long *)arg2;
if (l1 == l2)
return 0;
else if (l1 < l2)
return -1;
else
return 1;
}
/*
* Worker thread to sort a portion of the set of numbers.
*/
void *
thr_fn(void *arg)
{
long idx = (long)arg;
heapsort(&nums[idx], TNUM, sizeof(long), complong);
pthread_barrier_wait(&b);
/*
* Go off and perform more work ...
*/
return((void *)0);
}
/*
* Merge the results of the individual sorted ranges.
*/
void
merge()
{
long idx[NTHR];
long i, minidx, sidx, num;
for (i = 0; i < NTHR; i++)
idx[i] = i * TNUM;
for (sidx = 0; sidx < NUMNUM; sidx++) {
num = LONG_MAX;
for (i = 0; i < NTHR; i++) {
if ((idx[i] < (i+1)*TNUM) && (nums[idx[i]] < num)) {
num = nums[idx[i]];
minidx = i;
}
}
snums[sidx] = nums[idx[minidx]];
idx[minidx]++;
}
}
int
main()
{
unsigned long i;
struct timeval start, end;
long long startusec, endusec;
double elapsed;
int err;
pthread_t tid;
/*
* Create the initial set of numbers to sort.
*/
srandom(1);
for (i = 0; i < NUMNUM; i++)
nums[i] = random();
/*
* Create 8 threads to sort the numbers.
*/
gettimeofday(&start, NULL);
pthread_barrier_init(&b, NULL, NTHR+1);
for (i = 0; i < NTHR; i++) {
err = pthread_create(&tid, NULL, thr_fn, (void *)(i * TNUM));
if (err != 0)
err_exit(err, "can't create thread");
}
pthread_barrier_wait(&b);
merge();
gettimeofday(&end, NULL);
/*
* Print the sorted list.
*/
startusec = start.tv_sec * 1000000 + start.tv_usec;
endusec = end.tv_sec * 1000000 + end.tv_usec;
elapsed = (double)(endusec - startusec) / 1000000.0;
printf("sort took %.4f seconds\n", elapsed);
for (i = 0; i < NUMNUM; i++)
printf("%ld\n", snums[i]);
exit(0);
}
这个实例使用了8个并行线程和1个合并结果的线程,进行堆排序。