Saturday, January 18, 2014

JMS - Priority using ActiveMQ and Maven

Priority levels are a powerful instrument on JMS messages which allow building robust applications where for example peak traffic will not block important messages (set with a higher priority) from getting through. The following post explains the basics of JMS priority and illustrates them with a code sample using ActiveMQ and Maven.

Setting Message Priority Levels

Message priority levels can be used to instruct the JMS provider to deliver urgent messages first. The message’s priority is contained in the JMSPriority header. You can set the priority level in either of two ways:
  1. You can use the setPriority() method of the MessageProducer interface to set the priority level for all messages sent by that producer. For example, the following call sets a priority level of '7' for a producer:
producer.setPriority(7);
  1. You can use the long form of the send() or the publish() method to set the priority level for a specific message. The third argument sets the priority level. For example, the following send call sets the priority level for message to '3':
producer.send(message, DeliveryMode.NON_PERSISTENT, 3, 0);
Setting the priority directly on the JMS Message using the setJMSPriority() method of the Message interface does not work as in that case the priority of the producer is taken!
The ten levels of priority range from 0 (lowest) to 9 (highest). If you do not specify a priority level, the default level is 4.
A JMS provider tries to deliver higher-priority messages before lower-priority ones but depending on the setup and configuration it will not always deliver messages in exact order of priority. This last point is an import one to note when designing your application.
For example on ActiveMQ, once you hit a situation where consumers are slow, or producers are just significantly faster, you'll observe that the cache will fill up (possibly with lower priority messages) while higher priority messages get stuck on disk and are not available until they're paged in. In this case, you can make a decision to tradeoff optimized message dispatching for priority enforcement. You can disable the cache, message expiration check, and lower you consumer prefetch to 1 to ensure getting the high priority messages from the store ahead of lower priority messages. However this sort of tradeoff can have significant performance implications, so always test your scenarios thoroughly.

ActiveMQ Priority Example

Let's illustrate the above by creating a simple producer with two different send() methods. The first method will send a message with the default priority level and the second method will accept an additional parameter specifying the priority to be set on the message. The example uses Maven and assumes a default ActiveMQ message broker is up and running.

Tools used:
  • ActiveMQ 5.10
  • Maven 3

First let's look at the below Maven POM file which contains the needed dependencies for Logback, JUnit and ActiveMQ.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>info.source4code</groupId>
<artifactId>jms-activemq-priority</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<name>JMS - Priority using ActiveMQ</name>
<url>http://www.source4code.info/2014/01/jms-priority-using-activemq.html</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.6</java.version>

<slf4j.version>1.7.7</slf4j.version>
<ch.qos.logback.version>1.1.2</ch.qos.logback.version>
<junit.version>4.12-beta-1</junit.version>
<activemq.version>5.10.0</activemq.version>

<maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
</properties>

<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${ch.qos.logback.version}</version>
</dependency>
<!-- JUnit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<!-- ActiveMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

Next is the Producer class which contains the two send() methods: one which applies default priority and one which applies a custom priority. The class also contains two methods for opening/closing a connection to the message broker as well as a method for creating the message producer.
package info.source4code.jms.priority;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Producer {

private static final Logger LOGGER = LoggerFactory
.getLogger(Producer.class);

private Connection connection;
private Session session;
private MessageProducer producer;

public void openConnection() throws JMSException {
// Create a new connection factory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
connection = connectionFactory.createConnection();
}

public void closeConnection() throws JMSException {
connection.close();
}

public void createProducer(String queue) throws JMSException {
// Create a session for sending messages
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create the destination to which messages will be sent
Destination destination = session.createQueue(queue);

// Create a MessageProducer for sending the messages
producer = session.createProducer(destination);
}

public void send(String text) throws JMSException {
TextMessage message = session.createTextMessage(text);
producer.send(message);

LOGGER.info("{} sent with default priority(=4)", text);
}

public void send(String text, int priority) throws JMSException {
TextMessage message = session.createTextMessage(text);
// Note: setting the priority directly on the JMS Message does not work
// as in that case the priority of the producer is taken
producer.send(message, DeliveryMode.PERSISTENT, priority, 0);

LOGGER.info("{} sent with priority={}", text, priority);
}
}

