YuXuan
发布于 2020-10-18 / 50 阅读
0

RabbitMQ源码剖析

队列

image.png
声明队列记录:

-record(amqqueue, {
  name :: rabbit_amqqueue:name() | '_', %% immutable
  durable :: boolean() | '_', %% immutable
  auto_delete :: boolean() | '_', %% immutable
  exclusive_owner = none :: pid() | none | '_', %% immutable
  arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable
  pid :: pid() | ra_server_id() | none | '_', %% durable (just so we know home node)
  slave_pids = [] :: [pid()] | none | '_', %% transient
  sync_slave_pids = [] :: [pid()] | none| '_',%% transient
  recoverable_slaves = [] :: [atom()] | none | '_', %% durable
  policy :: binary() | none | undefined | '_', %% durable, implicit update as above
  operator_policy :: binary() | none | undefined | '_', %% durable, implicit update as above
  gm_pids = [] :: [{pid(), pid()} | pid()] | none | '_', %% transient
  decorators :: [atom()] | none | undefined | '_', %% transient, recalculated as above
  state = live :: atom() | none | '_', %% durable (have we crashed?)
  policy_version = 0 :: non_neg_integer() | '_',
  slave_pids_pending_shutdown = [] :: [pid()] | '_',
  vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index
  options = #{} :: map() | '_',
  type = ?amqqueue_v1_type :: module() | '_',
  type_state = #{} :: map() | '_'
}).

类型声明:

-type amqqueue() :: amqqueue_v1:amqqueue_v1() | amqqueue_v2().
-type amqqueue_v2() :: #amqqueue{
  name :: rabbit_amqqueue:name(), %% 队列名称
  durable :: boolean(), %% 是否为持久化队列
  auto_delete :: boolean(), %% 是否自动删除
  exclusive_owner :: pid() | none,
  arguments :: rabbit_framing:amqp_table(), %% 属性参数
  pid :: pid() | ra_server_id() | none,
  slave_pids :: [pid()] | none,
  sync_slave_pids :: [pid()] | none,
  recoverable_slaves :: [atom()] | none,
  policy :: binary() | none | undefined,
  operator_policy :: binary() | none | undefined,
  gm_pids :: [pid()] | none,
  decorators :: [atom()] | none | undefined,
  state :: atom() | none,
  policy_version :: non_neg_integer(), %% 策略版本
  slave_pids_pending_shutdown :: [pid()],
  vhost :: rabbit_types:vhost() | undefined, %% 所在的虚拟主机
  options :: map(),
  type :: atom(), %% 什么类型:disk还是ram的
  type_state :: #{}
}.

image.png
消息的确认:
image.png
消息队列消费函数的声明:
image.png
消费消息实现1:
image.png
消费消息实现2:

basic_consume(Q, _NoAck, _ChPid,
              _LimiterPid, true, _ConsumerPrefetchCount, _ConsumerTag,
              _ExclusiveConsume, _Args, _OkMsg, _ActingUser, _QStates)
  when ?amqqueue_is_quorum(Q) ->
    {error, global_qos_not_supported_for_queue_type};
basic_consume(Q, NoAck, ChPid, _LimiterPid, _LimiterActive, ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser, QStates)
  when ?amqqueue_is_quorum(Q) ->
    {Name, _} = Id = amqqueue:get_pid(Q),
    QName = amqqueue:get_name(Q),
    ok = check_consume_arguments(QName, Args),
    QState0 = get_quorum_state(Id, QName, QStates),
    case rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid,
                                           ConsumerPrefetchCount,
                                           ConsumerTag,
                                           ExclusiveConsume, Args,
                                           ActingUser,
                                           OkMsg, QState0) of
      {ok, QState} ->
        {ok, maps:put(Name, QState, QStates)};
      {error, Reason} ->
        rabbit_misc:protocol_error(internal_error,
                                   "Cannot consume a message from quorum queue '~s': ~w",
                                   [rabbit_misc:rs(QName), Reason])
  end.

消费消息实现3:

basic_consume(Q, _NoAck, _ChPid,
  _LimiterPid, true, _ConsumerPrefetchCount, _ConsumerTag,
  _ExclusiveConsume, _Args, _OkMsg, _ActingUser, _QStates)
when ?amqqueue_is_quorum(Q) ->
  {error, global_qos_not_supported_for_queue_type};

主动拉消息函数声明:
image.png
声明实现1:
image.png
声明实现2:
image.png
消息的确认:
image.png
消息的重新入列:
image.png
消息失效时间的计算:
image.png
消息的发送确认:

