chromium源码学习——线程池(中)

本篇分析base::SchedulerSequencedTaskRunner实现。
SchedulerSequencedTaskRunner是SequencedTaskRunner的一个主要实现,主要保证了队列中任务的串行执行:前一个任务执行完成后才会进行下一个任务。譬如Profile中的IOTaskRunner就是一个SchedulerSequencedTaskRunner实例。
分析过程中会牵涉到很多个类,这里分析过程我也不知道怎么说起,直接跳到结论吧:


先简单介绍流程中重要的类
base::Sequence 任务的容器,同一个Sequence中的任务保证串行执行,内部存储结构为std::queue。线程安全。
base::PriorityQueue Sequence的容器,内部存储结构为std::priority_queue,可以取出具有最高优先级的sequence。线程安全。
base::PriorityQueue::Transaction 简单封装了一下对PriorityQueue中锁的操作,通过Transaction对PriorityQueue进行操作。保证当存在一个Transaction实例时就没有其他线程可以读写对应的PriorityQueue。
base::SchedulerWorker 线程池中线程的封装
base::SchedulerWorkerPool 明显就是线程池本身咯


SchedulerSequencedTaskRunner的实现位于base/task_scheduler/scheduler_worker_pool_impl.cc,先看一下它的成员:

// Sequence for all Tasks posted through this TaskRunner.
const scoped_refptr<Sequence> sequence_ = new Sequence;

const TaskTraits traits_;
SchedulerWorkerPool* const worker_pool_;

很明显,每个SchedulerSequencedTaskRunner会对应一个Sequence实例,从而保证向同一个runner投放的任务之间的串行执行。跟踪一下任务的传递:

bool SchedulerWorkerPoolImpl::PostTaskWithSequence(
    std::unique_ptr<Task> task,
    scoped_refptr<Sequence> sequence) {
  if (!task_tracker_->WillPostTask(task.get()))
    return false;

  if (task->delayed_run_time.is_null()) {
    PostTaskWithSequenceNow(std::move(task), std::move(sequence));
  } else {
    delayed_task_manager_->AddDelayedTask(...);
  }
  return true;
}

void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
    std::unique_ptr<Task> task,
    scoped_refptr<Sequence> sequence) {
  const bool sequence_was_empty = sequence->PushTask(std::move(task));
  if (sequence_was_empty) {
    const auto sequence_sort_key = sequence->GetSortKey();
    shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
                                                    sequence_sort_key);

    // Wake up a worker to process |sequence|.
    WakeUpOneWorker();
  }
}

任务会先经过TaskTracker,TaskTracker内的逻辑还没有细看,但是WillPostTask里面大致就是判断是否正在退出流程,根据任务的traits是否允许执行。 如果任务有延时属性,会向DelayTaskManager中一个线程消息队列投放一个延时任务,时间到后将Task加入Sequence。如果没有延时属性,Task将直接被加入Sequence中,如果Sequence中原本没有任务,那么把Sequence加入Pool中的PriorityQueue,并唤醒一个空闲的worker(如果有的话)。
接下来再看Worker。Worker线程的入口函数位于scheduler_worker.cc:39,这里贴一点重要逻辑的代码如下:

void ThreadMain() override {
  ...

  while (!outer_->ShouldExit()) {
    ...

    // Get the sequence containing the next task to execute.
    scoped_refptr<Sequence> sequence =
        outer_->delegate_->GetWork(outer_.get());
    if (!sequence) {
      if (outer_->delegate_->CanDetach(outer_.get())) {
        detached_thread = outer_->DetachThreadObject(DetachNotify::DELEGATE);
        if (detached_thread) {
          DCHECK_EQ(detached_thread.get(), this);
          PlatformThread::Detach(thread_handle_);
          break;
        }
      }
      outer_->delegate_->WaitForWork(&wake_up_event_);
      continue;
    }

    if (outer_->task_tracker_->RunTask(sequence->TakeTask(),
                                       sequence->token())) {
      outer_->delegate_->DidRunTask();
    }

    const bool sequence_became_empty = sequence->Pop();
    if (!sequence_became_empty)
      outer_->delegate_->ReEnqueueSequence(std::move(sequence));

    ...
  }

  ...
}

scoped_refptr<Sequence>
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
    SchedulerWorker* worker) {
  ...

  scoped_refptr<Sequence> sequence;
  {
    std::unique_ptr<PriorityQueue::Transaction> shared_transaction(
        outer_->shared_priority_queue_.BeginTransaction());

    if (shared_transaction->IsEmpty()) {
      outer_->AddToIdleWorkersStack(worker);
      ...
      return nullptr;
    }

    sequence = shared_transaction->PopSequence();
  }

  outer_->RemoveFromIdleWorkersStack(worker);
  ...
  return sequence;
}

分析ThreadMain可以得到以下信息:

  • 每次循环会从SchedulerWorkerPool中的优先队列获取一个Sequence,并从中弹出一个任务执行
  • 执行完任务后如果Sequence不为空,则将Sequence重新放回WorkerPool中的优先队列
  • 空闲线程的生存由SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach控制,主要影响因素为空闲时间,且Pool中会保留一个空闲线程(不会将所有空闲线程都退出)
  • 执行任务时的Sequence是从优先队列中取出的,执行完任务后才会重新将Sequence加回优先队列,所以保证了不会有两个工作线程持有同一个Sequence的引用,从而保证了Sequence中的任务是串行执行的
    如何保证在多线程的情况下同一队列中的任务串行执行,最直观的方式应该就是执行任务的整个过程中对队列加锁,但是这样会造成执行任务时无法向队列中添加任务,降低了性能。chromium中的做法就避免了这个问题,这个思路可以学习。
    下篇准备写一些初始化、退出流程之类的杂七杂八的。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,519评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,842评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,544评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,742评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,646评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,027评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,513评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,169评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,324评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,268评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,299评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,996评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,591评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,667评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,911评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,288评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,871评论 2 341

推荐阅读更多精彩内容