Tuesday, March 24, 2009

Introduction to Message-oriented Middleware (MOM) and Java Messaging Service (JMS) Using Apache ActiveMQ


package com.ndung;

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloWorldProducer {
public static void main(String[] args) throws JMSException {
//Instantiate connection factory, specific to the vendor
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.MQ");
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
// Create a messages
String text = "Hello world! From: ndung";
TextMessage message = session.createTextMessage(text);
// Tell the producer to send the message
System.out.println("Message Sent");
producer.send(message);
// Clean up
session.close();
connection.close();
}
}


package com.ndung;

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloWorldConsumer {
public static void main(String[] args) throws JMSException {
//Instantiate connection factory, specific to the vendor
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.MQ");
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + message);
}
consumer.close();
session.close();
connection.close();
}
}

In the previous post, we have been introduced into EAI (Enterprise Application Integration) and a flash on Middleware. There are two fundamentally different types of middleware based on the approach used by the middleware to transfer the data between the distributed software applications. They are Remote Procedure Calls (RPC) based middleware and Message-oriented Middleware (MOM).
1. RPC. Software application that uses RPC based middleware to transfer data to another software application has to wait until the latter application is done processing the data. Thus, with this type of middleware, the communication proceeds in a lock step, synchronized manner, and the communicating processes are tightly coupled to one another. Examples of such middleware include Java RMI, CORBA, etc.
2. MOM. Is best described as a category of software for communication in an loosely-coupled, reliable, scalable, enabled asynchronous communication amongst distributed applications or systems.


JMS is a spesification that defines a set of interfaces and associated semantics, which allow applications written in Java to access the services of any JMS compliant Message MOM product. There are plenty of compliant MOM products available in market, including MQSeries from IBM, SonicMQ from Progress, Sun Java Message Queue, even Apache ActiveMQ, and many more.
The players in JMS:
1. Connections and Connection Factories
2. Sessions
3. Destinations
We should try first JMS First Impression using ActiveMQ in here. I'm using Netbeans 6.5 as IDE and prepare library: activemq-all-5.2.0.jar and jms.jar.

There are two different types of MOM: point-to-point and publish-and-subscribe.
1. Point-to-Point messaging style. In this model, a MOM is used by two applications to communicate with each other, often as an asynchronous replacement for remote procedure calls (RPC).
2. Publish-and-Subscribe messaging style. In this model multiple applications connect to the MOM as their publishers, which are producers of messages, or subscribers, which are consumers of messages. An important point of difference between the two styles is that a a point-to-point system is typically either a one-to-one system, which means one message sender talking to one message receiver, or it is a a many-to-one system, which means more than one senders are talking to one receiver. On the other hand, publish-and-subscribe systems are typically many-to-many systems, which means that there could be one or more publishers talking to one or more subscribers at a time.

Simple PTP (Point-to-Point) Application and Pub/Sub (Publisher and Subscriber) Application
We will enhance sample above by setting up activemq broker first and using JNDI. Configuring JNDI web application in Tomcat container can read in here. After we extract apache-activemq-5.2.0.rar that we have downloaded, edit activemq.xml that placed in folder conf. Delete all "multicast" in this configuration file. After that, enter folder bin, and run activemq. In the same project with above create packages named com.ndung.ptp and com.ndung.ps. Create new two java classes named SimpleQueueSender.java and SimpleQueueReceiver.java in com.ndung.ptp package and create new two java classes named SimpleTopicPublisher.java and SimpleTopicSubscriber in com.ndung.ps package.


package com.ndung.ptp;

import java.util.Properties;
import javax.jms.*;
import javax.naming.*;

public class SimpleQueueSender {
public static void main(String[] args) throws NamingException, JMSException {
String queueName = "PTP.ACTIVE.MQ";
//setting JNDI configuration, differents between vendors.
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
Context jndiContext = new InitialContext(props);
ConnectionFactory queueConnectionFactory = (ConnectionFactory)
jndiContext.lookup("ConnectionFactory");
QueueConnection queueConnection = (QueueConnection)
queueConnectionFactory.createConnection();
QueueSession queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueSender queueSender = queueSession.createSender(queue);
queueConnection.start();
TextMessage textMessage = queueSession.createTextMessage();
textMessage.setText("Hello World");
queueSender.send(textMessage);
queueSession.close();
queueConnection.close();
}
}