confirm_messages(MsgIds, MTC) ->
  {CMs, MTC1} =
    lists:foldl(
      fun(MsgId, {CMs, MTC0}) ->
        case maps:get(MsgId, MTC0, none) of
          none ->
            {CMs, MTC0};
          {SenderPid, MsgSeqNo} ->
            {maps:update_with(SenderPid,
              fun(MsgSeqNos) ->
                [MsgSeqNo | MsgSeqNos]
              end,
              [MsgSeqNo],
              CMs),
            maps:remove(MsgId, MTC0)}
        end
      end, {#{}, MTC}, MsgIds),
    maps:fold(
      fun(Pid, MsgSeqNos, _) ->
        rabbit_misc:confirm_to_sender(Pid, MsgSeqNos)
      end,
      ok,
      CMs),
    MTC1.

死信:
image.png

交换器

direct交换器
rabbit_exchange_type_direct.erl
看其中的路由方法:
image.png
fanout交换器
rabbit_exchange_type_fanout.erl
看其中的路由方法:
image.png
headers交换器
rabbit_exchange_type_headers.erl
看其中的路由方法:
image.png
topic交换器
rabbit_exchange_type_topic.erl
看其中的路由方法:
image.png

持久化

消息流转示意图:
image.png
rabbit_channel进程确定了消息将要投递的目标队列,rabbit_amqqueue_process是队列进程,每个队列都有一个对应的进程,实际上rabbit_amqqueue_process进程只是提供了逻辑上对队列的相关操作,他的真正操作是通过调用指定的backing_queue模块提供的相关接口实现的,默认情况该backing_queue的实现模块为rabbit_variable_queue。 RabbitMQ队列中的消息随着系统的负载会不断的变化,一个消息可能会处于以下4种状态:

%% Definitions:
%%
%% alpha: this is a message where both the message itself, and its
%% position within the queue are held in RAM(消息本身和消息位置索引都只在内存中)
%%
%% beta: this is a message where the message itself is only held on
%% disk (if persisted to the message store) but its position
%% within the queue is held in RAM.(消息本身存储在磁盘中,但是消息的位置索引存在内存中)
%%
%% gamma: this is a message where the message itself is only held on
%% disk, but its position is both in RAM and on disk.(消息本身存储在磁盘中,但是消息的位置索引存在内存中和磁盘中)
%%
%% delta: this is a collection of messages, represented by a single
%% term, where the messages and their position are only held on
%% disk.(消息本身和消息的位置索引都值存储在磁盘中)

对于普通的没有设置优先级和镜像的队列来说,backing_queue的默认实现是rabbit_variable_queue,其内部通过5个子队列Q1、Q2、Delta、Q3和Q4来实现这4个状态的转换,其关系如下图所示:
image.png
其中Q1、Q4只包含alpha状态的消息,Q2和Q3包含Beta和gamma状态的消息,Delta只包含delta状态的消息。具体消息的状态转换后续会进行源码分析。

消息入队分析

rabbit_amqqueue_process对消息的主要处理逻辑位于deliver_or_enqueue函数,该方法将消息直接传递给消费者,或者将消息存储到队列当中。
整体处理逻辑如下:

  1. 首先处理消息的mandory标志,和confirm属性。mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者。confirm则是消息的发布确认。
  2. 然后判断队列中是否有消费者正在等待,如果有则直接调用backing_queue的接口给客户端发送消息。
  3. 如果队列上没有消费者,根据当前相关设置判断消息是否需要丢弃,不需要丢弃的情况下调用backing_queue的接口将消息入队。

deliver_or_enqueue函数代码:

deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid, flow = Flow},
                   Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
  %% 如果当前消息mandatory字段为true,则立刻通知该消息对应的rabbit_channel进程
  send_mandatory(Delivery), %% must do this before confirms
  %% 消息队列记录要confirm的消息,如果confirm为false,则不记录要confirm(如果消息需要进行confirm,则将该消息的信息存入msg_id_to_channel字段中)
  {Confirm, State1} = send_or_record_confirm(Delivery, State),
  %% 得到消息特性特性数据结构
  Props = message_properties(Message, Confirm, State1),
  %% 让backing_queue去判断当前消息是否重复(rabbit_variable_queue没有实现,直接返回的false)
  {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
  State2 = State1#q{backing_queue_state = BQS1},
  case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, State2) of
    true ->
      State2;
    %% 已经将消息发送给消费者的情况
    {delivered, State3} ->
      State3;
    %% The next one is an optimisation
    %% 没有消费者来取消息的情况(discard:抛弃)
    %% 当前消息没有发送到对应的消费者,同时当前队列中设置的消息过期时间为0,同时重新发送的exchange交换机为undefined,则立刻将该消息丢弃掉
    {undelivered, State3 = #q{ttl = 0, dlx = undefined, backing_queue_state = BQS2, msg_id_to_channel = MTC}} ->
      %% 直接将消息丢弃掉,如果需要confirm的消息则立刻通知rabbit_channel进程进行confirm操作
      {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
      State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
    %% 没有消费者来取消息的情况
    {undelivered, State3 = #q{backing_queue_state = BQS2}} ->
      %% 将消息发布到backing_queue中
      BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
      %% 判断当前队列中的消息数量超过上限或者消息的占的空间大小超过上限
      {Dropped, State4 = #q{backing_queue_state = BQS4}} = maybe_drop_head(State3#q{backing_queue_state = BQS3}),
      %% 得到当前队列中的消息数量
      QLen = BQ:len(BQS4),
      %% optimisation: it would be perfectly safe to always
      %% invoke drop_expired_msgs here, but that is expensive so
      %% we only do that if a new message that might have an
      %% expiry ends up at the head of the queue. If the head
      %% remains unchanged, or if the newly published message
      %% has no expiry and becomes the head of the queue then
      %% the call is unnecessary.
      case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
        %% 该情况是头部没有变化,同时消息队列消息树立不为一,则不管当前加入的消息是否设置有超时时间,都不执行drop_expired_msgs函数
        {false, false, _} -> State4;
        %% 有丢弃消息,同时当前队列中只有当前这个新的消息,同时消息自己的特性过期时间没有定义,则不检查消息过期
        %% 此时消息的头部有变化,但是消息队列中只有一个消息,该消息还没有设置超时时间,则不执行drop_expired_msgs函数
        {true, true, undefined} -> State4;
        %% 当向队列中插入消息后需要做检查消息过期,同时设置定时器的操作只有三种情况
        %% 1.当消息头部根据队列上限有变化,同时消息插入后当前队列消息数量为一,且该消息设置有过期时间,则需要做一次操作(该情况是消息头部有删除消息,都会进行一次消息过期检查)
        %% 2.当消息头部根据队列上限有变化,同时消息插入后当前队列消息数量不为一,且该消息设置有过期时间,则需要做一次操作(该情况是消息头部有删除消息,都会进行一次消息过期检查)
        %% 3.当消息头部根据队列上限没有变化,同时消息插入后当前队列消息数量为一,不管消息有没有过期时间,都要做一次操作(该情况下是当前队列进入第一条消息)
        %% 最重要的是只要消息队列的头部消息有变化,则立刻执行drop_expired_msgs函数,将队列头部超时的消息删除掉
        {_, _, _} -> drop_expired_msgs(State4)
      end
  end.

如果调用到该方法的BQ:publish则说明当前队列没有消费者正在等待,消息将进入到队列。
backing_queue实现了消息的存储,他会尽力会durable=true的消息做持久化存储。初始默认情况下,非持久化消息直接进入内存队列,此时效率最高,当内存占用逐渐达到一个阈值时,消息和消息索引逐渐往磁盘中移动,随着消费者的不断消费,内存占用的减少,消息逐渐又从磁盘中被转到内存队列中。
消息在这些Queue中传递的"一般"过程q1->q2->delta->q3->q4,一般负载较轻的情况消息不需要走完每个Queue,大部分都可以跳过。rabbit_variable_queue中消息的入队接口源码如下:

%% 消息的发布接口
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
        MsgProps = #message_properties { needs_confirming = NeedsConfirming
},
        IsDelivered, _ChPid, _Flow,
        State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
                           next_seq_id = SeqId,
                           in_counter = InCount,
                           durable = IsDurable,
                           unconfirmed = UC }) ->
  %% 只有持久化队列和消息持久化才会对消息进行持久化
  IsPersistent1 = IsDurable andalso IsPersistent,
  %% 组装消息状态(该数据结构是实际存储在队列中的数据)
  MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
  %% 如果队列和消息都是持久化类型,则将消息内容和消息在队列中的索引写入磁盘
  {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
  %% 将消息状态数据结构存入内存(如果Q3队列不为空,则将新消息存入Q1队列,如果为空则将新消息存入Q4队列)
  State2 = case ?QUEUE:is_empty(Q3) of
               %% 如果Q3队列不为空,则将当前的消息写入Q1队列
               false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
               %% 如果Q3队列为空,则将当前的消息写入Q4队列
               true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
           end,
  %% 进入队列中的消息数量加一
  InCount1 = InCount + 1,
  %% 如果消息需要确认,将该消息加入unconfirmed字段
  UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
  %% 更新队列进程中的状态信息
  State3 = stats({1, 0}, {none, MsgStatus1},
                 %% 更新下一个消息在消息中的位置
                 State2#vqstate{ next_seq_id = SeqId + 1, in_counter = InCount1, unconfirmed = UC1 }),
  %% RabbitMQ系统中使用的内存过多,此操作是将内存中的队列数据写入到磁盘中
  a(reduce_memory_use(maybe_update_rates(State3))).

消息入队时先判断Q3是否为空,如果Q3为空,则直接进入Q4,否则进入Q1,这里思考下为什么?
假如Q3为空,Delta一定为空,因为假如Delta不为空,那么Q3取出最后一个消息的时候Delta已经把消息转移到Q3了,这样Q3就不是空了,前后矛盾因此Delta一定是空的。同理可以推测出Q2、Q1都是空的,直接把消息放入Q4即可。
消息入队后,需要判断内存使用,调用reduce_memory_use函数:

reduce_memory_use(State = #vqstate {
                                    ram_pending_ack = RPA,
                                    ram_msg_count = RamMsgCount,
                                    target_ram_count = TargetRamCount,
                                    rates = #rates { in = AvgIngress,
                                                     out = AvgEgress,
                                                     ack_in = AvgAckIngress,
                                                     ack_out = AvgAckEgress } }) -> State1 = #vqstate { q2 = Q2, q3 = Q3 } =
                        %% 得到当前在内存中的数量超过允许在内存中的最大数量的个数
                        case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
                            0 -> State;
                            %% Reduce memory of pending acks and alphas.The order is
                            %% determined based on which is growing faster. Whichever
                            %% comes second may very well get a quota of 0 if the
                            %% first manages to push out the max number of messages.
                            S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
                                                   (AvgIngress - AvgEgress)) of
                                             %% ack操作进入的流量大于消息进入的流量,则优先将等待ack的消息写入磁盘文件
                                             true -> [
                                                      %% 限制内存中的等待ack的消息(将消息内容在内存中的等待ack的消息的消息内容写入磁盘文件)
                                                      fun limit_ram_acks/2,
                                                      %% 将Quota个alphas 类型的消息转化为betas类型的消息(Q1和Q4队列都是alphas类型的消息)
                                                      fun push_alphas_to_betas/2
                                                     ];
                                             %% 消息进入的流量大于ack操作进入的消息流量,则优先将非等待ack的消息写入磁盘文件
                                             false -> [
                                                       %% 将Quota个alphas类型的消息转化为betas类型的消息(Q1和Q4队列都是alphas类型的消息)
                                                       fun push_alphas_to_betas/2,
                                                       %% 限制内存中的等待ack的消息(将消息内容在内存中的等待ack的消息的消息内容写入磁盘文件)
                                                       fun limit_ram_acks/2
                                                      ]
                                          end,
                            %% 真正执行转化的函数
                            {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> ReduceFun(QuotaN, StateN)
                                                      end, {S1, State}, Funs), 
                            State2
                       end,
  %% 当前beta类型的消息大于允许的beta消息的最大值,则将beta类型多余的消息转化为deltas类型的消息
  case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
                  permitted_beta_count(State1)) of
      S2 when S2 >= ?IO_BATCH_SIZE ->
          %% 将S2个betas类型的消息转化为deltas类型的消息
          push_betas_to_deltas(S2, State1);
      _ ->
          State1
  end.

