Wednesday, March 25, 2009

JMS Quick Start Using Apache ActiveMQ

Read first JMS First Impression in the previous post. A JMS message has three parts:
1. Header, consists of: JMSDestination, JMSDeliveryMode, JMSExpiration, JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, JMSReplyTo, JMSType, JMSRedelivered.
2. Properties (optional). If we need addition values besides header fields, we can set and create message properties.
3. Body (optional). JMS API defined five message body formats, also called message types, consists of: TextMessage, MapMessage, BytesMessage, StreamMessage, ObjectMessage, Message.

JMS Message delivery styles. JMS supports synchronous and asynchronous delivery messages:
1. Synchronous Delivery. For example:
A client can request the next message from a message consumer using one of its receive methods. There are several variations of receive that allow a client to poll or wait for the next message.


QueueReceiver receiver = null;
receiver = session.createReceiver(queue);
StreamMessage stockMessage;
stockMessage = (StreamMessage)receiver.receive();

In the above code fragment, the receiver will wait indefinitely for a message. Alternatively, we could have specified a timeout in milliseconds, such as:

// wait for 10 seconds only.
stockMessage = (StreamMessage)receiver.receive(10*1000);

Or, no wait at all:

// Don’t wait?
stockMessage = (StreamMessage)receiver.receiveNoWait();

2. Asynchronous Delivery
Instead of waiting/polling the message consumer for messages, a client can register a message listener with a message consumer. A message listener is an object that implements the MessageListener.

// asynchronous reader, register a message listener.
// listener will be called for each message that come into the queue
receiver.setMessageListener(listener);

At last, but not at least. We will make a Point-to-Point Simple Calculator Application. Client will write message in queue in XML format after got request from user and read synchronously a response queue from server with specific time out by using a selector. Server will read request queue asynchronously and write response in response queue. I'm using Netbeans 6.5, and Spring Framework. Also prepare the other library like jms.jar, dom4j.jar, and Apache ActiveMQ library. Create a new Java application named CalculatorMQ. Create new package named com.ndung.mq.util. In this package we will write all utility class that needed by JMS and MQ Vendor. The First class named MQConfig.java.

package com.ndung.mq.util;

public class MQConfig {
private String queueName;
private int timeout;

//getter & setter
}


The second class named MQWriter.java. This class will be used to write message in queue.

package com.ndung.mq.util;

import java.io.*;
import java.util.Map;
import java.util.Properties;
import javax.jms.*;
import javax.naming.*;

public class MQWriter {

private ConnectionFactory queueConnectionFactory;
private QueueConnection queueConnection;
private QueueSession queueSession;
private Queue queue;
private QueueSender queueSender;
private MQConfig config;

public MQWriter(MQConfig config) {
this.config = config;
}

public void init() {
openConnection();
}

public void terminate() {
try {
if (queueConnection != null) {
queueConnection.close();
queueConnection = null;
}
} catch (JMSException e) {
e.printStackTrace();
}
}

public void openConnection(){
try {
Properties props = new Properties();
props.load(new FileInputStream(new File("src/jndi.properties")));
Context jndiContext = new InitialContext(props);
queueConnectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
queueConnection = (QueueConnection) queueConnectionFactory.createConnection();
queueConnection.start();
} catch (NamingException ne) {
ne.printStackTrace();
} catch (JMSException je) {
je.printStackTrace();
} catch (IOException ie) {
ie.printStackTrace();
}
}

public void write(String message, Map properties) throws JMSException {
queueSender = null;
queueSession = null;
try {
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = queueSession.createQueue(config.getQueueName());
queueSender = queueSession.createSender(queue);
TextMessage textMessage = queueSession.createTextMessage();
if (properties != null || properties.size() != 0) {
for (Object o : properties.keySet()) {
if (o instanceof String)
textMessage.setStringProperty(o.toString(), properties.get(o).toString());
if (o instanceof Integer)
textMessage.setIntProperty(o.toString(), Integer.parseInt(properties.get(o).toString()));
if (o instanceof Long)
textMessage.setLongProperty(o.toString(), Long.parseLong(properties.get(o).toString()));
}
}
textMessage.setText(message);
queueSender.send(textMessage);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (queueSender != null)
queueSender.close();
if (queueSession != null)
queueSession.close();
}
}
}

The third class named MQSyncReader.java. This class will read message in queue synchronously.

