Showing posts with label JMS. Show all posts
Showing posts with label JMS. Show all posts

Wednesday, November 26, 2014

JMS - Publish/Subscribe messaging example using ActiveMQ & Maven


In a publish/subscribe (pub/sub) product or application, clients address messages to a topic, which functions somewhat like a bulletin board. Subscribers can receive information, in the form of messages, from publishers. Topics retain messages only as long as it takes to distribute them to current subscribers. The following post introduces the basic concepts of JMS point-to-point messaging and illustrates them with a code sample using ActiveMQ and Maven.

Publish/Subscribe Messaging

Pub/sub messaging has the following characteristics:
  • Each message can have multiple consumers.
  • Publishers and subscribers have a timing dependency. A client that subscribes to a topic can consume only messages published after the client has created a subscription, and the subscriber must continue to be active in order for it to consume messages.
The JMS API relaxes this timing dependency mentioned in the second bullet to some extent by allowing subscribers to create durable subscriptions, which receive messages sent while the subscribers are not active. Durable subscriptions provide the flexibility and reliability of queues but still allow clients to send messages to many recipients.

ActiveMQ Example

Let's illustrate the above characteristics by creating a message producer that sends a message containing a first and last name to a topic. In turn a message consumer will read the message and transform it into a greeting. The code is very similar to the JMS Hello World example but contains a few key differences explained below.

Tools used:
  • ActiveMQ 5.10
  • Maven 3

The code is built and run using Maven. Specified below is the Maven POM file which contains the needed dependencies for Logback, JUnit and ActiveMQ.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>info.source4code</groupId>
<artifactId>jms-activemq-publish-subscribe</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<name>JMS - Publish/Subscribe messaging using ActiveMQ</name>
<url>http://www.source4code.info/2014/11/jms-publish-subscribe-messaging-example-activemq-maven.html</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.6</java.version>

<logback.version>1.1.2</logback.version>
<slf4j.version>1.7.7</slf4j.version>
<junit.version>4.12-beta-2</junit.version>
<activemq.version>5.10.0</activemq.version>

<maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
</properties>

<dependencies>
<!-- Logging -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- JUnit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- ActiveMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

Nondurable Subscription

The Publisher class contains a constructor which creates a message producer and needed connection and session objects. The sendName() operation takes as input a first and last name which are set on a TextMessage which in turn is sent to the topic set on the message producer.
package info.source4code.jms.activemq.pubsub;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Publisher {

private static final Logger LOGGER = LoggerFactory
.getLogger(Publisher.class);

private String clientId;
private Connection connection;
private Session session;
private MessageProducer messageProducer;

public void create(String clientId, String topicName) throws JMSException {
this.clientId = clientId;

// create a Connection Factory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);

// create a Connection
connection = connectionFactory.createConnection();
connection.setClientID(clientId);

// create a Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// create the Topic to which messages will be sent
Topic topic = session.createTopic(topicName);

// create a MessageProducer for sending messages
messageProducer = session.createProducer(topic);
}

public void closeConnection() throws JMSException {
connection.close();
}

public void sendName(String firstName, String lastName) throws JMSException {
String text = firstName + " " + lastName;

// create a JMS TextMessage
TextMessage textMessage = session.createTextMessage(text);

// send the message to the topic destination
messageProducer.send(textMessage);

LOGGER.debug(clientId + ": sent message with text='{}'", text);
}
}

The Subscriber class contains a constructor which creates a message consumer and needed connection and session objects. The getGreeting() operation reads a message from the topic and creates a greeting which is returned. A timeout parameter is passed to assure that the method does not wait indefinitely for a message to arrive.
package info.source4code.jms.activemq.pubsub;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Subscriber {

private static final Logger LOGGER = LoggerFactory
.getLogger(Subscriber.class);

private static final String NO_GREETING = "no greeting";

private String clientId;
private Connection connection;
private Session session;
private MessageConsumer messageConsumer;

public void create(String clientId, String topicName) throws JMSException {
this.clientId = clientId;

// create a Connection Factory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);

// create a Connection
connection = connectionFactory.createConnection();
connection.setClientID(clientId);

// create a Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// create the Topic from which messages will be received
Topic topic = session.createTopic(topicName);

// create a MessageConsumer for receiving messages
messageConsumer = session.createConsumer(topic);

// start the connection in order to receive messages
connection.start();
}

public void closeConnection() throws JMSException {
connection.close();
}

public String getGreeting(int timeout) throws JMSException {

String greeting = NO_GREETING;

// read a message from the topic destination
Message message = messageConsumer.receive(timeout);

// check if a message was received
if (message != null) {
// cast the message to the correct type
TextMessage textMessage = (TextMessage) message;

// retrieve the message content
String text = textMessage.getText();
LOGGER.debug(clientId + ": received message with text='{}'", text);

// create greeting
greeting = "Hello " + text + "!";
} else {
LOGGER.debug(clientId + ": no message received");
}

LOGGER.info("greeting={}", greeting);
return greeting;
}
}

The below JUnit test class will be used to illustrate the Pub/Sub messaging characteristics mentioned at the beginning of this post. The testGreeting() test case verifies the correct working of the getGreeting() method of the Subscriber class.

The testMultipleConsumers() test case will verify that the same message can be read by multiple consumers. In order to test this, two Subscriber instances are created on the same 'multipleconsumers.t' topic.

Finally the testNonDurableSubscriber() test case will illustrate the timing dependency between publisher and subscriber. First a message is sent to a topic on which only one subscriber listens. Then a second subscriber is added to the same topic and a second message is sent. The result is that the second subscriber only receives the second message and not the first one whereas the first subscriber has received both messages.
package info.source4code.jms.activemq.pubsub;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import javax.jms.JMSException;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class SubscriberTest {

private static Publisher publisherPublishSubscribe,
publisherMultipleConsumers, publisherNonDurableSubscriber;
private static Subscriber subscriberPublishSubscribe,
subscriber1MultipleConsumers, subscriber2MultipleConsumers,
subscriber1NonDurableSubscriber, subscriber2NonDurableSubscriber;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
publisherPublishSubscribe = new Publisher();
publisherPublishSubscribe.create("publisher-publishsubscribe",
"publishsubscribe.t");

publisherMultipleConsumers = new Publisher();
publisherMultipleConsumers.create("publisher-multipleconsumers",
"multipleconsumers.t");

publisherNonDurableSubscriber = new Publisher();
publisherNonDurableSubscriber.create("publisher-nondurablesubscriber",
"nondurablesubscriber.t");

subscriberPublishSubscribe = new Subscriber();
subscriberPublishSubscribe.create("subscriber-publishsubscribe",
"publishsubscribe.t");

subscriber1MultipleConsumers = new Subscriber();
subscriber1MultipleConsumers.create("subscriber1-multipleconsumers",
"multipleconsumers.t");

subscriber2MultipleConsumers = new Subscriber();
subscriber2MultipleConsumers.create("subscriber2-multipleconsumers",
"multipleconsumers.t");

subscriber1NonDurableSubscriber = new Subscriber();
subscriber1NonDurableSubscriber.create(
"subscriber1-nondurablesubscriber", "nondurablesubscriber.t");

subscriber2NonDurableSubscriber = new Subscriber();
subscriber2NonDurableSubscriber.create(
"subscriber2-nondurablesubscriber", "nondurablesubscriber.t");
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
publisherPublishSubscribe.closeConnection();
publisherMultipleConsumers.closeConnection();
publisherNonDurableSubscriber.closeConnection();

subscriberPublishSubscribe.closeConnection();
subscriber1MultipleConsumers.closeConnection();
subscriber2MultipleConsumers.closeConnection();
subscriber1NonDurableSubscriber.closeConnection();
subscriber2NonDurableSubscriber.closeConnection();
}

