Event-driven micro-services is one of the buzz words in todays software development. In this article we will go through basic setup of Event-driven architecture, by utilizing: RabbitMQ message broker, Spring Boot and Spring Cloud Stream frameworks.
- RabbitMQ is messaging technology used to provide asynchronous communication and decouple processes. More info on it here.
- Spring Boot is a project built on top of the Spring framework. Its goal is to simplify and speed up setup, configuration and run of applications.
- Spring Cloud Stream is a framework for building highly scalable event-driven micro-services connected with shared messaging systems.
Step One: RabbitMQ setup
For this project we will be using RabbitMQ HA setup we already discussed in a post: RabbitMQ Single Point of Failure.
Step Two: Basic Spring Boot Application
To setup our Spring Boot app lets use Spring Initializr website with following settings:

Download and extract zip file generated by clicking “Generate the project” button. Here you go simple Spring Boot app.
Step Three: Spring Cloud Stream Configuration
For this step first of all we will need to change our build.gradle file in Spring Boot application by including spring-cloud-stream-binder-rabbit library and run Gradle build command or ‘./gradlew build’ – from a terminal.
plugins {
id 'org.springframework.boot' version '2.1.7.RELEASE'
id 'io.spring.dependency-management' version '1.0.7.RELEASE'
id 'java'
}
group = 'com.synchronoss'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "Greenwich.SR2")
}
dependencies {
compile("org.springframework.boot:spring-boot-starter-web")
compile("org.springframework.cloud:spring-cloud-stream-binder-rabbit")
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Greenwich.RELEASE"
}
}
After Gradle build completes we can start setting up our app. First of all lets create application.yml file under resources and add following config on it.
server:
port: 8555
spring:
application:
name: demo
rabbitmq:
addresses: localhost:5673,localhost:5672
username: admin
password: guest
cloud:
stream:
bindings:
outputChannel:
destination: demo
inputChannel:
destination: demo
group: demo-group
In the configuration under could.stream we are stating that we will be using RabbitMQ exchange called “demo” in two bindings: outputChannel and inputChannel. Let’s create them.
OutputBinding:
package com.codespacelab.demo;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
interface OutputBinding {
@Output("outputChannel")
MessageChannel channel();
}
InputBinding:
package com.codespacelab.demo;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
interface InputBinding {
String MESSAGES = "inputChannel";
@Input(MESSAGES)
SubscribableChannel channel();
}
Then we need to enabled output binding to allow our Application to publish events to RabbitMQ.
package com.codespacelab.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(OutputBinding.class)
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
And create the ProducerController as a trigger to send a desired amount of messages.
package com.codespacelab.demo;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
private MessageChannel messages;
public ProducerController(OutputBinding binding) {
messages = binding.channel();
}
@GetMapping("/messages/{number}")
public String publish(@PathVariable int number) {
for(int i = 1; i < number; i++) {
Message<String> msg = MessageBuilder.withPayload("Event number: " + i)
.build();
messages.send(msg);
}
return "Finished";
}
}
Now we will be able to send desired amount of messages/events to RabbitMQ by triggering our messages API. Let’s setup the consumer which will be reading these messages.
package com.codespacelab.demo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
@EnableBinding(InputBinding.class)
public class DemoListener {
@StreamListener(target = InputBinding.MESSAGES)
public void processHelloChannelGreeting(String msg) {
System.out.println(msg);
}
}
Voilà we have fully functional service which is able to publish events to RabbitMQ and then read them. But it doesn’t have to be the same service. Actually usually it won’t be the same service publishing and consuming events from the queue. This setup was chosen for simplicity of the application. It is valid use case too. For example you have service which exports all you client purchases from your e-shop over a year. That kind of process might take long time to finish and imagen fifty clients requesting for that report at the same time. Service would simply crash, but in our case all requests would be queued on rabbitMQ and handled one by one or on multiple threads. To finish up let’s test our Spring Boot application.
Step Four: Testing
First of all to test our setup we need to start our Spring Boot app. That can be done by running ‘./gradlew bootRun’ command in a terminal.
codespace:~ lab$ cd demo
codespace:demo lab$ ./gradlew bootRun
> Task :bootRun
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.7.RELEASE)
2019-08-19 21:27:05.355 INFO 82538 --- [ main] com.codespacelab.demo.DemoApplication : Starting DemoApplication on IE-C02YH04DJGH7 with PID 82538 (/Users/codespace/demo/build/classes/java/main started by codespace in /Users/codespace/demo)
2019-08-19 21:27:05.357 INFO 82538 --- [ main] com.codespacelab.demo.DemoApplication : No active profile set, falling back to default profiles: default
2019-08-19 21:27:06.140 INFO 82538 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2019-08-19 21:27:06.147 INFO 82538 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2019-08-19 21:27:06.153 INFO 82538 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2019-08-19 21:27:06.219 INFO 82538 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-19 21:27:06.239 INFO 82538 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration$$EnhancerBySpringCGLIB$$658d09e9] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-19 21:27:06.254 INFO 82538 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration' of type [org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration$$EnhancerBySpringCGLIB$$411f1509] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-19 21:27:06.267 INFO 82538 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration' of type [org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration$$EnhancerBySpringCGLIB$$7a06bdd6] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-19 21:27:06.275 INFO 82538 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-19 21:27:06.698 INFO 82538 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8555 (http)
2019-08-19 21:27:06.727 INFO 82538 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2019-08-19 21:27:06.727 INFO 82538 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.22]
2019-08-19 21:27:06.831 INFO 82538 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-08-19 21:27:06.831 INFO 82538 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1440 ms
2019-08-19 21:27:07.388 INFO 82538 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-08-19 21:27:07.658 INFO 82538 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2019-08-19 21:27:07.782 INFO 82538 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel nullChannel
2019-08-19 21:27:07.797 INFO 82538 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel outputChannel
2019-08-19 21:27:07.836 INFO 82538 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel errorChannel
2019-08-19 21:27:07.861 INFO 82538 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel inputChannel
2019-08-19 21:27:07.871 INFO 82538 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageHandler errorLogger
2019-08-19 21:27:07.893 INFO 82538 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'demo.inputChannel' has 1 subscriber(s).
2019-08-19 21:27:07.898 INFO 82538 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2019-08-19 21:27:07.898 INFO 82538 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'demo.errorChannel' has 1 subscriber(s).
2019-08-19 21:27:07.898 INFO 82538 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started _org.springframework.integration.errorLogger
2019-08-19 21:27:08.020 INFO 82538 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5673, localhost:5672]
2019-08-19 21:27:08.099 INFO 82538 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#6917bb4:0/SimpleConnection@1d247525 [delegate=amqp://admin@127.0.0.1:5673/, localPort= 50038]
2019-08-19 21:27:08.134 INFO 82538 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'demo.outputChannel' has 1 subscriber(s).
2019-08-19 21:27:08.148 INFO 82538 --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: demo.demo-group, bound to: demo
2019-08-19 21:27:08.174 INFO 82538 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel demo.demo-group.errors
2019-08-19 21:27:08.245 INFO 82538 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'demo.demo.demo-group.errors' has 1 subscriber(s).
2019-08-19 21:27:08.246 INFO 82538 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'demo.demo.demo-group.errors' has 2 subscriber(s).
2019-08-19 21:27:08.266 INFO 82538 --- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.demo.demo-group
2019-08-19 21:27:08.303 INFO 82538 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8555 (http) with context path ''
2019-08-19 21:27:08.304 INFO 82538 --- [ main] com.codespacelab.demo.DemoApplication : Started DemoApplication in 3.338 seconds (JVM running for 3.663)
<=========----> 75% EXECUTING [20s]
> :bootRun
After our Spring Boot app is started we double check that our RabbitMQ cluster is running, exchange and queue is created. As stated in the logs. To do so open up http://localhost:15672 and login with admin/guest credentials.

Under Exchanges tab we should see our “demo” listed.

To test out full flow lets send 9 messages to RabbitMQ using our DemoApp. To do so we need to run this curl command in a terminal:
curl -X GET \
http://localhost:8555/messages/10 \
-H 'Accept: */*' \
-H 'Cache-Control: no-cache' \
-H 'Connection: keep-alive' \
-H 'Host: localhost:8555' \
-H 'Postman-Token: 83ced364-4eaa-41f7-9167-94cf75095fae,6ecdf370-67a0-4f25-8131-f4f911c084d6' \
-H 'accept-encoding: gzip, deflate' \
-H 'cache-control: no-cache'
Charts in Overview section in RabbitMQ management portal verifies that messages were received and sent by RabbitMQ:

Application logs shows that messages were received by DemoListener as well:
Event number: 1
Event number: 2
Event number: 3
Event number: 4
Event number: 5
Event number: 6
Event number: 7
Event number: 8
Event number: 9
DemoApps source code with RabbitMQ docker-compose file can be found on GitHub.
Add Comment