package com.ndung.mq.util;

import java.io.*;
import java.util.Properties;
import javax.jms.*;
import javax.naming.*;

public class MQSyncReader {

private ConnectionFactory queueConnectionFactory;
private QueueConnection queueConnection;
private QueueSession queueSession;
private Queue queue;
private QueueReceiver queueReceiver;
private MQConfig config;

public MQSyncReader(MQConfig config) {
this.config = config;
}

public void init() {
openConnection();
}

public void terminate() {
try {
if (queueSession != null)
queueSession.close();
if (queueConnection != null)
queueConnection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}

public void openConnection() {
try {
Properties props = new Properties();
props.load(new FileInputStream(new File("src/jndi.properties")));
Context jndiContext = new InitialContext(props);

queueConnectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
queueConnection = (QueueConnection) queueConnectionFactory.createConnection();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queueConnection.start();
} catch (NamingException ne) {
ne.printStackTrace();
} catch (JMSException je) {
je.printStackTrace();
} catch (IOException ie) {
ie.printStackTrace();
}
}

public String read(String selector) throws JMSException {
queueReceiver = null;
String out = "";
try {
queue = queueSession.createQueue(config.getQueueName());
if (selector != null || !selector.equals("")) {
queueReceiver = queueSession.createReceiver(queue, selector);
} else {
queueReceiver = queueSession.createReceiver(queue);
}
TextMessage textMessage = null;
if (config.getTimeout() != -1)
textMessage = (TextMessage) queueReceiver.receive(config.getTimeout());
else
textMessage = (TextMessage) queueReceiver.receive();
if (textMessage != null)
out = textMessage.getText();
} catch (JMSException je) {
je.printStackTrace();
} finally {
if (queueReceiver != null)
queueReceiver.close();
}
return out;
}
}

The third class named MQAsyncReader.java. This class will read message in queue asynchronously.

package com.ndung.mq.util;

import java.io.*;
import java.util.Properties;
import javax.jms.*;
import javax.naming.*;

public class MQAsyncReader implements Runnable{

private ConnectionFactory queueConnectionFactory;
private QueueConnection queueConnection;
private QueueSession queueSession;
private Queue queue;
private QueueReceiver queueReceiver;
private MQConfig config;
private MessageListener listener;
private volatile Thread thread = null;

public MQAsyncReader(MQConfig config, MessageListener listener) {
this.config = config;
this.listener = listener;
}

public void start(){
try{
Properties props = new Properties();
props.load(new FileInputStream(new File("src/jndi.properties")));
Context jndiContext = new InitialContext(props);

queueConnectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
queueConnection = (QueueConnection) queueConnectionFactory.createConnection();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queueConnection.start();
queue = queueSession.createQueue(config.getQueueName());
queueReceiver = queueSession.createReceiver(queue);
thread = new Thread(this);
thread.start();
}
catch(NamingException ne){
ne.printStackTrace();
}
catch(JMSException je){
je.printStackTrace();
} catch (IOException ie) {
ie.printStackTrace();
}
}

public void stop() throws JMSException {
thread.interrupt();
thread = null;
if (queueReceiver != null)
queueReceiver.close();
if (queueSession != null)
queueSession.close();
if (queueConnection != null)
queueConnection.close();
}

public void run() {
Thread thisThread = Thread.currentThread();
while (thisThread == thread) {
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
ie.printStackTrace();
break;
}

try{
queueReceiver.setMessageListener(listener);
}
catch(JMSException ex){
ex.printStackTrace();
}
}
}
}

Then we will make client application. Create package named com.ndung.calc.client. Create two classes named Client.java and Main.java.

package com.ndung.calc.client;

import com.ndung.mq.util.MQSyncReader;
import com.ndung.mq.util.MQWriter;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;

public class Client {
private MQWriter mQWriter;
private MQSyncReader mQSyncReader;
private String transid;

public Client(MQWriter mQWriter, MQSyncReader mQSyncReader){
this.mQWriter = mQWriter;
this.mQSyncReader = mQSyncReader;
}

public String execute(int arg1, String operator, int arg2) throws JMSException{
this.transid = String.valueOf(System.currentTimeMillis());
String message = "\n" +
"\n" +
""+arg1+"\n"+
""+operator+"\n"+
""+arg2+"\n"+
"
";
queuePullRequest(message);
String result = lookupFromQueue();
return result;
}

private void queuePullRequest(String message) throws JMSException {
Map map = new HashMap();
map.put("TransactionID", transid);
mQWriter.write(message, map);
}

private String lookupFromQueue() throws JMSException{
StringBuffer selector = new StringBuffer(1024);
selector.append("TransactionID = '").append(transid).append("'");
return mQSyncReader.read(selector.toString());
}
}


