广告位联系
返回顶部
分享到

SpringCloudStream原理和深入使用介绍

java 来源:互联网 作者:佚名 发布时间:2024-06-20 21:31:37 人浏览
摘要

Spring Cloud Stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。 应用程序通过inputs或outputs来与Spring Cloud Stream中binder对象交互,binder对象负责与消息中间件交

Spring Cloud Stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。

应用程序通过inputs或outputs来与Spring Cloud Stream中binder对象交互,binder对象负责与消息中间件交互。也就是说:Spring Cloud Stream能够屏蔽底层消息中间件【RabbitMQ,kafka等】的差异,降低切换成本,统一消息的编程模型。

相关概念

Channel(通道):Channel是消息的传输管道,用于在生产者和消费者之间传递消息。生产者通过输出通道将消息发送到Destination,消费者通过输入通道从Destination接收消息。

在Spring Cloud Stream中,有两种类型的通道:输入(input)和输出(output)。这两种通道分别用于消费者接收消息和生产者发送消息。

  • Input(输入):Input通道用于消费者从消息代理接收消息。消费者可以通过监听Input通道来实时接收传入的消息
  • Output(输出):Output通道用于生产者向消息代理发送消息。生产者可以通过向Output通道发送消息来发布新的消息

Destination(目标):Destination是消息的目的地,通常对应于消息代理中的Topic或Queue。生产者将消息发送到特定的Destination,消费者从其中接收消息。

Binder(绑定器):Binder是Spring Cloud Stream的核心组件之一。它作为消息代理与外部消息中间件进行交互,并负责将消息发送到消息总线或从消息总线接收消息。Binder负责处理消息传递、序列化、反序列化、消息路由等底层细节,使得开发者能够以统一的方式与不同的消息中间件进行交互。Spring Cloud Stream提供了多个可用的Binder实现,包括RabbitMQ、Kafka等。

**消费者组:**在Spring Cloud Stream中,消费组(Consumer Group)是一组具有相同功能的消费者实例。当多个消费者实例属于同一个消费组时,消息代理会将消息均匀地分发给消费者实例,以实现负载均衡。如果其中一个消费者实例失效,消息代理会自动将消息重新分配给其他可用的消费者实例,以实现高可用性。(对于一个消息来说,每个消费者组只会有一个消费者消费消息)

分区:Spring Cloud Stream支持在多个消费者实例之间创建分区,这样我们通过某些特征量做消息分发,保证相同标识的消息总是能被同一个消费者处理

Spring Message

Spring Message是Spring Framework的一个模块,其作用就是统一消息的编程模型。

1

2

3

4

5

package org.springframework.messaging;

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

消息通道 MessageChannel 用于接收消息,调用send方法可以将消息发送至该消息通道中:

1

2

3

4

5

6

7

8

@FunctionalInterface

public interface MessageChannel {

    long INDEFINITE_TIMEOUT = -1;

    default boolean send(Message<?> message) {

        return send(message, INDEFINITE_TIMEOUT);

    }

    boolean send(Message<?> message, long timeout);

}

消息通道里的消息由消息通道的子接口可订阅的消息通道SubscribableChannel实现,被MessageHandler消息处理器所订阅

1

2

3

4

public interface SubscribableChannel extends MessageChannel {

    boolean subscribe(MessageHandler handler);

    boolean unsubscribe(MessageHandler handler);

}

由MessageHandler真正地消费/处理消息

1

2

3

4

@FunctionalInterface

public interface MessageHandler {

    void handleMessage(Message<?> message) throws MessagingException;

}

Spring Integration

Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。

它提出了不少新的概念,包括消息路由MessageRoute、消息分发MessageDispatcher、消息过滤Filter、消息转换Transformer、消息聚合Aggregator、消息分割Splitter等等。同时还提供了MessageChannel和MessageHandler的实现,分别包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等内容。

Spring-Cloud-Stream的架构

img

快速入门

引入依赖

1

2

3

4

5

<!--stream-->

<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

</dependency>

增加配置文件

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

spring:

    cloud:

        stream:

            # 定义消息中间件

            binders:

              MyRabbit:

                  type: rabbit

                  environment:

                    spring:

                        rabbitmq:

                            host: localhost

                            port: 5672

                            username: root

                            password: root

                            vhost: /

            bindings:

            # 生产者中定义,定义发布对象

              myInput:

                destination: myStreamExchange

                group: myStreamGroup

                binder: MyRabbit

            # 消费者中定义,定义订阅的对象

              myOutput-in-0:

                destination: myStreamExchange

                group: myStreamGroup

                binder: MyRabbit

        # 消费者中定义,定义输出的函数

        function:

            definition: myOutput