%% 将Quota个alphas类型的消息转化为betas类型的消息(Q1和Q4队列都是alphas类型的消息)
push_alphas_to_betas(Quota, State) ->
  %% 将Q1队列中消息转化为betas类型的消息
  %% 如果磁盘中没有消息,则将Q1中的消息存储到Q3队列,如果磁盘中有消息则将Q3队列中的消息存储到Q2队列(将Q1队列头部的元素放入到Q2或者Q3队列的尾部)
  {Quota1, State1} =
    push_alphas_to_betas(
      fun ?QUEUE:out/1,
      fun (MsgStatus, Q1a,
        %% 如果delta类型的消息的个数为0,则将该消息存入存入Q3队列
        State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
          State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
        %% 如果delta类型的消息个数不为0,则将该消息存入Q2队列
        (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
          State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
      end,
      Quota, State #vqstate.q1, State),
  %% 将Q4队列中消息转化为betas类型的消息(Q4 -> Q3)(将Q4队列尾部的元素不断的放入到Q3队列的头部)
  {Quota2, State2} =
    push_alphas_to_betas(
      fun ?QUEUE:out_r/1,
      fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) ->
               State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a }
      end,
      Quota1, State1 #vqstate.q4, State1),
  {Quota2, State2}.

%% 限制内存中的等待ack的消息(将消息内容在内存中的等待ack的消息的消息内容写入磁盘文件)
limit_ram_acks(0, State) -> {0, State};
limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
  case gb_trees:is_empty(RPA) of
    true ->
      {Quota, State};
    false ->
      %% 拿到队列索引最大的消息
      {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
      %% 内存不足,强制性的将等待ack的SeqId消息内容写入磁盘
      {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
      %% 如果成功的将消息写入磁盘,则将内存中的消息体字段清空
      MsgStatus2 = m(trim_msg_status(MsgStatus1)),
      %% 更新存储在磁盘中等待ack的消息字段disk_pending_ack,将刚才从存储在内存中等待ack的消息字段ram_pending_ack中的SeqId存储到disk_pending_ack字段中
      DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
      %% 更新队列状态,同时更新最新的ram_pending_ack和disk_pending_ack字段
      limit_ram_acks(Quota - 1,
                     %% 主要是更新内存中保存的消息大小(ram_bytes减去当前写入磁盘的消息的大小)
                     stats({0, 0}, {MsgStatus, MsgStatus2},
                           State1 #vqstate { ram_pending_ack = RPA1,
                                             disk_pending_ack = DPA1 }))
  end.

每次入队消息后,判断RabbitMQ系统中使用的内存是否过多,此操作是尝试将内存中的队列数据写入到磁盘中. 内存中的消息数量(RamMsgCount)及内存中的等待ack的消息数量(RamAckIndex)的和大于允许的内存消息数量(TargetRamCount)时,多余数量的消息内容会被写到磁盘中。

消息出队源码分析

获取消息:

  1. 尝试从q4队列中获取一个消息,如果成功,则返回获取到的消息,如果失败,则尝试通过试用fetch_from_q3/1从q3队列获取消息,成功则返回,如果为空则返回空;
  2. 注意fetch_from_q3从Q3获取消息,如果Q3为空,则说明整个队列都是空的,无消息,消费者等待即可。

取出消息后:

  1. 如果Q4不为空,取出消息后直接返回;
  2. 如果Q4为空,Q3不为空,从Q3取出消息后,判断Q3是否为空,如果Q3为空,Delta不为空,则将Delta中的消息转移到Q3中,下次直接从Q3消费;
  3. 如果Q3和Delta都是空的,则可以任务Delta和Q2的消息都是空的,此时将Q1的消息转移到Q4,下次直接从Q4消费即可。
%% 从队列中获取消息
queue_out(State = #vqstate { q4 = Q4 }) ->
  %% 首先尝试从Q4队列中取得元素(Q4队列中的消息类型为alpha)
  case ?QUEUE:out(Q4) of
    {empty, _Q4} ->
      %% 如果Q4队列为空则从Q3队列中取得元素(如果Q3也为空,则直接返回空)
      case fetch_from_q3(State) of
        {empty, _State1} = Result -> Result;
        {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
      end;
    {{value, MsgStatus}, Q4a} -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
  end.

%% 从队列Q3中读取消息
fetch_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, q4 = Q4 }) ->
  %% 先从Q3队列中取元素(如果为空,则直接返回为空)
  case ?QUEUE:out(Q3) of
    {empty, _Q3} ->
      {empty, State};
    {{value, MsgStatus}, Q3a} ->
      State1 = State #vqstate { q3 = Q3a },
      State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
                   {true, true} ->
                       %% 当这两个队列都为空时,可以确认q2也为空,也就是这时候,q2,q3,delta,q4都为空,那么,q1队列的消息可以直接转移到q4,下次获取消息时就可以直接从q4获取
                       %% q3 is now empty, it wasn't before;
                       %% delta is still empty. So q2 must be
                       %% empty, and we know q4 is empty
                       %% otherwise we wouldn't be loading from
                       %% q3. As such, we can just set q4 to Q1.
                       %% 当Q3队列为空,且磁盘中的消息数量为空,则断言Q2队列为空
                       true = ?QUEUE:is_empty(Q2), %% ASSERTION
                       %% 当Q3队列为空,且磁盘中的消息数量为空,则断言Q4队列为空
                       true = ?QUEUE:is_empty(Q4), %% ASSERTION
                       %% 从Q3队列中取走消息后发现Q3队列为空,同时磁盘中没有消息,则将Q1队列中的消息放入Q4队列
                       State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
                   {true, false} ->
                       %% 从Q3队列中取走消息后发现Q3队列为空,q3空,delta非空,这时候就需要从delta队列(内容与索引都在磁盘上,通过maybe_deltas_to_betas/1调用)读取消息,并转移到q3队列
                       maybe_deltas_to_betas(State1);
                   {false, _} ->
                       %% q3非空,直接返回,下次获取消息还可以从q3获取
                       %% q3 still isn't empty, we've not
                       %% touched delta, so the invariants
                       %% between q1, q2, delta and q3 are
                       %% maintained
                       State1
               end,
      {loaded, {MsgStatus, State2}}
  end.

转移Delta消息到Q3源码分析:

%% 从磁盘中读取队列数据到内存中来(从队列消息中最小索引ID读取出一个索引磁盘文件大小的消息索引信息)
%% 从队列索引的磁盘文件将单个磁盘文件中的消息索引读取出来
%% 该操作是将单个队列索引磁盘文件中的deltas类型消息转换为beta类型的消息
maybe_deltas_to_betas(State = #vqstate {
                                        q2 = Q2,
                                        delta = Delta,
                                        q3 = Q3,
                                        index_state = IndexState,
                                        ram_msg_count = RamMsgCount,
                                        ram_bytes = RamBytes,
                                        ram_pending_ack = RPA,
                                        disk_pending_ack = DPA,
                                        qi_pending_ack = QPA,
                                        disk_read_count = DiskReadCount,
                                        transient_threshold = 
TransientThreshold }) ->
    #delta { start_seq_id = DeltaSeqId,
             count = DeltaCount,
             end_seq_id = DeltaSeqIdEnd } = Delta,
             %% 根据delta中的开始DeltaSeqId得到存在索引磁盘的最小的磁盘索引号
    DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]),
    %% 从队列索引中读取消息索引(从队列索引的磁盘文件将单个磁盘文件中的消息索引读取出来)
    {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
    %% 过滤掉从rabbit_queue_index中读取过来的消息队列索引(如果该消息不是持久化的则需要删除掉),最后得到当前内存中准备好的消息个数以及内存中的消息的总的大小
    {Q3a, RamCountsInc, RamBytesInc, IndexState2} =
         %% RabbitMQ系统关闭以前非持久化消息存储到磁盘中的索引信息再从磁盘读取出来的时候必须将他们彻底从RabbitMQ系统中删除
         betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState1),
    %% 更新队列消息索引结构,内存中队列中的消息个数,队列内存中消息占的大小,以及从磁盘文件读取的次数
    State1 = State #vqstate { index_state = IndexState2,
                              ram_msg_count = RamMsgCount + RamCountsInc,
                              ram_bytes = RamBytes + RamBytesInc,
                              disk_read_count = DiskReadCount + RamCountsInc},
    case ?QUEUE:len(Q3a) of
      0 ->
        %% we ignored every message in the segment due to it being
        %% transient and below the threshold
        %% 如果读取的当前消息队列索引磁盘文件中的操作项为空,则继续读下一个消息索引磁盘文件中的操作项
        maybe_deltas_to_betas(
          State1 #vqstate {
                           delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
      Q3aLen ->
        %% 将从索引中读取出来的消息索引存储到Q3队列(将新从磁盘中读取的消息队列添加到老的Q3队列的后面)
        Q3b = ?QUEUE:join(Q3, Q3a),
        case DeltaCount - Q3aLen of
          0 ->
            %% 如果读取出来的长度和队列索引的总长度相等,则delta信息被重置为消息个数为0,同时q2中的消息转移到q3队列
            %% delta is now empty, but it wasn't before, so
            %% can now join q2 onto q3
            State1 #vqstate { q2 = ?QUEUE:new(),
                              delta = ?BLANK_DELTA,
                              %% 如果磁盘中已经没有消息,则将Q2队列中的消息放入Q3队列
                              q3 = ?QUEUE:join(Q3b, Q2) };
          N when N > 0 ->
            %% 得到最新的队列消息磁盘中的信息
            Delta1 = d(#delta { start_seq_id = DeltaSeqId1, count = N, end_seq_id = DeltaSeqIdEnd }),
          %% 更新最新的q3队列和磁盘信息结构
          State1 #vqstate { delta = Delta1, q3 = Q3b }
    end
  end.

总结

节点消息堆积较多时,这些堆积的消息很快就会进入很深的队列中去,这样会增加处理每个消息的平均开销,整个系统的处理能力就会降低。因为要花更多的时间和资源处理堆积的消息,后流入的消息又被挤压到很深的队列中了,系统负载越来越恶化。
因此RabbitMQ使用时一定要注意磁盘占用监控和流控监控,这些在控制台上都可以看到,一般来说如果消息堆积过多建议增加消费者或者增强每个消费者的消费能力(比如调高prefetch_count消费者一次收到的消息可以提高单个消费者消费能力)。

启动过程

看启动过程源码:
首先从一个脚本开始:启动RabbitMQ需要使用脚本:rabbitmq-server
image.png
在172行调用了start_rabbitmq_server函数
image.png
image.png
该函数在92行设置了RABBITMQ_START_RABBIT变量的值。
为什么是92行?因为在81行判断了环境变量USE_RABBIT_BOOT_SCRIPT的值,false。因为没有USE_RABBIT_BOOT_SCRIPT环境变量。
这里在执行rabbitmq-server脚本的时候,在该脚本执行了一遍rabbitmq-env脚本,在rabbitmqenv中执行了一遍rabbitmq-defaults脚本。看完后发现没有USE_RABBIT_BOOT_SCRIPT。
image.png
在第110行使用了RABBITMQ_START_RABBIT的扩展,也就是 -s rabbit boot ,它表示erlang要调用rabbit模块的boot函数。
image.png
image.png
模块中的boot函数:
image.png
在模块中导出了boot/0函数。
boot/0函数的具体实现:
image.png
调用了start_it(transient)函数,参数的值就是transient。
image.png
start_it函数首先调用了spawn_boot_marker()函数,然后对其结果做分支匹配。
spawn_boot_marker()函数:
image.png
该函数什么也不做,仅仅是注册了一个进程,标志着RabbitMQ正在启动中。。。远程RabbitMQ节点可以访问到这个状态。
该函数中调用了register函数,注册进程。如果注册成功了,则开始启动RabbitMQ,如果注册失败,则表示RabbitMQ已经在启动中了。
image.png
image.png
首先确保该模块已经启动成功了。
Erlang内核的application.erl:
ensure_all_started函数如下:
image.png
函数中调用了ensure_all_started函数:
image.png
首先start该应用:rabbitmq_prelaunch
启动成功了,就返回{ok, [Application|Started]}
rabbit应用的ensure_all_started也是这个流程。
如果一切正常,rabbit和rabbitmq_prelaunch就都启动成功了。
启动的时候要回调rabbit的方法:start
image.png
该方法中:
调用了run_prelaunch_second_phase()函数:
image.png
image.png

消息的发送

使用channel.basicPublish()方法发送消息:
image.png
该抽象方法有如下实现:
image.png
究竟是AutorecoveringChannel还是ChannelN还是PublisherCallbackChannelImpl,要看设置。
我们经常用的是ChannelN:
比如发送消息:
image.png
要看channel的来源:
image.png
查看createChannel方法的具体实现:
image.png
究竟是AMQConnection还是AutorecoveringConnection?
需要看
image.png
打开该方法的实现:
image.png
看newConnection方法的实现:
image.png
看newConnection方法的实现:
image.png
看newConnection方法的实现:
image.png
看哪里返回了Connection对象:
image.png
1131行返回Connection的AutorecoveringConnection对象。
前提是isAutomaticRecoveryEnabled()方法返回true。
该方法何时返回true?
image.png
如果在创建ConnectioFactory的时候设置了setAutomaticRecoveryEnabled为true,则1130行的AutorecoveringConnection对象返回。
image.png
1141行返回AMQConnection对象。
在AMQConnection类中,查看createChannel()方法返回的Channel是哪个实现:
image.png
上述源码中,需要查看_channelManager的createChannel方法的返回值。
首先需要知道_channelManager是哪个类的对象:
image.png
通过搜索发现只有414行给_channelManager赋值。通过调用instantiateChannelManager方法赋值的。
看instantiateChannelManager的实现:
image.png
该方法有两个实现,查看AMQConnection中的实现:
image.png
此处使用的是ChannelManager类。
回到前面:
看该类的createChannel方法返回的是哪个对象:
实现一:
image.png
实现二:
image.png
两个实现的区别在于有没有传递通道编号。
回到前面:
在发送消息的时候调用basicPublish方法,实际上就是ChannelN的方法:
image.png
ChannelN中三个重载的basicPublish方法:
第一个方法:
image.png
第二个方法:
image.png
第三个方法:
image.png
最终调用的都是第三个实现。
在第三个实现中,
image.png
如果没有设置消息头,则设置最基本的消息头设置:
image.png
其中,
contentType表示内容类型,也就是MIME类型。
contentEncoding表示编码类型:如UTF-8
headers表示用户自定义的消息属性,键值对形式,Map<String, Object>
deliveryMode表示消息投递的模式,1表示瞬时消息,2表示持久化消息
priority表示消息的优先级,0~9,数字越大,优先级越高。
correlationId表示关联ID,一般用在RabbitMQ的请求/响应模式中,关联请求消息的ID
replyTo表示RabbitMQ的请求/响应模式中,响应消息要发送到的消息队列。
expiration表示消息的过期时间
messageId每个消息都有一个消息ID,该ID值要么手动设置,要么由系统自动生成,用于唯一标识消息。
timestamp表示消息被发送的时间戳。这个时间戳并不是精确的消息被发送出的时间,而是在消息放到发送队列到发送完成之间的任何时间。
type消息的类型,通常用语指定消息的序列化反序列化类型。
userId使用user-id属性来标识已登录的用户
appId在处理消息之前检查app-id允许应用程序丢弃那些来源不明或者不受支持的消息
clusterId:AMQP 0-9-1将cluster-id属性重新命名为reserved,并声明它必须为空,虽然RabbitMQ目前没有根据规范要求它是空的,但是最好规避这个属性。
image.png
AMQCommand是AMQP规定的命令,用于跟RabbitMQ交互。命令中指定具体的操作,比如上文中命令的属性是Basic.Publish,也就是AMQP的发布消息。
mandatory表示如果一个消息无法被交换器路由,则如果该值设置为0,则服务器悄无声息的丢弃,否则使用AMQP的Return退还给发布者。
immediate如果该值设置为0,则当消息一到达交换器,就立即投递给消费者。如果消费者不在线或不能立即投递给消费者,则服务器无法保证该消息被消费。如果设置为1,则如果消息不能被立即投递给消费者,则使用AMQP的Return命令退还给发布者。
transmit用于执行该命令,发布消息。
最后一行用于发送统计消息。
transmit方法的实现:
image.png
quiescingTransmit()用于执行AMQP命令:
image.png
要在通道上执行命令,首先获取通道的共享锁,实际上就是一个Connection可以有多个通道来操作,每个通道属于一个线程,连接是多线程共享的,因此需要获取该共享锁,以操作Connection。在获取锁之后,如果此时发送线程需要阻塞,就让共享锁等待,直到被唤醒。
c.transmit(this)用于通过通道执行命令:
image.png
该方法首先获取通道号码,然后获取AMQConnection连接。

/**
* Sends this command down the named channel on the channel's
* connection, possibly in multiple frames.
* @param channel the channel on which to transmit the command
* @throws IOException if an error is encountered
*/
public void transmit(AMQChannel channel) throws IOException {
  int channelNumber = channel.getChannelNumber();
  AMQConnection connection = channel.getConnection();
  synchronized (assembler) {
    Method m = this.assembler.getMethod();
    if (m.hasContent()) {
      byte[] body = this.assembler.getContentBody();
      Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
      // 获取协议协商的帧大小
      int frameMax = connection.getFrameMax();
      Boolean cappedFrameMax = frameMax > 0;
      int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
      // 如果消息头帧大小大于协议协商的帧大小,则抛异常。
      if (cappedFrameMax && headerFrame.size() > frameMax) {
        String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
        throw new IllegalArgumentException(msg);
      }
      // 发送要执行的AMQP方法
      connection.writeFrame(m.toFrame(channelNumber));
      // 发送消息头帧
      connection.writeFrame(headerFrame);
      // 封装消息帧,有可能有多个消息帧需要发送
      for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
        int remaining = body.length - offset;
        int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
        Frame frame = Frame.fromBodyFragment(channelNumber, body, offset, fragmentLength);
        // 发送消息体帧,有可能多个
        connection.writeFrame(frame);
      }
    } else {
      // 如果要执行的AMQP方法没有数据,则只发送命令帧。
      connection.writeFrame(m.toFrame(channelNumber));
    }
  }
  connection.flush();
}

