Saturday, January 18, 2014

JMS - Priority using ActiveMQ and Maven

Priority levels are a powerful instrument on JMS messages which allow building robust applications where for example peak traffic will not block important messages (set with a higher priority) from getting through. The following post explains the basics of JMS priority and illustrates them with a code sample using ActiveMQ and Maven.

Setting Message Priority Levels

Message priority levels can be used to instruct the JMS provider to deliver urgent messages first. The message’s priority is contained in the JMSPriority header. You can set the priority level in either of two ways:
  1. You can use the setPriority() method of the MessageProducer interface to set the priority level for all messages sent by that producer. For example, the following call sets a priority level of '7' for a producer:
producer.setPriority(7);
  1. You can use the long form of the send() or the publish() method to set the priority level for a specific message. The third argument sets the priority level. For example, the following send call sets the priority level for message to '3':
producer.send(message, DeliveryMode.NON_PERSISTENT, 3, 0);
Setting the priority directly on the JMS Message using the setJMSPriority() method of the Message interface does not work as in that case the priority of the producer is taken!
The ten levels of priority range from 0 (lowest) to 9 (highest). If you do not specify a priority level, the default level is 4.
A JMS provider tries to deliver higher-priority messages before lower-priority ones but depending on the setup and configuration it will not always deliver messages in exact order of priority. This last point is an import one to note when designing your application.
For example on ActiveMQ, once you hit a situation where consumers are slow, or producers are just significantly faster, you'll observe that the cache will fill up (possibly with lower priority messages) while higher priority messages get stuck on disk and are not available until they're paged in. In this case, you can make a decision to tradeoff optimized message dispatching for priority enforcement. You can disable the cache, message expiration check, and lower you consumer prefetch to 1 to ensure getting the high priority messages from the store ahead of lower priority messages. However this sort of tradeoff can have significant performance implications, so always test your scenarios thoroughly.

ActiveMQ Priority Example

Let's illustrate the above by creating a simple producer with two different send() methods. The first method will send a message with the default priority level and the second method will accept an additional parameter specifying the priority to be set on the message. The example uses Maven and assumes a default ActiveMQ message broker is up and running.

Tools used:
  • ActiveMQ 5.10
  • Maven 3

First let's look at the below Maven POM file which contains the needed dependencies for Logback, JUnit and ActiveMQ.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>info.source4code</groupId>
<artifactId>jms-activemq-priority</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<name>JMS - Priority using ActiveMQ</name>
<url>http://www.source4code.info/2014/01/jms-priority-using-activemq.html</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.6</java.version>

<slf4j.version>1.7.7</slf4j.version>
<ch.qos.logback.version>1.1.2</ch.qos.logback.version>
<junit.version>4.12-beta-1</junit.version>
<activemq.version>5.10.0</activemq.version>

<maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
</properties>

<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${ch.qos.logback.version}</version>
</dependency>
<!-- JUnit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<!-- ActiveMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

Next is the Producer class which contains the two send() methods: one which applies default priority and one which applies a custom priority. The class also contains two methods for opening/closing a connection to the message broker as well as a method for creating the message producer.
package info.source4code.jms.priority;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Producer {

private static final Logger LOGGER = LoggerFactory
.getLogger(Producer.class);

private Connection connection;
private Session session;
private MessageProducer producer;

public void openConnection() throws JMSException {
// Create a new connection factory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
connection = connectionFactory.createConnection();
}

public void closeConnection() throws JMSException {
connection.close();
}

public void createProducer(String queue) throws JMSException {
// Create a session for sending messages
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create the destination to which messages will be sent
Destination destination = session.createQueue(queue);

// Create a MessageProducer for sending the messages
producer = session.createProducer(destination);
}

public void send(String text) throws JMSException {
TextMessage message = session.createTextMessage(text);
producer.send(message);

LOGGER.info("{} sent with default priority(=4)", text);
}

public void send(String text, int priority) throws JMSException {
TextMessage message = session.createTextMessage(text);
// Note: setting the priority directly on the JMS Message does not work
// as in that case the priority of the producer is taken
producer.send(message, DeliveryMode.PERSISTENT, priority, 0);

LOGGER.info("{} sent with priority={}", text, priority);
}
}