生产者

1

2

3

4

5

@Resource

    private StreamBridge streamBridge;

    public void sendNormal() {

        streamBridge.send("myInput", "hello world");

    }

消费者

1

2

3

4

5

6

7

8

9

@Bean("myOutput")

    public Consumer<Message<String>> myOutput() {

        return (message) -> {

            MessageHeaders headers = message.getHeaders();

            System.out.println("myOutput head is : " + headers);

            String payload = message.getPayload();

            System.out.println("myOutput payload is : " + payload);

        };

    }

如何自定义Binder

  • 添加spring-cloud-stream依赖
  • 提供ProvisioningProvider的实现提供
  • MessageProducer的实现提供
  • MessageHandler的实现提供
  • Binder的实现创建Binder的配置
  • 在META-INF/spring.binders中定义绑定器

添加spring-cloud-stream依赖

1

2

3

4

5

<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-stream</artifactId>

    <version>${spring.cloud.stream.version}</version>

</dependency>

提供ProvisioningProvider的实现

ProvisioningProvider负责提供消费者和生产者目的地,并需要将 application.yml 或 application.properties 文件中包含的逻辑目的地转换为物理目的地引用。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

public class FileProvisioningProvider implements ProvisioningProvider<

    ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>> {

    public FileProvisioningProvider() {

        super();

    }

    @Override

    public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<FileProducerProperties> properties) throws ProvisioningException {

        return new FileMessageDestination(name);

    }

    @Override

    public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws ProvisioningException {

        return new FileMessageDestination(name);

    }

    private static class FileMessageDestination implements ProducerDestination, ConsumerDestination {

        private final String destination;

        private FileMessageDestination(final String destination) {

            this.destination = destination;

        }

        @Override

        public String getName() {

            return destination.trim();

        }

        @Override

        public String getNameForPartition(int partition) {

            throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");

        }

    }

}

提供MessageProducer的实现

MessageProducer负责使用事件并将其作为消息处理,发送给配置为使用此类事件的客户端应用程序。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

super.onInit();

        executorService = Executors.newScheduledThreadPool(1);

    }

    @Override

    public void doStart() {

        executorService.scheduleWithFixedDelay(() -> {

            String payload = getPayload();

            if (payload != null) {

                Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();

                sendMessage(receivedMessage);

            }

        }, 0, 50, TimeUnit.MILLISECONDS);

    }

    @Override

    protected void doStop() {

        executorService.shutdownNow();

    }

    private String getPayload() {

        try {

            List<String> allLines = Files.readAllLines(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));

            String currentPayload = allLines.get(allLines.size() - 1);

            if (!currentPayload.equals(previousPayload)) {

                previousPayload = currentPayload;

                return currentPayload;

            }

        } catch (IOException e) {

            FileUtil.touch(new File(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));

        }

        return null;

    }

}

提供MessageHandler的实现

MessageHandler提供产生事件所需的逻辑。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

public class FileMessageHandler extends AbstractMessageHandler {

    FileExtendedBindingProperties fileExtendedBindingProperties;

    ProducerDestination destination;

    public FileMessageHandler(ProducerDestination destination, FileExtendedBindingProperties fileExtendedBindingProperties) {

        this.destination = destination;

        this.fileExtendedBindingProperties = fileExtendedBindingProperties;

    }

    @Override

    protected void handleMessageInternal(Message<?> message) {

        try {

            if (message.getPayload() instanceof byte[]) {

                Files.write(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"), (byte[]) message.getPayload());

            } else {

                throw new RuntimeException("处理消息失败");

            }

        } catch (IOException e) {

            throw new RuntimeException(e);

        }

    }

}

提供Binder的实现

提供自己的Binder抽象实现:

  • 扩展AbstractMessageChannelBinder类
  • 将自定义的 ProvisioningProvider 指定为 AbstractMessageChannelBinder 的通用参数
  • 重写createProducerMessageHandler和createConsumerEndpoint方法

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

public class FileMessageChannelBinder extends AbstractMessageChannelBinder

    <ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>, FileProvisioningProvider>

