
Something about sync


  1. 互斥锁
  2. 条件变量
  3. 读写锁
  4. 信号量
  5. 自旋锁
  6. 屏障
  7. 原子操作
  8. 各类IPC机制(包括信号、管道、FIFO、socket、消息队列、共享内存)


1. 互斥量



// 初始化互斥锁
int pthread_mutex_init (pthread_mutex_t * restrict mutex , \
                        const pthread_mutexattr_t * restrict attr ) ;
int pthread_mutex_destroy (pthread_mutex_t * mutex ) ;
// 加锁
int pthread_mutex_lock (pthread_mutex_t *mutex) ;
// 解锁
int pthread_mutex_unlock (pthread_mutex_t *mutex) ;
// 尝试加锁
int pthread_mutex_trylock (pthread_mutex_t *mutex) ;
// 带超时的尝试加锁,防止死锁的一种方式
int pthread_mutex_timedlock (pthread_mutex_t * restrict mutex , \
                            const struct timespec * restrict abstime ) ;

2. 自旋锁



  • 自旋锁:不断尝试,是否可以获得锁,只要操作系统调度到它,他就去尝试是否能获取锁。
  • 互斥锁:尝试一次没有获取到锁之后,就去休息了,然后等待外界(OS)告诉它可以获得锁,之后去加锁。


// 初始化自旋锁
int pthread_spin_init (pthread_spinlock_t *lock , int pshared) ;
// 销毁自旋锁
int pthread_spin_destroy ( pthread_spinlock_t * lock ) ;
// 加锁
int pthread_spin_lock ( pthread_spinlock_t *lock) ;
// 解锁
int pthread_spin_unlock ( pthread_spinlock_t *lock ) ;
// 尝试加锁
int pthread_spin_trylock ( pthread_spinlock_t * lock ) ;

3. 条件变量





// 初始化条件变量
int pthread_cond_init (pthread_cond_t * restrict cond , \
                       pthread_condattr_t * restrict attr ) ;
// 销毁条件变量
int pthread_cond_destroy ( pthread_cond_t * cond ) ;
// 等待事件发生
int pthread_cond_wait (pthread_cond_t * restrict cond , \
                       pthread_mutex_t * restrict mutex ) ;
// 带超时的等待,防止死锁的一种方式
int pthread_cond_timedwait (pthread_cond_t * restrict cond , \
                       pthread_mutex_t * restrict mutex , \
                       const struct timespec * restrict tsptr ) ;
// 向任意一个在等待的进程或线程通知锁可用
int pthread_cond_signal ( pthread_cond_t *cond ) ;
// 通知所有在等待的进程或者线程锁可用
int pthread_cond_broadcast ( pthread_cond_t *cond ) ;


#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <signal.h>
#include <abort.h>
#include <errno.h>

#define     PHILOSOPHER_COUNT   5   
#define     LEFT(p)     ((p+4)%PHILOSOPHER_COUNT)
#define     RIGHT(p)    ((p+1)%PHILOSOPHER_COUNT)

enum {
    EATING ,
} ;

// Every ph has a cond
typedef struct {
    int     tid ;
    pthread_cond_t cond ;
    int     status ;
} Philopher , *Philopher_p ;

// table is shared
pthread_mutex_t     table_lock ;
Philopher_p     ps;
void sig_handler (int signo) ;

