Librdkafka的基础数据结构 2 --- 定时器 原子操作与引用计数

  • Timer
  • 原子操作
  • 引用计数

Timer
  • 所在文件: sr/rdkafka_timer.c(h)
  • 主要是通过TimerManager来管理多个timer, 达到处理定时任务的效果
  • TimerManager定义:
typedef struct rd_kafka_timers_s {
        TAILQ_HEAD(, rd_kafka_timer_s) rkts_timers;
        struct rd_kafka_s *rkts_rk;
    mtx_t       rkts_lock;
    cnd_t       rkts_cond;

        int         rkts_enabled;
} rd_kafka_timers_t;
  1. 使用TAILQ来管理多个timer, 这个 队列是个有序队列, 按rd_kafka_timer_s中的rtmr_next从小到大排列;
  2. 对timer 队列的操作需要加锁保护: rkts_lock
  • Timer定义:
typedef struct rd_kafka_timer_s {
    TAILQ_ENTRY(rd_kafka_timer_s)  rtmr_link;

    rd_ts_t rtmr_next;
    rd_ts_t rtmr_interval;   /* interval in microseconds */

    void  (*rtmr_callback) (rd_kafka_timers_t *rkts, void *arg);
    void   *rtmr_arg;
} rd_kafka_timer_t;
  1. rtmr_link : TAILQ元素
  2. rtmr_next: 当前timer的下一次到期时间, 绝对时间;
  3. rtmr_interval: 执行的时间间隔;
  4. rtmr_callback: 时期时执行的回调函数;
  • 加入新的timer到TimerManager中:
void rd_kafka_timer_start (rd_kafka_timers_t *rkts,
               rd_kafka_timer_t *rtmr, rd_ts_t interval,
               void (*callback) (rd_kafka_timers_t *rkts, void *arg),
               void *arg) {
    rd_kafka_timers_lock(rkts);
    rd_kafka_timer_stop(rkts, rtmr, 0/*!lock*/); 

    rtmr->rtmr_interval = interval;
    rtmr->rtmr_callback = callback;
    rtmr->rtmr_arg      = arg;

    rd_kafka_timer_schedule(rkts, rtmr, 0);
    rd_kafka_timers_unlock(rkts);
}
  1. 此timer已经在队列中的话,要先stop;
  2. 重新设置 timer的各参数;
  3. 加入队列;
  • Timer的插入: 根据rtmr_next值在队列中找到合适的位置后插入;
static void rd_kafka_timer_schedule (rd_kafka_timers_t *rkts,
                     rd_kafka_timer_t *rtmr, int extra_us) {
    rd_kafka_timer_t *first;

    /* Timer has been stopped */
    if (!rtmr->rtmr_interval)
        return;

        /* Timers framework is terminating */
        if (unlikely(!rkts->rkts_enabled))
                return;

    rtmr->rtmr_next = rd_clock() + rtmr->rtmr_interval + extra_us;

    if (!(first = TAILQ_FIRST(&rkts->rkts_timers)) ||
        first->rtmr_next > rtmr->rtmr_next) {
        TAILQ_INSERT_HEAD(&rkts->rkts_timers, rtmr, rtmr_link);
                cnd_signal(&rkts->rkts_cond);
    } else
        TAILQ_INSERT_SORTED(&rkts->rkts_timers, rtmr,
                                    rd_kafka_timer_t *, rtmr_link,
                    rd_kafka_timer_cmp);
}
  • Timer的调度执行:
void rd_kafka_timers_run (rd_kafka_timers_t *rkts, int timeout_us) {
    rd_ts_t now = rd_clock();
    rd_ts_t end = now + timeout_us;

        rd_kafka_timers_lock(rkts);

    while (!rd_atomic32_get(&rkts->rkts_rk->rk_terminate) && now <= end) {
        int64_t sleeptime;
        rd_kafka_timer_t *rtmr;

        if (timeout_us != RD_POLL_NOWAIT) {
            sleeptime = rd_kafka_timers_next(rkts,
                             timeout_us,
                             0/*no-lock*/);

            if (sleeptime > 0) {
                cnd_timedwait_ms(&rkts->rkts_cond,
                         &rkts->rkts_lock,
                         (int)(sleeptime / 1000));

            }
        }

        now = rd_clock();

        while ((rtmr = TAILQ_FIRST(&rkts->rkts_timers)) &&
               rtmr->rtmr_next <= now) {

            rd_kafka_timer_unschedule(rkts, rtmr);
                        rd_kafka_timers_unlock(rkts);

            rtmr->rtmr_callback(rkts, rtmr->rtmr_arg);

                        rd_kafka_timers_lock(rkts);
            /* Restart timer, unless it has been stopped, or
             * already reschedueld (start()ed) from callback. */
            if (rd_kafka_timer_started(rtmr) &&
                !rd_kafka_timer_scheduled(rtmr))
                rd_kafka_timer_schedule(rkts, rtmr, 0);
        }

        if (timeout_us == RD_POLL_NOWAIT) {
            /* Only iterate once, even if rd_clock doesn't change */
            break;
        }
    }

    rd_kafka_timers_unlock(rkts);
}
  1. 通过 rd_kafka_timers_next获取需要wait的时间 ;
  2. 需要wait就 cnd_timedwait_ms;
  3. 执行到期 timer的回调函数, 根据需要将此timer再次加入队列;
原子操作
  • 所在文件: src/rdatomic.h
  • 如果当前GCC支持_atomic组操作,就使用GCC的build-in函数
  • 如果不支持, 原子操作用锁来模拟实现;
  • 在Windows上用Interlocked族函数实现;
