Introduction
The messaging standards that an application uses for communication varies and Advanced Message Queuing Protocol (ADQP) aims in providing standards with respect to messaging communication like the format of the message, various contracts to be implied, etc. Note that ADQP is just a protocol and it is not tied with any specific technology or language (unlike JMS which is tightly coupled with Java). Some of the popular implementations available are Qpid (from Apache), Rabbit MQ (from Vmware) etc.
also read:
Spring AMQP framework provides a complete end-to-end solution for integrating messaging solution providers into the application. Note that this framework itself doesn’t provide the messaging solutions such as sending and receiving messages, but instead it simplifies the job of integrating applications with existing messaging solution providers. The framework hierarchy streams into two divisions, one for the Java platform and the other for the .NET platform. The article covers only the integration support on the Java platforms. We will discuss about the various capabilities available in the form of classes/interfaces before moving on into demonstration.
Spring’s AMQP API
We will look into the core interfaces and classes of Spring’s AMQP API in this section.
Message
The Message class in Spring AMQP conforms to AMQP standards and as the name implies, it represents the message that an application can send or receive. However, an application never deals with this class directly, there are bunch of helper classes available for dealing with the data encapsulated in the message. Closely associated with the Message class is the MessageProperties class which defines the various properties of the message. One such property is the message type which can be plain, in serialized form, bytes or in JSON (Java script Object Notation). There are also other useful message properties such as message id, priority, headers, delivery mode, content length, reply to field etc.
Exchange/Queue/Binding
Exchange in AMQP defines the target for a sender application for sending messages. Queue refers to the target for the receiving applications for fetching the messages. Binding defines the linking between an Exchanger and a Queue. As mentioned, Exchange defines the destination where the messages will be placed, three different types of Exchanges are identified in AMQP. They are:
- Direct
- Topic
- Fanout
In Direct Exchange, the queue that links with the Exchange using the Binding can have the link established only through the name of the queue that corresponds to the Exchange’s routing key. For example, if an Exchange is created with the routing key ‘test’, then the Queue that links with this Exchange must have the queue name configured as ‘test’. If the Queue fails to give to specify a name that is equivalent to the Exchange routing key, then the message won’t to be delivered to the Queue.
In Topic Exchange, the routing key may include patterns such as ‘*’ (match one character) and ‘#’ (match any character). For example, if the Exchange is created with the routing key ‘test*’, then the queue with name ‘test1’, ‘test2’ that links with the Binding can receive the message. In Fanout Exchange mode, whatever the routing key is specified in the Exchange, which means that whatever Queues that are bound with the exchange will have the messages delivered.
ConnectionFactory/Connection
ConnectionFactory represents the interface for creating Connection
objects that can be used by the applications before sending or receiving messages. One concrete implementation of the ConnectionFactory
interface namely SingleConnectionFactory
is available that can be used for creating connection factory instance. Calling createConnection()
on the ConnectionFactory
creates and opens up a new connection to the messaging broker. The messaging broker properties such as the host name, port, username and password
needs to be configured at the ConnectionFactory level before acquiring a Connection.
Setting up the environment
In this section, we will see how to make the environment ready for building a sample application that will be illustrated in the next section. The following packages need to be available for building and running the sample:
- Spring AMQP and its dependencies – http://www.springsource.org/spring-amqp
- Rabbit MQ Client libraries – http://www.rabbitmq.com/java-client.html
- Rabbit MQ Server – http://www.rabbitmq.com/server.html
- Erlang OTP – http://www.erlang.org/download/otp_win32_R13B03.exe
Spring AMQP and its dependencies
Unzip the downloaded distribution and place it in a folder, example – C:\Spring\AMQP. The distribution will contain the following the jars that has to be set to the class path for any AMQP application
- spring-amqp-1.0.0.M1.jar
- spring-erlang-1.0.0.M1.jar
- spring-rabbit-1.0.0.M1.jar
- spring-rabbit-admin-1.0.0.M1.jar
Note that the above libraries will have dependency with the core Spring libraries. We have used Spring 3 libraries in this example. So make sure that the following libraries are also available in the class path.
- spring-aop-3.0.3.RELEASE.jar
- spring-beans-3.0.3.RELEASE.jar
- spring-context-3.0.3.RELEASE.jar
- spring-context-support-3.0.3.RELEASE.jar
- spring-core-3.0.3.RELEASE.jar
- spring-expression-3.0.3.RELEASE.jar
Rabbit MQ Client libraries
Rabbit MQ client libraries will be required for client applications that need to send and receive messages. So make sure that the libraries are present in the classpath of the application.
- commons-cli-1.1.jar
- commons-io-1.2.jar
- rabbitmq-client.jar
Erlang OTP
Rabbit MQ Server is dependant on Erand OTP. After downloading Erlang OTP, run the installation with the setup file. Once the installation is complete, make sure to create a environment variable ERLANG_HOME pointing to the installation’s home directory, for example “C:\Languages\Erl\Erl5.7.4”.
Rabbit MQ Server
Rabbit MQ Server provides the messaging solutions for sending and receiving messages. Installing the server is as simple as unzipping the distribution to a desired location. With the assumption that the environment variable is already set, locate the file ‘rabbitmq-server.bat’ found in RABBIT_MQ_SERVER_HOME/sbin directory and run it. It will produce an output similar to the following,
Activating RabbitMQ plugins ... 0 plugins activated: +---+ +---+ | | | | | | | | | | | | | +---+ +-------+ | | | RabbitMQ +---+ | | | | | | v2.0.0 +---+ | | | +-------------------+ AMQP 0-9-1 / 0-9 / 0-8 Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. Licensed under the MPL. See http://www.rabbitmq.com/ node : rabbit@XYZ app descriptor : f:/Messaging/RabbitMQ/Server/rabbitmq-server-windows- 2.0.0/rabbitmq_server-2.0.0/sbin/../ebin/rabbit.app home dir : C:\Documents and Settings\ XYZ cookie hash : R9IMCzDGrduFqrqU3VYieA== log : C:/Documents and Settings/ XYZ /Application Data/RabbitMQ/log/[email protected] sasl log : C:/Documents and Settings/XYZ/Application Data/RabbitMQ/log/[email protected] database dir : c:/Documents and Settings/XYZ/Application Data/RabbitMQ/db/rabbit@XYZ-mnesia erlang version : 5.7.4 starting file handle cache server ...done starting worker pool ...done starting database ...done starting codec correctness check ...done -- external infrastructure ready starting exchange type registry ...done starting exchange type topic ...done starting exchange type headers ...done starting exchange type fanout ...done starting exchange type direct ...done starting statistics event manager ...done starting internal event notification system ...done starting logging server ...done -- kernel ready starting alarm handler ...done starting node monitor ...done starting cluster delegate ...done starting guid generator ...done starting memory monitor ...done -- core initialized starting empty DB check ...done starting exchange recovery ...done starting queue supervisor and queue recovery ...done -- message delivery logic ready starting error log relay ...done starting networking ...done -- network listeners available broker running
Demonstration
In this example, we will see a simple demonstration on sending and receiving messages using Spring’s AMQP framework. The messaging broker that we will be using is Rabbit MQ Server. The demonstration is as simple as the sender application sends a message to the queue and the receiver applications picks up the message and displays it in the console.
Configuration
The Configuration class listed below does the job of returning a suitable Template object that will be used by the calling applications for sending and receiving message. The minimal requirements for a AMQP Template object is set by connection factory instance so that it can open up the connection to the specified messaging broker, specify the exchange mode (direct, topic, fanout) and to specify the queue name. Given these details, the template class is capable of creating connection, exchange and queue objects. Once can see that the Connection, Exchange, Queue classes are completely hidden from the application developer and the focus can be made only on the business logic.
TestConfiguration.java
package net.javabeat.articles.spring.amqp.test; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.AbstractRabbitConfiguration; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TestConfiguration extends AbstractRabbitConfiguration { private String routingKey = "test.queue"; private String testQueueName = "test.queue"; public ConnectionFactory getConnectionFactory() { SingleConnectionFactory connectionFactory = new SingleConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Override public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(getConnectionFactory()); rabbitTemplate.setRoutingKey(routingKey); rabbitTemplate.setQueue(testQueueName); return rabbitTemplate; } }
The above configuration extends from AbstractRabbitConfiguration class and it is annotated with @Configuration annotation. Later on we will see the usage of this annotation. The template method rabbitTemplate() is overridden to return an instance of RabbitTemplate class. Before returning the instance, the mandatory properties such the Connection Factory instance, routing key and the queue name are specified. Note that in this example, the routing key and the queue name are the same which means the Binding will be established between Direct Exchange with the specified Queue. We have used the SingleConnectionFactory class for creating a ConnectionFactory instance for Rabbit MQ Server. The defaut username, password and port are ‘guest’, ‘guest’ and ‘5672’ respectively. If during the installation of Rabbit MQ Server, if any of these values got change, then appropriate values have to be specified to the ConnectionFactory instance.
Message Sender
The message Sender applications prepare the message to be sent, and it creates an instance of the Configuration class that we have created previously. Because we have used the annotations @Configuration and @Bean in the TestConfiguration class, it is now possible to instantiate the bean RabbitTemplate using context.getBean(RabbitTemplate.class).
MessageSender.java
package net.javabeat.articles.spring.amqp.test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class MessageSender { public void send(){ String messageToBeSent = "Test message"; ApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class); RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); rabbitTemplate.convertAndSend(messageToBeSent); System.out.println("Message '" + messageToBeSent + "' sent."); } public static void main(String[] args) { MessageSender sender = new MessageSender(); sender.send(); } }
The above class makes use of the method convertAndSend() defined on the RabbitTemplate class for sending the message. In the later sections, we will see the variations of this method for sending messages.
Message Receiver
MessageReceiver.java
package net.javabeat.articles.spring.amqp.test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class MessageReceiver { public void receive(){ ApplicationContext applicationContext = new AnnotationConfigApplicationContext(TestConfiguration.class); RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); Object message = rabbitTemplate.receiveAndConvert(); if (message instanceof String){ String strMessage = (String)message; System.out.println("String message received is '" + strMessage + "'"); }else{ System.out.println("Unknown message received '" + message + "'"); } } public static void main(String[] args) { MessageReceiver receiver = new MessageReceiver(); receiver.receive(); } }
The above class almost does the same job as that of the Message Sender except that it calls the method receiveAndConvert() defined on the RabbitTemplate class for receiving the message. Note that for running the application, the bat file ‘rabbitmq-server.bat’ has to be started. Once it is successfully started, open a command prompt and the run the class ‘MessageSender’ which will send the message. Now this message is stored in the Exchange of Rabbit MQ Server. Now open a different console for running the class Message Receiver which binds the queue to the Exchange for receiving the message which is then displayed in the console.
Miscellaneous
In this section, we will look into the overloaded functionalities present in Spring AMQP. We will look into the various ways of sending and receiving messages. We will also have a look at writing custom Message Converters and how to hook them.
Sending and Receiving Messages
A message producer application can send a message to the exchange in plenty of ways and the one that we have already seen is to call the method convertAndSend() defined on the RabbitTemplate object. For example, if we wish to post process the message once it is created, we can define a custom message post processor for achieving that. Have a look at the following code snippet,
MessagePostProcesserTest.java
package net.javabeat.articles.spring.amqp.misc; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; public class MessagePostProcesserTest { public static void main(String[] args) { RabbitTemplate rabbitTempate = null; // Inititate the above object Object messageObject = null; // Inititate the above object CustomMessagePostProcessor postProcessor = new CustomMessagePostProcessor(); rabbitTempate.convertAndSend(messageObject, postProcessor); } } class CustomMessagePostProcessor implements MessagePostProcessor{ @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setAppId("TestApplication-1"); return message; } }
In the above class, we have ensured that the application id is always set on the message object before sending it by attaching a custom message post processor object. This scenario is usually used when the message comes from a different source and the current application has to modify the message properties.
It is also possible to specify the routing key that represents the exchange destination to which the message has to be sent. By setting this property, it will override the routing key that is already set on the Rabbit Template object. Consider the following code which just does that.
SendMessageTest1.java
package net.javabeat.articles.spring.amqp.misc; import org.springframework.amqp.rabbit.core.RabbitTemplate; public class SendMessageTest1 { public static void main(String[] args) { RabbitTemplate rabbitTemplate = null; // Populate the rabbitTemplate object. String routingKey = "ABC"; Object messageObject = null; // Populate the message object rabbitTemplate.convertAndSend(routingKey, messageObject); } }
The other way to send a message is to make use of the Message Creator object which actually creates a message before sending it. The following code will demonstrate that.
SendMessageTest2.java
package net.javabeat.articles.spring.amqp.misc; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageCreator; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; public class SendMessageTest2 { public static void main(String[] args) { RabbitTemplate rabbitTemplate = null; // Populate the rabbitTemplate object. String routingKey = "ABC"; Object messageObject = null; // Populate the message object MyMessageCreator creator = new MyMessageCreator(); rabbitTemplate.send(routingKey, creator); } } class MyMessageCreator implements MessageCreator{ @Override public Message createMessage() { MessageProperties properties = null; // Populate the properties object Message myCustomMessage = new Message("ABCD".getBytes(), properties); return myCustomMessage; } }
In the above code, the message creating process is externalized from the code that sends the message with the help of MessageCreator interface. Messages can be received either in synchronous or in asynchronous fashion. In the example code that we saw in the last section, we received the message synchronously. Technically this means that the method call is blocked until the message is consumed by the message receiver application. The following code will explain the nature of receiving asynchronous messages.
ReceiveMessageTest1.java
package net.javabeat.articles.spring.amqp.misc; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; public class ReceiveMessageTest1 { public static void main(String[] args) { SimpleMessageListenerContainer container = null; // Instantiate the above container MyMessageListener listener = new MyMessageListener(); container.setMessageListener(listener); } } class MyMessageListener implements MessageListener{ @Override public void onMessage(Message message) { byte[] messageContent = message.getBody(); System.out.println("Message receied is " + messageContent); } }
In the above code, we have ensured that the Simple Message Listener Container has been set with a message listener object so that the method MessageListener.onMessage() is called whenever a message arrives.
Message Converters
Message Converter does the job of converting messages between Message and a regular java object which is represented through MessgeConverter interface. Have a look at the following code.
ConverterTest1.java
package net.javabeat.articles.spring.amqp.convert; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.SimpleMessageConverter; public class ConverterTest1 { public static void main(String[] args) { RabbitTemplate rabbitTemplate = null; // Instantiate the above object. SimpleMessageConverter converter = new SimpleMessageConverter(); rabbitTemplate.setMessageConverter(converter); Object message = null; // Instantiate the above message object rabbitTemplate.convertAndSend(message); } }
In the above code, before sending the message, we have created an instance of SimpleMessgeConverter object and attached it to the RabbitTemplte object. However, this is not needed, since if the message converter object is not explicitly specified by the caller, it will make use of SimpleMessageConverter for converting the message. If the encoding of the message object is set to text/plain, then this converter will convert the byte content into a String, whereas if the encoding is set to application/x-java-serialized-object, then it wil use Java Seriaization mechanism for de-serializing the byte array (found in the message) into a Java object. It is always possible to hook in a Custom Message Converter implementation by extending the interface MessageConverter.
Conclusion
This article provided introductory knowledge on Spring’s AMQP for sending and receiving message through Spring’s configuration mechanism. It outlined the benefits of this technology over the rest of the messaging solutions. Setting up the environment for running the sample was also explained in greater depth. Through the usage of the sample application, the various concepts like Exchange, Queue and Message objects were also demonstrated. Finally the article concluded with exploring the alternate APIs for sending and receiving messages through Message Creators and Message Post Processors along with Message Converters.
also read: