RabbitMQ and Spring Cloud Stream

RabbitMQ and Spring Cloud Stream

Event-driven micro-services is one of the buzz words in todays software development. In this article we will go through basic setup of Event-driven architecture, by utilizing: RabbitMQ message broker, Spring Boot and Spring Cloud Stream frameworks.

  • RabbitMQ is messaging technology used to provide asynchronous communication and decouple processes. More info on it here.
  • Spring Boot is a project built on top of the Spring framework. Its goal is to simplify and speed up setup, configuration and run of applications.
  • Spring Cloud Stream is a framework for building highly scalable event-driven micro-services connected with shared messaging systems.

Step One: RabbitMQ setup

For this project we will be using RabbitMQ HA setup we already discussed in a post: RabbitMQ Single Point of Failure.

Step Two: Basic Spring Boot Application

To setup our Spring Boot app lets use Spring Initializr website with following settings:

Download and extract zip file generated by clicking “Generate the project” button. Here you go simple Spring Boot app.

Step Three: Spring Cloud Stream Configuration

For this step first of all we will need to change our build.gradle file in Spring Boot application by including spring-cloud-stream-binder-rabbit library and run Gradle build command or ‘./gradlew build’ – from a terminal.

plugins {
	id 'org.springframework.boot' version '2.1.7.RELEASE'
	id 'io.spring.dependency-management' version '1.0.7.RELEASE'
	id 'java'
}

group = 'com.synchronoss'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

repositories {
	mavenCentral()
}

ext {
	set('springCloudVersion', "Greenwich.SR2")
}

dependencies {
	compile("org.springframework.boot:spring-boot-starter-web")
	compile("org.springframework.cloud:spring-cloud-stream-binder-rabbit")
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

dependencyManagement {
	imports {
		mavenBom "org.springframework.cloud:spring-cloud-dependencies:Greenwich.RELEASE"
	}
}

After Gradle build completes we can start setting up our app. First of all lets create application.yml file under resources and add following config on it.

server:
  port: 8555

spring:
  application:
    name: demo
  rabbitmq:
    addresses: localhost:5673,localhost:5672
    username: admin
    password: guest

  cloud:
    stream:
      bindings:
        outputChannel:
          destination: demo
        inputChannel:
          destination: demo
          group: demo-group

In the configuration under could.stream we are stating that we will be using RabbitMQ exchange called “demo” in two bindings: outputChannel and inputChannel. Let’s create them.

OutputBinding:

package com.codespacelab.demo;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

interface OutputBinding {

    @Output("outputChannel")
    MessageChannel channel();

}

InputBinding:

package com.codespacelab.demo;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

interface InputBinding {

    String MESSAGES = "inputChannel";

    @Input(MESSAGES)
    SubscribableChannel channel();

}

Then we need to enabled output binding to allow our Application to publish events to RabbitMQ.

package com.codespacelab.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(OutputBinding.class)
@SpringBootApplication
public class DemoApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoApplication.class, args);
	}

}

And create the ProducerController as a trigger to send a desired amount of messages.

package com.codespacelab.demo;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    private MessageChannel messages;

    public ProducerController(OutputBinding binding) {
        messages = binding.channel();
    }

    @GetMapping("/messages/{number}")
    public String publish(@PathVariable int number) {
        for(int i = 1; i < number; i++) {
            Message<String> msg = MessageBuilder.withPayload("Event number: " + i)
                    .build();
            messages.send(msg);
        }

        return "Finished";
    }

}

Now we will be able to send desired amount of messages/events to RabbitMQ by triggering our messages API. Let’s setup the consumer which will be reading these messages.

package com.codespacelab.demo;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

@EnableBinding(InputBinding.class)
public class DemoListener {

    @StreamListener(target = InputBinding.MESSAGES)
    public void processHelloChannelGreeting(String msg) {
        System.out.println(msg);
    }
}

Voil√† we have fully functional service which is able to publish events to RabbitMQ and then read them. But it doesn’t have to be the same service. Actually usually it won’t be the same service publishing and consuming events from the queue. This setup was chosen for simplicity of the application. It is valid use case too. For example you have service which exports all you client purchases from your e-shop over a year. That kind of process might take long time to finish and imagen fifty clients requesting for that report at the same time. Service would simply crash, but in our case all requests would be queued on rabbitMQ and handled one by one or on multiple threads. To finish up let’s test our Spring Boot application.

Step Four: Testing

First of all to test our setup we need to start our Spring Boot app. That can be done by running ‘./gradlew bootRun’ command in a terminal.

codespace:~ lab$ cd demo
codespace:demo lab$ ./gradlew bootRun

> Task :bootRun

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.7.RELEASE)