package com.ndung.calc.client;

import java.util.Scanner;
import javax.jms.JMSException;
import javax.naming.NamingException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;

public class Main {
public static void main(String[] args) throws NamingException, JMSException {
Scanner scanner = new Scanner(System.in);
System.out.print("Please input the first arg: ");
int arg1 = scanner.nextInt();
System.out.print("Please input the operation, example: +,-,/,* : ");
String operator = scanner.next();
System.out.print("Please input the second arg: ");
int arg2 = scanner.nextInt();

ApplicationContext ctx = new FileSystemXmlApplicationContext("src/applicationContext.xml");
Client client = (Client) ctx.getBean("client");
System.out.println("The Result is = "+client.execute(arg1, operator, arg2));
}
}

After that we make server application. Create package named com.ndung.calc.server. First make a listener class that will be registered to catch for each message that listed in request queue. This class named MQHandler.java. Then make an interface named Processor.java, that used to process request. And then four classes that implements Processor. These classes represent each calculator operation that will be handled. Last, make a Main class for server application named Main.java.

package com.ndung.calc.server;

import com.ndung.mq.util.MQWriter;
import java.io.*;
import java.util.*;
import javax.jms.*;
import org.dom4j.*;
import org.dom4j.io.SAXReader;

public class MQHandler implements MessageListener{

Map<String, Processor> map;
MQWriter mQWriter;

public MQHandler(Map<String, Processor> map, MQWriter mqWriter){
this.map = map;
this.mQWriter = mqWriter;
}

public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) msg;
String rqMsg = txtMsg.getText();
System.out.println(rqMsg);
String transID = txtMsg.getStringProperty("TransactionID");
Map properties = new HashMap();
properties.put("TransactionID", transID);
//process response message
InputStream in = new ByteArrayInputStream(rqMsg.getBytes());
SAXReader reader = new SAXReader();
Document document = reader.read(in);
Element root = document.getRootElement();
int arg1 = Integer.parseInt(root.elementText("arg1"));
int arg2 = Integer.parseInt(root.elementText("arg2"));
String operator = root.elementText("operator");
Processor processor = map.get(operator);
int result = 0;
if (processor!=null)
result = processor.process(arg1, arg2);
System.out.println("RESULT="+result);
mQWriter.write(String.valueOf(result), properties);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (DocumentException de){
de.printStackTrace();
}
}
}


package com.ndung.calc.server;

public interface Processor {
public int process(int arg0, int arg1);
}


package com.ndung.calc.server;

public class ProcessorAdd implements Processor{

public ProcessorAdd(){
}

public int process(int arg0, int arg1) {
int result = arg0 + arg1;
return result;
}
}


package com.ndung.calc.server;

import com.ndung.mq.util.MQAsyncReader;
import javax.jms.JMSException;
import javax.naming.NamingException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;

public class Main {
private boolean active = true;
MQAsyncReader[] reader;
private int maxHandler = 1;
static ApplicationContext ctx = new FileSystemXmlApplicationContext("src/applicationContext.xml");

public void start() throws JMSException {
MQAsyncReader r = (MQAsyncReader) ctx.getBean("mqAsyncReader");
if (active) {
reader = new MQAsyncReader[maxHandler];
for (int i = 0; i < maxHandler; i++) {
reader[i] = r;
reader[i].start();
}
}
else {
System.out.println("start no active");
}
}

public synchronized void stop() throws JMSException {
for (int i = 0; i < reader.length; i++) {
reader[i].stop();
}
}

public static void main(String[] args) throws NamingException, JMSException{
final Main main = new Main();
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable() {
public void run() {
try {
main.stop();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}, "ShutdownHook"));
main.start();
}
}

Last make two configuration files in folder src. First named jndi.properties that used for JNDI configuration and the second named applicationContext.xml that used for Spring configuration file.

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616


