在学习dispatch_once原理过程中,发现了之前因为信号量引起的卡住主线程的问题所在。
所以,了解原理,绝对是提高自己的必备条件。
我们带着两个问题去看
1.单例为什么会造成死锁。
2.滥用单例为什么会导致内存不断增加。
如果对dispatch_once的基础原理还不了解,可以看上一篇文章。
带着问题,我们还是先看dispatch_once_f这个函数。
#include "internal.h"
#undef dispatch_once
#undef dispatch_once_f
struct _dispatch_once_waiter_s
{
volatile struct _dispatch_once_waiter_s *volatile dow_next;
_dispatch_thread_semaphore_t dow_sema;
};
#define DISPATCH_ONCE_DONE ((struct _dispatch_once_waiter_s *)~0l)
#ifdef __BLOCKS__
// 1.应用程序调用的入口
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
struct Block_basic *bb = (void *)block;
// 2. 内部逻辑
dispatch_once_f(val, block, (void *)bb->Block_invoke);
}
#endif
DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
struct _dispatch_once_waiter_s * volatile *vval =
(struct _dispatch_once_waiter_s**)val;
// 3. 类似于简单的哨兵位
struct _dispatch_once_waiter_s dow = { NULL, 0 };
// 4. 在Dispatch_Once的block执行期进入的dispatch_once_t更改请求的链表
struct _dispatch_once_waiter_s *tail, *tmp;
// 5.局部变量,用于在遍历链表过程中获取每一个在链表上的更改请求的信号量
_dispatch_thread_semaphore_t sema;
// 6. Compare and Swap(用于首次更改请求)
if (dispatch_atomic_cmpxchg(vval, NULL, &dow))
{
dispatch_atomic_acquire_barrier();
// 7.调用dispatch_once的block
_dispatch_client_callout(ctxt, func);
//在写入端,dispatch_once在执行了block之后,会调用dispatch_atomic_maximally_synchronizing_barrier()
//宏函数,在intel处理器上,这个函数编译出的是cpuid指令。
dispatch_atomic_maximally_synchronizing_barrier();
//dispatch_atomic_release_barrier(); // assumed contained in above
// 8. 更改请求成为DISPATCH_ONCE_DONE(原子性的操作)
tmp = dispatch_atomic_xchg(vval, DISPATCH_ONCE_DONE);
tail = &dow;
// 9. 发现还有更改请求,继续遍历
while (tail != tmp)
{
// 10. 如果这个时候tmp的next指针还没更新完毕,就等待一会,提示cpu减少额外处理,提升性能,节省电力。
while (!tmp->dow_next)
{
_dispatch_hardware_pause();
}
// 11. 取出当前的信号量,告诉等待者,这次更改请求完成了,轮到下一个了
sema = tmp->dow_sema;
tmp = (struct _dispatch_once_waiter_s*)tmp->dow_next;
_dispatch_thread_semaphore_signal(sema);
}
} else
{
// 12. 非首次请求,进入此逻辑块
dow.dow_sema = _dispatch_get_thread_semaphore();
// 13. 遍历每一个后续请求,如果状态已经是Done,直接进行下一个
// 同时该状态检测还用于避免在后续wait之前,信号量已经发出(signal)造成
// 的死锁
for (;;)
{
tmp = *vval;
if (tmp == DISPATCH_ONCE_DONE)
{
break;
}
dispatch_atomic_store_barrier();
// 14. 如果当前dispatch_once执行的block没有结束,那么就将这些
// 后续请求添加到链表当中
if (dispatch_atomic_cmpxchg(vval, tmp, &dow))
{
dow.dow_next = tmp;
_dispatch_thread_semaphore_wait(dow.dow_sema);
}
}
_dispatch_put_thread_semaphore(dow.dow_sema);
}
}
首先我们先来认识几个对象.
struct _dispatch_once_waiter_s
{
volatile struct _dispatch_once_waiter_s *volatile dow_next;
_dispatch_thread_semaphore_t dow_sema;
};
struct _dispatch_once_waiter_s dow = { NULL, 0 };
要对dow.dow_next有个印象,因为后面会用。
**1.dispatch_once_f(dispatch_once_t val, void ctxt, dispatch_function_t func)传入了三个参数ctxt是外部传入的block的指针,func是block里具体执行的函数。
2. dispatch_atomic_cmpxchg 是原子交换函数,dispatch_atomic_cmpxchg(vval, NULL, &dow)也就是吧vval的值赋值给&dow.
3. _dispatch_client_callout(ctxt, func);根据ctxt找到block,并执行block中的函数。
4. dispatch_atomic_maximally_synchronizing_barrier函数的作用,是可以让其他线程来读取到未初始化的对象,从而可以使这些线程进入dispatch_once_f的另外一个分支(else分支)进行等待。
5.tmp = dispatch_atomic_xchg(vval, DISPATCH_ONCE_DONE);使其为DISPATCH_ONCE_DONE,即“完成”。
6.然后比较 tmp和&dow的值,如果这两个相等,分支结束。
7.如果 tmp和&dow的值不相等,为什么会不相等呢。因为在block执行过程中,会有其他线程进入到本函数,我们可以看else后面的内容,会形成一个信号量链表,(vval指向值变为信号量链的头部,链表的尾部为&dow),在这时候,进入分支1的while循环中,因为我们前面,struct _dispatch_once_waiter_s dow = { NULL, 0 }; ,dow.dow_next为null,所以需要一直等待,等待temp.dow_next有值才可以进行后面的操作。然后分支1就会进行等待分支2的进行,只有当分支2的dow_dow_next = tmp被执行了,才可以继续往后面执行。
while (!tmp->dow_next)
{
_dispatch_hardware_pause();
}
8.我们仔细看下分支2的操作。
创建了一个信号量,并把值赋值给dow.dow_sema.
dow.dow_sema = _dispatch_get_thread_semaphore();
然后进入了一个for循环中,如果vval的值已经为DISPATCH_ONCE_DONE,则直接break。
如果vval的值不为DISPATCH_ONCE_DONE,则把vval赋值给&dow.此时val.dow_next还是为null,把dow.dow_next = tmp来增加链表的节点,解决了分支1中while进行等待的问题。
if (dispatch_atomic_cmpxchg(vval, tmp, &dow))
{
dow.dow_next = tmp;
_dispatch_thread_semaphore_wait(dow.dow_sema);
}
然后等待在信号量上,当block执行分支1完成并遍历链表来signal时,唤醒、释放信号量,然后一切就完成了。
分支1的while循环,需要等待分支2的 dow.dow_next = tmp;赋值,然后,分支2的 _dispatch_thread_semaphore_wait(dow.dow_sema);需要等待分支1的_dispatch_thread_semaphore_signal(sema);。
总结下上面的问题。
dispatch_once实际上内部会构建一个俩表来维护,如果在block完成之前,有其它的调用者进来,则会把这些调用者放到一个waiter链表中。
waiter链表中的每个调用者会等待一个信号量(dow.dow_sema)。在block执行完了后,除了将onceToken置为DISPATCH_ONCE_DONE外,还会去遍历waiter链中的所有waiter,抛出相应的信号量,以告知waiter们调用已经结束了
上面的两个问题。
死锁如何形成?
两个类相互调用其单例方法时,调用者TestA作为一个waiter,在等待TestB中的block完成,而TestB中block的完成依赖于TestA中单例函数的block的执行完成,而TestA中的block想要完成还需要TestB中的block完成……两个人都在相互等待对方的完成,这就成了一个死锁。
滥用单例的为什么会死锁。
如果在dispatch_once函数的block块执行期间,循环进入自己的dispatch_once函数,会造成链表一直增长,同样也会造成死锁。(这里只是简单的A->B->A->B->A这样的循环,也可以是A->A->A这样的更加直接的循环.
如果在block执行期间,多次进入调用同类的dispatch_once函数(即单例函数),会导致整体链表无限增长,造成永久性死锁
我觉得这也就是之前,坐那个直播中,用信号量来控制时,为什么会卡主,因为我用单例封装的信号量,然后单例循环调用,发生了死锁。
2021.8.10 补充一下死锁的demo
#import "ShareA.h"
#import "ShareB.h"
@implementation ShareA
+(instancetype)instance {
static ShareA *a;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
[[ShareB instance] test];
a = [[ShareA alloc]init];
});
return a;
}
- (void)test {
NSLog(@"ShareA");
}
@end
#import "ShareB.h"
#import "ShareA.h"
@implementation ShareB
+(instancetype)instance {
static ShareB *a;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
[[ShareA instance]test];
a = [[ShareB alloc]init];
});
return a;
}
- (void)test {
NSLog(@"ShareB");
}
@end
通过下面的报错位置,在对应着源码,应该可以看出问题所在。