Read first JMS First Impression in the previous post. A JMS message has three parts:
1. Header, consists of: JMSDestination, JMSDeliveryMode, JMSExpiration, JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, JMSReplyTo, JMSType, JMSRedelivered.
2. Properties (optional). If we need addition values besides header fields, we can set and create message properties.
3. Body (optional). JMS API defined five message body formats, also called message types, consists of: TextMessage, MapMessage, BytesMessage, StreamMessage, ObjectMessage, Message.
JMS Message delivery styles. JMS supports synchronous and asynchronous delivery messages:
1. Synchronous Delivery. For example:
A client can request the next message from a message consumer using one of its receive methods. There are several variations of receive that allow a client to poll or wait for the next message.
QueueReceiver receiver = null;
receiver = session.createReceiver(queue);
StreamMessage stockMessage;
stockMessage = (StreamMessage)receiver.receive();
In the above code fragment, the receiver will wait indefinitely for a message. Alternatively, we could have specified a timeout in milliseconds, such as:
// wait for 10 seconds only.
stockMessage = (StreamMessage)receiver.receive(10*1000);
Or, no wait at all:
// Don’t wait?
stockMessage = (StreamMessage)receiver.receiveNoWait();
2. Asynchronous Delivery
Instead of waiting/polling the message consumer for messages, a client can register a message listener with a message consumer. A message listener is an object that implements the MessageListener.
// asynchronous reader, register a message listener.
// listener will be called for each message that come into the queue
receiver.setMessageListener(listener);
At last, but not at least. We will make a Point-to-Point Simple Calculator Application. Client will write message in queue in XML format after got request from user and read synchronously a response queue from server with specific time out by using a selector. Server will read request queue asynchronously and write response in response queue. I'm using Netbeans 6.5, and Spring Framework. Also prepare the other library like jms.jar, dom4j.jar, and Apache ActiveMQ library. Create a new Java application named CalculatorMQ. Create new package named com.ndung.mq.util. In this package we will write all utility class that needed by JMS and MQ Vendor. The First class named MQConfig.java.
package com.ndung.mq.util;
public class MQConfig {
private String queueName;
private int timeout;
//getter & setter
}
The second class named MQWriter.java. This class will be used to write message in queue.
package com.ndung.mq.util;
import java.io.*;
import java.util.Map;
import java.util.Properties;
import javax.jms.*;
import javax.naming.*;
public class MQWriter {
private ConnectionFactory queueConnectionFactory;
private QueueConnection queueConnection;
private QueueSession queueSession;
private Queue queue;
private QueueSender queueSender;
private MQConfig config;
public MQWriter(MQConfig config) {
this.config = config;
}
public void init() {
openConnection();
}
public void terminate() {
try {
if (queueConnection != null) {
queueConnection.close();
queueConnection = null;
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public void openConnection(){
try {
Properties props = new Properties();
props.load(new FileInputStream(new File("src/jndi.properties")));
Context jndiContext = new InitialContext(props);
queueConnectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
queueConnection = (QueueConnection) queueConnectionFactory.createConnection();
queueConnection.start();
} catch (NamingException ne) {
ne.printStackTrace();
} catch (JMSException je) {
je.printStackTrace();
} catch (IOException ie) {
ie.printStackTrace();
}
}
public void write(String message, Map properties) throws JMSException {
queueSender = null;
queueSession = null;
try {
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = queueSession.createQueue(config.getQueueName());
queueSender = queueSession.createSender(queue);
TextMessage textMessage = queueSession.createTextMessage();
if (properties != null || properties.size() != 0) {
for (Object o : properties.keySet()) {
if (o instanceof String)
textMessage.setStringProperty(o.toString(), properties.get(o).toString());
if (o instanceof Integer)
textMessage.setIntProperty(o.toString(), Integer.parseInt(properties.get(o).toString()));
if (o instanceof Long)
textMessage.setLongProperty(o.toString(), Long.parseLong(properties.get(o).toString()));
}
}
textMessage.setText(message);
queueSender.send(textMessage);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (queueSender != null)
queueSender.close();
if (queueSession != null)
queueSession.close();
}
}
}
The third class named MQSyncReader.java. This class will read message in queue synchronously.
package com.ndung.mq.util;
import java.io.*;
import java.util.Properties;
import javax.jms.*;
import javax.naming.*;
public class MQSyncReader {
private ConnectionFactory queueConnectionFactory;
private QueueConnection queueConnection;
private QueueSession queueSession;
private Queue queue;
private QueueReceiver queueReceiver;
private MQConfig config;
public MQSyncReader(MQConfig config) {
this.config = config;
}
public void init() {
openConnection();
}
public void terminate() {
try {
if (queueSession != null)
queueSession.close();
if (queueConnection != null)
queueConnection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public void openConnection() {
try {
Properties props = new Properties();
props.load(new FileInputStream(new File("src/jndi.properties")));
Context jndiContext = new InitialContext(props);
queueConnectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
queueConnection = (QueueConnection) queueConnectionFactory.createConnection();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queueConnection.start();
} catch (NamingException ne) {
ne.printStackTrace();
} catch (JMSException je) {
je.printStackTrace();
} catch (IOException ie) {
ie.printStackTrace();
}
}
public String read(String selector) throws JMSException {
queueReceiver = null;
String out = "";
try {
queue = queueSession.createQueue(config.getQueueName());
if (selector != null || !selector.equals("")) {
queueReceiver = queueSession.createReceiver(queue, selector);
} else {
queueReceiver = queueSession.createReceiver(queue);
}
TextMessage textMessage = null;
if (config.getTimeout() != -1)
textMessage = (TextMessage) queueReceiver.receive(config.getTimeout());
else
textMessage = (TextMessage) queueReceiver.receive();
if (textMessage != null)
out = textMessage.getText();
} catch (JMSException je) {
je.printStackTrace();
} finally {
if (queueReceiver != null)
queueReceiver.close();
}
return out;
}
}
The third class named MQAsyncReader.java. This class will read message in queue asynchronously.
package com.ndung.mq.util;
import java.io.*;
import java.util.Properties;
import javax.jms.*;
import javax.naming.*;
public class MQAsyncReader implements Runnable{
private ConnectionFactory queueConnectionFactory;
private QueueConnection queueConnection;
private QueueSession queueSession;
private Queue queue;
private QueueReceiver queueReceiver;
private MQConfig config;
private MessageListener listener;
private volatile Thread thread = null;
public MQAsyncReader(MQConfig config, MessageListener listener) {
this.config = config;
this.listener = listener;
}
public void start(){
try{
Properties props = new Properties();
props.load(new FileInputStream(new File("src/jndi.properties")));
Context jndiContext = new InitialContext(props);
queueConnectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
queueConnection = (QueueConnection) queueConnectionFactory.createConnection();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queueConnection.start();
queue = queueSession.createQueue(config.getQueueName());
queueReceiver = queueSession.createReceiver(queue);
thread = new Thread(this);
thread.start();
}
catch(NamingException ne){
ne.printStackTrace();
}
catch(JMSException je){
je.printStackTrace();
} catch (IOException ie) {
ie.printStackTrace();
}
}
public void stop() throws JMSException {
thread.interrupt();
thread = null;
if (queueReceiver != null)
queueReceiver.close();
if (queueSession != null)
queueSession.close();
if (queueConnection != null)
queueConnection.close();
}
public void run() {
Thread thisThread = Thread.currentThread();
while (thisThread == thread) {
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
ie.printStackTrace();
break;
}
try{
queueReceiver.setMessageListener(listener);
}
catch(JMSException ex){
ex.printStackTrace();
}
}
}
}
Then we will make client application. Create package named com.ndung.calc.client. Create two classes named Client.java and Main.java.
package com.ndung.calc.client;
import com.ndung.mq.util.MQSyncReader;
import com.ndung.mq.util.MQWriter;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
public class Client {
private MQWriter mQWriter;
private MQSyncReader mQSyncReader;
private String transid;
public Client(MQWriter mQWriter, MQSyncReader mQSyncReader){
this.mQWriter = mQWriter;
this.mQSyncReader = mQSyncReader;
}
public String execute(int arg1, String operator, int arg2) throws JMSException{
this.transid = String.valueOf(System.currentTimeMillis());
String message = "\n" +
"\n" + ";
""+arg1+" \n"+
""+operator+" \n"+
""+arg2+" \n"+
"
queuePullRequest(message);
String result = lookupFromQueue();
return result;
}
private void queuePullRequest(String message) throws JMSException {
Map map = new HashMap();
map.put("TransactionID", transid);
mQWriter.write(message, map);
}
private String lookupFromQueue() throws JMSException{
StringBuffer selector = new StringBuffer(1024);
selector.append("TransactionID = '").append(transid).append("'");
return mQSyncReader.read(selector.toString());
}
}
package com.ndung.calc.client;
import java.util.Scanner;
import javax.jms.JMSException;
import javax.naming.NamingException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
public class Main {
public static void main(String[] args) throws NamingException, JMSException {
Scanner scanner = new Scanner(System.in);
System.out.print("Please input the first arg: ");
int arg1 = scanner.nextInt();
System.out.print("Please input the operation, example: +,-,/,* : ");
String operator = scanner.next();
System.out.print("Please input the second arg: ");
int arg2 = scanner.nextInt();
ApplicationContext ctx = new FileSystemXmlApplicationContext("src/applicationContext.xml");
Client client = (Client) ctx.getBean("client");
System.out.println("The Result is = "+client.execute(arg1, operator, arg2));
}
}
After that we make server application. Create package named com.ndung.calc.server. First make a listener class that will be registered to catch for each message that listed in request queue. This class named MQHandler.java. Then make an interface named Processor.java, that used to process request. And then four classes that implements Processor. These classes represent each calculator operation that will be handled. Last, make a Main class for server application named Main.java.
package com.ndung.calc.server;
import com.ndung.mq.util.MQWriter;
import java.io.*;
import java.util.*;
import javax.jms.*;
import org.dom4j.*;
import org.dom4j.io.SAXReader;
public class MQHandler implements MessageListener{
Map<String, Processor> map;
MQWriter mQWriter;
public MQHandler(Map<String, Processor> map, MQWriter mqWriter){
this.map = map;
this.mQWriter = mqWriter;
}
public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) msg;
String rqMsg = txtMsg.getText();
System.out.println(rqMsg);
String transID = txtMsg.getStringProperty("TransactionID");
Map properties = new HashMap();
properties.put("TransactionID", transID);
//process response message
InputStream in = new ByteArrayInputStream(rqMsg.getBytes());
SAXReader reader = new SAXReader();
Document document = reader.read(in);
Element root = document.getRootElement();
int arg1 = Integer.parseInt(root.elementText("arg1"));
int arg2 = Integer.parseInt(root.elementText("arg2"));
String operator = root.elementText("operator");
Processor processor = map.get(operator);
int result = 0;
if (processor!=null)
result = processor.process(arg1, arg2);
System.out.println("RESULT="+result);
mQWriter.write(String.valueOf(result), properties);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (DocumentException de){
de.printStackTrace();
}
}
}
package com.ndung.calc.server;
public interface Processor {
public int process(int arg0, int arg1);
}
package com.ndung.calc.server;
public class ProcessorAdd implements Processor{
public ProcessorAdd(){
}
public int process(int arg0, int arg1) {
int result = arg0 + arg1;
return result;
}
}
package com.ndung.calc.server;
import com.ndung.mq.util.MQAsyncReader;
import javax.jms.JMSException;
import javax.naming.NamingException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
public class Main {
private boolean active = true;
MQAsyncReader[] reader;
private int maxHandler = 1;
static ApplicationContext ctx = new FileSystemXmlApplicationContext("src/applicationContext.xml");
public void start() throws JMSException {
MQAsyncReader r = (MQAsyncReader) ctx.getBean("mqAsyncReader");
if (active) {
reader = new MQAsyncReader[maxHandler];
for (int i = 0; i < maxHandler; i++) {
reader[i] = r;
reader[i].start();
}
}
else {
System.out.println("start no active");
}
}
public synchronized void stop() throws JMSException {
for (int i = 0; i < reader.length; i++) {
reader[i].stop();
}
}
public static void main(String[] args) throws NamingException, JMSException{
final Main main = new Main();
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable() {
public void run() {
try {
main.stop();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}, "ShutdownHook"));
main.start();
}
}
Last make two configuration files in folder src. First named jndi.properties that used for JNDI configuration and the second named applicationContext.xml that used for Spring configuration file.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
<bean id="mqAsyncReader" class="com.ndung.mq.util.MQAsyncReader">
<constructor-arg>
<ref local="reqConfig" />
</constructor-arg>
<constructor-arg>
<ref local="mqHandler" />
</constructor-arg>
</bean>
<bean id="reqConfig" class="com.ndung.mq.util.MQConfig">
<property name="queueName">
<value>Q.REQUEST</value>
</property>
</bean>
<bean id="respConfig" class="com.ndung.mq.util.MQConfig">
<property name="queueName">
<value>Q.RESPONSE</value>
</property>
<property name="timeout">
<value>20000</value>
</property>
</bean>
<bean id="mqHandler" class="com.ndung.calc.server.MQHandler">
<constructor-arg>
<map>
<entry key="+">
<bean class="com.ndung.calc.server.ProcessorAdd">
</bean>
</map>
</constructor-arg>
<constructor-arg>
<ref local="mqRespWriter" />
</constructor-arg>
</bean>
<bean id="mqReqWriter" class="com.ndung.mq.util.MQWriter" init-method="init"
destroy-method="terminate">
<constructor-arg>
<ref local="reqConfig" />
</constructor-arg>
</bean>
<bean id="mqRespSyncReader" class="com.ndung.mq.util.MQSyncReader" init-method="init"
destroy-method="terminate">
<constructor-arg>
<ref local="respConfig" />
</constructor-arg>
</bean>
<bean id="client" class="com.ndung.calc.client.Client">
<constructor-arg>
<ref local="mqReqWriter"/>
</constructor-arg>
<constructor-arg>
<ref local="mqRespSyncReader"/>
</constructor-arg>
</bean>
<bean id="mqRespWriter" class="com.ndung.mq.util.MQWriter" init-method="init"
destroy-method="terminate">
<constructor-arg>
<ref local="respConfig" />
</constructor-arg>
</bean>
</beans>
Run one instance of com.ndung.calc.server.Main, run and enter user inputs for several instance of com.ndung.calc.client.Main. Next post, introducing into MQ vendor that so many used in market and industry. It is MQSeries from IBM.
Read More..