Beliebte Suchanfragen

Cloud Native

DevOps

IT-Security

Agile Methoden

Java

//

Event Driven Microservices with Spring Cloud Stream

4.4.2016 | 4 minutes of reading time

Lately I’ve been much into event driven architectures because I believe it’s the best approach for microservices, allowing for much more decoupled services than point-to-point communication. There are two main approaches for event driven communication:

  • Feed: Each application has a (synchronous) endpoint anybody may pull domain events from in a feed fashion.
  • Broker: There is a dedicated broker responsible for distributing the events, like Kafka.

Each approach has its up- and downsides. With a broker you have more infrastructure to handle, but you also have a central place where your events are stored. Feeds are not accessible when the producing application is down. Scaling is easier with a broker – what happens if you suddenly need to double your consuming applications because of load? Who subcribes to the feed? If both subscribe, events are processed twice. With a broker like Kafka you easily create consumer groups, and each event is only processed by one application of this group. So we preferred the broker way, and we decided to use Kafka.
So far so good – but we were impatient. We wanted to learn about event driven architectures, we didn’t want to spend weeks fighting with Kafka. And there came Spring Cloud Stream to the rescue.

Yes, we spent a little time setting up our own little playground with docker-compose, including Kafka and Zookeeper of course, but also Spring Cloud Config, Spring Boot Admin and an integrated Continuous Delivery setup with Jenkins, Nexus and Sonar. You can find it here: https://github.com/codecentric/event-driven-microservices-platform . Then we thought that the tough part would come – connecting to and using Kafka. We stumbled over Spring Cloud Stream – and using Kafka was a matter of minutes.

Dependencies

You need to add one dependency to your pom:

1<dependency>
2        <groupId>org.springframework.cloud</groupId>
3        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
4    </dependency>

As parent I use the spring-cloud-starter-parent in the most current version (at time of writing Brixton.RC1). It solves all the version management for me.

1<parent>
2        <groupId>org.springframework.cloud</groupId>
3        <artifactId>spring-cloud-starter-parent</artifactId>
4        <version>Brixton.RC1</version>
5    </parent>

When using Actuator, Spring Cloud Stream automatically adds a HealthIndicator for the Kafka binder, and a new actuator endpoint /channels with all the channels used in the application.

Producing events

In our sample application we produce one event every 10 seconds with a Poller.

1@SpringBootApplication
2@EnableBinding(Source.class)
3public class EdmpSampleStreamApplication {
4 
5    public static void main(String[] args) {
6        SpringApplication.run(EdmpSampleStreamApplication.class, args);
7    }
8 
9    @Bean
10    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
11    public MessageSource<TimeInfo> timerMessageSource() {
12        return () -> MessageBuilder.withPayload(new TimeInfo(new Date().getTime()+"","Label")).build();
13    }
14 
15    public static class TimeInfo{
16 
17        private String time;
18        private String label;
19 
20        public TimeInfo(String time, String label) {
21            super();
22            this.time = time;
23            this.label = label;
24        }
25 
26        public String getTime() {
27            return time;
28        }
29 
30        public String getLabel() {
31            return label;
32        }
33 
34    }
35 
36}

When using @EnableBinding(Source.class) Spring Cloud Stream automatically creates a message channel with the name output which is used by the @InboundChannelAdapter. You may also autowire this message channel and write messages to it manually. Our application.properties looks like this:


spring.cloud.stream.bindings.output.destination=timerTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.zkNodes=kafka
spring.cloud.stream.kafka.binder.brokers=kafka

It basically says that we want to bind the output message channel to the Kafka timerTopic, and it says that we want to serialize the payload into JSON. And then we need to tell Spring Cloud Stream the host name where Kafka and Zookeeper are running – defaults are localhost, we are running them in one Docker container named kafka.

Consuming events

Our sample application for consuming events looks like this:

1@SpringBootApplication
2@EnableBinding(Sink.class)
3public class EdmpSampleStreamSinkApplication {
4 
5    private static Logger logger = LoggerFactory.getLogger(EdmpSampleStreamSinkApplication.class);
6 
7    public static void main(String[] args) {
8        SpringApplication.run(EdmpSampleStreamSinkApplication.class, args);
9    }
10 
11    @StreamListener(Sink.INPUT)
12    public void loggerSink(SinkTimeInfo sinkTimeInfo) {
13        logger.info("Received: " + sinkTimeInfo.toString());
14    }
15 
16    public static class SinkTimeInfo{
17 
18        private String time;
19        private String label;
20 
21        public String getTime() {
22            return time;
23        }
24 
25        public void setTime(String time) {
26            this.time = time;
27        }
28 
29        public void setSinkLabel(String label) {
30            this.label = label;
31        }
32 
33        public String getLabel() {
34            return label;
35        }
36 
37        @Override
38        public String toString() {
39            return "SinkTimeInfo [time=" + time + ", label=" + label + "]";
40        }
41 
42    }
43 
44}

When using @EnableBinding(Sink.class) Spring Cloud Stream automatically creates a message channel with the name input which is used by the @StreamListener above. Our application.properties look like this:


spring.cloud.stream.bindings.input.destination=timerTopic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=timerGroup
spring.cloud.stream.kafka.bindings.input.consumer.resetOffsets=true
spring.cloud.stream.kafka.binder.zkNodes=kafka
spring.cloud.stream.kafka.binder.brokers=kafka

$ docker-machine create -d virtualbox --virtualbox-memory "6000" --virtualbox-disk-size "40000" default
$ eval "$(docker-machine env default)"
$ git clone git@github.com:codecentric/event-driven-microservices-platform.git
$ cd event-driven-microservices-platform
$ docker-compose up

Now get a coffee, have a chat with the colleagues, or surf around the internet while Docker is downloading it. Then go to http://${docker-machine ip default}:18080/ and you should see something like this:

Then go to Spring Boot Admin at http://${docker-machine ip default}:10001/ and you should see something like this:

And if you take a look at the logs of edmp-sample-stream-sink you’ll see the events coming in.

share post

Likes

0

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.