<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
<bean id="mqAsyncReader" class="com.ndung.mq.util.MQAsyncReader">
<constructor-arg>
<ref local="reqConfig" />
</constructor-arg>
<constructor-arg>
<ref local="mqHandler" />
</constructor-arg>
</bean>
<bean id="reqConfig" class="com.ndung.mq.util.MQConfig">
<property name="queueName">
<value>Q.REQUEST</value>
</property>
</bean>
<bean id="respConfig" class="com.ndung.mq.util.MQConfig">
<property name="queueName">
<value>Q.RESPONSE</value>
</property>
<property name="timeout">
<value>20000</value>
</property>
</bean>
<bean id="mqHandler" class="com.ndung.calc.server.MQHandler">
<constructor-arg>
<map>
<entry key="+">
<bean class="com.ndung.calc.server.ProcessorAdd">
</bean>
</map>
</constructor-arg>
<constructor-arg>
<ref local="mqRespWriter" />
</constructor-arg>
</bean>
<bean id="mqReqWriter" class="com.ndung.mq.util.MQWriter" init-method="init"
destroy-method="terminate">
<constructor-arg>
<ref local="reqConfig" />
</constructor-arg>
</bean>
<bean id="mqRespSyncReader" class="com.ndung.mq.util.MQSyncReader" init-method="init"
destroy-method="terminate">
<constructor-arg>
<ref local="respConfig" />
</constructor-arg>
</bean>
<bean id="client" class="com.ndung.calc.client.Client">
<constructor-arg>
<ref local="mqReqWriter"/>
</constructor-arg>
<constructor-arg>
<ref local="mqRespSyncReader"/>
</constructor-arg>
</bean>
<bean id="mqRespWriter" class="com.ndung.mq.util.MQWriter" init-method="init"
destroy-method="terminate">
<constructor-arg>
<ref local="respConfig" />
</constructor-arg>
</bean>
</beans>

Run one instance of com.ndung.calc.server.Main, run and enter user inputs for several instance of com.ndung.calc.client.Main. Next post, introducing into MQ vendor that so many used in market and industry. It is MQSeries from IBM.

Read More..

Tuesday, March 24, 2009

Introduction to Message-oriented Middleware (MOM) and Java Messaging Service (JMS) Using Apache ActiveMQ


package com.ndung;

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloWorldProducer {
public static void main(String[] args) throws JMSException {
//Instantiate connection factory, specific to the vendor
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.MQ");
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
// Create a messages
String text = "Hello world! From: ndung";
TextMessage message = session.createTextMessage(text);
// Tell the producer to send the message
System.out.println("Message Sent");
producer.send(message);
// Clean up
session.close();
connection.close();
}
}


package com.ndung;

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloWorldConsumer {
public static void main(String[] args) throws JMSException {
//Instantiate connection factory, specific to the vendor
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.MQ");
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + message);
}
consumer.close();
session.close();
connection.close();
}
}

In the previous post, we have been introduced into EAI (Enterprise Application Integration) and a flash on Middleware. There are two fundamentally different types of middleware based on the approach used by the middleware to transfer the data between the distributed software applications. They are Remote Procedure Calls (RPC) based middleware and Message-oriented Middleware (MOM).
1. RPC. Software application that uses RPC based middleware to transfer data to another software application has to wait until the latter application is done processing the data. Thus, with this type of middleware, the communication proceeds in a lock step, synchronized manner, and the communicating processes are tightly coupled to one another. Examples of such middleware include Java RMI, CORBA, etc.
2. MOM. Is best described as a category of software for communication in an loosely-coupled, reliable, scalable, enabled asynchronous communication amongst distributed applications or systems.


JMS is a spesification that defines a set of interfaces and associated semantics, which allow applications written in Java to access the services of any JMS compliant Message MOM product. There are plenty of compliant MOM products available in market, including MQSeries from IBM, SonicMQ from Progress, Sun Java Message Queue, even Apache ActiveMQ, and many more.
The players in JMS:
1. Connections and Connection Factories
2. Sessions
3. Destinations
We should try first JMS First Impression using ActiveMQ in here. I'm using Netbeans 6.5 as IDE and prepare library: activemq-all-5.2.0.jar and jms.jar.

