Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架,为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅、消费组以及消息分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。由于Spring Cloud Stream基于Spring Boot实现,所以它秉承了Spring Boot的优点,实现了自动化配置的功能帮忙我们可以快速的上手使用,但是目前为止Spring Cloud Stream只支持RabbitMQ和Kafka
一个简单的demo
创建项目:spring-cloud-stream-demo,引入依赖:
1 |
|
配置文件:
1 | server: |
消息消费者类:
1 | /** |
应用启动之后,观察日志,有:
1 | main] o.s.i.codec.kryo.CompositeKryoRegistrar : configured Kryo registration [40, java.io.File] with serializer org.springframework.integration.codec.kryo.FileSerializer |
得知 ,系统guest用户创建了一个指向192.168.1.10:5672位置的RabbitMQ连接,在RabbitMQ的控制台中我们也可以发现它。
在控制台管理器中也可以看到。
声明了一个名为input.anonymous.bSbkR9fXS8utJPlEn2Cidw的队列,并通过RabbitMessageChannelBinder将自己绑定为它的消费者。
进入input.anonymous.bSbkR9fXS8utJPlEn2Cidw 通过Publish Message功能来发送一条消息到该队列中。
结果可以在当前启动的Spring Boot应用程序的控制台中看到:
[8utJPlEn2Cidw-1] com.hu.mq.ConsumerDemo : Received: [B@3b5ba131
输出的内容就是ConsumerDemo中receive方法定义的,输出的内容就是在控制台输入的信息。由于没有对消息进行序列化,所以看到的就对象的引用。
这是怎么通过消费消息以实现消息驱动业务逻辑的,解释如下:
首先,在应用做的就是引入spring-cloud-starter-stream-rabbit依赖,包是Spring Cloud Stream对RabbitMQ支持的封装,其中包含了对RabbitMQ的自动化配置等内容,等价于spring-cloud-stream-binder-rabbit依赖。
这里用到的几个Spring Cloud Stream的核心注解,它们都被定义在SinkReceiver中:
@EnableBinding,该注解用来指定一个或多个定义了@Input或@Output注解的接口,以此实现对消息通道(Channel)的绑定。在上面的例子中,我们通过@EnableBinding(Sink.class)绑定了Sink接口,该接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义,它的源码如下:
1 | public interface Sink { |
它通过@Input注解绑定了一个名为input的通道。除了Sink之外,Spring Cloud Stream还默认实现了绑定output通道的Source接口,还有结合了Sink和Source的Processor接口,实际使用时我们也可以自己通过@Input和@Output注解来定义绑定消息通道的接口。当我们需要为@EnableBinding指定多个接口来绑定消息通道的时候,可以这样定义:@EnableBinding(value = {Sink.class, Source.class})。
@StreamListener:该注解主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。在上面的例子中,我们通过@StreamListener(Sink.INPUT)注解将receive方法注册为对input消息通道的监听处理器,所以当我们在RabbitMQ的控制页面中发布消息的时候,receive方法会做出对应的响应动作。
之前是控制台完成了发送消息来验证了消息消费程序的功能,现在做一个单元测试方式发送消息。
创建单元测试:
1 | /** |
运行后,消费端就输出结果了
1 | [8W8kbDiQJj4QA-1] com.hu.mq.ConsumerDemo : Received: ---------------hello------------- |
Spring Cloud Stream应用模型的结构图
Spring Cloud Stream构建的应用程序与消息中间件之间是通过绑定器 Binder相关联的,绑定器对于应用程序而言起到了隔离作用,它使得不同消息中间件的实现细节对应用程序来说是透明的。所以对于每一个Spring Cloud Stream的应用程序来说,它不需要知晓消息中间件的通信细节,它只需要知道 Binder对应用程序提供的概念去实现即可,而这个概念就是在快速入门中我们提到的消息通道: Channel
绑定器
Binder绑定器是Spring Cloud Stream中一个非常重要的概念。在没有绑定器这个概念的情况下,我们的Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,这使得我们实现的消息交互逻辑就会非常笨重,因为对具体的中间件实现细节有太重的依赖,当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。当我们需要升级消息中间件,或是更换其他消息中间件产品时,我们要做的就是更换它们对应的Binder绑定器而不需要修改任何Spring Boot的应用逻辑。这一点在上一章实现消息总线时,从RabbitMQ切换到Kafka的过程中,已经能够让我们体验到这一好处。
发布-订阅模式
在Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享的Topic主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。这里所提到的Topic主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。在不同的消息中间件中,Topic可能对应着不同的概念,比如:在RabbitMQ中的它对应了Exchange、而在Kakfa中则对应了Kafka中的Topic。
在之前的而例子,通过RabbitMQ的Channel进行发布消息给我们编写的应用程序消费,实际上Spring Cloud Stream应用启动的时候,在RabbitMQ的Exchange中也创建了一个名为input的Exchange交换器,由于Binder的隔离作用,应用程序并无法感知它的存在,应用程序只知道自己指向Binder的输入或是输出通道。
在发布-订阅模式中,消息被分发到多个订阅者,例如,把之前的例子,通过命令行的方式启动两个不同端口的进程。此时,在RabbitMQ控制页面的Channels标签页中看到如下图所示的两个消息通道,它们分别绑定了启动的两个应用程序。
把项目install,生成jar,进入目录,在控制台通过命令启动另外一个实例,指定端口。
java -jar spring-cloud-stream-demo-1.0-SNAPSHOT.jar -server.port 2345
而在Exchanges标签页中,我们还能找到名为input的交换器,点击进入可以看到如下图所示的详情页面,其中在Bindings中的内容就是两个应用程序绑定通道中的消息队列,我们可以通过Exchange页面的Publish Message来发布消息,此时可以发现两个启动的应用程序都输出了消息内容。
相对于点对点队列实现的消息通信来说,Spring Cloud Stream采用的发布-订阅模式可以有效的降低消息生产者与消费者之间的耦合,当我们需要对同一类消息增加一种处理方式时,只需要增加一个应用程序并将输入通道绑定到既有的Topic中就可以实现功能的扩展,而不需要改变原来已经实现的任何内容。
消费组
虽然Spring Cloud Stream通过发布-订阅模式将消息生产者与消费者做了很好的解耦,基于相同主题的消费者可以轻松的进行扩展,但是这些扩展都是针对不同的应用实例而言的,在现实的微服务架构中,我们每一个微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例。
消息生产者发送消息给某个具体微服务时,只希望被消费一次,按照上面我们启动两个应用的例子,虽然它们同属一个应用,但是这个消息出现了被重复消费两次的情况。为了解决这个问题,在Spring Cloud Stream中提供了消费组的概念。
如果在同一个主题上的应用需要启动多个实例的时候,我们可以通过spring.cloud.stream.bindings.input.group属性为应用指定一个组名,这样这个应用的多个实例在接收到消息的时候,只会有一个成员真正的收到消息并进行处理。
默认情况下,Spring Cloud Stream会为其分配一个独立的匿名消费组。所以,如果同一主题下所有的应用都没有指定消费组的时候,当有消息被发布之后,所有的应用都会对其进行消费,因为它们各自都属于一个独立的组中。大部分情况下,我们在创建Spring Cloud Stream应用的时候,建议最好为其指定一个消费组,以防止对消息的重复处理,除非该行为需要这样做(比如:刷新所有实例的配置等)。
使用消费组实现消息消费的负载均衡
先创建一个消费者应用,实现了greetings主题上的输入通道绑定;
spring.cloud.stream.bindings.input.group=Service-A #指定了该应用实例都属于Service-A消费组
spring.cloud.stream.bindings.input.destination=greetings #指定了输入通道对应的主题名
完成了消息消费者之后,我们再来实现一个消息生产者应用;
它的输出通道绑定目标也指向greetings主题
spring.cloud.stream.bindings.output.destination=greetings
消息分区
通过引入消费组的概念,已经能够在多实例的情况下,保障每个消息只被组内一个实例进行消费。
通过上面对消费组参数设置后的实验,消费组并无法控制消息具体被哪个实例消费。也就是说,对于同一条消息,它多次到达之后可能是由不同的实例进行消费的。但是对于一些业务场景,就需要对于一些具有相同特征的消息每次都可以被同一个消费实例处理,比如:一些用于监控服务,为了统计某段时间内消息生产者发送的报告内容,监控服务需要在自身内容聚合这些数据,那么消息生产者可以为消息增加一个固有的特征ID来进行分区,使得拥有这些ID的消息每次都能被发送到一个特定的实例上实现累计统计的效果,否则这些数据就会分散到各个不同的节点导致监控结果不一致的情况。而分区概念的引入就是为了解决这样的问题:当生产者将消息数据发送给多个消费者实例时,保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理。
Spring Cloud Stream为分区提供了通用的抽象实现,用来在消息中间件的上层实现分区处理,所以它对于消息中间件自身是否实现了消息分区并不关心,这使得Spring Cloud Stream为不具备分区功能的消息中间件也增加了分区功能扩展。
在消费者应用,增加配置:
1 | spring.cloud.stream.bindings.input.consumer.partitioned:通过该参数开启消费者分区功能; |
在生产者应用,增加配置:
1 | spring.cloud.stream.bindings.output.producer.partitionKeyExpression:通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键; |
消息分区配置完成,启动生产者,同时消费者启动多个,注意为消费者指定不同的实例索引号,这样当同一个消息被发给消费组时,我们可以发现只有一个消费实例在接收和处理这些相同的消息。