void init_all_phil_status () {

    ps = (Philopher_p) malloc (sizeof(Philopher)* PHILOSOPHER_COUNT) ;
    if (NULL == ps ) {
        fprintf(stderr , "Malloc error .No free space !\n") ;
        abort () ;
    // Init global lock 
    pthread_mutexattr_t     lock_attr ;
    pthread_mutexattr_init (&lock_attr) ;
    pthread_mutexattr_settype ( &lock_attr , PTHREAD_MUTEX_ERRORCHECK ) ;

    pthread_mutex_init (&table_lock , &lock_attr) ;
    // Init every philosopher's status 
    for (int i = 0 ; i < PHILOSOPHER_COUNT ; i++ ) {
        ps[i].status = THINKING ;
        pthread_cond_init (&ps[i].cond , NULL) ;
    // Regist sighandler 
    struct sigaction    act , old_act ;

    act.sa_handler = sig_handler ;
    sigemptyset (&act.sa_mask) ;
    act.sa_flags = 0 ;

    sigaction (SIGINT , &act , &old_act ) ;
    sigaction (SIGQUIT , &act , &old_act ) ;

    printf ("Finish init !\n") ;

void philosopher_thinking (int index) {
    // lock table ;
    pthread_mutex_lock(&table_lock) ;
    ps[index].status = THINKING ;
    // judge left and right status 
    if (ps[LEFT(index)].status==THINKING) {
        pthread_cond_signal (&ps[index].cond) ;
    if ( ps[RIGHT(index)].status == THINKING) {
        pthread_cond_signal(&ps[index].cond) ;
    pthread_mutex_unlock (&table_lock) ;
    sleep(1) ;

void philosopher_eating (int index) {
    pthread_mutex_lock (&table_lock ) ; 
    // judge left and right status 
    if ((ps[LEFT(index)].status == THINKING) && (ps[RIGHT(index)].status == THINKING) ) {
        ps[index].status = EATING ;
        printf ("num %d philosopher is eating now\n" , ps[index].tid) ;
    pthread_mutex_unlock (&table_lock) ;
    sleep(1) ;
void * th_func (void * argu) {
    long i = (long) argu ;  
    int index = (int)i ;
    printf ("tid = %d\n" , index ) ;
    ps[index].tid = index ;
    while (1) {
        philosopher_thinking (index) ;
        philosopher_eating (index) ;

void destory_all_phol_status () {
    free(ps) ;
    printf ("Finish clear!\n") ;

void sig_handler (int signo) {
    if ( SIGQUIT == signo ) {
        printf ("Recv a SIGTERM signal , clear global var now ~\n") ;
    } else if ( SIGTERM == signo ) {
        printf ("Recv a SIGTERM signal , clear global var now ~\n") ;

    destory_all_phol_status () ;
    exit (0) ;

int main() {
    atexit(destory_all_phol_status) ;
    pthread_t   tid ;

    pthread_attr_t thread_attr ;
    pthread_attr_init (&thread_attr) ;
    pthread_attr_setdetachstate (&thread_attr, PTHREAD_CREATE_DETACHED) ;

    init_all_phil_status() ;

    for (int i = 0 ; i < 5 ; i++) {
        pthread_create (&tid , &thread_attr , th_func , (void*)i) ;
    pthread_attr_destroy (&thread_attr) ;

    sleep (100) ;

    return 0 ;

4. 读写锁



// 初始化读写锁
int pthread_rwlock_init ( pthread_rwlock_t * restrict rwlock , \
                          const pthread_rwlockattr_t * restrict attr ) ;
// 销毁读写锁
int pthread_rwlock_destroy (pthread_rwlock_t * rwlock ) ;
// 加读锁
int pthread_rwlock_rdlock ( pthread_rwlock_t  * rwlock ) ;
// 加写锁
int pthread_rwlock_wrlock ( pthread_rwlock_t * rwlock ) ;
// 解锁
int pthread_rwlock_unlock ( pthread_rwlock_t * rwlock ) ;
// 尝试加读锁
int pthread_rwlock_tryrdlock ( pthread_rwlock_t * rwlock ) ;
// 尝试加写锁
int pthread_rwlock_trywrlock ( pthread_rwlock_t * rwlock ) ;
// 带有超时的读写锁,避免死锁的一种方式
int pthread_rwlock_timedrdlock ( pthread_rwlock_t * restrict rwlock ,\
                                 const struct timespec * restrict tsptr ) ;
int pthread_rwlock_timedwrlock ( pthread_rwlock_t * restrict rwlock , \
                                 const struct timespec * restrict tsptr ) ;


5. 屏障



/// 多线程排序 ,同步使用屏障
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <strings.h>
#include <time.h>
#include <errno.h>
#include <sys/types.h>
#include <limits.h>
#include <sys/time.h>

#define     THREAD_COUNT            4
#define     NUM_COUNT               8000000L

typedef     struct  data {
    long    start ;
    int     id ;
} threadData , *threadDataP;

long        *num ;
long        *savedNum ;

// 声明屏障
pthread_barrier_t   barrier ;

// 快排
void    start_qsort ( long  num[] , long start , long end ) {
    long    i = start , j = end ;
    long    base = num[start] ;
    if ( i < j ) {
        while (i < j ) {
            while ( i < j && base <= num[j] ) {
                j -- ;
            if ( i < j ) {
                num[i] = num[j] ;
            while ( i < j && base > num[i] ) {
                i ++ ;
            if ( i < j ) {
                num[j] = num[i] ;
        num[i] = base ;
        // 递归左
        start_qsort ( num , start , i-1 ) ;
        // 递归右
        start_qsort (num , i+1 , end ) ;

void * thread_sort_part ( void * start_size ) {
    // 分离出去线程
    threadDataP     tdp = (threadDataP) start_size ;

    printf ("thread %d ready to sort !\n",tdp->id) ;
    // 开始快排部分

    start_qsort (num , tdp->start , tdp->start + PRE_THREAD_NUM_COUNT -1) ;

    printf ("thread %d finished sort !\n" , tdp->id ) ;
    pthread_barrier_wait (&barrier) ;

    // Quit
    pthread_exit ((void *)0) ;

void    merge () {
    // 记录每个块的开始位置
    long    threadBeginIndex [THREAD_COUNT] ;
    // minidx 保存被选中的块(最小的那个)
    // numIndex 保存数组中的数的索引
    // tempNum 用来暂存找到的最小值
    long    i , minidx , numIndex , tempNum ;
// *******************************************************************************************
    // 确保遍历遍历数组中所有的数
    for ( i = 0 ; i < THREAD_COUNT ; ++i ) {
        threadBeginIndex[i] = i * PRE_THREAD_NUM_COUNT ;

    for ( numIndex = 0 ; numIndex < NUM_COUNT ; ++numIndex ) {
        tempNum = LONG_MAX ;
        // 这层循环的意思是找到所有的块中最小的那个块,把最小块的第一个元素放到最终的数组中去。然后让那个块的开头++
        for ( i = 0 ; i < THREAD_COUNT ; i ++ ) {
            if ( (threadBeginIndex[i] < (i+1)*PRE_THREAD_NUM_COUNT) && (num[threadBeginIndex[i]] < tempNum) ) {
                tempNum = num[threadBeginIndex[i]] ;
                minidx = i ;
        savedNum[numIndex] = num[threadBeginIndex[minidx]] ;
        threadBeginIndex[minidx]++ ;

    printf ("Finished!\n") ;

int main () {

    unsigned    long        i ;
    struct      timeval     start , end ;
    long        long        startusec , endusec ;
    double                  elapsed ;
    // errno
    int                     err ;
    pthread_t               tid ;
    pthread_attr_t          thread_attr ;
    // 线程数据
    threadData              td[THREAD_COUNT] ;

    if ( NULL == (num = (long*) malloc (8*NUM_COUNT)) ) {
        fprintf (stderr , "No free memory . Serious error !\n") ;
        exit (1) ;
    if ( NULL == (savedNum = (long*) malloc (8*NUM_COUNT) )) {
        fprintf (stderr , "No free memory . Serious error !\n") ;
        exit (1) ;

    srandom (1) ;

    for ( i = 0 ; i < NUM_COUNT ; ++i ) {
        num[i] = random ()%10000000L ;

    gettimeofday (&start , NULL ) ;

    // 初始化屏障
    pthread_barrier_init (&barrier , NULL , THREAD_COUNT+1) ;
    // 初始化线程属性
    pthread_attr_init ( &thread_attr ) ;
    pthread_attr_setdetachstate ( &thread_attr , PTHREAD_CREATE_DETACHED ) ;

    for ( i = 0 ; i <  THREAD_COUNT ; ++i ) {
        td[i].id = i ;
        td[i].start = i * PRE_THREAD_NUM_COUNT ;
        err = pthread_create (&tid , NULL , thread_sort_part , (void*)(&td[i])) ;

        if ( 0 != err ) {
            fprintf (stderr , "pthread_create error , serious error \n") ;
            // 严重错误可以使用   long_jmp 跳转到指定清除程序或者函数处。 或者调一个统一处理的函数去。
            exit (1) ;

    // 主线程创建完协同线程后阻塞等待其排序完成。
    pthread_barrier_wait (&barrier) ;

    printf ("Begin merge !\n") ;
    merge () ;
    // 调用merge 合并所有分块
    gettimeofday (&end , NULL) ;
    // 计算排序使用的时间
    startusec = start.tv_sec * 1000000 + start.tv_usec ;
    endusec = end.tv_sec * 1000000 + end.tv_usec ;
    elapsed = (double) (endusec - startusec ) /1000000.0 ;
    printf ("sort tookk %.4f seconds\n" , elapsed) ;

    // 清理操作
    free ( num ) ;
    free ( savedNum ) ;

    pthread_barrier_destroy (&barrier) ;
    pthread_attr_destroy ( &thread_attr ) ;

    sleep (1) ;

    return 0 ;

5. 信号量

在生产者消费者模型中,对任务数量的记录就可以使用信号量来做。可以理解为带计数的条件变量。当信号量的值小于0时,工作进程或者线程就会阻塞,等待物品到来。当生产者生产一个物品,会将信号量值加1操作。 这是会唤醒在信号量上阻塞的进程或者线程,它们去争抢物品。

POSIX 对应的API(匿名信号量)

// 初始化信号量
int sem_init ( sem_t * sem , int pshared , unsigned int value ) ;
// 销毁信号量
int sem_destroy ( sem_t * sem ) ;
// 信号量加1操作
int sem_post ( sem_t * sem ) ;
// 等待信号量值大于0 , 并将信号量减1操作
int sem_wait (sem_t * sem ) ;
// 尝试等待
int sem_trywait ( sem_t * sem ) ;



#ifndef  _TASK_STRUCT_H
#define  _TASK_STRUCT_H

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <assert.h>
#include <pthread.h>
#include <semaphore.h>

#define     BUFFER_SIZE     256

struct task_and_data {
    void (*task_type_one) (const char *) ;      //基本的打印工作
    double (*task_type_two) (const char *) ;    //计算传入的值
    char task_buffer[BUFFER_SIZE] ;         //任务缓冲区

    int task_type ;                 //任务类型标志
} ;

typedef struct task_queue {
    int thread_id ;                 //线程号
    struct task_and_data task ;             //任务结构
    struct task_queue *next ;           //指向下一个任务
} task_queue , *task_queue_p ;

typedef struct task_queue_head_rear {           //队列的头和尾指针
    task_queue_p  head ;
    task_queue_p  rear ;
    int task_count ;                //任务计数
} task_queue_pointer ;

void print_string (const char * str ) ;
double calculate (const char * str ) ;

#include "task_struct.h"

pthread_mutex_t task_queue_lock ;                   //任务队列互斥
sem_t sem ;                                 //申请信号量
task_queue_pointer task_queue_hr ;                  //申请任务队列头尾指针

void print_string (const char *str ) {
    printf ("%s" , str ) ;

double calculate (const char * str ) {
    double result  = 1 ;

    printf ("This expression is *** , now I will calculate it !\n") ;
    return result ;

void init_sem_task_queue () {
    pthread_mutex_init (&task_queue_lock , NULL) ;
    sem_init (&sem, 0 , 0 ) ;                   //初始化信号量
    task_queue_hr.rear = task_queue_hr.head = NULL ;        //初始化任务队列为空
    task_queue_hr.task_count = 0 ;                  //初始化任务数量为0

void * exec_th (void * data ) {
    int tid = data ;
    task_queue_p    task_temp ;
    struct task_and_data    task ;

    printf ("pthread %d waitting \n" , tid) ;

    while (1) {
        sem_wait (&sem) ;                       //等待信号到来
        pthread_mutex_lock (&task_queue_lock) ;
        printf ("Tread %d is running \n" , data);   
        ///当任务队列有任务时, 取出来, 执行
            if (task_queue_hr.head != task_queue_hr.rear) {
                task = (task_queue_hr.head)->task ;
                task_temp = task_queue_hr.head ;
        //  printf ("task pointer value is %p \n" , task_temp) ;
                task_queue_hr.head = (task_queue_hr.head)->next ;
                free (task_temp) ;
            } else {
                if (task_queue_hr.head != NULL ) {
                    task = (task_queue_hr.head)->task ;
                    free (task_queue_hr.head) ;
                    task_queue_hr.head = task_queue_hr.rear = NULL ;                
                } else {
                    printf ("任务队列已空!\n") ;
            pthread_mutex_unlock (&task_queue_lock) ;
            switch (task.task_type) {
                case 1 : (*(task.task_type_one))(task.task_buffer) ;  break;
                case 2 : break ;

void * task_th (void * data ) {
    int count = 100 ;
    task_queue_p temp_task_p ;                  //临时指向申请的任务
    task_queue_p temp_task_before_p ;               //指向先前一个节点

    while (count) {
        pthread_mutex_lock (&task_queue_lock) ;         //锁住队列
        temp_task_p = (task_queue_p)malloc (sizeof (task_queue)) ;
        (temp_task_p ->task).task_type_one = print_string ;
        sprintf (((temp_task_p ->task).task_buffer) , "%s%d%c","Hello this is a task " , count,'\n' ) ;
        (temp_task_p ->task).task_type = 1 ;            //表示执行打印函数
        temp_task_p->next = NULL ;  

        if ((task_queue_hr.task_count) == 0) {
            task_queue_hr.head = task_queue_hr.rear = temp_task_p ;         
            temp_task_before_p = temp_task_p ;
            (task_queue_hr.task_count) ++ ;
        } else {
            task_queue_hr.rear = temp_task_p ;
            temp_task_before_p ->next = temp_task_p ;
            temp_task_before_p = temp_task_p ;
            (task_queue_hr.task_count) ++ ;

        count -- ;                      //添加任务次数减1
        pthread_mutex_unlock (&task_queue_lock) ;
        sem_post (&sem) ;                   //添加信号量

    printf ("任务线程放置所有任务完毕\n") ;

    pthread_exit (NULL) ;

int main (int argc , char* argv[]) {
    pthread_t   exec_th1 , exec_th2 , exec_th3 , task_thread ;  

    init_sem_task_queue () ;                    //初始化信号量和互斥量

    pthread_create (&exec_th1 , NULL ,exec_th , (void*)1 ) ;
    pthread_detach (exec_th1) ;
    pthread_create (&exec_th2 , NULL ,exec_th ,(void*) 2 ) ;
    pthread_detach (exec_th2) ;
    pthread_create (&exec_th3 , NULL ,exec_th , (void*)3 ) ;
    pthread_detach (exec_th3) ;
    pthread_create (&task_thread , NULL , task_th , (void*)4 ) ;
    pthread_detach (task_thread) ;

    sleep (10000) ;

    return 0 ;



  • 信号(异步)
  • 文件
  • 管道
  • Socket
  • 共享内存
  • 消息队列(个人用的比较少)



static __inline__ void atomic_add(int i, atomic_t *v) ;
static __inline__ void atomic_sub(int i, atomic_t *v) ;
static __inline__ int atomic_sub_and_test(int i, atomic_t *v) ;
static __inline__ void atomic_inc(atomic_t *v) ;
static __inline__ void atomic_dec(atomic_t *v) ;
static __inline__ int atomic_dec_and_test(atomic_t *v) ;
... ...
void __sync_add_and_fetch((x),1)  
void __sync_sub_and_fetch((x),1)
void __sync_add_and_fetch((x),(y))  
void __sync_sub_and_fetch((x),(y))


#include <stdio.h>
#include <stdlib.h>

static  int count = 0 ;

int main() {

    __sync_fetch_and_add (&count , 1 ) ;
    __sync_fetch_and_sub (&count , 1) ;

    return  0;


00000000004004f6 <main>:
  4004f6:   55                      push   %rbp
  4004f7:   48 89 e5                mov    %rsp,%rbp
  4004fa:   f0 83 05 2e 0b 20 00    lock addl $0x1,0x200b2e(%rip)        # 601030 <__TMC_END__>
  400501:   01 
  400502:   f0 83 2d 26 0b 20 00    lock subl $0x1,0x200b26(%rip)        # 601030 <__TMC_END__>
  400509:   01 
  40050a:   b8 00 00 00 00          mov    $0x0,%eax
  40050f:   5d                      pop    %rbp
  400510:   c3                      retq   
  400511:   66 2e 0f 1f 84 00 00    nopw   %cs:0x0(%rax,%rax,1)
  400518:   00 00 00 
  40051b:   0f 1f 44 00 00          nopl   0x0(%rax,%rax,1)