2019-08-19 21:27:05.355  INFO 82538 --- [           main] com.codespacelab.demo.DemoApplication    : Starting DemoApplication on IE-C02YH04DJGH7 with PID 82538 (/Users/codespace/demo/build/classes/java/main started by codespace in /Users/codespace/demo)
2019-08-19 21:27:05.357  INFO 82538 --- [           main] com.codespacelab.demo.DemoApplication    : No active profile set, falling back to default profiles: default
2019-08-19 21:27:06.140  INFO 82538 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2019-08-19 21:27:06.147  INFO 82538 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2019-08-19 21:27:06.153  INFO 82538 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2019-08-19 21:27:06.219  INFO 82538 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-19 21:27:06.239  INFO 82538 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration$$EnhancerBySpringCGLIB$$658d09e9] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-19 21:27:06.254  INFO 82538 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration' of type [org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration$$EnhancerBySpringCGLIB$$411f1509] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-19 21:27:06.267  INFO 82538 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration' of type [org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration$$EnhancerBySpringCGLIB$$7a06bdd6] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-19 21:27:06.275  INFO 82538 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-19 21:27:06.698  INFO 82538 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8555 (http)
2019-08-19 21:27:06.727  INFO 82538 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2019-08-19 21:27:06.727  INFO 82538 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.22]
2019-08-19 21:27:06.831  INFO 82538 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2019-08-19 21:27:06.831  INFO 82538 --- [           main] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 1440 ms
2019-08-19 21:27:07.388  INFO 82538 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2019-08-19 21:27:07.658  INFO 82538 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2019-08-19 21:27:07.782  INFO 82538 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel nullChannel
2019-08-19 21:27:07.797  INFO 82538 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel outputChannel
2019-08-19 21:27:07.836  INFO 82538 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel errorChannel
2019-08-19 21:27:07.861  INFO 82538 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel inputChannel
2019-08-19 21:27:07.871  INFO 82538 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageHandler errorLogger
2019-08-19 21:27:07.893  INFO 82538 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'demo.inputChannel' has 1 subscriber(s).
2019-08-19 21:27:07.898  INFO 82538 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2019-08-19 21:27:07.898  INFO 82538 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'demo.errorChannel' has 1 subscriber(s).
2019-08-19 21:27:07.898  INFO 82538 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2019-08-19 21:27:08.020  INFO 82538 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5673, localhost:5672]
2019-08-19 21:27:08.099  INFO 82538 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#6917bb4:0/SimpleConnection@1d247525 [delegate=amqp://admin@127.0.0.1:5673/, localPort= 50038]
2019-08-19 21:27:08.134  INFO 82538 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'demo.outputChannel' has 1 subscriber(s).
2019-08-19 21:27:08.148  INFO 82538 --- [           main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: demo.demo-group, bound to: demo
2019-08-19 21:27:08.174  INFO 82538 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel demo.demo-group.errors
2019-08-19 21:27:08.245  INFO 82538 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'demo.demo.demo-group.errors' has 1 subscriber(s).
2019-08-19 21:27:08.246  INFO 82538 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'demo.demo.demo-group.errors' has 2 subscriber(s).
2019-08-19 21:27:08.266  INFO 82538 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.demo.demo-group
2019-08-19 21:27:08.303  INFO 82538 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8555 (http) with context path ''
2019-08-19 21:27:08.304  INFO 82538 --- [           main] com.codespacelab.demo.DemoApplication    : Started DemoApplication in 3.338 seconds (JVM running for 3.663)
<=========----> 75% EXECUTING [20s]
> :bootRun

After our Spring Boot app is started we double check that our RabbitMQ cluster is running, exchange and queue is created. As stated in the logs. To do so open up http://localhost:15672 and login with admin/guest credentials.

Under Exchanges tab we should see our “demo” listed.

To test out full flow lets send 9 messages to RabbitMQ using our DemoApp. To do so we need to run this curl command in a terminal:

curl -X GET \
  http://localhost:8555/messages/10 \
  -H 'Accept: */*' \
  -H 'Cache-Control: no-cache' \
  -H 'Connection: keep-alive' \
  -H 'Host: localhost:8555' \
  -H 'Postman-Token: 83ced364-4eaa-41f7-9167-94cf75095fae,6ecdf370-67a0-4f25-8131-f4f911c084d6' \
  -H 'accept-encoding: gzip, deflate' \
  -H 'cache-control: no-cache'

Charts in Overview section in RabbitMQ management portal verifies that messages were received and sent by RabbitMQ:

Application logs shows that messages were received by DemoListener as well:

Event number: 1
Event number: 2
Event number: 3
Event number: 4
Event number: 5
Event number: 6
Event number: 7
Event number: 8
Event number: 9

DemoApps source code with RabbitMQ docker-compose file can be found on GitHub.

Add Comment

Your email address will not be published. Required fields are marked *