引用计数
  • 所在文件: src/rd.h
  • 定义:
#ifdef RD_REFCNT_USE_LOCKS
typedef struct rd_refcnt_t {
        mtx_t lock;
        int v;
} rd_refcnt_t;
#else
typedef rd_atomic32_t rd_refcnt_t;
#endif
  1. 由定义我们可以看出可以通过锁来实现,也可以通过上面介绍的原子类型来实现这个计数;
  • 引用计数的操作接口, 也是分成了锁(实现成函数)和原子类型(实现成宏)两种不同的实现
static RD_INLINE RD_UNUSED int rd_refcnt_init (rd_refcnt_t *R, int v)
static RD_INLINE RD_UNUSED void rd_refcnt_destroy (rd_refcnt_t *R)
static RD_INLINE RD_UNUSED int rd_refcnt_set (rd_refcnt_t *R, int v) 
static RD_INLINE RD_UNUSED int rd_refcnt_add0 (rd_refcnt_t *R)
static RD_INLINE RD_UNUSED int rd_refcnt_sub0 (rd_refcnt_t *R)
static RD_INLINE RD_UNUSED int rd_refcnt_get (rd_refcnt_t *R)
智能指针
  • 所在文件: src/rd.h
  • 智能指针就是加了上面的引用计数的指针
  • 定义:
#define RD_SHARED_PTR_TYPE(STRUCT_NAME,WRAPPED_TYPE) WRAPPED_TYPE
//get的同时会将此用计数 +1
#define rd_shared_ptr_get_src(FUNC,LINE,OBJ,REFCNT,SPTR_TYPE)   \
        (rd_refcnt_add(REFCNT), (OBJ))
#define rd_shared_ptr_get(OBJ,REFCNT,SPTR_TYPE)          \
        (rd_refcnt_add(REFCNT), (OBJ))

#define rd_shared_ptr_obj(SPTR) (SPTR)

// put使用rd_refcnt_destroywrapper实现, 引用计数减为0,则调用DESTRUCTOR作清理释放
#define rd_shared_ptr_put(SPTR,REF,DESTRUCTOR)                  \
                rd_refcnt_destroywrapper(REF,DESTRUCTOR)
  • 在C中实现引用计数, 哪里要+1, 哪里要-1, 全凭使用者自己根据代码逻辑需要来控制,因此很容易导致少+1, 多+1, 少-1, 多-1的情况, 因此rdkafka作者又提供了一个debug版本的实现, 跟踪了调用函数, 所在行等信息, 供调试排查问题用,其实实现也很简单, 但还是比较巧妙的
#define RD_SHARED_PTR_TYPE(STRUCT_NAME, WRAPPED_TYPE) \
        struct STRUCT_NAME {                          \
                LIST_ENTRY(rd_shptr0_s) link;         \
                WRAPPED_TYPE *obj;                     \
                rd_refcnt_t *ref;                     \
                const char *typename;                 \
                const char *func;                     \
                int line;                             \
        }
/* Common backing struct compatible with RD_SHARED_PTR_TYPE() types */
typedef RD_SHARED_PTR_TYPE(rd_shptr0_s, void) rd_shptr0_t;

LIST_HEAD(rd_shptr0_head, rd_shptr0_s);
extern struct rd_shptr0_head rd_shared_ptr_debug_list;
extern mtx_t rd_shared_ptr_debug_mtx;

引用了一个新的struct来将引用计数和调用信息结合起来, 使用链表来管理这个struct的对象. 每次对引用计数的操作都要操作这个链表.

static RD_INLINE RD_UNUSED RD_WARN_UNUSED_RESULT __attribute__((warn_unused_result))
rd_shptr0_t *rd_shared_ptr_get0 (const char *func, int line,
                                 const char *typename,
                                 rd_refcnt_t *ref, void *obj) {
        //创建shared ptr struct结构
        rd_shptr0_t *sptr = rd_calloc(1, sizeof(*sptr));
        sptr->obj = obj;
        sptr->ref = ref;
        sptr->typename = typename;
        sptr->func = func;
        sptr->line = line;

       //加入链表
        mtx_lock(&rd_shared_ptr_debug_mtx);
        LIST_INSERT_HEAD(&rd_shared_ptr_debug_list, sptr, link);
        mtx_unlock(&rd_shared_ptr_debug_mtx);
        return sptr;
}

#define rd_shared_ptr_put(SPTR,REF,DESTRUCTOR) do {               \
               // 引用计数 -1, 到0话清理释放
                if (rd_refcnt_sub(REF) == 0)                      \
                        DESTRUCTOR;                               \
                mtx_lock(&rd_shared_ptr_debug_mtx);               \
                //从链表中移除struct 对象
                LIST_REMOVE(SPTR, link);                          \
                mtx_unlock(&rd_shared_ptr_debug_mtx);             \
                rd_free(SPTR);                                    \
        } while (0)

Librdkafka源码分析-Content Table

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容

  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,787评论 1 19
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,398评论 25 707
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,585评论 18 139
  • 轻柔、和缓的音乐正适合这半夜0点的氛围。 我不能明白,为什么我的朋友和我做一样工作的时候生活过得比我有味道的多,现...
    麦克斯韦L阅读 213评论 0 0
  • 集合了一些关于香港ins上网红打卡地 除了在香港买买买 还有超适合拍照的小众地方 试一下这些小众玩法 你可能会发现...
    catingtan阅读 389评论 0 1