For receiving the messages a Consumer class with again two methods for opening/closing a connection to the message broker is shown below. In addition a method to create a message consumer for a specific queue and a method to receive a single message are available on the class.
package info.source4code.jms.priority;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer {

private static final Logger LOGGER = LoggerFactory
.getLogger(Consumer.class);

private Connection connection;
private Session session;
private MessageConsumer consumer;

public void openConnection() throws JMSException {
// Create a new connection factory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
connection = connectionFactory.createConnection();
}

public void closeConnection() throws JMSException {
connection.close();
}

public void createConsumer(String queue) throws JMSException {
// Create a session for receiving messages
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create the destination from which messages will be received
Destination destination = session.createQueue(queue);

// Create a MessageConsumer for receiving messages
consumer = session.createConsumer(destination);

// Start the connection in order to receive messages
connection.start();
}

public String receive(int timeout) throws JMSException {
// Read a message from the destination
Message message = consumer.receive(timeout);

// Cast the message to the correct type
TextMessage input = (TextMessage) message;

// Retrieve the message content
String text = input.getText();
LOGGER.info("{} received", text);

return text;
}
}

Last, a JUnit test class in which a first testSend() test case will send three messages with default priority to a 'priority.q' queue and then verifies if the messages are read in first in, first out (FIFO) order. And then a second testSendWithPriority() test case which will send three messages with custom priorities where the last message gets the highest priority and then verifies if the message are read in last in, first out (LIFO) order.
package info.source4code.jms.priority;

import static org.junit.Assert.assertEquals;

import javax.jms.JMSException;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class ProducerTest {

private static Consumer consumer;
private static Producer producer;

private static String QUEUE = "priority.q";

@BeforeClass
public static void setUpBeforeClass() throws JMSException {
producer = new Producer();
producer.openConnection();
producer.createProducer(QUEUE);

consumer = new Consumer();
consumer.openConnection();
consumer.createConsumer(QUEUE);
}

@AfterClass
public static void tearDownAfterClass() throws JMSException {
producer.closeConnection();
consumer.closeConnection();
}

@Test
public void testSend() throws JMSException, InterruptedException {
producer.send("message1");
producer.send("message2");
producer.send("message3");

Thread.sleep(1000);

// Messages should be received FIFO as priority is the same for all
assertEquals("message1", consumer.receive(5000));
assertEquals("message2", consumer.receive(5000));
assertEquals("message3", consumer.receive(5000));
}

@Test
public void testSendWithPriority() throws JMSException,
InterruptedException {
producer.send("message1", 1);
producer.send("message2", 2);
producer.send("message3", 3);

Thread.sleep(1000);

// Messages should be received LIFO as priority=1 is lower than
// priority=3
assertEquals("message3", consumer.receive(5000));
assertEquals("message2", consumer.receive(5000));
assertEquals("message1", consumer.receive(5000));
}
}

Make sure a default ActiveMQ message broker is up and running, open a command prompt and execute following Maven command:
mvn test

This will trigger Maven to run the above test case and will result in the following log statements. Even though 'message1' was sent first in both test cases, in the first test it is received first whereas in the second test it is received last because of the different assigned priority.

23:24:30.126 [main] INFO i.source4code.jms.priority.Producer -
message1 sent with default priority(=4)
23:24:30.152 [main] INFO i.source4code.jms.priority.Producer -
message2 sent with default priority(=4)
23:24:30.175 [main] INFO i.source4code.jms.priority.Producer -
message3 sent with default priority(=4)
23:24:31.177 [main] INFO i.source4code.jms.priority.Consumer -
message1 received
23:24:31.177 [main] INFO i.source4code.jms.priority.Consumer -
message2 received
23:24:31.178 [main] INFO i.source4code.jms.priority.Consumer -
message3 received
23:24:31.203 [main] INFO i.source4code.jms.priority.Producer -
message1 sent with priority=1
23:24:31.226 [main] INFO i.source4code.jms.priority.Producer -
message2 sent with priority=2
23:24:31.253 [main] INFO i.source4code.jms.priority.Producer -
message3 sent with priority=3
23:24:32.253 [main] INFO i.source4code.jms.priority.Consumer -
message3 received
23:24:32.254 [main] INFO i.source4code.jms.priority.Consumer -
message2 received
23:24:32.254 [main] INFO i.source4code.jms.priority.Consumer -
message1 received

By browsing the 'priority.q' using the ActiveMQ console we can verify the JMSPriority header set on the messages sent by the above test cases. The testSend() test case will create three messages a shown below, each with priority set to 4.

activemq message browser no priority

The second testSendWithPriority() test case results in three messages, each with a different priority ranging from 1 till 3.

activemq message browser with priority