package com.ndung.ptp;

import java.util.Properties;
import javax.jms.*;
import javax.naming.*;

public class SimpleQueueReceiver {
public static void main(String[] args) throws JMSException, NamingException {
String queueName = "PTP.ACTIVE.MQ";
//setting JNDI configuration, differents between vendors.
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
Context jndiContext = new InitialContext(props);

ConnectionFactory queueConnectionFactory = (ConnectionFactory)
jndiContext.lookup("ConnectionFactory");
QueueConnection queueConnection = (QueueConnection)
queueConnectionFactory.createConnection();
QueueSession queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
queueConnection.start();
TextMessage textMessage = (TextMessage) queueReceiver.receive();
System.out.println(textMessage);
queueSession.close();
queueConnection.close();
}
}


package com.ndung.ps;

import java.util.Properties;
import javax.jms.*;
import javax.naming.*;

public class SimpleTopicPublisher {
public static void main(String[] args) throws Exception {
try {
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
Context jndiContext = new InitialContext(props);
ConnectionFactory myConnectionFactory = (ConnectionFactory)
jndiContext.lookup("ConnectionFactory");
// Use myConnectionFactory to get a Topic connection
TopicConnection myConnection = (TopicConnection)
myConnectionFactory.createConnection();
// Use myConnection to create a Topic session
TopicSession mySession = myConnection.createTopicSession(false, 1);
// Use mySession to get the Topic
Topic myTopic = mySession.createTopic("PS.ACTIVE.MQ");
// Use mySession to create a publisher for myTopic
TopicPublisher myPublisher = mySession.createPublisher(myTopic);
// Start the connection
myConnection.start();
// Create the HelloWorld message
TextMessage m = mySession.createTextMessage();
m.setText("Hello World");
// Use myPublisher to publish the message
myPublisher.publish(m);
// Done.
// Need to clean up
mySession.close();
myConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}


package com.ndung.ps;

import java.util.Properties;
import javax.jms.*;
import javax.naming.*;

public class SimpleTopicSubscriber {
public static void main(String[] args) {
try {
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
Context jndiContext = new InitialContext(props);
ConnectionFactory myConnectionFactory = (ConnectionFactory)
jndiContext.lookup("ConnectionFactory");
// Use myConnectionFactory to get a Topic connection
TopicConnection myConnection = (TopicConnection)
myConnectionFactory.createConnection();
// Use myConnection to create a Topic session
TopicSession mySession = myConnection.createTopicSession(false, 1);
// Use mySession to get the Topic
Topic myTopic = mySession.createTopic("PS.ACTIVE.MQ");
// Use mySession to create a subscriber
TopicSubscriber mySubscriber = mySession.createSubscriber(myTopic);
// Start the connection
myConnection.start();
// Wait for the Hello World message
// Use the receiver and wait forever until the message arrives
TextMessage m = (TextMessage) mySubscriber.receive();
// Display the message
System.out.println("Received the message: " + m.getText());
// Done.
// Need to clean up
mySession.close();
myConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

To know the differences between PTP application and Pub/Sub application:
1. Run two instance of SimpleQueueReceiver and then run one instance of SimpleQueueSender. One of SimpleQueueReceiver got message from SimpleQueueSender and the other one didn't.
2. Run two instance of SimpleTopicSubscriber and then run one instance of SimpleTopicPublisher. We can look both of SimpleTopicSubsriber got message from SimpleTopicPublisher.
We can monitor message queue by running http://localhost:8161/admin in web browser. More better coding style in the next post.

327 comments:

«Oldest   ‹Older   401 – 327 of 327
«Oldest ‹Older   401 – 327 of 327   Newer› Newest»