广告位联系
返回顶部
分享到

RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析

java 来源:互联网 作者:佚名 发布时间:2023-04-05 12:34:39 人浏览
摘要

RocketMq消息处理整个流程如下: 本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈 消息接收:消息接收是指接收producer的消息,处理类是SendMe

RocketMq消息处理整个流程如下:

本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

  • 消息接收:消息接收是指接收producer的消息,处理类是SendMessageProcessor,将消息写入到commigLog文件后,接收流程处理完毕;
  • 消息分发:broker处理消息分发的类是ReputMessageService,它会启动一个线程,不断地将commitLong分到到对应的consumerQueue,这一步操作会写两个文件:consumerQueue与indexFile,写入后,消息分发流程处理 完毕;
  • 消息投递:消息投递是指将消息发往consumer的流程,consumer会发起获取消息的请求,broker收到请求后,调用PullMessageProcessor类处理,从consumerQueue文件获取消息,返回给consumer后,投递流程处理完毕。

以上就是rocketMq处理消息的流程了,接下来我们就从源码来分析消息投递的实现。

1. 处理PULL_MESSAGE请求

与producer不同,consumer从broker拉取消息时,发送的请求code为PULL_MESSAGE,processor为PullMessageProcessor,我们直接进入它的processRequest方法:

1

2

3

4

5

6

@Override

public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request)

        throws RemotingCommandException {

    // 调用方法

    return this.processRequest(ctx.channel(), request, true);

}

这个方法就只是调用了一个重载方法,多出来的参数true表示允许broker挂起请求,我们继续,

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

/**

 * 继续处理

 */

private RemotingCommand processRequest(final Channel channel, RemotingCommand request,

        boolean brokerAllowSuspend)throws RemotingCommandException {

    RemotingCommand response = RemotingCommand

        .createResponseCommand(PullMessageResponseHeader.class);

    final PullMessageResponseHeader responseHeader

        = (PullMessageResponseHeader) response.readCustomHeader();

    final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader)

        request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

    response.setOpaque(request.getOpaque());

    // 省略权限校验流程

    // 1. rocketMq 可以设置校验信息,以阻挡非法客户端的连接

    // 2. 同时,对topic可以设置DENY(拒绝)、ANY(PUB 或者 SUB 权限)、PUB(发送权限)、SUB(订阅权限)等权限,

    //    可以细粒度控制客户端对topic的操作内容

    ...

    // 获取订阅组

    SubscriptionGroupConfig subscriptionGroupConfig =

        this.brokerController.getSubscriptionGroupManager()

        .findSubscriptionGroupConfig(requestHeader.getConsumerGroup());

    ...

    // 获取订阅主题

    TopicConfig topicConfig = this.brokerController.getTopicConfigManager()

        .selectTopicConfig(requestHeader.getTopic());

    ...

    // 处理filter

    // consumer在订阅消息时,可以对订阅的消息进行过滤,过滤方法有两种:tag与sql92

    // 这里我们重点关注拉取消息的流程,具体的过滤细节后面再分析

    ...

    // 获取消息

    // 1. 根据 topic 与 queueId 获取 ConsumerQueue 文件

    // 2. 根据 ConsumerQueue 文件的信息,从 CommitLog 中获取消息内容

    final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(

        requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(),

        requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

    if (getMessageResult != null) {

        // 省略一大堆的校验过程

        ...

        switch (response.getCode()) {

            // 表示消息可以处理,这里会把消息内容写入到 response 中

            case ResponseCode.SUCCESS:

                ...

                // 处理消息消息内容,就是把消息从 getMessageResult 读出来,放到 response 中

                if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {

                    final long beginTimeMills = this.brokerController.getMessageStore().now();

                    // 将消息内容转为byte数组

                    final byte[] r = this.readGetMessageResult(getMessageResult,

                        requestHeader.getConsumerGroup(), requestHeader.getTopic(),

                        requestHeader.getQueueId());

                    ...

                    response.setBody(r);

                } else {

                    try {

                        // 消息转换

                        FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(

                            getMessageResult.getBufferTotalSize()), getMessageResult);

                        channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {

                            ...

                        });

                    } catch (Throwable e) {

                        ...

                    }

                    response = null;

                }

                break;

            // 未找到满足条件的消息

            case ResponseCode.PULL_NOT_FOUND:

                // 如果支持挂起,就挂起当前请求

                if (brokerAllowSuspend && hasSuspendFlag) {

                    ...

                    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,

                        this.brokerController.getMessageStore().now(), offset, subscriptionData,

                        messageFilter);

                    // 没有找到相关的消息,挂起操作

                    this.brokerController.getPullRequestHoldService()

                        .suspendPullRequest(topic, queueId, pullRequest);

                    response = null;

                    break;

                }

            // 省略其他类型的处理

            ...

                break;

            default:

                assert false;

        }

    } else {

        response.setCode(ResponseCode.SYSTEM_ERROR);

        response.setRemark("store getMessage return null");

    }

    ...

    return response;

}