github icon
If you would like to run the above code sample you can download the full source code and their corresponding JUnit test cases here.

This concludes the priority using ActiveMQ example. If you found this post helpful or have any questions or remarks, please leave a comment.

Monday, January 13, 2014

JMS - Message Structure Overview

The main goal of a JMS application is to produce and consume messages that can then be used by other software applications. This post details the basic structure of a JMS message which consists out of three parts: headers, properties and body. For a complete overview please check the Java Message Service Concepts chapter of The Java EE 6 Tutorial.


The JMS API defines the standard form of a JMS message, which should be portable across all JMS providers. The picture below illustrates the high level structure of a JMS message.
jms message structure

JMS Message Headers

The JMS message header part, contains a number of predefined fields that must be present in every JMS message. Most of the values in the header are set by the JMS provider (which overrides any client-set values) when the message is put on a JMS destination. The values are used by both clients and providers to identify and route messages. The table below lists the JMS message header fields, indicates how their values are set and describes the content of each header field.

Header FieldSet ByDescription
JMSDestinationsend or publish methodReturns a Destination object (a Topic or a Queue, or their temporary version) describing where the message was directed.
JMSDeliveryModesend or publish methodCan be DeliveryMode.NON_PERSISTENT or DeliveryMode.PERSISTENT; only persistent messages guarantee delivery in case of a crash of the brokers that transport it.
JMSExpirationsend or publish methodReturns a timestamp indicating the expiration time of the message; it can be 0 on a message without a defined expiration.
JMSPrioritysend or publish methodReturns a 0-9 integer value (higher is better) defining the priority for delivery. It is only a best-effort value.
JMSMessageIDsend or publish methodContains a generated ID for identifying a message, unique at least for the current broker. All generated IDs start with the prefix 'ID:', but you can override it with the corresponding setter.
JMSTimestampsend or publish methodReturns a long indicating the time of sending.
JMSCorrelationIDClientCan link a message with another, usually one that has been sent previously (typically used for a request/response scenario). For example, a reply can carry the ID of the original request message.
JMSReplyToClientIs a Destination object where replies should be sent, it can be null.
JMSTypeClientDefines a field for provider-specific or application-specific message types.
JMSRedeliveredJMS providerReturns a boolean indicating if the message is being delivered again after a delivery which was not acknowledge.

JMS Message Properties

You can create and set properties for JMS messages if you need values in addition to those provided by the header fields. Properties are optional and stored as standard Java name/value pairs. Property fields are most often used for message selection and filtering.

There are three kinds of message properties:
  1. Application-related properties: A Java application can assign application-related properties, which are set before the message is delivered.
  2. Provider-related properties: Every JMS provider can define proprietary properties that can be set either by the client or automatically by the provider. Provider-related properties are prefixed with 'JMS_' followed by the vendor name and the specific property name; for example: JMS_IBM_MsgType or JMS_SonicMQ_XQ.isMultipart
  3. Standard properties: These standardized properties are set by the JMS provider (if supported) when a message is sent. Standard property names start with 'JMSX'; for example: JMSXUserid or JMSXDeliveryCount.

JMS Message Body

The message body contains the main information that is being exchanged by the JMS message. The JMS API defines five message body formats, also called message types, which allow you to send and receive data in a number of forms. JMS specifies only the interface and does not specify the implementation. This approach allows for vendor-specific implementation and transportation of messages while using a common interface.

Message TypeBody Contains
TextMessageA java.lang.String object (for example, the contents of an XML file).
MapMessageA set of name-value pairs, with names as String objects and values as primitive types in the Java programming language. The entries can be accessed sequentially by enumerator or randomly by name. The order of the entries is undefined.
BytesMessageA stream of uninterpreted bytes. This message type is for literally encoding a body to match an existing message format.
StreamMessageA stream of primitive values in the Java programming language, filled and read sequentially.
ObjectMessageA Serializable object in the Java programming language.
Some JMS vendor implementations have added additional non-standard messages types; for example SonicMQ provides a MultipartMessage message type.
The JMS API provides methods for creating messages of each type and for filling in their contents. For example, to create and send a TextMessage, you might use the following statements:
TextMessage message = session.createTextMessage();
message.setText(msg_text); // msg_text is a String
producer.send(message);

At the consuming end, a message arrives as a generic Message object and must be cast to the appropriate message type. You can use one or more getter methods to extract the message contents. The following code fragment uses the getText() method:
Message m = consumer.receive();
if (m instanceof TextMessage) {
TextMessage message = (TextMessage) m;
System.out.println("Reading message: " + message.getText());
} else {
// Handle error
}