@Test
public void testGetGreeting() {
try {
publisherPublishSubscribe.sendName("Peregrin", "Took");

String greeting1 = subscriberPublishSubscribe.getGreeting(1000);
assertEquals("Hello Peregrin Took!", greeting1);

String greeting2 = subscriberPublishSubscribe.getGreeting(1000);
assertEquals("no greeting", greeting2);

} catch (JMSException e) {
fail("a JMS Exception occurred");
}
}

@Test
public void testMultipleConsumers() {
try {
publisherMultipleConsumers.sendName("Gandalf", "the Grey");

String greeting1 = subscriber1MultipleConsumers.getGreeting(1000);
assertEquals("Hello Gandalf the Grey!", greeting1);

String greeting2 = subscriber2MultipleConsumers.getGreeting(1000);
assertEquals("Hello Gandalf the Grey!", greeting2);

} catch (JMSException e) {
fail("a JMS Exception occurred");
}
}

@Test
public void testNonDurableSubscriber() {
try {
// nondurable subscriptions, will not receive messages sent while
// the subscribers are not active
subscriber2NonDurableSubscriber.closeConnection();

publisherNonDurableSubscriber.sendName("Bilbo", "Baggins");

// recreate a connection for the nondurable subscription
subscriber2NonDurableSubscriber.create(
"subscriber2-nondurablesubscriber",
"nondurablesubscriber.t");

publisherNonDurableSubscriber.sendName("Frodo", "Baggins");

String greeting1 = subscriber1NonDurableSubscriber
.getGreeting(1000);
assertEquals("Hello Bilbo Baggins!", greeting1);
String greeting2 = subscriber1NonDurableSubscriber
.getGreeting(1000);
assertEquals("Hello Frodo Baggins!", greeting2);

String greeting3 = subscriber2NonDurableSubscriber
.getGreeting(1000);
assertEquals("Hello Frodo Baggins!", greeting3);
String greeting4 = subscriber2NonDurableSubscriber
.getGreeting(1000);
assertEquals("no greeting", greeting4);

} catch (JMSException e) {
fail("a JMS Exception occurred");
}
}
}

Make sure a default ActiveMQ message broker is up and running, open a command prompt and execute following Maven command:
mvn -Dtest=SubscriberTest test

This will trigger Maven to run the above test cases which should result in the following log statements.
07:24:00.299 DEBUG [main][Publisher]
publisher-multipleconsumers: sent message with text='Gandalf the Grey'
07:24:00.303 DEBUG [main][Subscriber]
subscriber1-multipleconsumers: received message with text='Gandalf the Grey'
07:24:00.303 INFO [main][Subscriber]
greeting=Hello Gandalf the Grey!
07:24:00.304 DEBUG [main][Subscriber]
subscriber2-multipleconsumers: received message with text='Gandalf the Grey'
07:24:00.304 INFO [main][Subscriber]
greeting=Hello Gandalf the Grey!
07:24:00.306 DEBUG [main][Publisher]
publisher-publishsubscribe: sent message with text='Peregrin Took'
07:24:00.306 DEBUG [main][Subscriber]
subscriber-publishsubscribe: received message with text='Peregrin Took'
07:24:00.307 INFO [main][Subscriber]
greeting=Hello Peregrin Took!
07:24:01.307 DEBUG [main][Subscriber]
subscriber-publishsubscribe: no message received
07:24:01.307 INFO [main][Subscriber]
greeting=no greeting
07:24:01.320 DEBUG [main][Publisher]
publisher-nondurablesubscriber: sent message with text='Bilbo Baggins'
07:24:01.337 DEBUG [main][Publisher]
publisher-nondurablesubscriber: sent message with text='Frodo Baggins'
07:24:01.338 DEBUG [main][Subscriber]
subscriber1-nondurablesubscriber: received message with text='Bilbo Baggins'
07:24:01.338 INFO [main][Subscriber]
greeting=Hello Bilbo Baggins!
07:24:01.338 DEBUG [main][Subscriber]
subscriber1-nondurablesubscriber: received message with text='Frodo Baggins'
07:24:01.338 INFO [main][Subscriber]
greeting=Hello Frodo Baggins!
07:24:01.339 DEBUG [main][Subscriber]
subscriber2-nondurablesubscriber: received message with text='Frodo Baggins'
07:24:01.339 INFO [main][Subscriber]
greeting=Hello Frodo Baggins!
07:24:02.339 DEBUG [main][Subscriber]
subscriber2-nondurablesubscriber: no message received
07:24:02.339 INFO [main][Subscriber]
greeting=no greeting

Durable Subscription

As mentioned in the beginning of this post it is also possible to create a durable subscription which allows to receive messages sent while the subscribers are not active. The JMS specification dictates that the identification of a specific durable subscription is done by a combination of the client ID, the durable subscription name and the topic name.

As a result the below DurableSubscriber has three main differences with the previous Subscriber class:
  • A clientId is mandatory on the connection in order to allow a JMS provider to uniquely identify a durable subscriber.
  • A durable subscriber is created using Session.CreateDurableSubscriber.
  • A subscriptionName is needed when creating the durable subscriber.
