在内核中,可能出现多个进程(通过系统调用进入内核模式)访问同一个对象,进程和硬中断访问同一个对象,进程和软中断访问同一个对象,多个处理器访问同一个对象等现象,我们需要使用互斥技术,确保在给定的时刻内只有一个主体可以进入临界区访问对象.
如果临界区的执行时间比较长或者可能睡眠,可使用下列互斥技术:
(1)信号量;
(2)读写信号量.
(3)互斥锁.
(4)实时互斥锁.
如果临界区的执行时间很短,并且不会睡眠,那么使用上面的锁不太合适,因为进程切换的代价很高,可以使用下面这些互斥技术:
(1)原子变量.
(2)自旋锁.
(3)读写顺序锁,它是对自旋锁的改进,允许多个读者同时进入临界区.
(4)顺序锁,是对读写自旋锁的改进,读者不会阻塞写者.
进程还可以使用以下互斥技术:
(1)禁止内核抢占,防止被当前处理器上的其他进程抢占,实现和当前处理器上的其他进程互斥.
(2)禁止软中断,防止被当前处理器上的软中断抢占,实现和当前处理器上的软中断互斥.
(3)禁止硬中断,防止被当前处理器上的硬中断抢占,实现和当前处理器上的硬中断互斥.
在多处理器中,为了提高程序的性能,需要尽量减少处理器之间的互斥,使处理器可以最大限度地并行执行.可以通过以下方式避免使用锁的互斥技术:
(1)每处理器变量.
(2)每处理器计数器.
(3)内存屏障.
(4)读-复制更新.
(5)可睡眠RCU.
1 信号量
信号量允许多个进程同时进入临界区,在只允许一个进程进入临 界区时,把信号量的计数值设置为1,即二值信号量,这种信号量也称为互斥信号量.
struct semaphore {
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};
成员lock是自旋锁,用来保护信号量的其他成员.count是计数值,表示还可以允许多少个进程进入临界区.wait_list是等待进入临界区的进程列表.
1.1 信号量初始化
初始化静态信号量,指定名称和计数值,允许n个进程同时进入临界区.
#define __SEMAPHORE_INITIALIZER(name, n) \
{ \
.lock = __RAW_SPIN_LOCK_UNLOCKED((name).lock), \
.count = n, \
.wait_list = LIST_HEAD_INIT((name).wait_list), \
}
初始化一个互斥信号量
#define DEFINE_SEMAPHORE(name) \
struct semaphore name = __SEMAPHORE_INITIALIZER(name, 1)
在运行时动态初始化信号量的方法
static inline void sema_init(struct semaphore *sem, int val)
{
static struct lock_class_key __key;
*sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);
lockdep_init_map(&sem->lock.dep_map, "semaphore->lock", &__key, 0);
}
1.2 获取信号量
(1)获取信号量,如果计数值为0,进程深度睡眠.
void down(struct semaphore *sem);
(2)获取信号量,如果计数值为0,进程轻度睡眠.
int down_interruptible(struct semaphore *sem);
(3)获取信号量,如果计数值为0,进程中度睡眠.
int down_killable(struct semaphore *sem);
(4)获取信号量,如果计数值为0,进程不等待.
int down_trylock(struct semaphore *sem);
(5)获取信号量,指定等待时间.
int down_timeout(struct semaphore *sem, long timeout);
1.3 获取信号量
释放信号量如下:
void up(struct semaphore *sem);
2 读写信号量
读写信号量是对互斥信号量的改进,允许多个读者同时进入临界区,读者和写者互斥,写者和写者互斥,适合在以读为主的情况下使用.
struct rw_semaphore {
long count;
struct list_head wait_list;
raw_spinlock_t wait_lock;
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* spinner MCS lock */
/*
* Write owner. Used as a speculative check to see
* if the owner is running on the cpu.
*/
struct task_struct *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
2.1 读写信号量初始化
静态初始化:
#define DECLARE_RWSEM(name) \
struct rw_semaphore name = __RWSEM_INITIALIZER(name)
运行时动态初始化:
void __init_rwsem(struct rw_semaphore *sem, const char *name,
struct lock_class_key *key)
{
#ifdef CONFIG_DEBUG_LOCK_ALLOC
/*
* Make sure we are not reinitializing a held semaphore:
*/
debug_check_no_locks_freed((void *)sem, sizeof(*sem));
lockdep_init_map(&sem->dep_map, name, key, 0);
#endif
sem->count = 0;
raw_spin_lock_init(&sem->wait_lock);
INIT_LIST_HEAD(&sem->wait_list);
}
2.2 申请读锁
申请读锁,如果写者占有写锁或者正在等待写锁,那么进程深度睡眠.
void __sched down_read(struct rw_semaphore *sem);
申请读锁,不会等待.如果申请成功,返回1;否则返回0.
int down_read_trylock(struct rw_semaphore *sem);
2.3 释放读锁
void up_read(struct rw_semaphore *sem);
2.4 申请写锁
void __sched down_write(struct rw_semaphore *sem);
int down_write_trylock(struct rw_semaphore *sem);
2.5 释放写锁
void up_write(struct rw_semaphore *sem);
3 互斥锁
互斥锁只允许一个进程进入临界区,适合保护比较长的临界区,因为竞争互斥锁时今晨可能睡眠和再次唤醒,代价很高.
struct mutex {
/* 1: unlocked, 0: locked, negative: locked, possible waiters */
atomic_t count;
spinlock_t wait_lock;
struct list_head wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
struct task_struct *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
3.1 初始化
#define DEFINE_MUTEX(mutexname) \
struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)
# define mutex_init(mutex) \
do { \
static struct lock_class_key __key; \
\
__mutex_init((mutex), #mutex, &__key); \
} while (0)
3.2 申请互斥锁
void __sched mutex_lock(struct mutex *lock);
int __sched mutex_lock_interruptible(struct mutex *lock);
int __sched mutex_lock_killable(struct mutex *lock);
3.3 释放互斥锁
void __sched mutex_unlock(struct mutex *lock);
4 实时互斥锁
实时互斥锁是对互斥锁的改进,实现了优先级继承,解决了优先级反转问题.
优先级反转问题:假设进程1的优先级低,进城2的优先级高.进程1持有互斥锁,进程2申请了互斥锁,因为进程1已经占有了互斥锁,所以进程2必须睡眠等待,导致优先级高的进程2等待优先级低的进程1.如果进程3,优先级在进程1和进程2之间,情况更糟糕.假设进程1持有互斥锁,进程2正在等待.进程3开始运行,因为它的优先级比进程1高,所以它可以抢占进程1,导致进程1持有互斥锁的时间延长,进程2等待的时间延长.
优先级继承可以解决优先级反转问题.如果有低优先级的进程持有互斥锁,高优先级的进程申请互斥锁,那么把持有互斥锁的进程的优先级临时提升到申请互斥锁的进程的优先级.
如果需要使用实时互斥锁,编译内核时需要开启宏CONFIG_RT_MUTEXS.
定义如下:
struct rt_mutex {
raw_spinlock_t wait_lock;
struct rb_root waiters;
struct rb_node *waiters_leftmost;
struct task_struct *owner;
#ifdef CONFIG_DEBUG_RT_MUTEXES
int save_state;
const char *name, *file;
int line;
void *magic;
#endif
};
4.1 初始化
#define DEFINE_RT_MUTEX(mutexname) \
struct rt_mutex mutexname = __RT_MUTEX_INITIALIZER(mutexname)
# define rt_mutex_init(mutex) __rt_mutex_init(mutex, __func__)
4.2 申请实时互斥锁
void __sched rt_mutex_lock(struct rt_mutex *lock);
int __sched rt_mutex_lock_interruptible(struct rt_mutex *lock);
int rt_mutex_timed_lock(struct rt_mutex *lock, struct hrtimer_sleeper *timeout);
int __sched rt_mutex_trylock(struct rt_mutex *lock);
4.3 释放实时互斥锁
void __sched rt_mutex_unlock(struct rt_mutex *lock);
5 原子变量
原子变量用来实现对整数的互斥访问,通常用来实现计数器.
内核定义了3种原子变量.
(1)整数原子变量,数据类型是atomic_t;
typedef struct {
volatile int val;
} atomic_t;
(2)长整数原子变量,atomic_long_t.
(3)64位整数原子变量,atomic64_t.
5.1 初始化
atomic<name> = ATOMIC_INIT(i);
//在运行中动态初始化原子变量的方法:
atomic_set(v, i);
5.2 常用操作函数
//读取原子变量的v的值.
static inline int atomic_read(const atomic_t *v);
//把原子变量v的值加上i.
static inline void atomic_add(int i, atomic_t *v);
static inline void atomic_and(int i, atomic_t *v);
static inline void atomic_sub(int i, atomic_t *v);
static inline int atomic_sub_return##name(int i, atomic_t *v);
...
5.3 原子变量实现原理
原子变量需要各种处理器架构提供特殊的指令支持.ARM64处理器提供了以下指令:
(1)独占加载指令ldxr;
(2)独占存储指令stxr;
独占加载指令从内存加载32位或64位数据到寄存器中,把访问的物理地址标记为独占访问.
在非常大的系统中,处理器很多,竞争很激烈,使用独占加载指令和独占存储指令可能需要重试很多次才能成功,性能很差.ARMv8.1标准实现了大系统扩展(LSE),专门设计了原子指令,提供了原子加法指令stadd:首先从内存加载32位或64位数据到寄存器中,然后把寄存器加上指定值,把结果写回内存.
6 自旋锁
自旋锁用于处理器之间的互斥,适合保护很短的临界区,并且不允许在临界区睡眠.申请自旋锁的时候,如果自旋锁被其他处理器占有,本处理器自旋等待(也称为忙等待).
进程,软中断和中断都可以使用自旋锁.
目前内核的自旋锁是排队自旋锁(FIFO ticket spinlock),算法如下:
(1)锁拥有排队号和服务号,服务号是当前占有锁的进程的排队号.
(2)每个进程申请锁的时候,首先申请一个排队号,然后轮询锁的服务号是否等于自己的排队号,如果等于,表示自己占有锁,可以进入临界区,否则继续轮询.
(3)当进程释放锁时,把服务号加1,下一个进程看到服务号等于自己的服务号,退出自旋,进入临界区.
自旋锁的定义如下:
typedef struct spinlock {
union {
struct raw_spinlock rlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
struct {
u8 __padding[LOCK_PADSIZE];
struct lockdep_map dep_map;
};
#endif
};
} spinlock_t;
可以看到spinlock只是封装raw_spinlock;在未打上实时内核补丁前,它俩完全一样.在打上实时内核补丁后,那么spinlock使用实时互斥锁保护临界区,临界区内可以被抢占和睡眠,但raw_spinlock还是自旋锁.为了使代码可以兼容实时内核,最好坚持以下三个原则:
(1)尽可能使用spinlock.
(2)绝对不允许被抢占和睡眠的地方,使用raw_spinlock,否则使用spinlock.
(3)如果临界区足够小,使用raw_spinlock.
各种处理器架构需要自定义数据类型arch_spinlock_t,ARM64架构的定义如下:
typedef struct {
#ifdef __AARCH64EB__
u16 next;
u16 owner;
#else
u16 owner;
u16 next;
#endif
} __aligned(4) arch_spinlock_t;
其中成员next是排队号,成员owner是服务号.
6.1 初始化
#define DEFINE_SPINLOCK(x) spinlock_t x = __SPIN_LOCK_UNLOCKED(x)
#define spin_lock_init(_lock) \
do { \
spinlock_check(_lock); \
raw_spin_lock_init(&(_lock)->rlock); \
} while (0)
6.2 申请锁
static __always_inline void spin_lock(spinlock_t *lock);
static __always_inline void spin_lock_bh(spinlock_t *lock);
static __always_inline void spin_lock_irq(spinlock_t *lock);
static __always_inline int spin_trylock(spinlock_t *lock);
spin_lock_irqsave(lock, flags);
6.3 释放自旋锁
static __always_inline void spin_unlock(spinlock_t *lock);
static __always_inline void spin_unlock_bh(spinlock_t *lock);
static __always_inline void spin_unlock_irq(spinlock_t *lock);
static __always_inline void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
7 读写自旋锁
读写自旋锁是对自旋锁的改进,区分读者和写者,允许多个读者同时进入临界区,读者和写者互斥,写者与写着互斥.
如果读者占有读锁,写者申请写锁时自旋等待.如果写者占有写锁,读者申请读锁的时候自旋等待.
读写锁的定义如下:
typedef struct {
arch_rwlock_t raw_lock;
#ifdef CONFIG_GENERIC_LOCKBREAK
unsigned int break_lock;
#endif
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} rwlock_t;
各种处理器架构需要自定义数据类型arch_rwlock_t,ARM64架构的定义如下:
typedef struct {
volatile unsigned int lock;
} arch_rwlock_t;
7.1 初始化
#define DEFINE_RWLOCK(x) rwlock_t x = __RW_LOCK_UNLOCKED(x)
#define rwlock_init(lock) \
do { *(lock) = __RW_LOCK_UNLOCKED(lock); } while (0)
7.2 申请读锁
#define read_lock(lock) _raw_read_lock(lock);
#define read_lock_irq(lock) _raw_read_lock_irq(lock);
#define read_lock_bh(lock) _raw_read_lock_bh(lock);
#define read_trylock(lock) __cond_lock(lock, _raw_read_trylock(lock));
read_unlock_irqrestore(lock, flags);
7.3 申请写锁
#define write_lock(lock) _raw_write_lock(lock);
#define write_lock_irq(lock) _raw_write_lock_irq(lock);
#define write_lock_bh(lock) _raw_write_lock_bh(lock);
#define write_trylock(lock) __cond_lock(lock, _raw_write_trylock(lock));
write_lock_irqsave(lock, flags);
7.4 释放读锁
#define read_unlock(lock) _raw_read_unlock(lock);
#define read_unlock_bh(lock) _raw_read_unlock_bh(lock);
#define read_unlock_irq(lock) _raw_read_unlock_irq(lock);
read_unlock_irqrestore(lock, flags);
7.5 释放写锁
#define write_unlock(lock) _raw_write_unlock(lock);
#define write_unlock_bh(lock) _raw_write_unlock_bh(lock);
#define write_unlock_irq(lock) _raw_write_unlock_irq(lock);
write_unlock_irqrestore(lock, flags);
7.6 实现原理
读写自旋锁使用一个无符号32位整数作为计数值,写锁使用最高位,读锁使用其余31位,算法如下:
(1)申请写锁时,如果计数值为0,那么设置计数值的最高位,进入临界区;如果计数值不是0,说明写者占有写锁或者读者占有读锁,那么自旋等待.
(2)申请读锁时,如果计数值的最高位是0,那么把计数值加1,进入临界区;如果计数值的最高位不是0,那么说明写者占有写锁,那么自旋等待.
(3)释放写锁时,把计数值设置为0.
(4)释放读锁时,把计数值减1.
8 顺序锁
顺序锁区分读者和写者,和读写自旋锁相比,它的优点是不会出现写者饿死的情况.顺序锁有序列号,写者把序列号加1,如果读者检测到序列号有变化,发现写者修改了数据,将会重试,读者的代价比较高.
顺序锁支持两种类型的读者:
(1)顺序读者:不会阻塞写者,但是如果读者检测到序列号有变化,发现写者修改了数据,读者将会重试.
(2)持锁读者:退化成自旋锁.
如果使用顺序读者,那么互斥访问的资源不能是指针,因为写者可能使指针失效,读者访问失效的指针会出现致命错误.
顺序锁比读写自旋锁更加高效,但读写自旋锁适用于所有场合,而顺序所不能适用于所有场合,所以顺序锁不能完全替代读写自旋锁.
顺序锁完整定义:
typedef struct {
struct seqcount seqcount;
spinlock_t lock;
} seqlock_t;
8.1 初始化
#define DEFINE_SEQLOCK(x) \
seqlock_t x = __SEQLOCK_UNLOCKED(x)
#define seqlock_init(x) \
do { \
seqcount_init(&(x)->seqcount); \
spin_lock_init(&(x)->lock); \
} while (0)
8.2 申请读锁
顺序读者数据的方式如下:
seqlock_t seqlock;
unsigned int seq;
do{
seq = read_seqbegin(&seqlock);
}while(read_seqretry(&seqlock, seq));
首先调用函数read_seqbegin读取序列号,然后读数据,最后调用函数read_seqretry判断序列号是否发生了变化.如果序列号有变化,说明写者修改了数据,那么读者需要重试.
持锁读者读数据的方法如下:
seqlock_t seqlock;
read_seqlock_excl(&seqlock);
读数据
read_sequnlock_excl(&seqlock);
读者还可以根据情况灵活选择:如果没有写者在写数据,那么读者成为顺序读者;如果写者正在写数据,那么读者成为持锁读者.
seqlock_t seqlock;
unsigned int seq = 0;
do{
read_seqbegin_or_lock(&seqlock, &seq);
读数据;
}while(need_seqretry(&seqlock,seq));
done_seqretry(&seqlock, seq);
8.3 写者写数据
write_seqlock(&seqlock);
write_sequnlock(&seqlock);
函数write_seqlock有一些变体:
static inline void write_seqlock(seqlock_t *sl);
static inline void write_seqlock_bh(seqlock_t *sl);
static inline void write_seqlock_irq(seqlock_t *sl);
9 每处理器变量&每处理器计数器
在多处理器中,每处理器变量是为每个处理器生成一个变量的副本,每个处理器访问自己的副本,从而避免处理器之间的互斥和处理器缓存之间的同步,提高了程序的执行速度.
通常使用原子变量作为计数器,在多处理器系统中,如果处理器越多,那么计数器可能成为瓶颈:每次只能有一个处理器修改计数器,其他处理器必须等待.如果访问计数器很频繁,将会严重降低系统性能.
10 内存屏障
内存屏障(memory barrier)是一种保证内存访问顺序的方法.内核支持3种内存屏障:
(1)编译器屏障.
(2)处理器内存屏障.
(3)内存映射屏障.
10.1 编译器屏障
为了提高程序的执行速度,编译器优化代码,对于不存在数据依赖或控制依赖的汇编指令,重新排列它们的顺序,但是有时候优化产生的指令顺序不符合程序员的真实意图.程序员需要使用编译器屏障知道编译器.
编译器屏障是:
barrier();
它能组织编译器把屏障指令一侧的指令移动到另一侧.C语言的关键字"volatile"也可以阻止编译器对单个变量的优化.
10.2 处理器内存屏障
处理器内存屏障用来解决处理器之间的内存访问乱序问题和处理器访问外设的乱序问题.
内存屏障类型 | 强制性的内存屏障 | SMP内存屏障 |
---|---|---|
通用内存屏障 | mb() | smp_mb() |
写内存屏障 | wmb() | smp_wmb() |
读内存屏障 | rmb() | smp_rmb() |
数据依赖屏障 | read_barrier_depends() | smp_read_barrier_depends() |
10.3 内存映射屏障
内核为内存映射I/O写操作提供了一个特殊的屏障.
mmiowb();
它是wmb的一个变体.这个屏障已经被废弃.
10.4 隐含内存屏障
内核的有些函数隐含内存屏障:
(1)获取或释放函数.
获取函数包括如下:
a.获取锁的函数.锁包括自旋锁,读写自旋锁,互斥锁,信号量和读写信号量.
b.smp_load_acquire(p):加载获取.
c.smp_cond_load_acquire(ptr,cond_expr):带条件的加载获取.
释放函数包括如下:
a.释放锁的函数.
b.smp_store_release():存储释放;
(2)中断禁止函数.
禁止中断和开启中断的函数只充当编译器优化屏障.
10.5 ARM64处理器内存屏障
不同处理器提供的内存屏障指令不同,ARM64处理器提供以下3种内存屏障.
(1)指令同步屏障(ISB):指令是isb.指令同步屏障指令冲刷流水线,在屏障指令执行完毕后重新获取程序中屏障指令后面的所有指令,以便使用最新的内存管理单元配置检查权限和访问,保证以前执行的改变上下文的操作(包括缓存维护指令,页表缓存维护指令或修改系统控制寄存器)在屏障指令执行完的时候已经完成.
(2)数据内存屏障(DMB):指令是dmb.保证屏障前面的内存访问和屏障后面的内存访问的相对顺序.
(3)数据同步屏障(DSB):指令时dsb.保证屏障前的内存访问,缓存维护指令和页表缓存维护指令在屏障指令之前就已经完成,屏障后面的任何指令在屏障完整之后才能开始执行,是比数据内存屏障更强的屏障.
11 RCU
RCU的意思是读-复制更新.写者修改对象的过程是:首先复制生成一个副本,然后更新这个副本,最后使用新的对象替换旧的对象.在写者执行复制更新的时候,读者可以读数据.写者删除对象,必须等到所有访问被删除对象的读者访问结束,才能执行销毁操作.
RCU的优点是读者没有任何同步开销:不需要任何获取任何锁,不需要执行原子指令,不需要执行内存屏障.但是写者的同步开销比较大,写者需要延迟对象的释放,复制被修改的对象,写者之间必须使用锁互斥.
11.1 RCU使用方法
11.1.1 经典RCU
读者使用函数rcu_read_lock标记进入读端临界区,使用函数rcu_read_unlock标记退出读端临界区.读端临界区可以嵌套.
写者可以使用以下4个函数:
(1)synchronize_rcu: 等待宽限期结束,即所有读者退出读端临界区,然后写者执行下一步操作.这个函数可以睡眠.
(2)synchronize_rcu_expedited:等待宽限期结束.与synchronize_rcu的区别在于该函数会向其他处理器发送处理器间中断(IPI)请求,强制宽限期快速结束.
(3)call_rcu: 注册延后执行的回调函数,把回调函数添加到RCU回调函数链表中,立即返回,不会阻塞.
(4)rcu_barrier:等待使用call_rcu注册的所有回调函数执行完.这个函数可能睡眠.
11.1.2 不可抢占RCU
读者使用函数rcu_read_lock_sched标记进入读端临界区,使用函数rcu_read_unlock_sched标记退出读端临界区.读端临界区可以嵌套.在读端临界区应该使用宏rcu_dereference_sched(p)访问指针.
写者可以使用下面4个函数:
(1)synchronize_sched:等待宽限期结束,即所有读者退出读端临界区,然后写者执行下一步操作.这个函数可能睡眠.
(2)synchronize_sched_expedited:等待宽限期结束,和函数synchronize_sched的区别是:该函数会想其他处理器发送处理器间中断,强制宽限期快速结束.
(3)call_rcu_sched: 注册延后执行的回调函数,把回调函数添加到RCU回调函数链表中,立即返回,不会阻塞.
(4)使用函数rcu_barrier_sched等待使用call_rcu_sched注册的所有回调函数执行完.
11.1.3 加速版不可抢占RCU
加速版不可抢占RCU在软中断很多的情况下可以缩短宽限期.
读者使用函数rcu_read_lock_bh标记进入读端临界区,使用函数rcu_read_unlock_bh标记退出读端临界区.读端临界区可以嵌套.在读端临界区应该使用宏rcu_dereference_bh(p)访问指针.
写者可以使用下面4个函数:
(1)synchronize_rcu_bh:等待宽限期结束,即所有读者退出读端临界区,然后写者执行下一步操作.这个函数可能睡眠.
(2)synchronize_rcu_bh_expedited:等待宽限期结束,和函数synchronize_sched的区别是:该函数会想其他处理器发送处理器间中断,强制宽限期快速结束.
(3)call_rcu_bh: 注册延后执行的回调函数,把回调函数添加到RCU回调函数链表中,立即返回,不会阻塞.
(4)使用函数rcu_barrier_bh等待使用call_rcu_bh注册的所有回调函数执行完.
RCU最常见的使用场合是保护大多数时候读的双向链表.内核实现了链表操作的RCU版本.这些操作封装了内核屏障.
11.2 RCU实现原理
读端临界区:读者访问RCU保护的对象的代码区域.
静止状态:处理器的执行状态,处理器没有读RCU保护的对象,也可以理解为读者静止,没有访问临界区.
候选静止状态:读端临界区以外的所有点都是候选静止状态.
宽限期:等待所有处理器上的读者退出临界区的时间称为宽限期.如果所有处理器都至少经历了一个静止状态,那么当前宽限期结束,新的宽限期开始.当前宽限期结束的时候,写者在当前宽限期及以前注册的销毁对象的回调函数就可以安全地执行.
宽限期分为正常宽限期和加速宽限期.调用函数synchronize_rcu_expedited等待宽限期结束的时候,会向其他处理器发送处理器间中断(IPI)请求,强制产生静止状态,使宽限期快速结束,我们把强制快速结束的宽限期称为加速宽限期.
RCU需要使用位图记录哪些处理器经历了一个静止状态.在宽限期开始的时候,在位图中为所有处理器设置对应的位,如果一个处理器经历了一个静止状态,就把自己的位从位图中清楚,处理器修改位图的时候必须使用自旋锁保护.如果处理器很多,对自旋锁的竞争很激烈,会导致系统性能很差.
为了解决性能问题,基于树的分层RCU采用了类似淘汰赛的原理:把处理器分组,当一个处理器经历一个静止状态时,把自己的位从分组的位图中清楚,只需要竞争分组的自旋锁,当前分组的最后一个处理器经历一个静止状态时,代表分组从上一层分组的位图中清除位,竞争上一层的自旋锁.
11.3 RCU简单使用实例
rcu_read_lock/rcu_read_unlock:组成一个RCU读临界区.
rcu_derefence:用于获取被RCU保护的指针.读者线程为了访问RCU保护的共享数据,需要使用该接口函数创建一个新指针,并且指向RCU被保护的指针.
rcu_assign_pointer:通常用于写者线程.在写者线程完成新的数据的修改后,调用该接口函数可以让被RCU保护的指针指向新建的数据,用RCU术语讲就是发布了更新后的数据.
synchronize_rcu:同步等待所有现存的读访问完成.
call_rcu:注册一个回调函数,当所有现存的读访问完成后,调用这个回调函数销毁旧数据.
struct foo {
int a;
char b;
long c;
};
DEFINE_SPINLOCK(foo_mutex);
struct foo __rcu *gbl_foo;
void foo_update_a(int new_a)
{
struct foo *new_fp;
struct foo *old_fp;
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
spin_lock(&foo_mutex);
old_fp = rcu_dereference_protected(gbl_foo, lockdep_is_held(&foo_mutex));
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp);
spin_unlock(&foo_mutex);
call_rcu(&old_fp->rcu, foo_reclaim);
}
void foo_reclaim(struct rcu_head *rp)
{
struct foo *fp = container_of(rp, struct foo, rcu);
foo_cleanup(fp->a);
kfree(fp);
}
int foo_get_a(void)
{
int retval;
rcu_read_lock();
retval = rcu_dereference(gbl_foo)->a;
rcu_read_unlock();
return retval;
}
12 死锁检测
12.1 死锁
常见的死锁有以下4种情况:
(1)进程重复申请同一个锁,称为AA死锁.例如,重复申请同一个自旋锁;使用读写锁,第一次申请读锁,第二次申请写锁.
(2)进程申请自旋锁时没有禁止硬中断,进程获取自旋锁后,硬中断抢占,申请同一个自旋锁.隐蔽的AA死锁.
(3)两个进程都要获取锁L1和L2,进程1持有锁L1,再去获取锁L2,如果这个时候进程2持有锁L2并且尝试获取锁L1,那么进程1和进程2就会死锁,称为AB-BA死锁.
(4)在一个处理器上进程1持有锁L1,再去获取锁L2;在另一个处理器上进程2持有锁L2,硬中断抢占进程2以后获取锁L1.隐蔽的AB-BA死锁.
避免AB-BA死锁最简单的方法就是定义锁的申请顺序,以破坏死锁的环形等待.
12.2 lockdep
内核提供的lockdep用来发现内核的死锁风险.
死锁监测工具lockdep操作的基本对象是锁类,例如结构体里面的锁是一个锁类,结构体的每个实例里面的锁是锁类的一个实例化.lockdep跟踪每个锁类的自身状态,也跟踪各个锁类之间的依赖关系,通过一系列的验证规则,确保锁类状态和锁类之间的依赖总是正确的.另外,锁类一旦在初次使用时被注册,后续就会一直存在,它的所有具体实例都会关联到它.