connection.writeFrame将消息帧发送到哪里了?
image.png
注释说直接将消息帧发送给broker,但实际上并非如此。
_frameHandler.writeFrame(f)用于写消息帧,写到哪里了?
image.png
有两个实现,究竟是哪个?需要判断。且看_frameHandler的源码:
image.png
该属性只在构造器中初始化过,传过来的frameHandler是哪个?
要看在哪里创建AMQConnection对象的。
image.png
调用方法createConnection创建AMQConnection对象:
image.png
下图的FrameHandler对象是哪个?
image.png
image.png
两个实现,该用哪个?
看fhFactory是哪个:
image.png
看createFrameHandlerFactory的实现:
image.png
如果使用nio,则是SocketChannelFrameHandlerFactory,否则是SocketFrameHandlerFactory。
看nio的值:
image.png
究竟调用了什么方法?
useNio还是useBlockingIo?
image.png
在我们的代码中可以手动调用。默认nio的值是:
image.png
默认是false,非nio。
默认使用的就是SocketFrameHandlerFactory这个类。
image.png
首先查看SocketFrameHandler的writeFrame实现:
image.png
由于是阻塞IO,此处直接使用输出流输出:
输出流:_outputStream的赋值:
image.png
平淡无奇。
frame.writeTo(_outputStream)的实现:
image.png

消息的消费

两种方式:推拉

拉消息:
image.png
推消息:
image.png

拉消息的代码实现

image.png
上图中,basicGet的具体实现是哪个?
image.png
现在的Channel究竟是哪个类型?ChannelN还是AurecoveringChannel?
image.png
看该方法的返回值
image.png
该方法在两个类中都存在,需要查看ConnectionFactory的方法返回的是哪个Connection:
image.png
image.png
如果isAutomaticRecoveryEnabled()返回true,则返回的Connection是AutorecoveringConnection的实例。
如果isAutomaticRecoveryEnabled()返回false,则返回的是:
image.png
看createConnection方法的返回值是什么类型的:
image.png
就是AMQConnection类型的对象。
最简单的判断方式就是直接打印connection的class信息:
image.png
发现connection是AutoreceoveringConnection类型的对象。
isAutomaticRecoveryEnabled()返回的是true还是false?
image.png
ConnectionFactory类中该属性的默认值是true。
还有两处涉及到变量automaticRecovery的:
image.png
image.png
上图中的方法表示可以使用ConnectionFactory对象设置是否启用自动恢复特性。
默认Connection是AutorecoveringConnection类型的对象。
看下面的代码中channel的类型:
image.png
image.png
最终的返回值是wrapChannel方法调用的返回值:
image.png
我们使用的channel的默认类型是AutorecoveringChannel。
image.png
看AutorecoveringChannel的basicGet实现:
image.png
delegate是哪个?
image.png
ChannelN.java中1149行是该方法的实现:

@Override
public GetResponse basicGet(String queue, Boolean autoAck) throws IOException {
  validateQueueNameLength(queue);
  // 发送RPC请求,返回AMQCommand响应信息。
  AMQCommand replyCommand = exnWrappingRpc(new Basic.Get.Builder().queue(queue).noAck(autoAck).build());
  // 获取响应的方法
  Method method = replyCommand.getMethod();
  // 如果响应的方法是Basic.Ok类型的,则表示获取消息成功
  if (method instanceof Basic.GetOk) {
    // 向下转型
    Basic.GetOk getOk = (Basic.GetOk)method;
    // 使用Envelop封装响应的信息,包括消息ID,是否是重发的,交换器,路由键。
    Envelope envelope = new Envelope(getOk.getDeliveryTag(), getOk.getRedelivered(), getOk.getExchange(), getOk.getRoutingKey());
    // 获取消息的属性
    BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
    // 获取消息体内容
    byte[] body = replyCommand.getContentBody();
    // 获取basic.getok.messagecount的值,此处是5
    int messageCount = getOk.getMessageCount();
    metricsCollector.consumedMessage(this, getOk.getDeliveryTag(), autoAck);
    // 实例化GetResponse对象并赋值返回。
    return new GetResponse(envelope, props, body, messageCount);
  } else if (method instanceof Basic.GetEmpty) {
    return null;
  } else {
    throw new UnexpectedMethodError(method);
  }
}

