AMQP Messaging mit RabbitMQ und Spring

3 Kommentare

RabbitMQ ist als Messaging-System Teil der vFabric Cloud Application Platform. Die Unterstützung des performanten Messaging Protokolls AMQP prädestiniert RabbitMQ für den Einsatz in Hochverfügbarkeitsszenarien. RabbitMQ ist ein Open-Source-Produkt und kann daher auch außerhalb der vFabric Plattform eingesetzt werden. Kommerzieller Support steht bei Bedarf zur Verfügung.

In diesem Artikel soll gezeigt werden, wie Sie mit Hilfe von Spring AMQP einen RabbitMQ-Broker an Ihre Java-Anwendung anbinden können.

Erlang/OTP Installation

RabbitMQ ist in Erlang implementiert. Da Erlang auf einer eigenen Runtime ausgeführt wird, müssen wir zunächst die Erlang-Runtime OTP (Open Telecom Platform) installieren. Wir verwenden den Release R14B02 für die Windows-Plattform. Als Installationsverzeichnis wählen wir C:erl5.8.3 und definieren eine Umgebungsvariable, die auf dieses Verzeichnis zeigt:

ERLANG_HOME=C:\erl5.8.3

RabbitMQ Installation

Nach dem Download von RabbitMQ entpacken wir das ZIP nach C:rabbitmq_server-2.4.1. Gestartet wird RabbitMQ durch den Aufruf des Skripts

C:\rabbitmq_server-2.4.1\sbin\rabbitmq-server.bat
RabbitMQ Server

RabbitMQ Server

RabbitMQ zeichnet sich durch einen geringen initialen Speicherverbrauch aus und ist schnell hochgefahren, was in elastisch skalierenden Cloud-Umgebungen von Vorteil ist. Client-APIs werden für verschiedene Sprachen angeboten, u.a. gibt es APIs für Java und .NET.

Spring AMQP

Mit Spring AMQP steht ein komfortables API zur Verfügung, um auf AMQP-fähige Message-Broker zugreifen zu können. Wie üblich gibt es ein Template als zentralen Zugriffspunkt. In unserem Fall das AmqpTemplate.

Die Abhängigkeiten zwischen den wesentlichen benötigten Spring-Projekten sind in der folgenden Abbildung dargestellt:

Spring AMQP API

Spring AMQP API

Das Projekt spring-amqp enthält im Wesentlichen allgemeine Interfaces (z.b. das AmqpTemplate) und Schnittstellenklassen, während in spring-rabbitmq Broker-spezifische Implementierungen liegen. spring-rabbitmq stützt sich dabei auf die allg. Java API für RabbitMQ amqp-client.

Ihre Client-Anwendung hängt idealerweise nur von spring-amqp ab, so dass eine lose Kopplung entsteht und der AMQP-Broker bei Bedarf ausgetauscht werden kann.

Für die folgenden Code-Beispiele wurde diese Maven-Artefakte verwendet:

...
<repositories>
	<repository>
		<id>repository.springframework.maven.milestone</id>
		<name>Spring Framework Maven Milestone Repository</name>
		<url>http://maven.springframework.org/milestone</url>
	</repository>
</repositories>
<properties>
	<spring.framework.version>3.0.5.RELEASE</spring.framework.version>
	<spring.amqp.version>1.0.0.M3</spring.amqp.version>
	<rabbitmq.version>2.2.0</rabbitmq.version>
</properties>
<dependencies>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-core</artifactId>
		<version>${spring.framework.version}</version>
	</dependency>
	<dependency>
		<groupId>org.springframework.amqp</groupId>
		<artifactId>spring-amqp</artifactId>
		<version>${spring.amqp.version}</version>
		<exclusions>
			<exclusion>
				<groupId>com.sun.jmx</groupId>
				<artifactId>jmxri</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.springframework.amqp</groupId>
		<artifactId>spring-rabbit</artifactId>
		<version>${spring.amqp.version}</version>
	</dependency>
	<dependency>
		<groupId>org.springframework.amqp</groupId>
		<artifactId>spring-erlang</artifactId>
		<version>${spring.amqp.version}</version>
	</dependency>
	<dependency>
		<groupId>com.rabbitmq</groupId>
		<artifactId>amqp-client</artifactId>
		<version>${rabbitmq.version}</version>
	</dependency>
	<dependency>
		<groupId>junit</groupId>
		<artifactId>junit</artifactId>
		<version>4.7</version>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-test</artifactId>
		<version>${spring.framework.version}</version>
		<scope>test</scope>
	</dependency>
</dependencies>
...

AMQP Template

Der Einfachheit halber werden die Code-Beispiele in Form eines JUnit-Tests präsentiert. Im Application Context werden eine Connection Factory und das AmqpTemplate definiert. Zusätzlich spendieren wir noch ein Bean zur Administration des Brokers.

<!-- Connection Factory -->
<bean id="rabbitConnFactory" 
	class="org.springframework.amqp.rabbit.connection.SingleConnectionFactory">
	<constructor-arg><value>localhost</value></constructor-arg>
	<property name="username" value="guest" />
	<property name="password" value="guest" />
	<property name="virtualHost" value="/" />
	<property name="port" value="5672" />