Note that creating a MessageConsumer provides the same features as creating a TopicSubscriber. The TopicSubscriber is provided to support existing code.
    package info.source4code.jms.activemq.pubsub;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    public class DurableSubscriber {

    private static final Logger LOGGER = LoggerFactory
    .getLogger(DurableSubscriber.class);

    private static final String NO_GREETING = "no greeting";

    private String clientId;
    private Connection connection;
    private Session session;
    private MessageConsumer messageConsumer;

    private String subscriptionName;

    public void create(String clientId, String topicName,
    String subscriptionName) throws JMSException {
    this.clientId = clientId;
    this.subscriptionName = subscriptionName;

    // create a Connection Factory
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    ActiveMQConnection.DEFAULT_BROKER_URL);

    // create a Connection
    connection = connectionFactory.createConnection();
    connection.setClientID(clientId);

    // create a Session
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    // create the Topic from which messages will be received
    Topic topic = session.createTopic(topicName);

    // create a MessageConsumer for receiving messages
    messageConsumer = session.createDurableSubscriber(topic,
    subscriptionName);

    // start the connection in order to receive messages
    connection.start();
    }

    public void removeDurableSubscriber() throws JMSException {
    messageConsumer.close();
    session.unsubscribe(subscriptionName);
    }

    public void closeConnection() throws JMSException {
    connection.close();
    }

    public String getGreeting(int timeout) throws JMSException {

    String greeting = NO_GREETING;

    // read a message from the topic destination
    Message message = messageConsumer.receive(timeout);

    // check if a message was received
    if (message != null) {
    // cast the message to the correct type
    TextMessage textMessage = (TextMessage) message;

    // retrieve the message content
    String text = textMessage.getText();
    LOGGER.debug(clientId + ": received message with text='{}'", text);

    // create greeting
    greeting = "Hello " + text + "!";
    } else {
    LOGGER.debug(clientId + ": no message received");
    }

    LOGGER.info("greeting={}", greeting);
    return greeting;
    }
    }

    The below JUnit test class will be used to illustrate the durable subscriber messaging characteristics. It contains a testDurableSubscriber() test case that will first remove one of the two durable subscribers that are listening on the 'durablesubscriber.t' topic by closing it's connection to the broker. Then a first message is sent to this topic on which only one subscribers is still actively listening. The second subscriber is recreated using the same client ID and subscription name and a second message is sent. The expected result is that both subscribers receive the two messages.
    Note that in the tearDownAfterClass() method the durable subscriptions are removed in order to avoid an error when rerunning the test case.
    package info.source4code.jms.activemq.pubsub;

    import static org.junit.Assert.assertEquals;
    import static org.junit.Assert.fail;

    import javax.jms.JMSException;

    import org.junit.AfterClass;
    import org.junit.BeforeClass;
    import org.junit.Test;

    public class DurableSubscriberTest {

    private static Publisher publisherPublishSubscribe,
    publisherDurableSubscriber;
    private static DurableSubscriber subscriberPublishSubscribe,

    subscriber1DurableSubscriber, subscriber2DurableSubscriber;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
    publisherPublishSubscribe = new Publisher();
    publisherPublishSubscribe.create("publisher-publishsubscribe",
    "publishsubscribe.t");

    publisherDurableSubscriber = new Publisher();
    publisherDurableSubscriber.create("publisher-durablesubscriber",
    "durablesubscriber.t");

    subscriberPublishSubscribe = new DurableSubscriber();
    subscriberPublishSubscribe.create("subscriber-publishsubscribe",
    "publishsubscribe.t", "publishsubscribe");

    subscriber1DurableSubscriber = new DurableSubscriber();
    subscriber1DurableSubscriber.create("subscriber1-durablesubscriber",
    "durablesubscriber.t", "durablesubscriber1");

    subscriber2DurableSubscriber = new DurableSubscriber();
    subscriber2DurableSubscriber.create("subscriber2-durablesubscriber",
    "durablesubscriber.t", "durablesubscriber2");
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
    publisherPublishSubscribe.closeConnection();
    publisherDurableSubscriber.closeConnection();

    // remove the durable subscriptions
    subscriberPublishSubscribe.removeDurableSubscriber();
    subscriber1DurableSubscriber.removeDurableSubscriber();
    subscriber2DurableSubscriber.removeDurableSubscriber();

    subscriberPublishSubscribe.closeConnection();
    subscriber1DurableSubscriber.closeConnection();
    subscriber2DurableSubscriber.closeConnection();
    }

    @Test
    public void testGetGreeting() {
    try {
    publisherPublishSubscribe.sendName("Peregrin", "Took");

    String greeting1 = subscriberPublishSubscribe.getGreeting(1000);
    assertEquals("Hello Peregrin Took!", greeting1);

    String greeting2 = subscriberPublishSubscribe.getGreeting(1000);
    assertEquals("no greeting", greeting2);

    } catch (JMSException e) {
    fail("a JMS Exception occurred");
    }
    }

    @Test
    public void testDurableSubscriber() {
    try {
    // durable subscriptions, receive messages sent while the
    // subscribers are not active
    subscriber2DurableSubscriber.closeConnection();

    publisherDurableSubscriber.sendName("Bilbo", "Baggins");

    // recreate a connection for the durable subscription
    subscriber2DurableSubscriber.create(
    "subscriber2-durablesubscriber", "durablesubscriber.t",
    "durablesubscriber2");

    publisherDurableSubscriber.sendName("Frodo", "Baggins");

    String greeting1 = subscriber1DurableSubscriber.getGreeting(1000);
    assertEquals("Hello Bilbo Baggins!", greeting1);
    String greeting2 = subscriber2DurableSubscriber.getGreeting(1000);
    assertEquals("Hello Bilbo Baggins!", greeting2);

    String greeting3 = subscriber1DurableSubscriber.getGreeting(1000);
    assertEquals("Hello Frodo Baggins!", greeting3);
    String greeting4 = subscriber2DurableSubscriber.getGreeting(1000);
    assertEquals("Hello Frodo Baggins!", greeting4);

    } catch (JMSException e) {
    fail("a JMS Exception occurred");
    }
    }
    }

    Make sure a default ActiveMQ message broker is up and running, open a command prompt and execute following Maven command:
    mvn -Dtest=DurableSubscriberTest test

    This will trigger Maven to run the above test cases which should result in the following log statements.
    18:58:54.591 DEBUG [main][Publisher]
    publisher-durablesubscriber: sent message with text='Bilbo Baggins'
    18:58:54.632 DEBUG [main][Publisher]
    publisher-durablesubscriber: sent message with text='Frodo Baggins'
    18:58:54.633 DEBUG [main][DurableSubscriber]
    subscriber1-durablesubscriber: received message with text='Bilbo Baggins'
    18:58:54.634 INFO [main][DurableSubscriber]
    greeting=Hello Bilbo Baggins!
    18:58:54.635 DEBUG [main][DurableSubscriber]
    subscriber2-durablesubscriber: received message with text='Bilbo Baggins'
    18:58:54.635 INFO [main][DurableSubscriber]
    greeting=Hello Bilbo Baggins!
    18:58:54.636 DEBUG [main][DurableSubscriber]
    subscriber1-durablesubscriber: received message with text='Frodo Baggins'
    18:58:54.636 INFO [main][DurableSubscriber]
    greeting=Hello Frodo Baggins!
    18:58:54.636 DEBUG [main][DurableSubscriber]
    subscriber2-durablesubscriber: received message with text='Frodo Baggins'
    18:58:54.637 INFO [main][DurableSubscriber]
    greeting=Hello Frodo Baggins!
    18:58:54.669 DEBUG [main][Publisher]
    publisher-publishsubscribe: sent message with text='Peregrin Took'
    18:58:54.670 DEBUG [main][DurableSubscriber]
    subscriber-publishsubscribe: received message with text='Peregrin Took'
    18:58:54.670 INFO [main][DurableSubscriber]
    greeting=Hello Peregrin Took!
    18:58:55.670 DEBUG [main][DurableSubscriber]
    subscriber-publishsubscribe: no message received
    18:58:55.670 INFO [main][DurableSubscriber]
    greeting=no greeting


    github icon
    If you would like to run the above code sample you can download the full source code and their corresponding JUnit test cases here.

    This concludes the JMS publish/subscribe example using ActiveMQ. If you found this post helpful or have any questions or remarks, please leave a comment.

    Tuesday, November 18, 2014

    JMS - Install RabbitMQ on Windows

    RabbitMQ is an open source message broker software that implements the Advanced Message Queuing Protocol (AMQP). The RabbitMQ server is written in the Erlang programming language and client libraries to interface with the broker are available for all major programming languages. Following tutorial shows how to install RabbitMQ and perform a start/stop of the installed instance on Windows.

    Install Erlang

    Erlang is a general-purpose concurrent, garbage-collected programming language and runtime system. It was designed by Ericsson to support distributed, fault-tolerant applications. It was originally a proprietary language within Ericsson, but was released as open source in 1998. OTP (Open Telecom Platform) is the open source distribution of Erlang.

    First thing to do is to download the OTP binaries. Go the the Erlang download page and click on the Windows binary link for your system (32-bit or 64-bit). At the time of writing the latest stable release was 'otp_win64_17.3.exe'. Note that there are also pre-built packages for platforms such as: Raspbian, Ubuntu, Fedora, OS X, and more.

    Double click to run the downloaded '.exe' file and click Next keeping the default settings on the first installer step.

    erlang installer welcome

    Optionally change the default destination folder and click Next and then Install. In the example below the install location was change to 'D:\source4code\tools\erl6.2'. From now on we will refer to this directory as: [erlang_install_dir].

    erlang installer installation path

    If Microsoft Visual C++ is not already setup on your system, a second installer window will pop-up. Click the 'I have read and accept the license terms' check-box and click Install.

    microsoft visual c++ installer

    Click Finish when the Microsoft Visual C++ setup is complete and then click Close to finish the OTP installation.

    microsoft visual c++ installer completed

    In order for Erlang applications to be able to run we need to setup an 'ERLANG_HOME' environment variable that will point to the Erlang installation directory. When using Windows the above parameters can be configured on the Environment Variables panel. Click on the Windows Start button and enter "env" without quotes as shown below.

    edit environment variables for your account

    Environment variables can be set at account level or at system level. For this example click on Edit environment variables for your account and following panel should appear.

    environment variables panel

    Click on the New button and enter "ERLANG_HOME" as variable name and the [erlang_install_dir] as variable value. In this tutorial the installation directory is "D:\source4code\tools\erl6.2". Click OK to to save.

    erlang_home user variable

    Install RabbitMQ

    RabbitMQ can be downloaded from the RabbitMQ download page. There are a number of different download packages available, for this tutorial we will be installing the manual install package on Windows.At the time of writing the latest stable release was 'rabbitmq-server-windows-3.4.1.zip'.

    Extract the binaries archive downloaded in the previous step. The extracted root directory should contain a number of files and subdirectories as shown below. From now on we will refer to this directory as: [rabbitmq_install_dir].

    In order to start RabbitMQ, open a command prompt by clicking on the Windows Start button and typing "cmd" followed by pressing ENTER. A new command prompt window should open. Navigate to the [rabbitmq_install_dir]/sbin and enter following command:
    rabbitmq-server

    rabbitmq start command

    In order to stop RabbitMQ, open another command prompt at the [rabbitmq_install_dir]/sbin and enter following command:
    rabbitmqctl stop

    rabbitmq stop command

    Setup RabbitMQ

    The 'rabbitmq-management' plugin provides a browser-based UI for management and monitoring of the RabbitMQ server . In order to enable the UI, make sure RabbitMQ is running and open a new command prompt at [rabbitmq_install_dir]/sbin in which you enter following:
    rabbitmq-plugins enable rabbitmq_management

    rabbitmq enable web console

    Open the RabbitMQ web console in a browser using: http://localhost:15672 and following page should be displayed:

    rabbitmq web console login

    Enter following default credentials: Username="guest" and Password="guest" and click on Login. The overview page will be displayed that shows some basic information on the RabbitMQ server:

    rabbitmq web console


    This concludes setting up and configuring RabbitMQ. If you found this post helpful or have any questions or remarks, please leave a comment.

    Sunday, October 26, 2014

    JMS - Point-to-Point messaging example using ActiveMQ and Maven

    jms logo
    A point-to-point (PTP) product or application is built on the concept of message queues, senders and receivers. Each message is addressed to a specific queue and receiving clients extract messages from the queues established to hold their messages. Queues retain all messages sent to them until the messages are consumed. The following post introduces the basic concepts of JMS point-to-point messaging and illustrates them with a code sample using ActiveMQ and Maven.

    Point-to-Point Messaging

    point-to-point messaging
    PTP messaging has the following characteristics:
    • Each message has only one consumer.
    • A sender and a receiver of a message have no timing dependencies. The receiver can fetch the message whether or not it was running when the client sent the message.
    • The receiver acknowledges the successful processing of a message.

    ActiveMQ Example

    Let's illustrate the above characteristics by creating a message producer that sends a message containing a first and last name to a queue. In turn a message consumer will read the message and transform it into a greeting. The code is very similar to the JMS Hello World example but contains a few key differences explained below.

    Tools used:
    • ActiveMQ 5.10
    • Maven 3

    The code is built and run using Maven. Specified below is the Maven POM file which contains the needed dependencies for Logback, JUnit and ActiveMQ.
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>info.source4code</groupId>
    <artifactId>jms-activemq-point-to-point</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <name>JMS - Point-to-Point messaging using ActiveMQ</name>
    <url>http://www.source4code.info/2014/10/jms-point-to-point-messaging-example-activemq-maven.html</url>

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.6</java.version>

    <logback.version>1.1.2</logback.version>
    <slf4j.version>1.7.7</slf4j.version>
    <junit.version>4.12-beta-2</junit.version>
    <activemq.version>5.10.0</activemq.version>

    <maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
    </properties>

    <dependencies>
    <!-- Logging -->
    <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>${logback.version}</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>${slf4j.version}</version>
    </dependency>
    <!-- JUnit -->
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>${junit.version}</version>
    <scope>test</scope>
    </dependency>
    <!-- ActiveMQ -->
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>${activemq.version}</version>
    </dependency>
    </dependencies>

    <build>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>${maven-compiler-plugin.version}</version>
    <configuration>
    <source>${java.version}</source>
    <target>${java.version}</target>
    </configuration>
    </plugin>
    </plugins>
    </build>
    </project>

    The Producer class contains a constructor which creates a message producer and needed connection and session objects. The sendName() operation takes as input a first and last name which are set on a TextMessage which in turn is sent to the queue set on the message producer.
    package info.source4code.jms.activemq.ptp;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    public class Producer {

    private static final Logger LOGGER = LoggerFactory
    .getLogger(Producer.class);

    private String clientId;
    private Connection connection;
    private Session session;
    private MessageProducer messageProducer;

    public void create(String clientId, String queueName) throws JMSException {
    this.clientId = clientId;

    // create a Connection Factory
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    ActiveMQConnection.DEFAULT_BROKER_URL);

    // create a Connection
    connection = connectionFactory.createConnection();
    connection.setClientID(clientId);

    // create a Session
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    // create the Queue to which messages will be sent
    Queue queue = session.createQueue(queueName);

    // create a MessageProducer for sending messages
    messageProducer = session.createProducer(queue);
    }

    public void closeConnection() throws JMSException {
    connection.close();
    }

    public void sendName(String firstName, String lastName) throws JMSException {
    String text = firstName + " " + lastName;

    // create a JMS TextMessage
    TextMessage textMessage = session.createTextMessage(text);

    // send the message to the queue destination
    messageProducer.send(textMessage);

    LOGGER.debug(clientId + ": sent message with text='{}'", text);
    }
    }

    The Consumer class contains a constructor which creates a message consumer and needed connection and session objects. A key difference with the JMS Hello World example is that the Session object is created with the Session.CLIENT_ACKNOWLEDGE parameter which requires a client to explicitly acknowledge a consumed message by calling the message's acknowledge() method.

    The getGreeting() operation reads a message from the queue and creates a greeting which is returned. Aside from the timeout parameter an additional acknowledge parameter is passed which is used to determine whether the received message should be acknowledged or not.
    package info.source4code.jms.activemq.ptp;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    public class Consumer {

    private static final Logger LOGGER = LoggerFactory
    .getLogger(Consumer.class);

    private static String NO_GREETING = "no greeting";

    private String clientId;
    private Connection connection;
    private Session session;
    private MessageConsumer messageConsumer;

    public void create(String clientId, String queueName) throws JMSException {
    this.clientId = clientId;

    // create a Connection Factory
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    ActiveMQConnection.DEFAULT_BROKER_URL);

    // create a Connection
    connection = connectionFactory.createConnection();
    connection.setClientID(clientId);

    // create a Session
    session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

    // create the Queue from which messages will be received
    Queue queue = session.createQueue(queueName);

    // create a MessageConsumer for receiving messages
    messageConsumer = session.createConsumer(queue);

    // start the connection in order to receive messages
    connection.start();
    }

    public void closeConnection() throws JMSException {
    connection.close();
    }

    public String getGreeting(int timeout, boolean acknowledge)
    throws JMSException {

    String greeting = NO_GREETING;

    // read a message from the queue destination
    Message message = messageConsumer.receive(timeout);

    // check if a message was received
    if (message != null) {
    // cast the message to the correct type
    TextMessage textMessage = (TextMessage) message;

    // retrieve the message content
    String text = textMessage.getText();
    LOGGER.debug(clientId + ": received message with text='{}'", text);

    if (acknowledge) {
    // acknowledge the successful processing of the message
    message.acknowledge();
    LOGGER.debug(clientId + ": message acknowledged");
    } else {
    LOGGER.debug(clientId + ": message not acknowledged");
    }

    // create greeting
    greeting = "Hello " + text + "!";
    } else {
    LOGGER.debug(clientId + ": no message received");
    }

    LOGGER.info("greeting={}", greeting);
    return greeting;
    }
    }

    The below JUnit test class will be used to illustrate the PTP messaging characteristics mentioned at the beginning of this post. The testGreeting() test case verifies the correct working of the getGreeting() method of the Consumer class.

    The testOnlyOneConsumer() test case will verify that a message is read by only one consumer. The first consumer will receive the greeting and the second consumer will receive nothing.

    The testNoTimingDependencies() test case illustrates that the consumer can successfully receive a message even if that consumer is created after the message was sent.

    Finally the testAcknowledgeProcessing() test case will verify that a message is not removed by the JMS provider in case it was not acknowledged by the consumer. In order to simulate this we first call the getGreeting() method with the acknowledge parameter set to false. Then the getGreeting() method is called a second time and as the first call did not acknowledge the message it is still available on the queue.
    package info.source4code.jms.activemq.ptp;

    import static org.junit.Assert.assertEquals;
    import static org.junit.Assert.fail;

    import javax.jms.JMSException;
    import javax.naming.NamingException;

    import org.junit.AfterClass;
    import org.junit.BeforeClass;
    import org.junit.Test;

    public class ConsumerTest {

    private static Producer producerPointToPoint, producerOnlyOneConsumer,
    producerNoTimingDependencies, producerAcknowledgeProcessing;
    private static Consumer consumerPointToPoint, consumer1OnlyOneConsumer,
    consumer2OnlyOneConsumer, consumerNoTimingDependencies,
    consumer1AcknowledgeProcessing, consumer2AcknowledgeProcessing;

    @BeforeClass
    public static void setUpBeforeClass() throws JMSException, NamingException {
    producerPointToPoint = new Producer();
    producerPointToPoint.create("producer-pointtopoint", "pointtopoint.q");

    producerOnlyOneConsumer = new Producer();
    producerOnlyOneConsumer.create("producer-onlyoneconsumer",
    "onlyoneconsumer.q");

    producerNoTimingDependencies = new Producer();
    producerNoTimingDependencies.create("producer-notimingdependencies",
    "notimingdependencies.q");

    producerAcknowledgeProcessing = new Producer();
    producerAcknowledgeProcessing.create("producer-acknowledgeprocessing",
    "acknowledgeprocessing.q");

    consumerPointToPoint = new Consumer();
    consumerPointToPoint.create("consumer-pointtopoint", "pointtopoint.q");

    consumer1OnlyOneConsumer = new Consumer();
    consumer1OnlyOneConsumer.create("consumer1-onlyoneconsumer",
    "onlyoneconsumer.q");

    consumer2OnlyOneConsumer = new Consumer();
    consumer2OnlyOneConsumer.create("consumer2-onlyoneconsumer",
    "onlyoneconsumer.q");

    // consumerNoTimingDependencies

    consumer1AcknowledgeProcessing = new Consumer();
    consumer1AcknowledgeProcessing.create(
    "consumer1-acknowledgeprocessing", "acknowledgeprocessing.q");

    consumer2AcknowledgeProcessing = new Consumer();
    consumer2AcknowledgeProcessing.create(
    "consumer2-acknowledgeprocessing", "acknowledgeprocessing.q");
    }

    @AfterClass
    public static void tearDownAfterClass() throws JMSException {
    producerPointToPoint.closeConnection();
    producerOnlyOneConsumer.closeConnection();
    producerNoTimingDependencies.closeConnection();
    producerAcknowledgeProcessing.closeConnection();

    consumerPointToPoint.closeConnection();
    consumer1OnlyOneConsumer.closeConnection();
    consumer2OnlyOneConsumer.closeConnection();
    consumerNoTimingDependencies.closeConnection();
    // consumer1AcknowledgeProcessing
    consumer2AcknowledgeProcessing.closeConnection();
    }

    @Test
    public void testGetGreeting() {
    try {
    producerPointToPoint.sendName("Frodo", "Baggins");

    String greeting = consumerPointToPoint.getGreeting(1000, true);
    assertEquals("Hello Frodo Baggins!", greeting);

    } catch (JMSException e) {
    fail("a JMS Exception occurred");
    }
    }

    @Test
    public void testOnlyOneConsumer() throws InterruptedException {
    try {
    producerOnlyOneConsumer.sendName("Legolas", "Greenleaf");

    String greeting1 = consumer1OnlyOneConsumer.getGreeting(1000, true);
    assertEquals("Hello Legolas Greenleaf!", greeting1);

    Thread.sleep(1000);

    String greeting2 = consumer2OnlyOneConsumer.getGreeting(1000, true);
    // each message has only one consumer
    assertEquals("no greeting", greeting2);

    } catch (JMSException e) {
    fail("a JMS Exception occurred");
    }
    }

    @Test
    public void testNoTimingDependencies() {
    try {
    producerNoTimingDependencies.sendName("Samwise", "Gamgee");
    // a receiver can fetch the message whether or not it was running
    // when the client sent the message
    consumerNoTimingDependencies = new Consumer();
    consumerNoTimingDependencies.create(
    "consumer-notimingdependencies", "notimingdependencies.q");

    String greeting = consumerNoTimingDependencies.getGreeting(1000,
    true);
    assertEquals("Hello Samwise Gamgee!", greeting);

    } catch (JMSException e) {
    fail("a JMS Exception occurred");
    }
    }

    @Test
    public void testAcknowledgeProcessing() throws InterruptedException {
    try {
    producerAcknowledgeProcessing.sendName("Gandalf", "the Grey");

    // consume the message without an acknowledgment
    String greeting1 = consumer1AcknowledgeProcessing.getGreeting(1000,
    false);
    assertEquals("Hello Gandalf the Grey!", greeting1);

    // close the MessageConsumer so the broker knows there is no
    // acknowledgment
    consumer1AcknowledgeProcessing.closeConnection();

    String greeting2 = consumer2AcknowledgeProcessing.getGreeting(1000,
    true);
    assertEquals("Hello Gandalf the Grey!", greeting2);

    } catch (JMSException e) {
    fail("a JMS Exception occurred");
    }
    }
    }

    Make sure a default ActiveMQ message broker is up and running, open a command prompt and execute following Maven command:
    mvn test

    This will trigger Maven to run the above test cases which should result in the following log statements.
    20:53:01.069 DEBUG [main][Producer]
    producer-onlyoneconsumer: sent message with text='Legolas Greenleaf'
    20:53:01.076 DEBUG [main][Consumer]
    consumer1-onlyoneconsumer: received message with text='Legolas Greenleaf'
    20:53:01.077 DEBUG [main][Consumer]
    consumer1-onlyoneconsumer: message acknowledged
    20:53:01.077 INFO [main][Consumer]
    greeting=Hello Legolas Greenleaf!
    20:53:03.078 DEBUG [main][Consumer]
    consumer2-onlyoneconsumer: no message received
    20:53:03.078 INFO [main][Consumer]
    greeting=no greeting
    20:53:03.110 DEBUG [main][Producer]
    producer-notimingdependencies: sent message with text='Samwise Gamgee'
    20:53:03.133 DEBUG [main][Consumer]
    consumer-notimingdependencies: received message with text='Samwise Gamgee'
    20:53:03.133 DEBUG [main][Consumer]
    consumer-notimingdependencies: message acknowledged
    20:53:03.134 INFO [main][Consumer]
    greeting=Hello Samwise Gamgee!
    20:53:03.181 DEBUG [main][Producer]
    producer-acknowledgeprocessing: sent message with text='Gandalf the Grey'
    20:53:03.181 DEBUG [main][Consumer]
    consumer1-acknowledgeprocessing: received message with text='Gandalf the Grey'
    20:53:03.181 DEBUG [main][Consumer]
    consumer1-acknowledgeprocessing: message not acknowledged
    20:53:03.182 INFO [main][Consumer]
    greeting=Hello Gandalf the Grey!
    20:53:03.189 DEBUG [main][Consumer]
    consumer2-acknowledgeprocessing: received message with text='Gandalf the Grey'
    20:53:03.189 DEBUG [main][Consumer]
    consumer2-acknowledgeprocessing: message acknowledged
    20:53:03.189 INFO [main][Consumer]
    greeting=Hello Gandalf the Grey!
    20:53:03.217 DEBUG [main][Producer]
    producer-pointtopoint: sent message with text='Frodo Baggins'
    20:53:03.217 DEBUG [main][Consumer]
    consumer-pointtopoint: received message with text='Frodo Baggins'
    20:53:03.217 DEBUG [main][Consumer]
    consumer-pointtopoint: message acknowledged
    20:53:03.218 INFO [main][Consumer]
    greeting=Hello Frodo Baggins!
    Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.758 sec

    Results :

    Tests run: 4, Failures: 0, Errors: 0, Skipped: 0


    github icon
    If you would like to run the above code sample you can download the full source code and their corresponding JUnit test cases here.

    This concludes the JMS point-to-point example using ActiveMQ. If you found this post helpful or have any questions or remarks, please leave a comment.

    Thursday, October 16, 2014

    JMS - Hello World using ActiveMQ

    jms logo
    The Java Message Service (JMS) API is a Java Message Oriented Middleware (MOM) API for sending messages between two or more clients. It is a Java API that allows applications to create, send, receive, and read messages. The JMS API enables communication that is loosely coupled, asynchronous and reliable. The current version of the JMS specification is version 1.1. The following post introduces the basic JMS concepts and illustrates them with a JMS Hello World example using ActiveMQ and Maven.


    To use JMS, one must have a JMS provider that can manage the sessions, queues and topics. Some examples of known JMS providers are Apache ActiveMQ, WebSphere MQ from IBM or SonicMQ from Aurea Software. Starting from Java EE version 1.4, a JMS provider has to be contained in all Java EE application servers.

    The JMS API Programming Model

    jms api programming model

    The basic building blocks of the JMS API programming model are shown above. At the top we have the ConnectionFactory object which is the object a client uses to create a connection to a JMS provider. A connection factory encapsulates a set of connection configuration parameters like for example the broker URL. A connection factory is a JMS administered object that is typically created by an administrator and later used by JMS clients.

    When you have a ConnectionFactory object, you can use it to create a connection. A Connection object encapsulates a virtual connection with a JMS provider. For example, a connection could represent an open TCP/IP socket between a client and a provider service daemon. Before an application completes, it must close any connections that were created. Failure to close a connection can cause resources not to be released by the JMS provider.
    Closing a connection also closes its sessions and their message producers/message consumers.
    A session is a single-threaded context for producing and consuming messages. A session provides a transactional context with which to group a set of sends and receives into an atomic unit of work. Session objects are created on top of connections.
    As mentioned above, it is important to note that everything from a session down is single-threaded!
    A MessageProducer is an object that is created by a session and used for sending messages to a destination. You use a Session object to create a message producer for a destination.
    It is possible to create an unidentified producer by specifying a null Destination as argument to the createProducer() method. When sending a message, overload the send method with the needed destination as the first parameter.
    A MessageConsumer is an object that is created by a session and used for receiving messages sent to a destination. After you have created a message consumer it becomes active, and you can use it to receive messages. Message delivery does not begin until you start the connection you created by calling its start() method.
    Remember to always to call the start() method on the Connection object in order to receive messages!
    A Destination is the object a client uses to specify the target of messages it produces and the source of messages it consumes. In the point-to-point messaging domain, destinations are called queues. In the publish/subscribe messaging domain, destinations are called topics.

    For more detailed information please check the JMS API programming model chapter of the Java EE 6 tutorial.

    ActiveMQ Example

    Let's illustrate the above by creating a message producer that sends a message containing a first and last name to a Hello World queue. In turn a message consumer will read the message and transform it into a greeting. The example uses Maven and assumes a default ActiveMQ message broker is up and running.

    Tools used:
    • ActiveMQ 5.10
    • Maven 3

    First let's look at the below Maven POM file which contains the needed dependencies for Logback, JUnit and ActiveMQ.
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>info.source4code</groupId>
    <artifactId>jms-activemq-helloworld</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <name>JMS - Hello World using ActiveMQ</name>
    <url>http://www.source4code.info/2014/10/jms-hello-world-using-activemq.html</url>

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.6</java.version>

    <logback.version>1.1.2</logback.version>
    <slf4j.version>1.7.7</slf4j.version>
    <junit.version>4.12-beta-2</junit.version>
    <activemq.version>5.10.0</activemq.version>

    <maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
    </properties>

    <dependencies>
    <!-- Logging -->
    <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>${logback.version}</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>${slf4j.version}</version>
    </dependency>
    <!-- JUnit -->
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>${junit.version}</version>
    <scope>test</scope>
    </dependency>
    <!-- ActiveMQ -->
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>${activemq.version}</version>
    </dependency>
    </dependencies>

    <build>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>${maven-compiler-plugin.version}</version>
    <configuration>
    <source>${java.version}</source>
    <target>${java.version}</target>
    </configuration>
    </plugin>
    </plugins>
    </build>
    </project>

    The Producer class below illustrates how to use the JMS API programming model. The create() method will first create an instance of the ConnectionFactory which is in turn used to create a connection to ActiveMQ using the default broker URL. Using the Connection instance, a Session is created which is used to create a Destination and MessageProducer instance. The destination type in the below example is a queue.

    The class also contains a close() method which allows to correctly release the resources at the JMS provider. The depending Session and MessageProducer objects are automatically closed when calling this method.

    The sendName() method takes as input a first and last name and concatenates them into a single string. Using the session a JMS TextMessage is created on which the string is set. Using the message producer the message is sent to the JMS provider.
    package info.source4code.jms;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    public class Producer {

    private static final Logger LOGGER = LoggerFactory
    .getLogger(Producer.class);

    private Connection connection;
    private Session session;
    private MessageProducer messageProducer;

    public void create(String destinationName) throws JMSException {

    // create a Connection Factory
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    ActiveMQConnection.DEFAULT_BROKER_URL);

    // create a Connection
    connection = connectionFactory.createConnection();

    // create a Session
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    // create the Destination to which messages will be sent
    Destination destination = session.createQueue(destinationName);

    // create a Message Producer for sending messages
    messageProducer = session.createProducer(destination);
    }

    public void close() throws JMSException {
    connection.close();
    }

    public void sendName(String firstName, String lastName) throws JMSException {

    String text = firstName + " " + lastName;

    // create a JMS TextMessage
    TextMessage textMessage = session.createTextMessage(text);

    // send the message to the queue destination
    messageProducer.send(textMessage);

    LOGGER.debug("producer sent message with text='{}'", text);
    }
    }

    For receiving messages, a Consumer class is defined which has the same create() and close() methods as the above Producer. The main difference is that in the case of a message consumer the connection is started.

    The getGreeting() method will receive the next message from the destination that was configured on the message consumer. A timeout parameter is passed to the receive() method in order to avoid waiting for an indefinite amount of time in case no message is present on the destination. As a result a check is needed to see if the receive() method returned a message or null.

    If a message was received it is cast to a TextMessage and the received text is converted into a greeting that is returned. In case the message was null a default "no greeting" is returned.
    package info.source4code.jms;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    public class Consumer {

    private static final Logger LOGGER = LoggerFactory
    .getLogger(Consumer.class);

    private static String NO_GREETING = "no greeting";

    private Connection connection;
    private Session session;
    private MessageConsumer messageConsumer;

    public void create(String destinationName) throws JMSException {

    // create a Connection Factory
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    ActiveMQConnection.DEFAULT_BROKER_URL);

    // create a Connection
    connection = connectionFactory.createConnection();

    // create a Session
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    // create the Destination from which messages will be received
    Destination destination = session.createQueue(destinationName);

    // create a Message Consumer for receiving messages
    messageConsumer = session.createConsumer(destination);

    // start the connection in order to receive messages
    connection.start();
    }

    public void close() throws JMSException {
    connection.close();
    }

    public String getGreeting(int timeout) throws JMSException {

    String greeting = NO_GREETING;

    // read a message from the queue destination
    Message message = messageConsumer.receive(timeout);

    // check if a message was received
    if (message != null) {
    // cast the message to the correct type
    TextMessage textMessage = (TextMessage) message;

    // retrieve the message content
    String text = textMessage.getText();
    LOGGER.debug("consumer received message with text='{}'", text);

    // create greeting
    greeting = "Hello " + text + "!";
    } else {
    LOGGER.debug("consumer received no message");
    }

    LOGGER.info("greeting={}", greeting);
    return greeting;
    }
    }

    In order to test above classes, below JUnit test class is created which contains two test cases. The first is a testGetGreeting() test case in which the producer is used to send a first and last name. Using the consumer the sent message is read and converted into a greeting. The second testNoGreeting() test case verifies the correct working of reading a destination when it contains no messages and as such "no greeting" is returned.
    package info.source4code.jms;

    import static org.junit.Assert.assertEquals;
    import static org.junit.Assert.fail;

    import javax.jms.JMSException;

    import org.junit.AfterClass;
    import org.junit.BeforeClass;
    import org.junit.Test;

    public class ProducerTest {

    private static Producer producer;
    private static Consumer consumer;

    @BeforeClass
    public static void setUpBeforeClass() throws JMSException {
    producer = new Producer();
    producer.create("helloworld.q");

    consumer = new Consumer();
    consumer.create("helloworld.q");
    }

    @AfterClass
    public static void tearDownAfterClass() throws JMSException {
    producer.close();
    consumer.close();
    }

    @Test
    public void testGetGreeting() {
    try {
    producer.sendName("John", "Doe");

    String greeting = consumer.getGreeting(1000);
    assertEquals("Hello John Doe!", greeting);

    } catch (JMSException e) {
    fail("a JMS Exception occurred");
    }
    }

    @Test
    public void testNoGreeting() {
    try {
    String greeting = consumer.getGreeting(1000);
    assertEquals("no greeting", greeting);

    } catch (JMSException e) {
    fail("a JMS Exception occurred");
    }
    }
    }

    Make sure a default ActiveMQ message broker is up and running, open a command prompt and execute following Maven command:
    mvn test

    This will trigger Maven to run the above test case and will result in the following log statements.
    13:38:19.844 DEBUG [main][Consumer]
    consumer received no message
    13:38:19.858 INFO [main][Consumer]
    greeting=no greeting
    13:38:19.911 DEBUG [main][Producer]
    producer sent message with text='John Doe'
    13:38:19.912 DEBUG [main][Consumer]
    consumer received message with text='John Doe'
    13:38:19.912 INFO [main][Consumer]
    greeting=Hello John Doe!
    Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.579 sec


    github icon
    If you would like to run the above code sample you can download the full source code and their corresponding JUnit test cases here.

    This concludes the JMS Hello World example using ActiveMQ. Note that the code also contains a UnidentifiedProducer class and corresponding JUnit test class which illustrates the overloading of the send() method with the needed destination. If you found this post helpful or have any questions or remarks, please leave a comment.

    Saturday, January 18, 2014

    JMS - Priority using ActiveMQ and Maven

    Priority levels are a powerful instrument on JMS messages which allow building robust applications where for example peak traffic will not block important messages (set with a higher priority) from getting through. The following post explains the basics of JMS priority and illustrates them with a code sample using ActiveMQ and Maven.

    Setting Message Priority Levels

    Message priority levels can be used to instruct the JMS provider to deliver urgent messages first. The message’s priority is contained in the JMSPriority header. You can set the priority level in either of two ways:
    1. You can use the setPriority() method of the MessageProducer interface to set the priority level for all messages sent by that producer. For example, the following call sets a priority level of '7' for a producer:
    producer.setPriority(7);
    1. You can use the long form of the send() or the publish() method to set the priority level for a specific message. The third argument sets the priority level. For example, the following send call sets the priority level for message to '3':
    producer.send(message, DeliveryMode.NON_PERSISTENT, 3, 0);
    Setting the priority directly on the JMS Message using the setJMSPriority() method of the Message interface does not work as in that case the priority of the producer is taken!
    The ten levels of priority range from 0 (lowest) to 9 (highest). If you do not specify a priority level, the default level is 4.
    A JMS provider tries to deliver higher-priority messages before lower-priority ones but depending on the setup and configuration it will not always deliver messages in exact order of priority. This last point is an import one to note when designing your application.
    For example on ActiveMQ, once you hit a situation where consumers are slow, or producers are just significantly faster, you'll observe that the cache will fill up (possibly with lower priority messages) while higher priority messages get stuck on disk and are not available until they're paged in. In this case, you can make a decision to tradeoff optimized message dispatching for priority enforcement. You can disable the cache, message expiration check, and lower you consumer prefetch to 1 to ensure getting the high priority messages from the store ahead of lower priority messages. However this sort of tradeoff can have significant performance implications, so always test your scenarios thoroughly.

    ActiveMQ Priority Example

    Let's illustrate the above by creating a simple producer with two different send() methods. The first method will send a message with the default priority level and the second method will accept an additional parameter specifying the priority to be set on the message. The example uses Maven and assumes a default ActiveMQ message broker is up and running.

    Tools used:
    • ActiveMQ 5.10
    • Maven 3

    First let's look at the below Maven POM file which contains the needed dependencies for Logback, JUnit and ActiveMQ.
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>info.source4code</groupId>
    <artifactId>jms-activemq-priority</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <name>JMS - Priority using ActiveMQ</name>
    <url>http://www.source4code.info/2014/01/jms-priority-using-activemq.html</url>

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.6</java.version>

    <slf4j.version>1.7.7</slf4j.version>
    <ch.qos.logback.version>1.1.2</ch.qos.logback.version>
    <junit.version>4.12-beta-1</junit.version>
    <activemq.version>5.10.0</activemq.version>

    <maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
    </properties>

    <dependencies>
    <!-- Logging -->
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>${slf4j.version}</version>
    </dependency>
    <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>${ch.qos.logback.version}</version>
    </dependency>
    <!-- JUnit -->
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>${junit.version}</version>
    </dependency>
    <!-- ActiveMQ -->
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>${activemq.version}</version>
    </dependency>
    </dependencies>

    <build>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>${maven-compiler-plugin.version}</version>
    <configuration>
    <source>${java.version}</source>
    <target>${java.version}</target>
    </configuration>
    </plugin>
    </plugins>
    </build>
    </project>

    Next is the Producer class which contains the two send() methods: one which applies default priority and one which applies a custom priority. The class also contains two methods for opening/closing a connection to the message broker as well as a method for creating the message producer.
    package info.source4code.jms.priority;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    public class Producer {

    private static final Logger LOGGER = LoggerFactory
    .getLogger(Producer.class);

    private Connection connection;
    private Session session;
    private MessageProducer producer;

    public void openConnection() throws JMSException {
    // Create a new connection factory
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    ActiveMQConnection.DEFAULT_BROKER_URL);
    connection = connectionFactory.createConnection();
    }

    public void closeConnection() throws JMSException {
    connection.close();
    }

    public void createProducer(String queue) throws JMSException {
    // Create a session for sending messages
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    // Create the destination to which messages will be sent
    Destination destination = session.createQueue(queue);

    // Create a MessageProducer for sending the messages
    producer = session.createProducer(destination);
    }

    public void send(String text) throws JMSException {
    TextMessage message = session.createTextMessage(text);
    producer.send(message);

    LOGGER.info("{} sent with default priority(=4)", text);
    }

    public void send(String text, int priority) throws JMSException {
    TextMessage message = session.createTextMessage(text);
    // Note: setting the priority directly on the JMS Message does not work
    // as in that case the priority of the producer is taken
    producer.send(message, DeliveryMode.PERSISTENT, priority, 0);

    LOGGER.info("{} sent with priority={}", text, priority);
    }
    }

    For receiving the messages a Consumer class with again two methods for opening/closing a connection to the message broker is shown below. In addition a method to create a message consumer for a specific queue and a method to receive a single message are available on the class.
    package info.source4code.jms.priority;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    public class Consumer {

    private static final Logger LOGGER = LoggerFactory
    .getLogger(Consumer.class);

    private Connection connection;
    private Session session;
    private MessageConsumer consumer;

    public void openConnection() throws JMSException {
    // Create a new connection factory
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    ActiveMQConnection.DEFAULT_BROKER_URL);
    connection = connectionFactory.createConnection();
    }

    public void closeConnection() throws JMSException {
    connection.close();
    }

    public void createConsumer(String queue) throws JMSException {
    // Create a session for receiving messages
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    // Create the destination from which messages will be received
    Destination destination = session.createQueue(queue);

    // Create a MessageConsumer for receiving messages
    consumer = session.createConsumer(destination);

    // Start the connection in order to receive messages
    connection.start();
    }

    public String receive(int timeout) throws JMSException {
    // Read a message from the destination
    Message message = consumer.receive(timeout);

    // Cast the message to the correct type
    TextMessage input = (TextMessage) message;

    // Retrieve the message content
    String text = input.getText();
    LOGGER.info("{} received", text);

    return text;
    }
    }

    Last, a JUnit test class in which a first testSend() test case will send three messages with default priority to a 'priority.q' queue and then verifies if the messages are read in first in, first out (FIFO) order. And then a second testSendWithPriority() test case which will send three messages with custom priorities where the last message gets the highest priority and then verifies if the message are read in last in, first out (LIFO) order.
    package info.source4code.jms.priority;

    import static org.junit.Assert.assertEquals;

    import javax.jms.JMSException;

    import org.junit.AfterClass;
    import org.junit.BeforeClass;
    import org.junit.Test;

    public class ProducerTest {

    private static Consumer consumer;
    private static Producer producer;

    private static String QUEUE = "priority.q";

    @BeforeClass
    public static void setUpBeforeClass() throws JMSException {
    producer = new Producer();
    producer.openConnection();
    producer.createProducer(QUEUE);

    consumer = new Consumer();
    consumer.openConnection();
    consumer.createConsumer(QUEUE);
    }

    @AfterClass
    public static void tearDownAfterClass() throws JMSException {
    producer.closeConnection();
    consumer.closeConnection();
    }

    @Test
    public void testSend() throws JMSException, InterruptedException {
    producer.send("message1");
    producer.send("message2");
    producer.send("message3");

    Thread.sleep(1000);

    // Messages should be received FIFO as priority is the same for all
    assertEquals("message1", consumer.receive(5000));
    assertEquals("message2", consumer.receive(5000));
    assertEquals("message3", consumer.receive(5000));
    }

    @Test
    public void testSendWithPriority() throws JMSException,
    InterruptedException {
    producer.send("message1", 1);
    producer.send("message2", 2);
    producer.send("message3", 3);

    Thread.sleep(1000);

    // Messages should be received LIFO as priority=1 is lower than
    // priority=3
    assertEquals("message3", consumer.receive(5000));
    assertEquals("message2", consumer.receive(5000));
    assertEquals("message1", consumer.receive(5000));
    }
    }

    Make sure a default ActiveMQ message broker is up and running, open a command prompt and execute following Maven command:
    mvn test

    This will trigger Maven to run the above test case and will result in the following log statements. Even though 'message1' was sent first in both test cases, in the first test it is received first whereas in the second test it is received last because of the different assigned priority.

    23:24:30.126 [main] INFO i.source4code.jms.priority.Producer -
    message1 sent with default priority(=4)
    23:24:30.152 [main] INFO i.source4code.jms.priority.Producer -
    message2 sent with default priority(=4)
    23:24:30.175 [main] INFO i.source4code.jms.priority.Producer -
    message3 sent with default priority(=4)
    23:24:31.177 [main] INFO i.source4code.jms.priority.Consumer -
    message1 received
    23:24:31.177 [main] INFO i.source4code.jms.priority.Consumer -
    message2 received
    23:24:31.178 [main] INFO i.source4code.jms.priority.Consumer -
    message3 received
    23:24:31.203 [main] INFO i.source4code.jms.priority.Producer -
    message1 sent with priority=1
    23:24:31.226 [main] INFO i.source4code.jms.priority.Producer -
    message2 sent with priority=2
    23:24:31.253 [main] INFO i.source4code.jms.priority.Producer -
    message3 sent with priority=3
    23:24:32.253 [main] INFO i.source4code.jms.priority.Consumer -
    message3 received
    23:24:32.254 [main] INFO i.source4code.jms.priority.Consumer -
    message2 received
    23:24:32.254 [main] INFO i.source4code.jms.priority.Consumer -
    message1 received

    By browsing the 'priority.q' using the ActiveMQ console we can verify the JMSPriority header set on the messages sent by the above test cases. The testSend() test case will create three messages a shown below, each with priority set to 4.

    activemq message browser no priority

    The second testSendWithPriority() test case results in three messages, each with a different priority ranging from 1 till 3.

    activemq message browser with priority


    github icon
    If you would like to run the above code sample you can download the full source code and their corresponding JUnit test cases here.

    This concludes the priority using ActiveMQ example. If you found this post helpful or have any questions or remarks, please leave a comment.