在源码中,这个方法也是非常长,这里我抹去了各种细枝末节,仅留下了一些重要的流程,整个处理流程如下:

  • 权限校验:rocketMq 可以设置校验信息,以阻挡非法客户端的连接,同时也可以设置客户端的发布、订阅权限,细节度控制访问权限;
  • 获取订阅组、订阅主题等,这块主要是通过请求消息里的内容获取broker中对应的记录
  • 创建过滤组件:consumer在订阅消息时,可以对订阅的消息进行过滤,过滤方法有两种:tag与sql92
  • 获取消息:先是根据 topic 与 queueId 获取 ConsumerQueue 文件,根据 ConsumerQueue 文件的信息,从 CommitLog 中获取消息内容,消息的过滤操作也是发生在这一步
  • 转换消息:如果获得了消息,就是把具体的消息内容,复制到reponse中
  • 挂起请求:如果没获得消息,而当前请求又支持挂起,就挂起当前请求

以上代码还是比较清晰的,相关流程代码中都作了注释。

以上流程就是整个消息的获取流程了,在本文中,我们仅关注与获取消息相关的步骤,重点关注以下两个操作:

  • 获取消息
  • 挂起请求

2. 获取消息

获取消息的方法为DefaultMessageStore#getMessage,代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

public GetMessageResult getMessage(final String group, final String topic, final int queueId,

        final long offset, final int maxMsgNums, final MessageFilter messageFilter) {

    // 省略一些判断

    ...

    // 根据topic与queueId一个ConsumeQueue,consumeQueue记录的是消息在commitLog的位置

    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);

    if (consumeQueue != null) {

        minOffset = consumeQueue.getMinOffsetInQueue();

        maxOffset = consumeQueue.getMaxOffsetInQueue();

        if (...) {

            // 判断 offset 是否符合要求

            ...

        } else {

            // 从 consumerQueue 文件中获取消息

            SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);

            if (bufferConsumeQueue != null) {

                ...

                for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount;

                    i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {

                    // 省略一大堆的消息过滤操作

                    ...

                    // 从 commitLong 获取消息

                    SelectMappedBufferResult selectResult

                            = this.commitLog.getMessage(offsetPy, sizePy);

                    if (null == selectResult) {

                        if (getResult.getBufferTotalSize() == 0) {

                            status = GetMessageStatus.MESSAGE_WAS_REMOVING;

                        }

                        nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);

                        continue;

                    }

                    // 省略一大堆的消息过滤操作

                    ...

                }

            }

    } else {

        status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;

        nextBeginOffset = nextOffsetCorrection(offset, 0);

    }

    if (GetMessageStatus.FOUND == status) {

        this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();

    } else {

        this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();

    }

    long elapsedTime = this.getSystemClock().now() - beginTime;

    this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);

    getResult.setStatus(status);

    // 又是处理 offset

    getResult.setNextBeginOffset(nextBeginOffset);

    getResult.setMaxOffset(maxOffset);

    getResult.setMinOffset(minOffset);

    return getResult;

}

这个方法不是比较长的,这里仅保留了关键流程,获取消息的关键流程如下:

  • 根据topic与queueId找到ConsumerQueue
  • 从ConsumerQueue对应的文件中获取消息信息,如tag的hashCode、消息在commitLog中的位置信息
  • 根据位置信息,从commitLog中获取完整的消息

经过以上步骤,消息就能获取到了,不过在获取消息的前后,会进行消息过滤操作,即根据tag或sql语法来过滤消息,关于消息过滤的一些细节,我们留到后面消息过滤相关章节作进一步分析。

3. 挂起请求:PullRequestHoldService#suspendPullRequest

当broker无新消息时,consumer拉取消息的请求就会挂起,方法为PullRequestHoldService#suspendPullRequest:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

public class PullRequestHoldService extends ServiceThread {