</bean>
 
<!-- Spring AMQP Template -->
<bean id="template" 
	class="org.springframework.amqp.rabbit.core.RabbitTemplate">
	<property name="connectionFactory" ref="rabbitConnFactory" />
	<property name="routingKey" value="test.queue"/>
	<property name="queue" value="test.queue"/>
</bean>
 
<!-- Spring AMQP Admin -->
<bean id="admin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
	<constructor-arg ref="rabbitConnFactory" />
</bean>

Die Connection Factory wird im Wesentlichen mit den TCP/IP-Verbindungsdaten zum RabbitMQ Server versorgt. Default-Port nach einer RabbitMQ-Installation ist 5672, als Credentials können guest/guest verwendet werden.

Ein konkretes Template bezieht sich auf eine bestimmte Queue, diese trägt den Namen test.queue.

Im folgenden Beispiel können wir guten Gewissens Autowiring verwenden, da wir genau eine Implementierung konfiguriert haben. Wir lassen uns ein AmqpAdmin und ein AmqpTemplate injizieren:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class RabbitMQClientTest {
 
	@Autowired private AmqpAdmin admin;
	@Autowired private AmqpTemplate template;
 
	@Test public void simpleProducerConsumerTest() {
		try {
			String sent = "Catch the rabbit! " + new Date();
			admin.declareQueue( new Queue("test.queue") );
 
			// write message
			template.convertAndSend( sent );
			// read message
			String received = (String)template.receiveAndConvert();
 
			System.out.println( "Msg: " + received );
			Assert.assertEquals( sent, received );
 
		} catch (AmqpException e) {
			Assert.fail( "Test failed: " + e.getLocalizedMessage() );
		}
	}
}

Der Test verwendet zunächst den AmqpAdmin, um die gewünschte Queue test.queue anzulegen. Diese Operation ist idempotent, d.h. sie legt eine Queue nur dann an, wenn diese noch nicht existiert.

Danach kann mit convertAndSend(...) bequem ein beliebiges Objekt über die Leitung gesendet werden. AMQP kennt intern nur Byte-Arrays als Nachrichtentyp, daher übernimmt das Template automatisch die Konvertierung, sofern kein eigener MessageConverter konfiguriert wird. Für unser Beispiel reichen die Standard-Converter, da sowohl Producer als auch Consumer in Java sind.

Anschließend lesen wir mit receiveAndConvert(...) synchron die nächste Nachricht aus unserer Queue und geben diese aus.

Die AmqpException ist wie üblich eine RuntimeException, müsste also nicht direkt mit try/catch behandelt werden, was wir als gute Tester hier aber dennoch tun.

Zusammenfassung

Wir haben gezeigt, wie wir RabbitMQ und die erforderliche Erlang-Runtime installieren und sind auf die Spring-APIs für AMQP eingegangen. Mit einem kleinen Test haben wir gezeigt, wie man als AMQP Message Producer bzw. Message Consumer agiert.

Autor

Tobias Trelle

Tobias Trelle

Senior IT Consultant

Share on FacebookGoogle+Share on LinkedInTweet about this on TwitterShare on RedditDigg thisShare on StumbleUpon

Kommentare

  • Hi,
    schöner Blog-Post – ich denke, RabbitMQ und AMQP brauchen noch viel mehr Sichtbarkeit in der Java-Welt!
    Einige kleine Hinweise:
    – Die SingleConnectionFactory ist für kleine Beispiele OK, aber in realen Anwendungen sollte man dann die CachingConnectionFactory nutzen.
    – Es gibt noch den rabbit-XML-Namespace, der die Konfiguration vereinfacht. Die Nutzung in dem Beispiel würde nach meinem Empfinden kaum einen Vorteil bringen.
    – Die beiden Test-Methoden haben eine Abhängigkeit – es muss erst queueProducer() und dann queueConsumer() ausgeführt werden. Warum nicht:

    admin.declareQueue( new Queue("test.queue") );
    String msg= date = "Catch the rabbit! " + date;
    template.convertAndSend( msg );
    Assert.assertEquals(msg, template.receiveAndConvert("test.queue"));

    – Und für das Empfangen von Nachrichten sind MessageListenerContainer natürlich toll.

  • Tobias Trelle

    April 21, 2011 von Tobias Trelle

    Habe den Test entsprechend angepasst. In der jetzigen Form ist er auch viel präziser.

    Ansonsten war das Beispiel schon absichtlich so einfach wie möglich gehalten. Wer weiss, vielleicht gibt es bald einen weiteren Post zum Thema „RabbitMQ Advanced“?

    I adjusted the test case. It’s more concise that way.

    The example itself was so simple by intent. How knows, maybe we’ll have another post on advanced RabbitMQ messaging.

  • Hallo,
    sehr schön geschriebener Artikel.
    Zurzeit habe ich RabbitMQ noch nicht im einsatz, aber ich denke ich werde mal ein genaueren Blick darauf werfen.

Kommentieren

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind markiert *