如何获取消息的?发送RPC请求:
image.png
看该方法的实现:
image.png
privateRpc(m)的实现:
image.png
上述代码中,rpc(m, k)发送请求消息。
k.getReply()方法是一个阻塞的方法,等待broker返回响应。
image.png
rpc方法的具体私实现:
image.png
quiescingRpc(m, k)的具体实现:
image.png
enqueueRpc(k)具体实现:
image.png
使用的channel的默认实现是:AutorecoveringChannel,该类中包含
image.png
RecoveryAwareChannelN是
image.png
ChannelN的子类。
ChannelN又是AMQChannel的子类。
所以enqueueRpc方法应该看ChannelN的实现方式:
image.png
调用了父类的enqueueRpc方法是父类的:
image.png
看doEnqueueRpc的具体实现:

推消息的代码实现

image.png
看basicConsume的具体实现:
ChannelN中1343行代码:

public String basicConsume(String queue, final Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, Map<String, Object> arguments, final Consumer callback) throws IOException {
  // 构建请求方法
  final Method m = new Basic.Consume.Builder().queue(queue).consumerTag(consumerTag).noLocal(noLocal).noAck(autoAck).exclusive(exclusive).arguments(arguments).build();
  BlockingRpcContinuation<String> k = new BlockingRpcContinuation<String> (m) {
    @Override
    public String transformReply(AMQCommand replyCommand) {
      String actualConsumerTag = ((Basic.ConsumeOk)
      replyCommand.getMethod()).getConsumerTag();
      _consumers.put(actualConsumerTag, callback);
      // need to register consumer in stats before it actually starts consuming
      metricsCollector.basicConsume(ChannelN.this, actualConsumerTag, autoAck);
      dispatcher.handleConsumeOk(callback, actualConsumerTag);
      return actualConsumerTag;
    }
  }
  ;
  rpc(m, k);
  try {
    if(_rpcTimeout == NO_RPC_TIMEOUT) {
      return k.getReply();
    } else {
      try {
        return k.getReply(_rpcTimeout);
      }
      catch (TimeoutException e) {
        throw wrapTimeoutException(m, e);
      }
    }
  }
  catch(ShutdownSignalException ex) {
    throw wrap(ex);
  }
}