    private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =

        new ConcurrentHashMap<String, ManyPullRequest>(1024);

    public void suspendPullRequest(final String topic, final int queueId,

            final PullRequest pullRequest) {

        String key = this.buildKey(topic, queueId);

        ManyPullRequest mpr = this.pullRequestTable.get(key);

        if (null == mpr) {

            mpr = new ManyPullRequest();

            ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);

            if (prev != null) {

                mpr = prev;

            }

        }

        mpr.addPullRequest(pullRequest);

    }

    ...

}

在suspendPullRequest方法中,所做的工作仅是把当前请求放入pullRequestTable中了。从代码中可以看到,pullRequestTable是一个ConcurrentMap,key 是 topic@queueId,value 就是挂起的请求了。

请求挂起后,何时处理呢?这就是PullRequestHoldService线程的工作了。

3.1 处理挂起请求的线程:PullRequestHoldService

看完PullRequestHoldService#suspendPullRequest方法后,我们再来看看PullRequestHoldService。

PullRequestHoldService是ServiceThread的子类(上一次看到ServiceThread的子类还是ReputMessageService),它也会启动一个新线程来处理挂起操作。

我们先来看看它是在哪里启动PullRequestHoldService的线程的,在BrokerController的启动方法start()中有这么一行:

BrokerController#start

1

2

3

4

5

6

7

public void start() throws Exception {

    ...

    if (this.pullRequestHoldService != null) {

        this.pullRequestHoldService.start();

    }

    ...

}

这里就是启动pullRequestHoldService的线程操作了。

为了探究这个线程做了什么,我们进入PullRequestHoldService#run方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

@Override

public void run() {

    log.info("{} service started", this.getServiceName());

    while (!this.isStopped()) {

        try {

            // 等待中

            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {

                this.waitForRunning(5 * 1000);

            } else {

                this.waitForRunning(

                    this.brokerController.getBrokerConfig().getShortPollingTimeMills());

            }

            long beginLockTimestamp = this.systemClock.now();

            // 检查操作

            this.checkHoldRequest();

            long costTime = this.systemClock.now() - beginLockTimestamp;

            if (costTime > 5 * 1000) {

                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);

            }

        } catch (Throwable e) {

            log.warn(this.getServiceName() + " service has exception. ", e);

        }

    }

    log.info("{} service end", this.getServiceName());

}

从代码来看,这个线程先是进行等待,然后调用PullRequestHoldService#checkHoldRequest方法,看来关注就是这个方法了,它的代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

private void checkHoldRequest() {

    for (String key : this.pullRequestTable.keySet()) {

        String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);

        if (2 == kArray.length) {

            String topic = kArray[0];

            int queueId = Integer.parseInt(kArray[1]);

            final long offset = this.brokerController.getMessageStore()

                .getMaxOffsetInQueue(topic, queueId);

            try {

                // 调用notifyMessageArriving方法操作

                this.notifyMessageArriving(topic, queueId, offset);

            } catch (Throwable e) {

                log.error(...);

            }

        }

    }

}

这个方法调用了PullRequestHoldService#notifyMessageArriving(...),我们继续进入:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {

    // 继续调用

    notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);

}

/**

 * 这个方法就是最终调用的了

 */

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset,

    final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {

    String key = this.buildKey(topic, queueId);

    ManyPullRequest mpr = this.pullRequestTable.get(key);

    if (mpr != null) {

        List<PullRequest> requestList = mpr.cloneListAndClear();

        if (requestList != null) {

            List<PullRequest> replayList = new ArrayList<PullRequest>();

            for (PullRequest request : requestList) {

                // 判断是否有新消息到达,要根据 comsumerQueue 的偏移量与request的偏移量判断

                long newestOffset = maxOffset;

                if (newestOffset <= request.getPullFromThisOffset()) {

                    newestOffset = this.brokerController.getMessageStore()

                        .getMaxOffsetInQueue(topic, queueId);

                }

                if (newestOffset > request.getPullFromThisOffset()) {

                    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,

                        new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));

                    if (match && properties != null) {

                        match = request.getMessageFilter().isMatchedByCommitLog(null, properties);

                    }

                    if (match) {

                        try {

                            // 唤醒操作

                            this.brokerController.getPullMessageProcessor()

                                .executeRequestWhenWakeup(request.getClientChannel(),

                                request.getRequestCommand());

                        } catch (Throwable e) {

                            log.error("execute request when wakeup failed.", e);

                        }

                        continue;

                    }

                }

                // 超时时间到了

                if (System.currentTimeMillis() >=

                        (request.getSuspendTimestamp() + request.getTimeoutMillis())) {

                    try {

                        // 唤醒操作

                        this.brokerController.getPullMessageProcessor()

                            .executeRequestWhenWakeup(request.getClientChannel(),

                            request.getRequestCommand());

                    } catch (Throwable e) {

                        log.error("execute request when wakeup failed.", e);

                    }

                    continue;

                }

                replayList.add(request);

            }

            if (!replayList.isEmpty()) {

                mpr.addPullRequest(replayList);

            }

        }

    }

}

