RabbitMQ服务端触发Confirm时机

Publisher Confirms 机制通常用来确认消息到达服务端。对于一条持久化消息,通过 channel(confirm模式)发送到持久化队列,如果生产者收到服务端响应关于这条消息的 basic.ack指令,那么生产者可以认为消息已经成功发到服务端(并且已经持久化)。

本文主要探讨 RabbitMQ(v3.6.x) 服务端触发 confirm 的时机

流程

  1. rabbit_channel 进程处理 basic.publish 方法时,执行 deliver_to_queues/2 方法
handle_method(#'basic.publish'{exchange    = ExchangeNameBin,
                               routing_key = RoutingKey,
                               mandatory   = Mandatory},
              Content, State = #ch{virtual_host    = VHostPath,
                                   tx              = Tx,
                                   channel         = ChannelNum,
                                   confirm_enabled = ConfirmEnabled,
                                   trace_state     = TraceState,
                                   user            = #user{username = Username},
                                   conn_name       = ConnName,
                                   delivery_flow   = Flow}) ->
    check_msg_size(Content),
    ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
    check_write_permitted(ExchangeName, State),
    Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
    check_internal_exchange(Exchange),
    %% We decode the content's properties here because we're almost
    %% certain to want to look at delivery-mode and priority.
    DecodedContent = #content {properties = Props} =
        maybe_set_fast_reply_to(
          rabbit_binary_parser:ensure_content_decoded(Content), State),
    check_user_id_header(Props, State),
    check_expiration_header(Props),
    DoConfirm = Tx =/= none orelse ConfirmEnabled,
    {MsgSeqNo, State1} =
        case DoConfirm orelse Mandatory of
            false -> {undefined, State};
            true  -> SeqNo = State#ch.publish_seqno,
                     {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
        end,
    case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
        {ok, Message} ->
            Delivery = rabbit_basic:delivery(
                         Mandatory, DoConfirm, Message, MsgSeqNo),
            QNames = rabbit_exchange:route(Exchange, Delivery),
            rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
                                Username, TraceState),
            DQ = {Delivery#delivery{flow = Flow}, QNames},
            {noreply, case Tx of
                          none         -> deliver_to_queues(DQ, State1);
                          {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
                                          State1#ch{tx = {Msgs1, Acks}}
                      end};
        {error, Reason} ->
            precondition_failed("invalid message: ~p", [Reason])
    end;
  1. deliver_to_queues/2 会调用 rabbit_amqqueue 模块 deliver/2方法:通过delegate 进程 同时向队列进程(master、slaves进程)发送消息
deliver(Qs, Delivery = #delivery{flow = Flow}) ->
    {MPids, SPids} = qpids(Qs),
    QPids = MPids ++ SPids,
    %% We use up two credits to send to a slave since the message
    %% arrives at the slave from two directions. We will ack one when
    %% the slave receives the message direct from the channel, and the
    %% other when it receives it via GM.
    case Flow of
        %% Here we are tracking messages sent by the rabbit_channel
        %% process. We are accessing the rabbit_channel process
        %% dictionary.
        flow   -> [credit_flow:send(QPid) || QPid <- QPids],
                  [credit_flow:send(QPid) || QPid <- SPids];
        noflow -> ok
    end,

    %% We let slaves know that they were being addressed as slaves at
    %% the time - if they receive such a message from the channel
    %% after they have become master they should mark the message as
    %% 'delivered' since they do not know what the master may have
    %% done with it.
    MMsg = {deliver, Delivery, false},
    SMsg = {deliver, Delivery, true},
    %% 同时向队列master、slaves进程发
    delegate:cast(MPids, MMsg),
    delegate:cast(SPids, SMsg),
    QPids.

  1. 队列进程 rabbit_amqqueue_process 接收消息
handle_cast({deliver, Delivery = #delivery{sender = Sender,
                                           flow   = Flow}, SlaveWhenPublished},
            State = #q{senders = Senders}) ->
    Senders1 = case Flow of
    %% In both credit_flow:ack/1 we are acking messages to the channel
    %% process that sent us the message delivery. See handle_ch_down
    %% for more info.
                   flow   -> credit_flow:ack(Sender),
                             case SlaveWhenPublished of
                                 true  -> credit_flow:ack(Sender); %% [0]
                                 false -> ok
                             end,
                             pmon:monitor(Sender, Senders);
                   noflow -> Senders
               end,
    State1 = State#q{senders = Senders1},
    noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
  1. deliver_or_enqueue/3 尝试投递给消费者;或者 入队
deliver_or_enqueue(Delivery = #delivery{message = Message,
                                        sender  = SenderPid,
                                        flow    = Flow},
                   Delivered, State = #q{backing_queue       = BQ,
                                         backing_queue_state = BQS}) ->
    send_mandatory(Delivery), %% must do this before confirms
    {Confirm, State1} = send_or_record_confirm(Delivery, State),
    Props = message_properties(Message, Confirm, State1),
    {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
    State2 = State1#q{backing_queue_state = BQS1},
    case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
                                             State2) of
        true ->
            State2;
        {delivered, State3} ->
            State3;
        %% The next one is an optimisation
        {undelivered, State3 = #q{ttl = 0, dlx = undefined,
                                  backing_queue_state = BQS2,
                                  msg_id_to_channel   = MTC}} ->
            {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
            State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
        {undelivered, State3 = #q{backing_queue_state = BQS2}} ->
            BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
            {Dropped, State4 = #q{backing_queue_state = BQS4}} =
                maybe_drop_head(State3#q{backing_queue_state = BQS3}),
            QLen = BQ:len(BQS4),
            %% optimisation: it would be perfectly safe to always
            %% invoke drop_expired_msgs here, but that is expensive so
            %% we only do that if a new message that might have an
            %% expiry ends up at the head of the queue. If the head
            %% remains unchanged, or if the newly published message
            %% has no expiry and becomes the head of the queue then
            %% the call is unnecessary.
            case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
                {false, false,         _} -> State4;
                {true,  true,  undefined} -> State4;
                {_,     _,             _} -> drop_expired_msgs(State4)
            end
    end.
  1. send_or_record_confirm/2,只有当消息队列都设置为持久化时,才会将消息状态保存在 msg_id_to_channel 数据结构中(等到消息最终落盘才会触发 confirm)
send_or_record_confirm(#delivery{confirm    = true,
                                 sender     = SenderPid,
                                 msg_seq_no = MsgSeqNo,
                                 message    = #basic_message {
                                   is_persistent = true,
                                   id            = MsgId}},
                       State = #q{q                 = #amqqueue{durable = true},
                                  msg_id_to_channel = MTC}) ->
    MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
    {eventually, State#q{msg_id_to_channel = MTC1}};
  1. attempt_delivery/4 后根据投递状态,执行 BQ:publish/6BQ:publish_delivered/5 方法,最终底层 rabbit_variable_queue 调用 rabbit_msg_store 进行消息持久化
  1. 步骤3最后执行noreply/1
noreply(NewState) ->
    {NewState1, Timeout} = next_state(NewState),
    {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.

next_state(State = #q{backing_queue       = BQ,
                      backing_queue_state = BQS,
                      msg_id_to_channel   = MTC}) ->
    assert_invariant(State),
    {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
    MTC1 = confirm_messages(MsgIds, MTC),
    State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1},
    case BQ:needs_timeout(BQS1) of
        false -> {stop_sync_timer(State1),   hibernate     };
        idle  -> {stop_sync_timer(State1),   ?SYNC_INTERVAL};
        timed -> {ensure_sync_timer(State1), 0             }
    end.
confirm_messages(MsgIds, MTC) ->
    {CMs, MTC1} =
        lists:foldl(
          fun(MsgId, {CMs, MTC0}) ->
                  case gb_trees:lookup(MsgId, MTC0) of
                      {value, {SenderPid, MsgSeqNo}} ->
                          {rabbit_misc:gb_trees_cons(SenderPid,
                                                     MsgSeqNo, CMs),
                           gb_trees:delete(MsgId, MTC0)};
                      none ->
                          {CMs, MTC0}
                  end
          end, {gb_trees:empty(), MTC}, MsgIds),
    rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
    MTC1.
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,340评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,762评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,329评论 0 329
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,678评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,583评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,995评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,493评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,145评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,293评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,250评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,267评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,973评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,556评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,648评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,873评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,257评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,809评论 2 339