There are two different types of MOM: point-to-point and publish-and-subscribe.
1. Point-to-Point messaging style. In this model, a MOM is used by two applications to communicate with each other, often as an asynchronous replacement for remote procedure calls (RPC).
2. Publish-and-Subscribe messaging style. In this model multiple applications connect to the MOM as their publishers, which are producers of messages, or subscribers, which are consumers of messages. An important point of difference between the two styles is that a a point-to-point system is typically either a one-to-one system, which means one message sender talking to one message receiver, or it is a a many-to-one system, which means more than one senders are talking to one receiver. On the other hand, publish-and-subscribe systems are typically many-to-many systems, which means that there could be one or more publishers talking to one or more subscribers at a time.

Simple PTP (Point-to-Point) Application and Pub/Sub (Publisher and Subscriber) Application
We will enhance sample above by setting up activemq broker first and using JNDI. Configuring JNDI web application in Tomcat container can read in here. After we extract apache-activemq-5.2.0.rar that we have downloaded, edit activemq.xml that placed in folder conf. Delete all "multicast" in this configuration file. After that, enter folder bin, and run activemq. In the same project with above create packages named com.ndung.ptp and com.ndung.ps. Create new two java classes named SimpleQueueSender.java and SimpleQueueReceiver.java in com.ndung.ptp package and create new two java classes named SimpleTopicPublisher.java and SimpleTopicSubscriber in com.ndung.ps package.


package com.ndung.ptp;

import java.util.Properties;
import javax.jms.*;
import javax.naming.*;

public class SimpleQueueSender {
public static void main(String[] args) throws NamingException, JMSException {
String queueName = "PTP.ACTIVE.MQ";
//setting JNDI configuration, differents between vendors.
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
Context jndiContext = new InitialContext(props);
ConnectionFactory queueConnectionFactory = (ConnectionFactory)
jndiContext.lookup("ConnectionFactory");
QueueConnection queueConnection = (QueueConnection)
queueConnectionFactory.createConnection();
QueueSession queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueSender queueSender = queueSession.createSender(queue);
queueConnection.start();
TextMessage textMessage = queueSession.createTextMessage();
textMessage.setText("Hello World");
queueSender.send(textMessage);
queueSession.close();
queueConnection.close();
}
}


package com.ndung.ptp;

import java.util.Properties;
import javax.jms.*;
import javax.naming.*;

public class SimpleQueueReceiver {
public static void main(String[] args) throws JMSException, NamingException {
String queueName = "PTP.ACTIVE.MQ";
//setting JNDI configuration, differents between vendors.
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
Context jndiContext = new InitialContext(props);

ConnectionFactory queueConnectionFactory = (ConnectionFactory)
jndiContext.lookup("ConnectionFactory");
QueueConnection queueConnection = (QueueConnection)
queueConnectionFactory.createConnection();
QueueSession queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
queueConnection.start();
TextMessage textMessage = (TextMessage) queueReceiver.receive();
System.out.println(textMessage);
queueSession.close();
queueConnection.close();
}
}


package com.ndung.ps;

import java.util.Properties;
import javax.jms.*;
import javax.naming.*;