    implements ExtendedPropertiesBinder<MessageChannel, FileConsumerProperties, FileProducerProperties> {

    FileExtendedBindingProperties fileExtendedBindingProperties;

    public FileMessageChannelBinder(String[] headersToEmbed, FileProvisioningProvider provisioningProvider, FileExtendedBindingProperties fileExtendedBindingProperties) {

        super(headersToEmbed, provisioningProvider);

        this.fileExtendedBindingProperties = fileExtendedBindingProperties;

    }

    @Override

    protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<FileProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {

        FileMessageHandler fileMessageHandler = new FileMessageHandler(destination, fileExtendedBindingProperties);

        return fileMessageHandler;

    }

    @Override

    protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws Exception {

        FileMessageProducerAdapter fileMessageProducerAdapter = new FileMessageProducerAdapter(destination, fileExtendedBindingProperties);

        return fileMessageProducerAdapter;

    }

    @Override

    public FileConsumerProperties getExtendedConsumerProperties(String channelName) {

        return fileExtendedBindingProperties.getExtendedConsumerProperties(channelName);

    }

    @Override

    public FileProducerProperties getExtendedProducerProperties(String channelName) {

        return fileExtendedBindingProperties.getExtendedProducerProperties(channelName);

    }

    @Override

    public String getDefaultsPrefix() {

        return fileExtendedBindingProperties.getDefaultsPrefix();

    }

    @Override

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {

        return fileExtendedBindingProperties.getExtendedPropertiesEntryClass();

    }

}

创建Binder的配置

严格要求创建一个 Spring 配置来初始化你的绑定器实现的 bean

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

@EnableConfigurationProperties(FileExtendedBindingProperties.class)

@Configuration

public class FileMessageBinderConfiguration {

    @Bean

    @ConditionalOnMissingBean

    public FileProvisioningProvider fileMessageBinderProvisioner() {

        return new FileProvisioningProvider();

    }

    @Bean

    @ConditionalOnMissingBean

    public FileMessageChannelBinder fileMessageBinder(FileProvisioningProvider fileMessageBinderProvisioner, FileExtendedBindingProperties fileExtendedBindingProperties) {

        return new FileMessageChannelBinder(null, fileMessageBinderProvisioner, fileExtendedBindingProperties);

    }

    @Bean

    public FileProducerProperties fileConsumerProperties() {

        return new FileProducerProperties();

    }

}

详细的代码见https://gitee.com/xiaovcloud/spring-cloud-stream


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 :
相关文章
  • java计算日期相差天数的4种简单方法介绍
    方法1:long值相减(推荐) 1 2 3 4 5 6 7 8 9 10 11 12 public static void main(String[] args) { DateFormat dateFormat = new SimpleDateFormat(yyyy-MM-dd HH:mm:ss); try { Date
  • SpringCloudStream原理和深入使用介绍

    SpringCloudStream原理和深入使用介绍
    Spring Cloud Stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。 应用程序通过inputs或outputs来与Spring Clo
  • gRPC在Java中的实现与应用介绍
    gRPC是由Google开发的高性能、开源的通用远程过程调用(RPC)框架,它基于HTTP/2标准设计,提供了多语言支持,包括Java、C++、Python等。gRPC特别适
  • SpringBoot中@FeignClient注解的作用
    在微服务架构中,服务之间的调用是非常频繁的。为了简化服务之间的调用,Spring Boot 提供了一个叫做 Feign 的组件。Feign 可以帮助我们定义
  • MyBatis-Plus介绍及Spring Boot 3集成指南

    MyBatis-Plus介绍及Spring Boot 3集成指南
    我们每个Java开发者都在使用springboot+mybatis开发时,我们经常发现自己需要为每张数据库表单独编写XML文件,并且为每个表都需要编写一套增
  • MyBatis与Spring中的SqlSession介绍
    在 MyBatis 中,你可以使用 SqlSessionFactory 来创建 SqlSession。 一旦你获得一个 session 之后,你可以使用它来执行映射了的语句,提交或回滚连接
  • java获取IP和IP的归属地的方法

    java获取IP和IP的归属地的方法
    在Java中,获取IP地址通常指的是获取本地机器的IP地址或者通过某种方式(如HTTP请求)获取的远程IP地址。代码案例如下: 而要获取IP的归属
  • idea没有services窗口、没有springboot启动项问题

    idea没有services窗口、没有springboot启动项问题
    idea没有services窗口、没有springboot启动项 idea没有services窗口 没有springboot启动项。 如果是找不到services窗口,可以在views的tools Windows下找到
  • Springboot限制IP访问指定的网址实现

    Springboot限制IP访问指定的网址实现
    IP黑白名单是网络安全管理中常见的策略工具,用于控制网络访问权限,根据业务场景的不同,其应用范围广泛 方式一: 添加一个简单的白
  • MybatisPlus多条件or()的使用问题小结

    MybatisPlus多条件or()的使用问题小结
    搞混了一次,特此笔记 1、bitCode or iotStr 跟其他ID一个都不能有重复 1 2 queryWrapper.ne(LineProductionPlan::getId,bean.getId()); queryWrapper.and(i - i.eq(LineP
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计