skynet源码分析(5)--消息机制之消息处理

作者:shihuaping0918@163.com,转载请注明作者

skynet的消息机制准备拆成三个部分来讲,第一部分是接收处理,第二部分是分发,第三部分是消息注册。顺序是倒过来的讲的,我觉得这样更容易被人接受理解。顺过来讲会有一个问题就是,讲到a时,可能牵扯到b.c.d,而b.c.d可能又牵扯到c.d.e,不讲呢又会留下疑惑,讲的话呢又很容易陷入细节的泥潭。干脆就倒过来讲好了。

skynet是单进程多线程的,线程的种类有monitor/timer/socket/worker,monitor在第4篇中讲过了,就是监控服务是不是陷入死循环了。timer是skynet自己实现的定时器。socket是负责网络的,这个应该是最容易被理解的。worker就是工作线程了,monitor/timer/socket都只有一个线程,唯独worker有多个线程,是可配的,不配的话是8个线程。每个工作线程有个叫worker_parm的参数。

在开始讲线程之前还需要回顾一下消息队列,在第2篇中讲过全局消息队列是链表,里面链了工作消息队列,而工作消息队列内部使用的是循环数组。

另外还要回顾一下消息的handle,每个服务都有运行时一个独一无二的handle,这个handle可以跟名字绑定。

好,准备工作差不多完成了,上面提到的都是分析代码所要具备的知识。上代码吧,源码之前了无秘密。

static void *
thread_worker(void *p) {
    struct worker_parm *wp = p;//线程参数
    int id = wp->id; //线程编号
    int weight = wp->weight; //线程权重
    struct monitor *m = wp->m; //monitor,监控器,每个线程有一个
    struct skynet_monitor *sm = m->m[id]; // 线程自己的监控器
    skynet_initthread(THREAD_WORKER);
    struct message_queue * q = NULL;
    while (!m->quit) {
//消息处理
        q = skynet_context_message_dispatch(sm, q, weight);
        if (q == NULL) { //所有消息队列都是空的
            if (pthread_mutex_lock(&m->mutex) == 0) {
                ++ m->sleep;
                // "spurious wakeup" is harmless,
                // because skynet_context_message_dispatch() can be call at any time.
                if (!m->quit) //不退出的时候等待唤醒
                    pthread_cond_wait(&m->cond, &m->mutex);
                -- m->sleep;
                if (pthread_mutex_unlock(&m->mutex)) {
                    fprintf(stderr, "unlock mutex error");
                    exit(1);
                }
            }
        }
    }
    return NULL;
}

上面的代码没涉及太多业务,注意传进去的q刚开始是null。下面看skynet_context_message_dispatch

struct message_queue * 
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
    if (q == NULL) { //第一次传进来的是null
        q = skynet_globalmq_pop(); //全局队列出队一个工作消息队列
        if (q==NULL)
            return NULL;
    }
    //消息队列的handle,就是服务的标识
    uint32_t handle = skynet_mq_handle(q);
    //根据handle取出服务上下文,并将ctx引用计数+1
    struct skynet_context * ctx = skynet_handle_grab(handle);
    if (ctx == NULL) {  //服务被释放了?
        struct drop_t d = { handle };
        skynet_mq_release(q, drop_message, &d); //清空工作队列
        return skynet_globalmq_pop(); //进行下一个工作队列
    }

    int i,n=1;
    struct skynet_message msg;

    for (i=0;i<n;i++) {
        if (skynet_mq_pop(q,&msg)) { //取工作队列中的消息
            skynet_context_release(ctx); //工作队列是空的,ctx引用计数减1
            return skynet_globalmq_pop(); //下一个工作队列
        } else if (i==0 && weight >= 0) { //权重 > 0
            n = skynet_mq_length(q);
            n >>= weight; //权重越大,给的处理时间越少
        }
        //取overlad值,然后把mq里的overload设为0
        //就是防止无限打下面这条日志
        int overload = skynet_mq_overload(q); 
        if (overload) {
            skynet_error(ctx, "May overload, message queue length = %d", overload);
        }
        //在分析monitor时讲过
        //触发monitor,monitor线程会检查是不是进入死循环
        skynet_monitor_trigger(sm, msg.source , handle);
        //如果服务都没提供回调
        if (ctx->cb == NULL) {
            skynet_free(msg.data);
        } else {//消息处理
            dispatch_message(ctx, &msg);
        }
        //调用结束了,当destination为0的时候,不进行死循环检查
        skynet_monitor_trigger(sm, 0,0);
    }

     //下面这段代码是时间片流转
    //把处理机会让给其它服务
    assert(q == ctx->queue);
    struct message_queue *nq = skynet_globalmq_pop();
    if (nq) { //如果全局队列里还有工作队列
        // If global mq is not empty , push q back, and return next queue (nq)
        // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
        skynet_globalmq_push(q); //把当前队列放回去
        q = nq; //把机会让给其它工作队列,雷锋啊
    } 
    skynet_context_release(ctx); //ctx引用计数减1

    return q;
}

先不继续分析dispatch_message,先总结一下,skynet_context_message_dispatch这个函数实际上就是不停地从全局消息队列里取工作队列,取到了以后呢,就一直处理这个队列里的消息。为了避免某个队列占用太多cpu,当前队列处理到一定的量,就把机会让给全局消息队列里的其它工作队列,把自己又放回全局消息队列。而这个处理的量是根据创建线程时thread_param里的weight权重来判定的,权重越大,流转的就越快,也就是说处理某个队列的消息数量就越少。这就是消息处理的主流程机制。

在主流程之外,还有monitor的触发和取消,每次处理前,触发monitor的检查。处理完了,取消monitor的检查。

大体流程清楚了以后,歇口气。可以抽根烟喝杯茶再继续,后面的内容简单些了。下面分析dispatch_message。

static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
    assert(ctx->init);
    CHECKCALLING_BEGIN(ctx)
    pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
    int type = msg->sz >> MESSAGE_TYPE_SHIFT; //消息类型,是请求包还是回应包,参考云风的博客
    size_t sz = msg->sz & MESSAGE_TYPE_MASK; //防止sz过长
    if (ctx->logfile) { //打日志
        skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
    }
    ++ctx->message_count;
    int reserve_msg;
    if (ctx->profile) { //profile
        ctx->cpu_start = skynet_thread_time();
        reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz); //调用服务里的回调函数
        uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
        ctx->cpu_cost += cost_time; //cpu时间消耗
    } else {
        reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz); //调用服务里的消息回调函数
    }
    if (!reserve_msg) {
        skynet_free(msg->data);
    }
    CHECKCALLING_END(ctx)
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,784评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,745评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,702评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,229评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,245评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,376评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,798评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,471评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,655评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,485评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,535评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,235评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,793评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,863评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,096评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,654评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,233评论 2 341

推荐阅读更多精彩内容