这个方法就是用来检查是否有新消息送达的操作了,方法虽然有点长,但可以用一句话来总结:如果有新消息送达,或者pullRquest hold住的时间到了,就唤醒pullRquest(即调用PullMessageProcessor#executeRequestWhenWakeup方法)。

  • 在判断是否有新消息送达时,会获取comsumerQueue文件中的最大偏移量,与当前pullRquest中的偏移量进行比较,如果前者大,就表示有新消息送达了,需要唤醒pullRquest
  • 前面说过,当consumer请求没获取到消息时,broker会hold这个请求一段时间(30s),当这个时间到了,也会唤醒pullRquest,之后就不会再hold住它了

3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup

我们再来看看 PullMessageProcessor#executeRequestWhenWakeup 方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

public void executeRequestWhenWakeup(final Channel channel,

    final RemotingCommand request) throws RemotingCommandException {

    // 关注 Runnable#run() 方法即可

    Runnable run = new Runnable() {

        @Override

        public void run() {

            try {

                // 再一次调用 PullMessageProcessor#processRequest(...) 方法

                final RemotingCommand response = PullMessageProcessor.this

                    .processRequest(channel, request, false);

                ...

            } catch (RemotingCommandException e1) {

                log.error("excuteRequestWhenWakeup run", e1);

            }

        }

    };

    // 提交任务

    this.brokerController.getPullMessageExecutor()

        .submit(new RequestTask(run, channel, request));

}

这个方法准备了一个任务,然后将其提交到线程池中执行,任务内容很简单,仅是调用了PullMessageProcessor#processRequest(...) 方法,这个方法就是本节一始提到的处理consumer拉取消息的方法了。

3.3 消息分发中唤醒consumer请求

在分析消息分发流程时,DefaultMessageStore.ReputMessageService#doReput方法中有这么一段:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

private void doReput() {

    ...

    // 分发消息

    DefaultMessageStore.this.doDispatch(dispatchRequest);

    // 长轮询:如果有消息到了主节点,并且开启了长轮询

    if (BrokerRole.SLAVE != DefaultMessageStore.this

            .getMessageStoreConfig().getBrokerRole()

            &&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){

        // 调用NotifyMessageArrivingListener的arriving方法

        DefaultMessageStore.this.messageArrivingListener.arriving(

            dispatchRequest.getTopic(),

            dispatchRequest.getQueueId(),

            dispatchRequest.getConsumeQueueOffset() + 1,

            dispatchRequest.getTagsCode(),

            dispatchRequest.getStoreTimestamp(),

            dispatchRequest.getBitMap(),

            dispatchRequest.getPropertiesMap());

    }

    ...

}

这段就是用来主动唤醒hold住的consumer请求的,我们进入NotifyMessageArrivingListener#arriving方法:

1

2

3

4

5

6

@Override

public void arriving(String topic, int queueId, long logicOffset, long tagsCode,

    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {

    this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,

        msgStoreTime, filterBitMap, properties);

}

最终它也是调用了 PullRequestHoldService#notifyMessageArriving(...) 方法。

总结

本文主要分析了broker处理PULL_MESSAGE请求的流程,总结如下:

  • broker处理PULL_MESSAGE的processor为PullMessageProcessor,PullMessageProcessor的processRequest(...)就是整个消息获取流程了
  • broker在获取消息时,先根据请求的topic与queueId找到consumerQueue,然后根据请求中的offset参数从consumerQueue文件中找到消息在commitLog的位置信息,最后根据位置信息从commitLog中获取消息内容
  • 如果broker中没有当前consumerQueue的消息,broker会挂起当前线程,直到超时(默认30s)或收到新的消息时再唤醒

版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。

您可能感兴趣的文章 :

原文链接 :
相关文章
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计