This concludes the overview of the JMS message structure. If you found this post helpful or have any questions or remarks, please leave a comment.

Saturday, January 11, 2014

JMS - Install HermesJMS on Windows

hermesjms logo
HermesJMS is an extensible console that helps you interact with JMS providers making it simple to publish and edit messages, browse or search queues and topics, copy messages around and delete them. Following tutorial shows how to install HermesJMS on Windows and start/stop the console.


The tutorial assumes a Java runtime environment (JRE) has been installed and configured on your computer. If not please check the following post on how to install and configure a JRE.

First thing to do is to download the HermesJMS installer Java archive (JAR). Go the the HermesJMS download page and click on the Sourceforge link in the Downloading and Webstarting section. This will redirect to the Sourceforge page, click on the 'hermes-installer-X.XX.jar' link and download the JAR file. At the time of writing the latest stable release was hermes-installer-1.14.jar.

Open a console window and navigate to the location of the downloaded 'hermes-installer-1.14.jar' and execute the following command to start the HermesJMS installer:
java -jar hermes-installer-1.14.jar
start hermesjms installer

A new window will open to start the installation of HermesJMS as shown below:

hermesjms installer welcome

Click Next twice and then select the 'I accept the terms of this license agreement' radio button:

hermesjms installer license

Click Next and change the default installation path (if needed). From now on we will refer to this directory as: [hermesjms_install_dir].

hermesjms installer installation path

Click Next twice and review the installation settings. If the settings are correct click Next to start the installation. Click Previous in case changes to the installation setting are needed. The progress of the installation will be shown as illustrated below.

hermesjms installer progress

Once the Pack installation progress bar mentions mentions '[Finished]', click on Quit. Open a console window and navigate to the [hermesjms_install_dir]. Change to the 'bin' subdirectory and execute the following command to start HermesJMS:
hermes.bat
hermesjms start command

The HermesJMS console should open as shown below:

hermesjms gui

An alternative way of starting HermesJMS is by creating a clickable shortcut. In order to do this navigate to the [hermesjms_install_dir]\bin directory using the Windows explorer. Right click on the 'hermes.bat' file and select Send to > Desktop(create shortcut).

hermesjms create shortcut

The result will be a shortcut icon on the desktop that can be used to start HermesJMS. In order to quit HermesJMS simply select File > Exit from the console top menu.

exit hermesjms gui


This concludes the basic installation of the HermesJMS. If you found this post helpful or have any questions or remarks, please leave a comment.

Friday, January 10, 2014

JMS - Install ActiveMQ on Windows

activemq logo
Apache ActiveMQ is an open source message broker (JMS provider) written in Java, together with a full JMS client. Java Message Service (JMS) is an application programming interface (API) for sending messages between two or more clients. Following tutorial shows how to install ActiveMQ and perform a start/stop of the installed instance on Windows.


First thing to do is to download the ActiveMQ binaries. Go the the ActiveMQ download page and click on the latest stable release link in the Latest Releases section. Then in the Getting the Binary Distributions section click on the download link for your operating system. This will redirect to a mirrors page, click on the first link to download the binaries. At the time of writing the latest stable release was apache-activemq-5.10.0-bin.zip.

Extract the binaries archive downloaded in the previous step. The extracted root directory should contain a number of files and subdirectories as shown below. From now on we will refer to this directory as: [activemq_install_dir].

activemq install directory

Open a console window and navigate to [activemq_install_dir]. Change to the 'bin' subdirectory and execute the following command to start ActiveMQ:
activemq start

By default ActiveMQ will generate a number of log statements at start-up as shown below:

activemq start command

One of the logs will mention 'ActiveMQ WebConsole available at http://0.0.0.0:8161/'. This means that ActiveMQ was successfully started. Open the ActiveMQ WebConsole URL in a browser by entering http://localhost:8161/ and following page should be displayed:

activemq web console

Click on the Manage ActiveMQ broker link and enter following default credentials: User name="admin" and Password="admin". A welcome page will be displayed that shows some statistics on the ActiveMQ broker:

manage activemq web console

In order to stop ActiveMQ, press CTRL+C in the console in which it is running. Then type "Y" when prompted to 'Terminate batch job' followed by ENTER. The console will return to the prompt as shown below and ActiveMQ is stopped.

activemq stop command


This concludes setting up and configuring ActiveMQ. If you found this post helpful or have any questions or remarks, please leave a comment.