简介
在多线程(进程)并发模型中,多个线程如果同时处理同一块数据的话,会引发竞态问题,以及随之而来的线程安全问题。而锁是解决线程安全的重要手段,其中主要包括原子性操作atomic,自旋锁spin_lock,信号量semaphore,互斥信号量mutex,读写锁rw_lock等等。
临界区
临界区是指一个访问共用资源(例如:共用设备或是共用存储器)的程序片段,而这些共用资源又无法同时被多个访问的特性。
当一个进程进入一段代码时被中断,而另一个进程刚好也进入相关代码片段操作了同一块数据导致数据的不正确,从而导致了进程工作不正确。最简单的方式是关中断,但这容易出问题,而且在多核操作系统中,关中断并不能阻止其他CPU上跑了同一代码片段,因此必须有些机制来保证只有一个进程进入临界区问题,我们称之为互斥。
原子操作
atomic是最简单的锁操作,可以保证最简单的操作,诸如计数器加1之类的,可以不被中断的执行代码,在SMP系统中需要特殊的锁机制来保证只在一个CPU上执行,在单CPU上通过关中断可以很好实现也很好理解atomic,但是SMP系统中的锁保证就会稍微理解起来比较复杂一点。这种特殊的锁指令比较偏硬件底层,就不细探究了。我们简单看一下atomic相关数据结构和对于int相关操作。(相关操作与具体的体系结构有关,这是因为不同的体系结构锁指令不一样)
typedef struct {
int counter;
} atomic_t;
// x86
static inline void atomic_inc(atomic_t *v)
{
asm_volatile(LOCK_PREFIX "incl %0"
: "+m" (v->counter));
}
#ifdef CONFIG_SMP
#define LOCK_PREFIX_HERE \
".pushsection .smp_locks,\"a\"\n" \
".balign 4\n" \
".long 671f - .\n" /* offset */ \
".popsection\n" \
"671:"
#define LOCK_PREFIX LOCK_PREFIX_HERE "\n\tlock; "
#else /* ! CONFIG_SMP */
#define LOCK_PREFIX_HERE ""
#define LOCK_PREFIX ""
#endif
static __always_inline int arch_atomic_read(const atomic_t *v)
{
/*
* Note for KASAN: we deliberately don't use READ_ONCE_NOCHECK() here,
* it's non-inlined function that increases binary size and stack usage.
*/
return READ_ONCE((v)->counter);
}
#define __READ_ONCE_SIZE \
({ \
switch (size) { \
case 1: *(__u8 *)res = *(volatile __u8 *)p; break; \
case 2: *(__u16 *)res = *(volatile __u16 *)p; break; \
case 4: *(__u32 *)res = *(volatile __u32 *)p; break; \
case 8: *(__u64 *)res = *(volatile __u64 *)p; break; \
default: \
barrier(); \
__builtin_memcpy((void *)res, (const void *)p, size); \
barrier(); \
} \
})
atomic的数据结构其实就是一个int 变量,读操作通过volatile读取(相关内容见内存屏障,由于只涉及到一个操作,不需要额外保证),而inc操作通过lock指令+inc指令联合实现。这样就可以实现对于
自旋锁
自旋锁用于保护执行很快的锁操作,等待锁的时间不会太长,所以不释放CPU,自循环忙等直到获取锁后继续执行临界区代码。被自旋锁保护的代码在获取锁期间必须得是完整执行的(不能被其他进程打断,类似于原子性的),如果被自旋锁保护的临界区代码执行到一半被打断,就会引起不正确状态。自旋锁会通过关抢占来保证不被打断;而且使用者也要保证被临界区代码不能主动睡眠(这样可能会被别人抢占,打破了原子性);自旋锁是不可重入的
相关数据结构和代码:(省略了debug相关难懂的代码)
typedef struct spinlock {
union {
struct raw_spinlock rlock;
... // debug related struct
};
} spinlock_t;
typedef struct raw_spinlock {
arch_spinlock_t raw_lock;
... // debug related struct
} raw_spinlock_t;
// arm
typedef struct {
union {
u32 slock;
struct __raw_tickets {
#ifdef __ARMEB__
u16 next;
u16 owner;
#else
u16 owner;
u16 next;
#endif
} tickets;
};
} arch_spinlock_t;
spin_lock => raw_spin_lock => _raw_spin_lock => __raw_spin_lock => do_raw_spin_lock => arch_spin_lock
static inline void arch_spin_lock(arch_spinlock_t *lock)
{
unsigned long tmp;
u32 newval;
arch_spinlock_t lockval;
prefetchw(&lock->slock);
__asm__ __volatile__( // 这段汇编要做的事情就是原子性操作 localval = lock->tickets.next++
"1: ldrex %0, [%3]\n" // 标记lock->slock独占访问,lockval = lock->slock
" add %1, %0, %4\n" // newval = lockval + (1 << 16) => next ++
" strex %2, %1, [%3]\n" // lock->slock = newval; tmp=0 清除独占访问标记
" teq %2, #0\n" // tmp == 0
" bne 1b" // tmp != 0,strex操作失败,自旋重试
: "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
: "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
: "cc");
while (lockval.tickets.next != lockval.tickets.owner) { // 通过判断localval ==
lock->tickets.owner 来判断锁是否由当前CPU占有
wfe();
lockval.tickets.owner = READ_ONCE(lock->tickets.owner);
}
smp_mb();
}
static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
smp_mb();
lock->tickets.owner++; // 释放锁
dsb_sev();
}
arm的自旋锁逻辑比较简单,所以就以arm为例来说一下spinlock的实现,在arm的arch_spinlock_t中额外维护了own和next两个变量来判断当前锁获取者是谁,own记录了当前锁的占有者,next记录了当前CPU的排队序号。每次lock时将next自增,并记录自己占有的next,称之为localnext,unlock时own自增,如果own != localnext说明当前锁由其他CPU占有,等待其他CPU释放锁,直到own与localnext相等,则判断为当前CPU是锁的占有者。
信号量
实质上信号量只是受保护的特别变量,能够表示为正负数,初始化为1。为操作信号量定义了两个标准操作,up 和 down。在进程进入临界区时,执行down操作,该操作会将值减1,如果之前已经是负数则会将当前进程放入等待队列中,并将当前进程标记为UnInterruptable or Interruptable状态进入block状态,等待被唤醒重新调度;获取到信号量的进程在退出临界区时,执行up操作,该操作将值加1,并从等待队列中选取一个进程唤醒。
相关数据结构和代码如下:
struct semaphore {
raw_spinlock_t lock; // 自旋锁,用于更新count及wait_list
unsigned int count; // 信号量值
struct list_head wait_list; // 等待队列
};
void down(struct semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(sem->count > 0)) // 倾向于有足够的count可供使用
sem->count--;
else
__down(sem); // 需要block当前process
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
static noinline void __sched __down(struct semaphore *sem)
{
__down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
static inline int __sched __down_common(struct semaphore *sem, long state,
long timeout)
{
struct semaphore_waiter waiter;
list_add_tail(&waiter.list, &sem->wait_list); // 放入等待队列
waiter.task = current;
waiter.up = false;
for (;;) {
if (signal_pending_state(state, current)) // 被中断唤醒
goto interrupted;
if (unlikely(timeout <= 0))
goto timed_out;
__set_current_state(state); // TASK_UNINTERRUPTIBLE
raw_spin_unlock_irq(&sem->lock);
timeout = schedule_timeout(timeout); // 启动超时定时器,超时后被唤醒并通过调用schedule放弃CPU调度。(定时器相关目前还没怎么看)
raw_spin_lock_irq(&sem->lock);
if (waiter.up)
return 0;
}
timed_out:
list_del(&waiter.list);
return -ETIME;
interrupted:
list_del(&waiter.list);
return -EINTR;
}
signed long __sched schedule_timeout(signed long timeout)
{
struct process_timer timer;
unsigned long expire;
switch (timeout)
{
case MAX_SCHEDULE_TIMEOUT:
/*
* These two special cases are useful to be comfortable
* in the caller. Nothing more. We could take
* MAX_SCHEDULE_TIMEOUT from one of the negative value
* but I' d like to return a valid offset (>=0) to allow
* the caller to do everything it want with the retval.
*/
schedule();
goto out;
default:
/*
* Another bit of PARANOID. Note that the retval will be
* 0 since no piece of kernel is supposed to do a check
* for a negative retval of schedule_timeout() (since it
* should never happens anyway). You just have the printk()
* that will tell you if something is gone wrong and where.
*/
if (timeout < 0) {
printk(KERN_ERR "schedule_timeout: wrong timeout "
"value %lx\n", timeout);
dump_stack();
current->state = TASK_RUNNING;
goto out;
}
}
expire = timeout + jiffies;
timer.task = current;
timer_setup_on_stack(&timer.timer, process_timeout, 0); // 初始化定时器,到点后执行process_timeout唤醒当前task,
__mod_timer(&timer.timer, expire, 0);
schedule(); // 放弃cpu,调度next se,相关内容见核心调度器
del_singleshot_timer_sync(&timer.timer);
/* Remove the timer from the object tracker */
destroy_timer_on_stack(&timer.timer);
timeout = expire - jiffies;
out:
return timeout < 0 ? 0 : timeout;
}
static void process_timeout(struct timer_list *t)
{
struct process_timer *timeout = from_timer(timeout, t, timer);
wake_up_process(timeout->task); // 唤醒task
}
void up(struct semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(list_empty(&sem->wait_list))) // 更倾向于没有等待锁的
sem->count++;
else
__up(sem); // 唤醒task
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
static noinline void __sched __up(struct semaphore *sem)
{
struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
struct semaphore_waiter, list);
list_del(&waiter->list);
waiter->up = true; // 修改对应waiter状态
wake_up_process(waiter->task); // 唤醒对应task
}
如果对CPU调度有大致理解的话不难理解相关代码,其中主要关注的就是schedule函数,这个函数大致语义就是调度当前CPU下一个task,不会将当前task放入等待队列rq。
这里需要单独拿出来额外解析一下wake_up_process函数。后续我也会借助java中的锁来分析一下user space lock与内核是如何协同工作的。
int wake_up_process(struct task_struct *p)
{
return try_to_wake_up(p, TASK_NORMAL, 0);
}
static int
try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags)
{
unsigned long flags;
int cpu, success = 0;
/*
* If we are going to wake up a thread waiting for CONDITION we
* need to ensure that CONDITION=1 done by the caller can not be
* reordered with p->state check below. This pairs with mb() in
* set_current_state() the waiting thread does.
*/
raw_spin_lock_irqsave(&p->pi_lock, flags);
smp_mb__after_spinlock();
if (!(p->state & state))
goto out;
trace_sched_waking(p);
/* We're going to change ->state: */
success = 1;
cpu = task_cpu(p);
/*
* Ensure we load p->on_rq _after_ p->state, otherwise it would
* be possible to, falsely, observe p->on_rq == 0 and get stuck
* in smp_cond_load_acquire() below.
*
* sched_ttwu_pending() try_to_wake_up()
* STORE p->on_rq = 1 LOAD p->state
* UNLOCK rq->lock
*
* __schedule() (switch to task 'p')
* LOCK rq->lock smp_rmb();
* smp_mb__after_spinlock();
* UNLOCK rq->lock
*
* [task p]
* STORE p->state = UNINTERRUPTIBLE LOAD p->on_rq
*
* Pairs with the LOCK+smp_mb__after_spinlock() on rq->lock in
* __schedule(). See the comment for smp_mb__after_spinlock().
*/
smp_rmb();
if (p->on_rq && ttwu_remote(p, wake_flags))
goto stat;
#ifdef CONFIG_SMP
/*
* Ensure we load p->on_cpu _after_ p->on_rq, otherwise it would be
* possible to, falsely, observe p->on_cpu == 0.
*
* One must be running (->on_cpu == 1) in order to remove oneself
* from the runqueue.
*
* __schedule() (switch to task 'p') try_to_wake_up()
* STORE p->on_cpu = 1 LOAD p->on_rq
* UNLOCK rq->lock
*
* __schedule() (put 'p' to sleep)
* LOCK rq->lock smp_rmb();
* smp_mb__after_spinlock();
* STORE p->on_rq = 0 LOAD p->on_cpu
*
* Pairs with the LOCK+smp_mb__after_spinlock() on rq->lock in
* __schedule(). See the comment for smp_mb__after_spinlock().
*/
smp_rmb();
/*
* If the owning (remote) CPU is still in the middle of schedule() with
* this task as prev, wait until its done referencing the task.
*
* Pairs with the smp_store_release() in finish_task().
*
* This ensures that tasks getting woken will be fully ordered against
* their previous state and preserve Program Order.
*/
smp_cond_load_acquire(&p->on_cpu, !VAL);
p->sched_contributes_to_load = !!task_contributes_to_load(p);
p->state = TASK_WAKING;
if (p->in_iowait) {
delayacct_blkio_end(p);
atomic_dec(&task_rq(p)->nr_iowait);
}
cpu = select_task_rq(p, p->wake_cpu, SD_BALANCE_WAKE, wake_flags);
if (task_cpu(p) != cpu) {
wake_flags |= WF_MIGRATED;
psi_ttwu_dequeue(p);
set_task_cpu(p, cpu);
}
#else /* CONFIG_SMP */
if (p->in_iowait) {
delayacct_blkio_end(p);
atomic_dec(&task_rq(p)->nr_iowait);
}
#endif /* CONFIG_SMP */
ttwu_queue(p, cpu, wake_flags); // 将对应的task放入唤醒队列
stat:
ttwu_stat(p, cpu, wake_flags);
out:
raw_spin_unlock_irqrestore(&p->pi_lock, flags);
return success;
}
ttwu_queue是唤醒task的重要入口,相关调度关系如下:
互斥信号量 mutex
互斥信号量其实和信号量本质上其实没有什么区别,基本实现原理也大同小异,唯一的区别就是互斥信号量count的取值限制为1或者-1,从而可以做一些特殊的优化来提高性能,这也是他存在的理由。在早之前mutex是会切换上下文的,但是最新的版本都是自旋等待了。
mutex涉及的操作包括mutex_lock及mutex_unlock。
相关数据结构及代码如下:
struct mutex {
atomic_long_t owner; // 当前信号量被那个task占用
spinlock_t wait_lock; // 保护owner及wait_list
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
struct list_head wait_list;
};
mutex_lock => __mutex_lock_slowpath => __mutex_lock => __mutex_lock_common
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct mutex_waiter waiter;
bool first = false;
struct ww_mutex *ww;
int ret;
might_sleep(); // 提示该函数会sleep,不要在这个函数上使用自旋锁
...
preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
if (__mutex_trylock(lock) || // 再次尝试快速获取锁
mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) { // 尝试自旋的方式获取锁,以提高性能,失败的判断依据是:1. 被抢占过 2. 第一个等待的task不等于上面最后一个变量,这里是NULL,也就是之前没有task在等待
/* got the lock, yay! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_set_context_fastpath(ww, ww_ctx);
preempt_enable();
return 0;
}
spin_lock(&lock->wait_lock);
/*
* After waiting to acquire the wait_lock, try again.
*/
if (__mutex_trylock(lock)) {
if (use_ww_ctx && ww_ctx)
__ww_mutex_check_waiters(lock, ww_ctx);
goto skip_wait;
}
...
if (!use_ww_ctx) {
/* add waiting tasks to the end of the waitqueue (FIFO): */
__mutex_add_waiter(lock, &waiter, &lock->wait_list); // FIFO加入等待队列队尾
#ifdef CONFIG_DEBUG_MUTEXES
waiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif
} else {
/*
* Add in stamp order, waking up waiters that must kill
* themselves.
*/
ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx); // 加入等待队列队尾并做一些标记工作
if (ret)
goto err_early_kill;
waiter.ww_ctx = ww_ctx;
}
waiter.task = current;
set_current_state(state);
for (;;) {
/*
* Once we hold wait_lock, we're serialized against
* mutex_unlock() handing the lock off to us, do a trylock
* before testing the error conditions to make sure we pick up
* the handoff.
*/
if (__mutex_trylock(lock)) // 获取成功
goto acquired;
/*
* Check for signals and kill conditions while holding
* wait_lock. This ensures the lock cancellation is ordered
* against mutex_unlock() and wake-ups do not go missing.
*/
if (signal_pending_state(state, current)) { //被中断
ret = -EINTR;
goto err;
}
if (use_ww_ctx && ww_ctx) {
ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
if (ret) // 被
goto err;
}
spin_unlock(&lock->wait_lock);
schedule_preempt_disabled();
/*
* ww_mutex needs to always recheck its position since its waiter
* list is not FIFO ordered.
*/
if ((use_ww_ctx && ww_ctx) || !first) {
first = __mutex_waiter_is_first(lock, &waiter);
if (first)
__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
}
set_current_state(state);
/*
* Here we order against unlock; we must either see it change
* state back to RUNNING and fall through the next schedule(),
* or we must see its unlock and acquire.
*/
if (__mutex_trylock(lock) ||
(first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter))) //自旋等待
break;
spin_lock(&lock->wait_lock);
}
spin_lock(&lock->wait_lock);
acquired: // 获取成功,设置状态并清除相关队列信息
__set_current_state(TASK_RUNNING);
if (use_ww_ctx && ww_ctx) {
/*
* Wound-Wait; we stole the lock (!first_waiter), check the
* waiters as anyone might want to wound us.
*/
if (!ww_ctx->is_wait_die &&
!__mutex_waiter_is_first(lock, &waiter))
__ww_mutex_check_waiters(lock, ww_ctx);
}
mutex_remove_waiter(lock, &waiter, current);
if (likely(list_empty(&lock->wait_list)))
__mutex_clear_flag(lock, MUTEX_FLAGS);
debug_mutex_free_waiter(&waiter);
skip_wait: // 快速获取成功,未操作队列以及状态信息
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_lock_acquired(ww, ww_ctx);
spin_unlock(&lock->wait_lock);
preempt_enable();
return 0;
err: // 被中断之类的
__set_current_state(TASK_RUNNING);
mutex_remove_waiter(lock, &waiter, current);
err_early_kill: //被kill
spin_unlock(&lock->wait_lock);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, 1, ip);
preempt_enable();
return ret;
}
/*
* Release the lock, slowpath:
*/
static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip)
{
struct task_struct *next = NULL;
DEFINE_WAKE_Q(wake_q);
unsigned long owner;
mutex_release(&lock->dep_map, 1, ip);
/*
* Release the lock before (potentially) taking the spinlock such that
* other contenders can get on with things ASAP.
*
* Except when HANDOFF, in that case we must not clear the owner field,
* but instead set it to the top waiter.
*/
owner = atomic_long_read(&lock->owner); // 获取当前owner
for (;;) {
unsigned long old;
#ifdef CONFIG_DEBUG_MUTEXES
DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
DEBUG_LOCKS_WARN_ON(owner & MUTEX_FLAG_PICKUP);
#endif
if (owner & MUTEX_FLAG_HANDOFF)
break;
old = atomic_long_cmpxchg_release(&lock->owner, owner,
__owner_flags(owner)); // 释放锁
if (old == owner) {
if (owner & MUTEX_FLAG_WAITERS)
break;
return;
}
owner = old;
}
spin_lock(&lock->wait_lock);
debug_mutex_unlock(lock);
if (!list_empty(&lock->wait_list)) {
/* get the first entry from the wait-list: */
struct mutex_waiter *waiter =
list_first_entry(&lock->wait_list,
struct mutex_waiter, list);
next = waiter->task;
debug_mutex_wake_waiter(lock, waiter);
wake_q_add(&wake_q, next); // 加入唤醒队列
}
if (owner & MUTEX_FLAG_HANDOFF)
__mutex_handoff(lock, next); // 直接将owner转交给下一个task
spin_unlock(&lock->wait_lock);
wake_up_q(&wake_q); // 唤醒对应的task
}
mutex_lock先会尝试__mutex_trylock_fast,也就是当锁未被占用时,通过cas操作将owner置为当前task。__mutex_lock_slowpath则是相关的队列操作的入口函数,最终会委托给__mutex_lock_common执行相关操作。在我看的版本(5.0.8)已经是自旋等待信号量了,不过在2.6.37版本还是通过上下文切换等待,具体哪个版本变化的,我也没深究了。lock的操作就是通过原子性操作cas将owner置为当前task_struct地址,如果成功或者owner中的值已经被设为自己则表示拿锁成功,否则继续自旋等待。
mutex_unlock同mutex_lock一样也会有fast和slow两种方式,其负责将owner置空,如果有MUTEX_FLAG_HANDOFF flag,则直接将owner值设为下一个task,如果有task等待该信号量,则将其唤醒。
RCU
read-copy_update这个机制是一个通过空间换时间的同步机制,性能很好,但是使用有一些限制:
- 共享资源大部分时间都是只读的,写比较少。
- RCU保护的代码,内核也不能进入睡眠。
- 受保护的资源必须通过指针访问。
其原理很简单,读取是访问实际存储,而修改则是通过创建一个副本,在副本中修改相关内容,在所有读使用者结束旧副本的访问后将指针替换为新的副本指针。RCU机制主要用于内核list相关操作中。相关实现细节有点繁琐,而思想比较简单,就不仔细看了。
读写锁
上述的各个机制,没有很好的区分读写访问,一般来说有很多场景读请求是要远大于写的,而又有少数场景写请求远大于读,所以需要一种更加灵活的机制来提高锁的性能,读写锁就横空出世了。内核提供了额外的信号量和自旋锁版本,分别称之为读者/写者信号量和读者/写者自旋锁,其相关实现与体系结构有关,但基本大同小异思想相同,我们以arm32为例来阅读一下相关源码。
读写自旋锁
arm32体系结构读写自旋锁相关代码如下:
typedef struct {
u32 lock;
} arch_rwlock_t;
//类似于执行while(!atomic_cas(rw->lock, 0, 0x80000000));
static inline void arch_write_lock(arch_rwlock_t *rw)
{
unsigned long tmp;
prefetchw(&rw->lock);
__asm__ __volatile__(
"1: ldrex %0, [%1]\n" // 标记rw->lock 独占访问,tmp = rw->lock
" teq %0, #0\n" // 判断 tmp 是否等于0
WFE("ne") // tmp != 0,表示有读请求或者请求正在执行,进入WFE状态
" strexeq %0, %2, [%1]\n" // 如果rw->lock==0 这执行 rw->lock= 0x80000000,如果成功则让tmp=0
" teq %0, #0\n"
" bne 1b" // tmp != 0,则说明获取锁失败,自旋重试
: "=&r" (tmp)
: "r" (&rw->lock), "r" (0x80000000)
: "cc");
smp_mb();
}
static inline void arch_write_unlock(arch_rwlock_t *rw)
{
smp_mb();
__asm__ __volatile__(
"str %1, [%0]\n" // rw->lock = 0
:
: "r" (&rw->lock), "r" (0)
: "cc");
dsb_sev(); //唤醒WFE cpu
}
// 类似于执行while(atomic_load(rw->lock) + 1 < 0);
static inline void arch_read_lock(arch_rwlock_t *rw)
{
unsigned long tmp, tmp2;
prefetchw(&rw->lock);
__asm__ __volatile__(
"1: ldrex %0, [%2]\n" // tmp = rw->lock
" adds %0, %0, #1\n" // tmp++
" strexpl %1, %0, [%2]\n" // 如果tmp非负,则执行rw->lock = tmp
WFE("mi") // 如果tmp是负数则进入 wait for event 状态
" rsbpls %0, %1, #0\n" // 判断strexpl是否执行成功,也就是tmp非负
" bmi 1b" // strexpl 执行不成功,自旋重试
: "=&r" (tmp), "=&r" (tmp2)
: "r" (&rw->lock)
: "cc");
smp_mb();
}
// 类似于执行while(!atomic_dec(rw->lock));
static inline void arch_read_unlock(arch_rwlock_t *rw)
{
unsigned long tmp, tmp2;
smp_mb();
prefetchw(&rw->lock);
__asm__ __volatile__(
"1: ldrex %0, [%2]\n" // tmp = rw->lock,标记独占访问rw->lock,
" sub %0, %0, #1\n" // tmp --;
" strex %1, %0, [%2]\n" // 清除独占访问rw->lock;
" teq %1, #0\n"
" bne 1b" // tmp2 !=0, strex失败,自旋重试到1
: "=&r" (tmp), "=&r" (tmp2)
: "r" (&rw->lock)
: "cc");
if (tmp == 0)
dsb_sev(); // 说明是最后一个离开临界区的reader,调用sev唤醒WFE的cpu core
}
arm读写自旋锁的实现其实和自旋锁的实现,差距不大,其基本思想就是:
通过一个int 变量lock来记录锁状态:正数,说明只有读者,读锁不需要自旋等待;负数,说明有写访问,读锁需要进入WFE自旋状态。
- 对于read lock,让lock值原子性自增1,如果lock值为正数,直接返回,否则自旋等待。
- 对于read unlock, 让lock值原子性自减1,如果lock==0,则做一次WFE唤醒
- 对于write lock,如果执行cas(lock, 0, 0x80000000)成功,说明之前锁free,直接返回,否则说明有其他读者或者写者占用锁,进入WFE自旋状态。
- 对于write unlock,直接让lock=0 表示锁无占用。
可以看出这是一个不公平锁,写操作明显具有劣势,只要lock值不为0,则写操作就永远获取不到锁,即使一个写请求先于一个读请求到达,如果当时已有读者占用的话。那么我们基于一个公平锁来介绍一下读者/写者信号量(也有不公平的但实现跟上面差不多)。
读写信号量
与读写自旋锁不同的是,读写信号量,当获取不到锁时,需要将对应的task挂起等待唤醒,其也有公平和非公平两种实现模式,非公平的比较简单和上述读写自旋锁差不多,除了需要schedule切换上下文,所以下面只讲公平锁的实现。
我们直接来看代码:
source: /include/linux/rwsem-spinlock.h && /include/linux/rwsem-spinlock.c
struct rw_semaphore {
__s32 count; // 记录读者数
raw_spinlock_t wait_lock; // 保护count及wait_list的修改
struct list_head wait_list; // 堵塞队列
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
down_write => __down_write => ____down_write_common
/*
* get a write lock on the semaphore
*/
int __sched __down_write_common(struct rw_semaphore *sem, int state)
{
struct rwsem_waiter waiter;
unsigned long flags;
int ret = 0;
raw_spin_lock_irqsave(&sem->wait_lock, flags);
/* set up my own style of waitqueue */
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_WRITE;
list_add_tail(&waiter.list, &sem->wait_list); // 加入等待队列
/* wait for someone to release the lock */
for (;;) {
/*
* That is the key to support write lock stealing: allows the
* task already on CPU to get the lock soon rather than put
* itself into sleep and waiting for system woke it or someone
* else in the head of the wait list up.
*/
if (sem->count == 0) // 获取到锁
break;
if (signal_pending_state(state, current))
goto out_nolock;
set_current_state(state);
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
schedule(); // 挂起当前task,切换到下一个task
raw_spin_lock_irqsave(&sem->wait_lock, flags);
}
/* got the lock */
sem->count = -1;
list_del(&waiter.list);
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
return ret;
out_nolock:
list_del(&waiter.list);
if (!list_empty(&sem->wait_list) && sem->count >= 0)
__rwsem_do_wake(sem, 0);
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
return -EINTR;
}
up_write => __up_write
void __up_write(struct rw_semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->wait_lock, flags);
sem->count = 0;
if (!list_empty(&sem->wait_list))
sem = __rwsem_do_wake(sem, 1); // 从sem等待队列中获取一个task放入唤醒队列
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}
down_read => __down_read => __down_read_common
int __sched __down_read_common(struct rw_semaphore *sem, int state)
{
struct rwsem_waiter waiter;
unsigned long flags;
raw_spin_lock_irqsave(&sem->wait_lock, flags);
if (sem->count >= 0 && list_empty(&sem->wait_list)) { // 如果count大于0且sem->wait_list为空时才认为是可以获取读锁的
/* granted */
sem->count++;
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
goto out;
}
/* set up my own style of waitqueue */
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_READ;
get_task_struct(current);
list_add_tail(&waiter.list, &sem->wait_list); // 加入等待队列
/* wait to be given the lock */
for (;;) {
if (!waiter.task) // 获取到锁(只有写锁释放会调用到这块逻辑,而__rwsem_do_wake会把这个指针置空)
break;
if (signal_pending_state(state, current)) // 被中断
goto out_nolock;
set_current_state(state);
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
schedule();
raw_spin_lock_irqsave(&sem->wait_lock, flags);
}
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
out:
return 0;
out_nolock:
/*
* We didn't take the lock, so that there is a writer, which
* is owner or the first waiter of the sem. If it's a waiter,
* it will be woken by current owner. Not need to wake anybody.
*/
list_del(&waiter.list);
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
return -EINTR;
}
up_read => __up_read
/*
* release a read lock on the semaphore
*/
void __up_read(struct rw_semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->wait_lock, flags);
if (--sem->count == 0 && !list_empty(&sem->wait_list))
sem = __rwsem_wake_one_writer(sem); // 如果是最后一个占用读锁,则唤醒一个等待写的task
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}
通过上述代码发现,公平锁的实现主要区别就是增加了对wait_list的size判断处理逻辑,这也是内核实现的精妙之处。read获取锁时不仅仅需要count非负,而且wait_list必须得为空。如果wait_list非空则说明已有写操作等待在队列中了,后续的读操作得排队在写请求之后,在非公平锁中,这些读请求会优于写请求。write lock如果当前count不等于0则加入等待队列wait_list,并放弃CPU,等待唤醒,否则获取成功。write unlock则将count置为0,并唤醒等待队列中的第一个task执行。read lock如果count非负,且wait_list为空,则获取锁成功,count自增1返回,否则,放入就绪队列中,等待被唤醒。read unlock 将count自减1,如果count==0,则尝试唤醒一个等待写的task(如果存在的话)。
总结
至此内核锁相关的内容介绍就差不多了,但是用户空间的锁和内核空间的锁存在什么关系呢?比如java中的ReentrantLock,ReadWriteLock这些的实现和内核锁有没有什么关联呢?
用户空间锁相关
内核可以很好地通过schedule调度来进行相关task等待以及唤醒,但是用户空间并不能直接去处理调度相关内容,所以用户空间锁等待锁时是如何挂起线程的呢?我们带着这个问题来探讨一下相关内容。
对于pthread,大家应该不陌生,具体见链接,该标准定义了一大堆与thread相关的接口函数,包括线程,锁,优先级,信号等等各种资源。用户空间相关的锁就是通过这一套标准与内核交互的。
其中与锁相关的主要包括:
pthread_spin_lock,自旋锁
pthread_mutex_lock/unlock,互斥锁
pthread_rwlock_rdlock,读锁
pthread_rwlock_timedrdlock,读锁超时
pthread_rwlock_wrlock,写锁
pthread_rwlock_timedwrlock,写锁超时
pthread_rwlock_unlock,释放读写锁
pthread_cond_wait,条件变量等待
pthread_cond_signal,条件变量通知
pthread_cond_broadcast,条件变量通知所有等待thread
等等函数,在linux中,这些都实现在glibc库中,有兴趣的同学可以看一下:https://github.com/bminor/glibc
其中
pthread_spin_lock就是通过while循环执行cas操作
pthread_mutex_lock及pthread_rwlock实际是通过futex系统调用来执行的,具体见http://man7.org/linux/man-pages/man2/futex.2.html
,其基本思想就是想尝试在user space通过cas获取锁,如果成功就直接返回,如果失败则进入内核调用mutex_lock相关逻辑,而读写锁相关逻辑是自己实现的,其思想与内核相关实现差不多。
pthread_cond_wait/notify会结合信号机制及锁机制一起合作实现,通过维护pthread_cond_t中的thread队列来进行相关通信及等待,具体咋搞的,我也没看,有点复杂,相比内核代码而言,他这个简直是太难看懂了0.0。想看的可以去看一看pthread_cond_wait,pthread_cond_signal
java相关的锁便是基于pthread 通过JNI封装了一层调用,比如
Object wait notify notifyall 对应于 pthread_cond_wait pthread_cond_signal
LockSupport park 及 unpark 是 相对于 Object wait notify 的一种更加安全的实现机制,其底层也是通过 pthread_cond_wait pthread_cond_signal 实现。
Object 及 LockSupport c++实现都是基于os相关的os::PlatformEvent::park()及os::PlatformEvent::unpark(),而这两个函数在linux中是通过以上两个操作pthread_cond_wait 及pthread_cond_signal完成的
ReentrantLock 与 ReadWriteLock这对应于 pthread_mutex及pthread_rwlock相关操作。
相关内容可以看java 8 hotspot park