广告位联系
返回顶部
分享到

springboot-rabbitmq-reply消息直接回复模式介绍

java 来源:互联网 作者:佚名 发布时间:2022-09-23 08:44:55 人浏览
摘要

一、使用场景 MQ的作用包括了解耦、异步等。 通常生产者只负责生产消息,而不关心消息谁去获取,或者消费结果如何;消费者只负责接收指定的消息进行业务处理而不关心消息从哪里

一、使用场景

MQ的作用包括了解耦、异步等。

通常生产者只负责生产消息,而不关心消息谁去获取,或者消费结果如何;消费者只负责接收指定的消息进行业务处理而不关心消息从哪里来一级回复业务处理情况。但我们项目中有特殊的业务存在,我们作为消息生产者在生产消息后需要接收消费者的响应结果(说白了就是类似同步调用 请求响应的MQ使用),经过研究,MQ的Reply模式(直接回复模式)就是为此种业务模式而产生。

二、Reply实战

(1)依赖与YML配置

依赖:

我这里只列出最核心的rabbitMq所需依赖

1

2

3

4

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

配置:

无其余特殊配置,因为reply就是rabbitmq的一种交互方式而已

1

2

3

4

5

6

spring:

  rabbitmq:

    host: 10.50.40.116

    port: 5673

    username: admin

    password: admin

(2)RabbitMq bean配置

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

package com.leilei.demo;

 

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

 * @author lei

 * @create 2022-09-19 21:44

 * @desc mq配置

 **/

@Configuration

public class RabbitMqConfig {

    @Bean

    public Queue bizQueue() {

        return new Queue("bizQueue");

    }

    @Bean

    public Queue replyQueue() {

        return new Queue("replyQueue");

    }

    @Bean

    FanoutExchange bizExchange() {

        return new FanoutExchange("bizExchange");

    }

}

业务类:

1

2

3

4

5

6

7

@Data

@NoArgsConstructor

@AllArgsConstructor

public class Vehicle implements Serializable {

    private Integer id;

    private String name;

}

(3)消息生产端

消息生产端需要做的事情:有生产消息、接受消息消费响应

(1)生产消息

  • 1、生产消息,看业务场景选择是否生成全局唯一自定义的消息ID
  • 2、指定消息消费后响应的队列(Reply)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

/**

 * 生产消息

 *

 * @param

 * @return void

 * @author lei

 * @date 2022-09-19 21:59:18

 */

public void replySend() {

    MessageProperties messageProperties = new MessageProperties();

    messageProperties.setReplyTo("replyQueue");

    //todo 根据业务,做一个严谨的全局唯一ID,我这里暂时用UUID

    String correlationId = UUID.randomUUID().toString();

    // 我这里指定了唯一消息ID,看业务场景,消费者消费响应后,生产者端可根据消息ID做业务处理

    messageProperties.setCorrelationId(correlationId);

    Vehicle vehicle = new Vehicle(1, "川A0001");

    Message message = new Message(JSON.toJSONString(vehicle).getBytes(), messageProperties);

    rabbitTemplate.convertAndSend("bizExchange","",message);

    System.out.println("生产者发送消息,自定义消息ID为:" + correlationId);

}

(2)接受Reply响应

消费者消费消息后会将处理结果进行发送到一个队列,我们读取这里队列就可以拿到对应消息的响应结果进行业务处理了

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

/**

 * 接收消息响应

 *

 * @param message

 * @return void

 * @author lei

 * @date 2022-09-19 21:59:27

 */

@RabbitListener(queues = "replyQueue")

public void replyResponse(Message message) {

    String s = new String(message.getBody());

    String correlationId = message.getMessageProperties().getCorrelationId();

    System.out.println("收到客户端响应消息ID:" + correlationId);

    //todo 根据消息ID可判断这是哪一个消息的响应,我们就可做业务操作

    System.out.println("收到客户端响应消息:" + s);

}

(4)消息消费端

消息消费端需要做的事有:接受消息然后进行业务处理、响应消息

(1)方法一:sendTo注解+方法返回值

一般来说,我们mq消费者监听方法不需要返回值,我们这里使用sendTo注解,则需要将要响应的消息定义为返回值,sendTo注解中指定要响应到哪个队列

重点:

  • 1、sendTo注解指定要相应的队列(注意和生产端保持一致)
  • 2、方法定义的返回值内容就是要响应的消息,最终会发送到sendTo注解指定要相应的队列
  • 3、这种方法的缺点是消费端的主关性很高,因为sendTo指定的目标队列可以自己瞎写,导致生产者端无法正确收到消息响应,但我相信一般项目中也不会这么干

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

/**

 * 方式1   SendTo指定响应队列

 *

 * @param message

 * @return String

 * @author lei

 * @date 2022-09-19 16:17:52

 */

@RabbitListener(queues ="bizQueue")

@SendTo("replyQueue")

public String handleEmailMessage(Message message) {

    try {

        String msg=new String(message.getBody(), StandardCharsets.UTF_8);

        log.info("---consumer接收到消息----{}",msg);

        return "客户端响应消息:"+msg+"处理完成!";

    } catch (Exception e) {

        log.error("处理业务消息失败",e);

    }

    return null;

}

