Spring Cloud Stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。 应用程序通过inputs或outputs来与Spring Cloud Stream中binder对象交互,binder对象负责与消息中间件交
Spring Cloud Stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。 应用程序通过inputs或outputs来与Spring Cloud Stream中binder对象交互,binder对象负责与消息中间件交互。也就是说:Spring Cloud Stream能够屏蔽底层消息中间件【RabbitMQ,kafka等】的差异,降低切换成本,统一消息的编程模型。 相关概念Channel(通道):Channel是消息的传输管道,用于在生产者和消费者之间传递消息。生产者通过输出通道将消息发送到Destination,消费者通过输入通道从Destination接收消息。 在Spring Cloud Stream中,有两种类型的通道:输入(input)和输出(output)。这两种通道分别用于消费者接收消息和生产者发送消息。
Destination(目标):Destination是消息的目的地,通常对应于消息代理中的Topic或Queue。生产者将消息发送到特定的Destination,消费者从其中接收消息。 Binder(绑定器):Binder是Spring Cloud Stream的核心组件之一。它作为消息代理与外部消息中间件进行交互,并负责将消息发送到消息总线或从消息总线接收消息。Binder负责处理消息传递、序列化、反序列化、消息路由等底层细节,使得开发者能够以统一的方式与不同的消息中间件进行交互。Spring Cloud Stream提供了多个可用的Binder实现,包括RabbitMQ、Kafka等。 **消费者组:**在Spring Cloud Stream中,消费组(Consumer Group)是一组具有相同功能的消费者实例。当多个消费者实例属于同一个消费组时,消息代理会将消息均匀地分发给消费者实例,以实现负载均衡。如果其中一个消费者实例失效,消息代理会自动将消息重新分配给其他可用的消费者实例,以实现高可用性。(对于一个消息来说,每个消费者组只会有一个消费者消费消息) 分区:Spring Cloud Stream支持在多个消费者实例之间创建分区,这样我们通过某些特征量做消息分发,保证相同标识的消息总是能被同一个消费者处理 Spring MessageSpring Message是Spring Framework的一个模块,其作用就是统一消息的编程模型。
消息通道 MessageChannel 用于接收消息,调用send方法可以将消息发送至该消息通道中:
消息通道里的消息由消息通道的子接口可订阅的消息通道SubscribableChannel实现,被MessageHandler消息处理器所订阅
由MessageHandler真正地消费/处理消息
Spring IntegrationSpring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。 它提出了不少新的概念,包括消息路由MessageRoute、消息分发MessageDispatcher、消息过滤Filter、消息转换Transformer、消息聚合Aggregator、消息分割Splitter等等。同时还提供了MessageChannel和MessageHandler的实现,分别包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等内容。 Spring-Cloud-Stream的架构快速入门引入依赖
增加配置文件
生产者
消费者
如何自定义Binder
添加spring-cloud-stream依赖
提供ProvisioningProvider的实现ProvisioningProvider负责提供消费者和生产者目的地,并需要将 application.yml 或 application.properties 文件中包含的逻辑目的地转换为物理目的地引用。
提供MessageProducer的实现MessageProducer负责使用事件并将其作为消息处理,发送给配置为使用此类事件的客户端应用程序。
提供MessageHandler的实现MessageHandler提供产生事件所需的逻辑。
提供Binder的实现 提供自己的Binder抽象实现:
创建Binder的配置严格要求创建一个 Spring 配置来初始化你的绑定器实现的 bean
详细的代码见https://gitee.com/xiaovcloud/spring-cloud-stream |
2021-06-05
2021-05-27
2021-05-26
2021-06-05
2021-05-16