For receiving the messages a Consumer class with again two methods for opening/closing a connection to the message broker is shown below. In addition a method to create a message consumer for a specific queue and a method to receive a single message are available on the class.
package info.source4code.jms.priority;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer {

private static final Logger LOGGER = LoggerFactory
.getLogger(Consumer.class);

private Connection connection;
private Session session;
private MessageConsumer consumer;

public void openConnection() throws JMSException {
// Create a new connection factory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
connection = connectionFactory.createConnection();
}

public void closeConnection() throws JMSException {
connection.close();
}

public void createConsumer(String queue) throws JMSException {
// Create a session for receiving messages
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create the destination from which messages will be received
Destination destination = session.createQueue(queue);

// Create a MessageConsumer for receiving messages
consumer = session.createConsumer(destination);

// Start the connection in order to receive messages
connection.start();
}

public String receive(int timeout) throws JMSException {
// Read a message from the destination
Message message = consumer.receive(timeout);

// Cast the message to the correct type
TextMessage input = (TextMessage) message;

// Retrieve the message content
String text = input.getText();
LOGGER.info("{} received", text);

return text;
}
}

Last, a JUnit test class in which a first testSend() test case will send three messages with default priority to a 'priority.q' queue and then verifies if the messages are read in first in, first out (FIFO) order. And then a second testSendWithPriority() test case which will send three messages with custom priorities where the last message gets the highest priority and then verifies if the message are read in last in, first out (LIFO) order.
package info.source4code.jms.priority;

import static org.junit.Assert.assertEquals;

import javax.jms.JMSException;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class ProducerTest {

private static Consumer consumer;
private static Producer producer;

private static String QUEUE = "priority.q";

@BeforeClass
public static void setUpBeforeClass() throws JMSException {
producer = new Producer();
producer.openConnection();
producer.createProducer(QUEUE);

consumer = new Consumer();
consumer.openConnection();
consumer.createConsumer(QUEUE);
}

@AfterClass
public static void tearDownAfterClass() throws JMSException {
producer.closeConnection();
consumer.closeConnection();
}

@Test
public void testSend() throws JMSException, InterruptedException {
producer.send("message1");
producer.send("message2");
producer.send("message3");

Thread.sleep(1000);

// Messages should be received FIFO as priority is the same for all
assertEquals("message1", consumer.receive(5000));
assertEquals("message2", consumer.receive(5000));
assertEquals("message3", consumer.receive(5000));
}

@Test
public void testSendWithPriority() throws JMSException,
InterruptedException {
producer.send("message1", 1);
producer.send("message2", 2);
producer.send("message3", 3);

Thread.sleep(1000);

// Messages should be received LIFO as priority=1 is lower than
// priority=3
assertEquals("message3", consumer.receive(5000));
assertEquals("message2", consumer.receive(5000));
assertEquals("message1", consumer.receive(5000));
}
}

Make sure a default ActiveMQ message broker is up and running, open a command prompt and execute following Maven command:
mvn test

This will trigger Maven to run the above test case and will result in the following log statements. Even though 'message1' was sent first in both test cases, in the first test it is received first whereas in the second test it is received last because of the different assigned priority.

23:24:30.126 [main] INFO i.source4code.jms.priority.Producer -
message1 sent with default priority(=4)
23:24:30.152 [main] INFO i.source4code.jms.priority.Producer -
message2 sent with default priority(=4)
23:24:30.175 [main] INFO i.source4code.jms.priority.Producer -
message3 sent with default priority(=4)
23:24:31.177 [main] INFO i.source4code.jms.priority.Consumer -
message1 received
23:24:31.177 [main] INFO i.source4code.jms.priority.Consumer -
message2 received
23:24:31.178 [main] INFO i.source4code.jms.priority.Consumer -
message3 received
23:24:31.203 [main] INFO i.source4code.jms.priority.Producer -
message1 sent with priority=1
23:24:31.226 [main] INFO i.source4code.jms.priority.Producer -
message2 sent with priority=2
23:24:31.253 [main] INFO i.source4code.jms.priority.Producer -
message3 sent with priority=3
23:24:32.253 [main] INFO i.source4code.jms.priority.Consumer -
message3 received
23:24:32.254 [main] INFO i.source4code.jms.priority.Consumer -
message2 received
23:24:32.254 [main] INFO i.source4code.jms.priority.Consumer -
message1 received

By browsing the 'priority.q' using the ActiveMQ console we can verify the JMSPriority header set on the messages sent by the above test cases. The testSend() test case will create three messages a shown below, each with priority set to 4.

activemq message browser no priority

The second testSendWithPriority() test case results in three messages, each with a different priority ranging from 1 till 3.

activemq message browser with priority


github icon
If you would like to run the above code sample you can download the full source code and their corresponding JUnit test cases here.

This concludes the priority using ActiveMQ example. If you found this post helpful or have any questions or remarks, please leave a comment.

No comments:

Post a Comment