广告位联系
返回顶部
分享到

java中RabbitMQ高级应用

java 来源:互联网 作者:秩名 发布时间:2022-05-03 21:28:51 人浏览
摘要

1、消息可靠性投递 在使用RabbitMQ的时候,生产者在进行消息投递的时候如果想知道消息是否成功的投递到对应的交换机和队列中,有两种方式可以用来控制消息投递的可靠性模式 。 由

1、消息可靠性投递

 在使用 RabbitMQ 的时候,生产者在进行消息投递的时候如果想知道消息是否成功的投递到对应的交换机和队列中,有两种方式可以用来控制消息投递的可靠性模式 。

 由上图的整个消息的投递过程来看,生产者的消息进入到中间件中会首先到达交换机,然后再从交换机传递到队列中去,也就是分为两步走战略。那么消息的丢失情况也就是会出现在这两个阶段中,RabbitMQ 贴心的为我们提供了针对于这两个部分的可靠新传递模式:

  • confirm 模式。
  • return 模式。

 利用这两个回调模式来确保消息的传递可靠。

 1.1、确认模式

 消息从生产者到交换机之间传递会返回一个 confirmCallback 的回调。可以直接在 rabbitTemplate 实例中进行确认逻辑的设置。如果是使用 XML 配置的话需要在工厂配置开启 publisher-confirms="true",YAML的配置就直接 publisher-confirm-type: correlated,他默认是 NONE ,需要手动开启。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")

public class Producer {

    @Autowired

    private RabbitTemplate rabbitTemplate;

 

    @Test

    public void producer() throws InterruptedException {

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            @Override

            public void confirm(CorrelationData correlationData, boolean b, String s) {

                System.out.println();

                if (!b) {

                    //  消息重发之类的处理

                    System.out.println(s);

                } else {

                    System.out.println("交换机成功接收消息");

                }

            }

        });

        rabbitTemplate.convertAndSend("default_exchange", "default_queue",

                "hello world & beordie");

        TimeUnit.SECONDS.sleep(5);

    }

}

 上面的确认是由一个 confirm 的函数执行的,里面携带了三个参数,第一个是配置的相关信息,第二个表示交换机是否成功的接收到消息,第三个参数是指没有成功接收消息的原因。

 1.2、退回模式

 从交换机到消息队列投递失败会返回一个 returnCallback 。在工厂配置中开启回退模式 publisher-returns="true" ,设置交换机处理消息失败的模式(默认 false 直接将消息进行丢弃),添加退回处理的逻辑。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")

public class Producer {

    @Autowired

    private RabbitTemplate rabbitTemplate;

 

    @Test

    public void producer() throws InterruptedException {

        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            @Override

            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

                //  重发逻辑处理

                System.out.println(message.getBody() + " 投递消息队列失败");

            }

        });

        rabbitTemplate.convertAndSend("default_exchange", "default_queue",

                "hello world & beordie");

        TimeUnit.SECONDS.sleep(5);

    }

}

 returnedMessage 中携带五个参数、分别指的是消息对象、错误码、错误信息、交换机、路由键。

 1.3、确认机制

 在消费者抓取消息队列中的数据取消费之后会有一个确认机制进行消息的确认,防止因为抓取消息之后但没有消费成功而导致的消息丢失。有三种确认方式:

  • 自动确认:acknowledge="none"

  • 手动确认:acknowledge="manual"

  • 根据异常情况确认:acknowledge="auto"

 其中自动确认是指一旦消息被消费者抓取就自动默认成功,并将消息从消息队列中进行移除,如果这个时候消费端消费出现问题,那么也会是默认消息消费成功,但是实际上是没有消费成功的,也就是当前的消息丢失了。默认的情况就是自动确认机制。

 如果设置手动确认的方式,就需要在正常消费消息之后进行回调确认 channel.basicAck(),手动签收。如果业务处理过程中发生了异常则调用 channel.basicNack() 重新发送消息。

 首先需要在队列绑定时进行确认机制的配置,设置为手动签收。

1

2

3

4

<!-- 绑定队列 -->

<rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual">

    <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/>

