Spring Cloud Stream is used within the Spring Cloud architecture to build highly scalable, event-driven microservices.
There is a lot of content in Spring Cloud Stream itself and it has many external dependencies. To get familiar with Spring Cloud Stream, you need to know the following.
- Spring Framework(Spring Messaging, Spring Environment)
- Spring Boot Actuator
- Spring Boot Externalized Configuration
- Spring Retry
- Spring Integration
- Spring Cloud Stream
The purpose of this article is to introduce Spring Cloud Stream, and with so much knowledge, we will try to take you through Spring Cloud Steam in the simplest way possible (we will replace Spring Cloud Stream with SCS later).
Before we understand SCS, we must understand the Spring Messaging and Spring Integration projects, first let’s look at these two projects.
Spring Messaging
The Spring Messaging module is a module in the Spring Framework whose role is to unify the programming model for messages.
For example, the message Messaging corresponds to a model that includes a message body Payload and a message header:
The message channel MessageChannel is used to receive messages and the send method can be called to send messages to this message channel :
How are the messages in the message channel consumed? It is implemented by the subscribable channel SubscribableChannel, a sub-interface of the message channel, which is subscribed to by the MessageHandler message processor:
MessageHandler really consumes/processes messages:
Spring Messaging internally derives other features based on the messaging model, such as
- Message reception parameter and return value handling: Message reception parameter handler
HandlerMethodArgumentResolver
with@Header
,@Payload
and other annotations; Message return value handler after receptionHandlerMethodReturnValueHandler
with@SendTo
annotations - Message body content converter
MessageConverter
- Unified abstract message sending template
AbstractMessageSendingTemplate
- Message channel interceptor
ChannelInterceptor
- …
Spring Integration
Spring Integration provides an extension to the Spring programming model to support Enterprise Integration Patterns.
Spring Integration
is an extension to Spring Messaging
. It introduces a number of new concepts, including message routing MessageRoute
, message distribution MessageDispatcher
, message filtering Filter
, message transformation Transformer
, message aggregation Aggregator
, message splitting Splitter
and so on. Also provided are MessageChannel
implementations DirectChannel
, ExecutorChannel
, PublishSubscribeChannel
, etc.; MessageHandler
implementations MessageFilter
, ServiceActivatingHandler
, ServiceFilter
, ServiceFilter
, ServiceFilter
, etc. ServiceActivatingHandler
, MethodInvokingSplitter
, etc.
Several ways of message processing
Segmentation of the message.
Aggregation of messages.
Filtering of messages.
Distribution of messages.
Let’s try Spring Integration with the simplest example:
This code explains.
- Construct a subscribable message channel messageChannel
- Use MessageHandler to consume the messages in this message channel
- send a message to this message channel, the message is finally consumed by the MessageHandler in the message channel
Finally the console prints: receive: msg from alibaba
DirectChannel
has an internal message dispatcher of type UnicastingDispatcher
, which distributes to the corresponding message channel MessageChannel
, as can be seen from the name, UnicastingDispatcher
is a unicast dispatcher, which can only select only one message channel. So how to choose it? The internal LoadBalancingStrategy
load balancing strategy is provided, and by default only the polling implementation can be extended.
Let’s modify the above code a bit and use multiple MessageHandlers
to handle messages.
|
|
Since the message dispatcher inside DirectChannel
is UnicastingDispatcher
unicast and uses a polling load balancing strategy, the two consumptions here correspond to the two MessageHandlers
respectively. The console prints out.
Since there is a unicast message dispatcher UnicastingDispatcher
, there must also be a broadcast message dispatcher, that is BroadcastingDispatcher
, which is used by the message channel PublishSubscribeChannel
. The broadcasting message dispatcher distributes messages to all MessageHandler
s.
|
|
Sends two messages, both of which are consumed by all MessageHandlers. Console printing.
Spring Cloud Stream
SCS encapsulates Spring Integration, introducing concepts such as Binder, Binding, @EnableBinding
, @StreamListener
, etc.; integrates with Spring Boot Actuator, providing / bindings, /channels endpoint; integration with Spring Boot Externalized Configuration, providing externalized configuration classes such as BindingProperties, BinderProperties, etc.; enhancements to the message sending The SCS is an enhancement to Spring Integration and provides externalized configuration classes such as BindingProperties, BinderProperties, etc.
SCS is an enhancement to Spring Integration and is integrated with the Spring Boot system and is the foundation of Spring Cloud Bus. It shields the implementation details of the underlying messaging middleware, hoping to unify the set of APIs for sending/consuming messages, and the implementation details of the underlying messaging middleware are done by the Binder of each messaging middleware.
The Binder
is a component that provides integration with external messaging middleware and constructs a Binding
, providing two methods bindConsumer
and bindProducer for constructing producers and consumers respectively. The current official implementations are Rabbit Binder
and Kafka Binder
, and Spring Cloud Alibaba
has an internal implementation of RocketMQ Binder
.
As you can see from the diagram, the Binding is the bridge between the application and the messaging middleware, and is used to consume and produce messages.
Let’s take a look at the simplest example using RocketMQ Binder and then analyze the underlying processing principles.
Startup class and sending of messages:
|
|
Receipt of messages.
The code is very simple and does not involve RocketMQ related code, the messages are sent and received based on the SCS system. If you want to switch to rabbitmq or kafka, just change the configuration file, no code changes are needed.
We analyze the principle of this code:
-
The two interface properties of @EnableBinding, Source and Sink, are provided internally by SCS, which constructs the BindableProxyFactory based on Source and Sink, and the corresponding output and input methods return a MessageChannel that is a DirectChannel. The value corresponding to the annotation modified by the output and input methods is the name of the binding in the configuration file.
The names of the bindings in the configuration file are output and input, corresponding to the values in the annotations on the methods of the Source and Sink interfaces.
1 2 3 4 5 6 7
spring.cloud.stream.bindings.output.destination=test-topic spring.cloud.stream.bindings.output.content-type=text/plain spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group spring.cloud.stream.bindings.input.destination=test-topic spring.cloud.stream.bindings.input.content-type=text/plain spring.cloud.stream.bindings.input.group=test-group1
-
Construct the CommandLineRunner and execute the run method of the CustomRunner when the application starts.
-
Call the output method of the Source interface to get the DirectChannel and send the message to this channel. This is the same code as in the previous Spring Integration chapter
- After the output in Source sends a message to the DirectChannel message channel, it is processed by the MessageHandler, AbstractMessageChannelBinder#SendingHandler, and then it is delegated to the MessageHandler created by It will then be delegated to the MessageHandler created by AbstractMessageChannelBinder#createProducerMessageHandler (this method is implemented by a different messaging middleware)
- The MessageHandler returned by the AbstractMessageChannelBinder#createProducerMessageHandler method of the different messaging middleware will internally convert the Spring Message into the Message model of the corresponding middleware and send it to the broker of the corresponding middleware.
-
Use @StreamListener for message subscription. Note that the value corresponding to Sink.input in the annotation is “input”, which will be configured according to the value corresponding to the binding’s name of input in the configuration file
- The AbstractMessageChannelBinder#createConsumerEndpoint method corresponding to different message middleware will use the Consumer to subscribe to the message, and will internally convert the Message model corresponding to the middleware into a Spring Message after the message is subscribed.
- After message conversion, the Spring Message will be sent to the message channel with the name input
- The StreamListenerMessageHandler corresponding to @StreamListener subscribes to the message channel with the name input and consumes the message
This process is a bit verbose, so I’ll summarize it in a diagram (the yellow part involves the Binder implementation of each message middleware and the basic subscription publishing function of MQ).
At the end of the SCS chapter, let’s look at a piece of code from SCS about the way messages are handled.
|
|
Did you notice that this code is similar to the code that receives requests in the Spring MVC Controller? In fact, their architecture is similar, and the Spring MVC classes for handling parameters and return values in the Controller are
org.springframework.web.method.support.HandlerMethodArgumentResolver
org.springframework.web.method.support.HandlerMethodReturnValueHandler
The classes for handling parameters and return values in Spring Messaging have been mentioned before, and are
org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver
org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler
Their class names are exactly the same, and even the internal method names are the same.
Summary
This is a summary of the SCS system related class descriptions.
Reference https://fangjian0423.github.io/2019/04/03/spring-cloud-stream-intro/