public class SimpleTopicPublisher {
public static void main(String[] args) throws Exception {
try {
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
Context jndiContext = new InitialContext(props);
ConnectionFactory myConnectionFactory = (ConnectionFactory)
jndiContext.lookup("ConnectionFactory");
// Use myConnectionFactory to get a Topic connection
TopicConnection myConnection = (TopicConnection)
myConnectionFactory.createConnection();
// Use myConnection to create a Topic session
TopicSession mySession = myConnection.createTopicSession(false, 1);
// Use mySession to get the Topic
Topic myTopic = mySession.createTopic("PS.ACTIVE.MQ");
// Use mySession to create a publisher for myTopic
TopicPublisher myPublisher = mySession.createPublisher(myTopic);
// Start the connection
myConnection.start();
// Create the HelloWorld message
TextMessage m = mySession.createTextMessage();
m.setText("Hello World");
// Use myPublisher to publish the message
myPublisher.publish(m);
// Done.
// Need to clean up
mySession.close();
myConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}


package com.ndung.ps;

import java.util.Properties;
import javax.jms.*;
import javax.naming.*;

public class SimpleTopicSubscriber {
public static void main(String[] args) {
try {
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
Context jndiContext = new InitialContext(props);
ConnectionFactory myConnectionFactory = (ConnectionFactory)
jndiContext.lookup("ConnectionFactory");
// Use myConnectionFactory to get a Topic connection
TopicConnection myConnection = (TopicConnection)
myConnectionFactory.createConnection();
// Use myConnection to create a Topic session
TopicSession mySession = myConnection.createTopicSession(false, 1);
// Use mySession to get the Topic
Topic myTopic = mySession.createTopic("PS.ACTIVE.MQ");
// Use mySession to create a subscriber
TopicSubscriber mySubscriber = mySession.createSubscriber(myTopic);
// Start the connection
myConnection.start();
// Wait for the Hello World message
// Use the receiver and wait forever until the message arrives
TextMessage m = (TextMessage) mySubscriber.receive();
// Display the message
System.out.println("Received the message: " + m.getText());
// Done.
// Need to clean up
mySession.close();
myConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

To know the differences between PTP application and Pub/Sub application:
1. Run two instance of SimpleQueueReceiver and then run one instance of SimpleQueueSender. One of SimpleQueueReceiver got message from SimpleQueueSender and the other one didn't.
2. Run two instance of SimpleTopicSubscriber and then run one instance of SimpleTopicPublisher. We can look both of SimpleTopicSubsriber got message from SimpleTopicPublisher.
We can monitor message queue by running http://localhost:8161/admin in web browser. More better coding style in the next post.

Read More..

Wednesday, March 11, 2009

Introduction to EAI

Enterprise Application Integration (EAI):

Imagine that a large company/project have more than one applications/softwares:
1. These applications run in different platforms, say for example: windows, linux, sun solaris, os/400, and so on.
2. Between these applications: from one application to another application must can communicate to each other. As example: for common case, one application need data from another application.
3. Message format for each applications are different. As example: One application uses EDI, another application uses XML, another application uses ISO8583 format, and so on.
4. Each application use different communication tools. As example: One application can use MQ, another application can use TCP/IP, another application use WebService, another application just can use HTTP, and so on.



EAI architecture must accommodate all transactions that need coordination from one application to another system application. EAI architecture must have characteristics as below:
1. A standard message format.
2. Middleware as communication channel.

Middleware must can handle following aspects:
1. Make sure message arrive at its destination.
2. Convert message from it's standard message format to other application message format and the other way.
3. Coordinate transaction.
4. Monitoring and logging.



EAI architecture components:
1. Adapter. EAI first implementation usually built by an adapter program for each application. This adapter will convert message from application's message format to middleware standard message format and other way.
2. Middleware also must have message router. This program will validate message and route message to appropriate destination.

Next material:
EAI Technology. EAI products as hub-and-spoke. Example of EAI products.
Taken from this material link.

Read More..

Tuesday, October 07, 2008

(ISO 8583 + JPOS + Log4J) in Java Socket Programming

SECTION 1. INTRODUCTION
Chapter 1.1. ISO 8583
You can read it's explanation first in Wikipedia.
Chapter 1.2. JPOS
JPOS is opensource framework for ISO 8583. Source code and library are free but documentation is comercial. We can download it here.
Chapter 1.3. Log4J
Log4J is opensource logging library. We can download it's library here and read it's mini e-book first written by Mr. Endy in here.
Chapter 1.4. Java Socket Programming
There's no explanation. Just need the basic knowledge of Java network programming and Java Thread Programming and let's coding.

SECTION 2. CODING
First we prepare that all we need. In this time, we will use NetBeans as IDE. Besides JPOS framework and Log4J library that I have mentioned above, we still need:
1. Xerces-Java XML parser library (xercesImpl.jar)
2. Spring framework
3. Jakarta Apache Commons Logging library

Chapter 2.1. ISO 8583 + JPOS + Log4J
In NetBeans, create a new Java application named MyApp. Create a package named com.ndung.iso8583. JPOS framework need a packager to set which ISO 8583 version that will be used. In this time we will use ISO 8583 version 1987. Download first iso87ascii.xml and place it in that package. Create a class as packager factory named PackagerFactory.java.


package com.ndung.iso8583;

import java.io.InputStream;

import org.jpos.iso.ISOException;
import org.jpos.iso.ISOPackager;
import org.jpos.iso.packager.GenericPackager;

public class PackagerFactory {
public static ISOPackager getPackager() {
ISOPackager packager = null;
try {
String filename = "iso87ascii.xml";
InputStream is = PackagerFactory.class.getResourceAsStream(filename);
packager = new GenericPackager(is);
}
catch (ISOException e) {
e.printStackTrace();
}
return packager;
}
}

And then create a class named MessageHandler.java that will be used to handle received message. Download first iso87asciiProperties.xml that will be used to translate bits of ISO message to become a String message that can be read easily.


package com.ndung.iso8583;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import org.apache.log4j.Logger;
import org.jpos.iso.ISOException;
import org.jpos.iso.ISOMsg;
import org.jpos.iso.ISOPackager;

public class MessageHandler {
private static ISOPackager packager = PackagerFactory.getPackager();
private Logger logger = Logger.getLogger( getClass() );
public String process(ISOMsg isomsg) throws Exception {
logger.info("ISO Message MTI is "+isomsg.getMTI());
logger.info("Is ISO message a incoming message? "+isomsg.isIncoming());
logger.info("Is ISO message a outgoing message? "+isomsg.isOutgoing());
logger.info("Is ISO message a request message? "+isomsg.isRequest());
logger.info("Is ISO message a response message? "+isomsg.isResponse());
String message = "";
for (int i=0;i<128;i++){
if (isomsg.hasField(i)){
message += loadXMLProperties().getProperty(Integer.toString(i))+"="+
isomsg.getValue(i)+"\n";
}
}
logger.info(message);
return message;
}

public ISOMsg unpackRequest(String message) throws ISOException, Exception {
ISOMsg isoMsg = new ISOMsg();
isoMsg.setPackager(packager);
isoMsg.unpack(message.getBytes());
isoMsg.dump(System.out, " ");
return isoMsg ;
}

public String packResponse(ISOMsg message) throws ISOException, Exception {
message.dump(System.out, " ");
return new String( message.pack() ) ;
}

public Properties loadXMLProperties(){
Properties prop = new Properties();
try{
FileInputStream input=new FileInputStream("iso87asciiProperties.xml");
prop.loadFromXML(input);
input.close();
}
catch(IOException e){
e.printStackTrace();
}
return prop;
}
}

Chapter 2.2. Java Socket Programming + Log4J
Create a package named com.ndung.socket and then create four classes named ServerConfig.java, SocketServerHandlerFactory.java, SocketServerHandler.java, SocketConnectionServer. Before that create log4j.properties as logging configuration in default package.

# Category Configuration
log4j.rootLogger=INFO,Konsole,Roll
# Console Appender Configuration
log4j.appender.Konsole=org.apache.log4j.ConsoleAppender
log4j.appender.Konsole.layout=org.apache.log4j.PatternLayout
# Date Format based on ISO­8601 : %d
log4j.appender.Konsole.layout.ConversionPattern=%d [%t] %5p %c ­ %m%n
# Roll Appender Configuration
log4j.appender.Roll=org.apache.log4j.RollingFileAppender
log4j.appender.Roll.File=/home/ndung/NetBeansProjects/MyApp/log/myApp.log
log4j.appender.Roll.MaxFileSize=10KB
log4j.appender.Roll.MaxBackupIndex=2
log4j.appender.Roll.layout=org.apache.log4j.PatternLayout
# Date Format based on ISO­8601 : %d
log4j.appender.Roll.layout.ConversionPattern=%d [%t] %p (%F:%L) ­ %m%n


package com.ndung.socket;
public class ServerConfig {
private int port;
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}


package com.ndung.socket;

import com.ndung.iso8583.MessageHandler;
import java.io.IOException;
import java.net.Socket;

public class SocketServerHandlerFactory {
private MessageHandler messageHandler;
public SocketServerHandlerFactory(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
public SocketServerHandler createHandler(Socket socket) throws IOException {
return new SocketServerHandler(socket, messageHandler);
}
}


package com.ndung.socket;

import com.ndung.iso8583.MessageHandler;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import org.apache.log4j.Logger;
import org.jpos.iso.ISOMsg;

public class SocketServerHandler extends Thread{
private Logger logger = Logger.getLogger( getClass() );
private Socket serverSocket ;
private BufferedReader inFromClient;
private DataOutputStream outToClient;
private MessageHandler messageHandler;
private String datafromClient;

public SocketServerHandler(Socket socket, MessageHandler messageHandler) throws IOException {
super("SocketHandler (" + socket.getInetAddress().getHostAddress() + ")");
this.serverSocket = socket ;
this.messageHandler = messageHandler;
this.inFromClient = new BufferedReader(new InputStreamReader(serverSocket.getInputStream()));
this.outToClient = new DataOutputStream(serverSocket.getOutputStream());
}
@Override
public void run() {
try {
logger.info("Server is ready...");
while (true) {
logger.info("There is a client connected...");
outToClient.writeBytes("InfoServer version 0.1\n");
datafromClient = inFromClient.readLine();
logger.info("Data From Client : "+datafromClient);
ISOMsg isomsg = messageHandler.unpackRequest(datafromClient);
outToClient.writeBytes(messageHandler.process(isomsg));
}
}
catch (IOException ioe) {
logger.error("error: " + ioe);
}
catch (Exception e) {
logger.error("error: " + e);
}
finally {
try {
if (inFromClient != null) inFromClient.close();
if (outToClient != null) outToClient.close();
if (serverSocket != null) serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}


package com.ndung.socket;

import java.io.IOException;
import java.net.ServerSocket;

public class SocketConnectionServer {
private ServerConfig config;
private SocketServerHandlerFactory handlerFactory;
private boolean stop;
public SocketConnectionServer(ServerConfig config, SocketServerHandlerFactory handlerFactory) {
this.config = config;
this.handlerFactory = handlerFactory;
}
public void start() throws IOException {
stop = false;
final ServerSocket serverSocket = new ServerSocket(config.getPort());
new Thread(new Runnable() {
public void run() {
while (!stop) {
try {
handlerFactory.createHandler(serverSocket.accept()).start();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
}).start();
}
public void stop() {
stop = true;
}
}

Create a socket server class named MyServer.java. This class also as a main class for our application. And then we will create a socket client class named MyClient.java. Before that create applicationContext.xml in default package as Spring configuration injection for our main class.

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN"
"http://www.springframework.org/dtd/spring-beans.dtd">

<beans>
<bean id="socketConnectionServer" class="com.ndung.socket.SocketConnectionServer">
<constructor-arg>
<ref local="config" />
</constructor-arg>
<constructor-arg>
<ref local="socketServerHandlerFactory" />
</constructor-arg>
</bean>

<bean id="config" class="com.ndung.socket.ServerConfig">
<property name="port">
<value>50000</value>
</property>
</bean>

<bean id="socketServerHandlerFactory" class="com.ndung.socket.SocketServerHandlerFactory">
<constructor-arg>
<ref local="messageHandler" />
</constructor-arg>
</bean>

<bean id="messageHandler" class="com.ndung.iso8583.MessageHandler">
</bean>

</beans>


package com.ndung.socket;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class MyServer {

public static void main(String[] args) throws IOException {
ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
SocketConnectionServer server = (SocketConnectionServer) ctx.getBean("socketConnectionServer");
server.start();
}
}


package com.ndung.socket;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import org.apache.log4j.Logger;

public class MyClient {
private final int MY_PORT=50000;
private final String TargetHost = "localhost";
private final String QUIT = "QUIT";
private Logger logger = Logger.getLogger( getClass() );
public MyClient() {
try {
BufferedReader inFromUser = new BufferedReader(new InputStreamReader(System.in));
Socket clientSocket = new Socket(TargetHost, MY_PORT);
DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
BufferedReader inFromServer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
logger.info(inFromServer.readLine());
boolean isQuit = false;
while (!isQuit) {
System.out.print("Your data : ");
String cmd = inFromUser.readLine();
cmd = cmd.toUpperCase();
if (cmd.equals(QUIT)) {
isQuit = true;
}
outToServer.writeBytes(cmd + "\n");
String message = inFromServer.readLine();
while (message!=null){
logger.info("From Server: " + message);
message = inFromServer.readLine();
}
}
outToServer.close();
inFromServer.close();
clientSocket.close();
}
catch (IOException ioe) {
logger.error("Error:" + ioe);
}
catch (Exception e) {
logger.error("Error:" + e);
}
}
public static void main(String[] args) {
new MyClient();
}
}

Run our application first. It means our main class (MyServer.java) will be run first. And then run client as much that we want. It means MyClient.java will be run twice or more. And then in one of our client application console enter an input data. It means a String of ISO message. As example:
0210723A00010A808400185936001410010999990110000000100000001007021533000001191533
10061007065656561006090102240000000901360020100236C0102240000000
Look in both of our application console either MyServer or MyClient. What do you see?
Btw, Happy Eid Mubarak...

Read More..