</rabbit:listener-container>

 生产者一端不用更改,只需要改变消费者的实现进行消息自动签收就可以了,正常执行业务则签收消息,业务发生错误则选择消息拒签,消息重发或者丢弃。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

public class ConsumerAck implements ChannelAwareMessageListener {

    @Override

    public void onMessage(Message message, Channel channel) throws Exception {

        //  消息唯一ID

        long tag = message.getMessageProperties().getDeliveryTag();

        try {

            String msg = new String(message.getBody(), "utf-8");

            channel.basicAck(tag, true);

            System.out.println("接收消息: " + msg);

        } catch (Exception e) {

            System.out.println("接收消息异常");

            channel.basicNack(tag, true, true);

            e.printStackTrace();

        }

    }

}

 里面涉及三个简单的签收函数,一是正确签收的 basicAck ,二是单条拒签的 basicReject ,三是批量拒签的 basicNack 。

  • basicAck 第一个参数表示消息在通道中的唯一ID,只针对当前的 Channel;第二个参数表示是否批量同意,如果是 false 的话只会同意签收当前ID的一条消息,将其从消息队列中进行删除,而如果是 true 的话将会把此ID之前的消息一起给同意签收了。
  • basicReject 第一个参数依旧表示消息的唯一ID,第二个参数表示是否重新回队发送,false 表示直接丢弃该条消息或者有死信队列可以接收, true 则表示重新回队进行消息发送,所有操作只针对当前的消息。
  • basicNack 比第二个多了一个参数,也就是处于中间位置的布尔值,表示是否批量进行。

2、消费端限流

 在用户请求和DB服务处理之间增加消息中间件的隔离,使得突发流量全部让消息队列来抗,降低服务端被冲垮的可能性。让所有的请求都往队列中存,消费端只需要匀速的取出消息进行消费,这样就能保证运行效率,也不会因为后台的阻塞而导致客户端得不到正常的响应(当然指的是一些不需要同步回显的任务)。

 只需要在消费者绑定消息队列时指定取出消息的速率即可,需要使用手动签收的方式,每进行一次的签收才会从队列中再取出下一条数据。

1

2

3

4

5

<!-- 绑定队列 -->

<rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true"

                           acknowledge="manual" prefetch="1">

    <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/>

</rabbit:listener-container>

3、消息过期时间

 消息队列提供了存储在队列中消息的过期时间,分为两个方向的实现,一个是针对于整个队列中的所有消息,也就是队列的过期时间,另一个是针对当前消息的过期时间,也就是针对于单条消息单独设置。

 队列的过期时间设置很简单,只需要在创建队列时进行过期时间的指定即可,也可以通过控制台直接创建指定过期时间。一旦队列过期时间到了,队列中还未被消费的消息都将过期,进行队列的过期处理。

1

2

3

4

5

<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">

    <rabbit:queue-arguments>

        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>

    </rabbit:queue-arguments>

</rabbit:queue>

 单条消息的过期时间需要在发送的时候进行单独的指定,发送的时候指定配置的额外信息,配置的编写由配置类完成。

 如果一条消息的过期时间到了,但是他此时处于队列的中间,那么他将不会被处理,只有当之后处理到时候才会进行判断是否过期。

1

2

3

4

5

6

7

8

9

10

11

MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {

    @Override

    public Message postProcessMessage(Message message) throws

        AmqpException {

        //  设置 message 的过期时间

        message.getMessageProperties().setExpiration("5000");

        //  返回该消息

        return message;

    }

};

rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);

 如果说同时设置了消息的过期时间和队列的过期时间,那么最终的过期时间由最短的时间进行决定,也就是说如果当前消息的过期时间没到,但是整个队列的过期时间到了,那么队列中的所有消息也自然就过期了,执行过期的处理策略。

4、死信队列

 4.1、死信概念

死信队列指的是死信交换机,当一条消息成为死信之后可以重新发送到另一个交换机进行处理,而进行处理的这个交换机就叫做死信交换机。

  • 消息成为死信消息有几种情况

    队列的消息长度达到限制

    消费者拒接消息的时候不把消息重新放入队列中

    队列存在消息过期设置,消息超时未被消费

    消息存在过期时间,在投递给消费者时发现过期

 在创建队列时可以在配置中指定相关的信息,例如死信交换机、队列长度等等,之后的一系列工作就不由程序员进行操作了,MQ 会自己完成配置过的事件响应。

