RocketMq消息处理整个流程如下: 本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈 消息接收:消息接收是指接收producer的消息,处理类是SendMe
RocketMq消息处理整个流程如下: 本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
以上就是rocketMq处理消息的流程了,接下来我们就从源码来分析消息投递的实现。 1. 处理PULL_MESSAGE请求与producer不同,consumer从broker拉取消息时,发送的请求code为PULL_MESSAGE,processor为PullMessageProcessor,我们直接进入它的processRequest方法:
这个方法就只是调用了一个重载方法,多出来的参数true表示允许broker挂起请求,我们继续,
在源码中,这个方法也是非常长,这里我抹去了各种细枝末节,仅留下了一些重要的流程,整个处理流程如下:
以上代码还是比较清晰的,相关流程代码中都作了注释。 以上流程就是整个消息的获取流程了,在本文中,我们仅关注与获取消息相关的步骤,重点关注以下两个操作:
2. 获取消息获取消息的方法为DefaultMessageStore#getMessage,代码如下:
这个方法不是比较长的,这里仅保留了关键流程,获取消息的关键流程如下:
经过以上步骤,消息就能获取到了,不过在获取消息的前后,会进行消息过滤操作,即根据tag或sql语法来过滤消息,关于消息过滤的一些细节,我们留到后面消息过滤相关章节作进一步分析。 3. 挂起请求:PullRequestHoldService#suspendPullRequest当broker无新消息时,consumer拉取消息的请求就会挂起,方法为PullRequestHoldService#suspendPullRequest:
在suspendPullRequest方法中,所做的工作仅是把当前请求放入pullRequestTable中了。从代码中可以看到,pullRequestTable是一个ConcurrentMap,key 是 topic@queueId,value 就是挂起的请求了。 请求挂起后,何时处理呢?这就是PullRequestHoldService线程的工作了。 3.1 处理挂起请求的线程:PullRequestHoldService看完PullRequestHoldService#suspendPullRequest方法后,我们再来看看PullRequestHoldService。 PullRequestHoldService是ServiceThread的子类(上一次看到ServiceThread的子类还是ReputMessageService),它也会启动一个新线程来处理挂起操作。 我们先来看看它是在哪里启动PullRequestHoldService的线程的,在BrokerController的启动方法start()中有这么一行: BrokerController#start
这里就是启动pullRequestHoldService的线程操作了。 为了探究这个线程做了什么,我们进入PullRequestHoldService#run方法:
从代码来看,这个线程先是进行等待,然后调用PullRequestHoldService#checkHoldRequest方法,看来关注就是这个方法了,它的代码如下:
这个方法调用了PullRequestHoldService#notifyMessageArriving(...),我们继续进入:
这个方法就是用来检查是否有新消息送达的操作了,方法虽然有点长,但可以用一句话来总结:如果有新消息送达,或者pullRquest hold住的时间到了,就唤醒pullRquest(即调用PullMessageProcessor#executeRequestWhenWakeup方法)。
3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup我们再来看看 PullMessageProcessor#executeRequestWhenWakeup 方法:
这个方法准备了一个任务,然后将其提交到线程池中执行,任务内容很简单,仅是调用了PullMessageProcessor#processRequest(...) 方法,这个方法就是本节一始提到的处理consumer拉取消息的方法了。 3.3 消息分发中唤醒consumer请求在分析消息分发流程时,DefaultMessageStore.ReputMessageService#doReput方法中有这么一段:
这段就是用来主动唤醒hold住的consumer请求的,我们进入NotifyMessageArrivingListener#arriving方法:
最终它也是调用了 PullRequestHoldService#notifyMessageArriving(...) 方法。 总结本文主要分析了broker处理PULL_MESSAGE请求的流程,总结如下:
|
2021-06-05
2021-05-27
2021-05-26
2021-06-05
2021-05-16