(2)方法二:读取生产端的消息使用模板发送

与普通的消费者方法一样,只需要RabbitListener注解监听业务队列;但还需要根据消息获取出ReplyTo地址,然后自己消费者方法内部手动发送消息

  • 1、优点,更强烈的感受到消息请求 响应的交互性,流程看起来更清晰
  • 2、缺点,代码不雅

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

/**

 * 方式2  message消息获取内部reply rabbitmq手动发送

 *

 * @param message

 * @return String

 * @author lei

 * @date 2022-09-19 16:17:52

 */

@RabbitListener(queues = "bizQueue")

public void handleEmailMessage2(Message message) {

    try {

        String msg = new String(message.getBody(), StandardCharsets.UTF_8);

        log.info("---consumer接收到消息----{}", msg);

        String replyTo = message.getMessageProperties().getReplyTo();

        System.out.println("接收到的reply:" + replyTo);

        rabbitTemplate.convertAndSend(replyTo, "客户端响应消息:" + msg + "处理完成!", x -> {

            x.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());

            return x;

        });

    } catch (Exception e) {

        log.error("处理业务消息失败",e);

    }

}

(3)方法三:方法返回值

这种方式与1其实是一致的,但我经过测试,因为生产者消息指定了ReplyTo的地址,消费者端无需自己再次手动指定,即生产消息到哪里,是否响应以及响应消息发送到哪里全由生产端自己空,消费者只需要处理自身业务以及返回结果

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

/**

  * 方式三  方法有返回值,返回要响应的数据 (reply 由生产者发送消息时指定,消费者不做任何处理)

  *

  * @param message

  * @return String

  * @author lei

  * @date 2022-09-19 23:17:47

  */

 @RabbitListener(queues ="bizQueue")

 public String handleEmailMessage3(Message message) {

     try {

         String msg=new String(message.getBody(), StandardCharsets.UTF_8);

         log.info("---consumer接收到消息----{}",msg);

         return "客户端响应消息:"+msg+"处理完成!";

     }

     catch (Exception e) {

         log.error("处理业务消息失败",e);

     }

     return null;

 }

(4)测试

生产消息:

消费消息与响应:

收到的响应:

链路:

如此,MQ版本的请求响应模式就完成了,其实很多大佬使用MQ来实现RPC就是用的ReplyTo啦!


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 : https://blog.csdn.net/leilei1366615/article/details/126944190
相关文章
  • SpringBoot自定义错误处理逻辑介绍

    SpringBoot自定义错误处理逻辑介绍
    1. 自定义错误页面 将自定义错误页面放在 templates 的 error 文件夹下,SpringBoot 精确匹配错误信息,使用 4xx.html 或者 5xx.html 页面可以打印错误
  • Java实现手写一个线程池的代码

    Java实现手写一个线程池的代码
    线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和
  • Java实现断点续传功能的代码

    Java实现断点续传功能的代码
    题目实现:网络资源的断点续传功能。 二、解题思路 获取要下载的资源网址 显示网络资源的大小 上次读取到的字节位置以及未读取的字节
  • 你可知HashMap为什么是线程不安全的
    HashMap 的线程不安全 HashMap 的线程不安全主要体现在下面两个方面 在 jdk 1.7 中,当并发执行扩容操作时会造成环形链和数据丢失的情况 在
  • ArrayList的动态扩容机制的介绍

    ArrayList的动态扩容机制的介绍
    对于 ArrayList 的动态扩容机制想必大家都听说过,之前的文章中也谈到过,不过由于时间久远,早已忘却。 所以利用这篇文章做做笔记,加
  • JVM基础之字节码的增强技术介绍

    JVM基础之字节码的增强技术介绍
    字节码增强技术 在上文中,着重介绍了字节码的结构,这为我们了解字节码增强技术的实现打下了基础。字节码增强技术就是一类对现有字
  • Java中的字节码增强技术

    Java中的字节码增强技术
    1.字节码增强技术 字节码增强技术就是一类对现有字节码进行修改或者动态生成全新字节码文件的技术。 参考地址 2.常见技术 技术分类 类
  • Redis BloomFilter布隆过滤器原理与实现

    Redis BloomFilter布隆过滤器原理与实现
    Bloom Filter 概念 布隆过滤器(英语:Bloom Filter)是1970年由一个叫布隆的小伙子提出的。它实际上是一个很长的二进制向量和一系列随机映射
  • Java C++算法题解leetcode801使序列递增的最小交换次

    Java C++算法题解leetcode801使序列递增的最小交换次
    题目要求 思路:状态机DP 实现一:状态机 Java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Solution { public int minSwap(int[] nums1, int[] nums2) { int n
  • Mybatis结果集映射与生命周期介绍

    Mybatis结果集映射与生命周期介绍
    一、ResultMap结果集映射 1、设计思想 对简单的语句做到零配置,对于复杂一点的语句,只需要描述语句之间的关系就行了 2、resultMap的应用场
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计