1

2

3

4

5

6

7

8

9

10

11

12

<rabbit:queue id="default_queue" name="default_queue" auto-declare="true">

    <rabbit:queue-arguments>

        <!-- 死信交换机 -->

        <entry key="x-dead-letter-exchange" value-type="dlx_exchane"/>

        <!-- 路由 -->

        <entry key="x-dead-letter-routing-key" value-type="dlx_routing"/>

        <!-- 队列过期时间 -->

        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>

        <!-- 队列长度 -->

        <entry key="x-max-length" value-type="java.lang.Integer" value="10"/>

    </rabbit:queue-arguments>

</rabbit:queue>

 4.2、延迟队列

 延迟队列指的是消息在进入队列后不会立即被消费,只有到达指定时间之后才会被消费,也就是需要有一个时间的判断条件。

 消息队列实际上是没有提供对延迟队列的实现的,但是可以通过 TTL + 死信队列 的方式完成,设置一个队列,不被任何的消费者所消费,所有的消息进入都会被保存在里面,设置队列的过期时间,一旦队列过期将所有的消息过渡到绑定的死信队列中。

 再由具体的消费者来消费死信队列中的消息,这样就实现了延迟队列的功能。

 例如实现一个下单超时支付取消订单的功能:


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 : https://juejin.cn/post/7093299640363122702
相关文章
  • SpringBoot自定义错误处理逻辑介绍

    SpringBoot自定义错误处理逻辑介绍
    1. 自定义错误页面 将自定义错误页面放在 templates 的 error 文件夹下,SpringBoot 精确匹配错误信息,使用 4xx.html 或者 5xx.html 页面可以打印错误
  • Java实现手写一个线程池的代码

    Java实现手写一个线程池的代码
    线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和
  • Java实现断点续传功能的代码

    Java实现断点续传功能的代码
    题目实现:网络资源的断点续传功能。 二、解题思路 获取要下载的资源网址 显示网络资源的大小 上次读取到的字节位置以及未读取的字节
  • 你可知HashMap为什么是线程不安全的
    HashMap 的线程不安全 HashMap 的线程不安全主要体现在下面两个方面 在 jdk 1.7 中,当并发执行扩容操作时会造成环形链和数据丢失的情况 在
  • ArrayList的动态扩容机制的介绍

    ArrayList的动态扩容机制的介绍
    对于 ArrayList 的动态扩容机制想必大家都听说过,之前的文章中也谈到过,不过由于时间久远,早已忘却。 所以利用这篇文章做做笔记,加
  • JVM基础之字节码的增强技术介绍

    JVM基础之字节码的增强技术介绍
    字节码增强技术 在上文中,着重介绍了字节码的结构,这为我们了解字节码增强技术的实现打下了基础。字节码增强技术就是一类对现有字
  • Java中的字节码增强技术

    Java中的字节码增强技术
    1.字节码增强技术 字节码增强技术就是一类对现有字节码进行修改或者动态生成全新字节码文件的技术。 参考地址 2.常见技术 技术分类 类
  • Redis BloomFilter布隆过滤器原理与实现

    Redis BloomFilter布隆过滤器原理与实现
    Bloom Filter 概念 布隆过滤器(英语:Bloom Filter)是1970年由一个叫布隆的小伙子提出的。它实际上是一个很长的二进制向量和一系列随机映射
  • Java C++算法题解leetcode801使序列递增的最小交换次

    Java C++算法题解leetcode801使序列递增的最小交换次
    题目要求 思路:状态机DP 实现一:状态机 Java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Solution { public int minSwap(int[] nums1, int[] nums2) { int n
  • Mybatis结果集映射与生命周期介绍

    Mybatis结果集映射与生命周期介绍
    一、ResultMap结果集映射 1、设计思想 对简单的语句做到零配置,对于复杂一点的语句,只需要描述语句之间的关系就行了 2、resultMap的应用场
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计