3.JMSの基礎3
2006.11.17 株式会社四次元データ 宮澤了祐
- 3.1. Publisher-Subscriberメッセージモデル
- 3.2. メッセージの受動的な受け取り
3.1.Publisher-Subscriberメッセージモデル
ここでは、一対多でメッセージングを行うPublisher-Subscriberモデルについて解説します。
Publisher-Subscriberモデルでは、まず送信者はJMSサーバ内部に作成されたトピック(Destinationオブジェクト)に対してメッセージを発行します。 トピックとは話題を意味する単語で、その名の通りメッセージを分類するオブジェクトです。 受信者側は予め、JMSサーバに対してそのトピックに送られたメッセージを購読(Subscribe)することを伝えておきます。 メッセージが発行されれば、購読を申し込んでいる全ての受信者にメッセージが送られます。全ての受信者がメッセージを受け取った際にはじめてメッセージはサーバから削除されます。
サンプルプログラムを用いて解説していきます。まずは送信側です。
import javax.jms.JMSException; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.activemq.ActiveMQConnectionFactory; public class Publisher { public static void main(String[] args){ TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); try { //Publisher-Subscriberモデル用のコネクションの作成 TopicConnection connection = factory.createTopicConnection(); TopicSession session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); connection.start(); //Topicオブジェクトの作成 Topic topic = session.createTopic("TestTopic"); //Topicに対してメッセージを送るMessageProducerの作成 TopicPublisher publisher = session.createPublisher(topic); //メッセージの送信 TextMessage msg = session.createTextMessage("test message!!"); publisher.publish(msg); publisher.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
PTPのプログラムとほとんど違いはありません。
PTPと違う点は
ConnectionオブジェクトがTopicConnectionオブジェクトになっている。
Destinationオブジェクトがjavax.jms.Topicオブジェクトになっている。
MessagePuroducerがjavax.jms.Publisherオブジェクトになっている。
があります。
また、メッセージを送るメソッドもsend()メソッドがpubslish()メソッドになっています。
続いて受信側です。 複数の受信者を用意するためにRunnableなクラスを作成し、5つのSubscriberを用意します。
import javax.jms.JMSException; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.activemq.ActiveMQConnectionFactory; import org.activemq.ActiveMQConnection; public class Subscriber implements Runnable{ String name; public Subscriber(String name) { this.name = name; } public void run() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); try { TopicConnection connection = factory.createTopicConnection(); TopicSession session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); connection.start(); //Topicオブジェクトの作成 Topic topic = session.createTopic("TestTopic"); //Topicを購読するMessageConsumerの設定 TopicSubscriber subscriber= session.createSubscriber(topic); //メッセージの受信 TextMessage msg = (TextMessage)subscriber.receive(); System.out.println("[" + this.name +"]" + msg.getText()); subscriber.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args){ Thread[] thread = new Thread[5]; for(int i=0; i<5; i++){ thread[i] = new Thread(new Subscriber("thread-" + i)); } for(int i=0; i<5; i++){ thread[i].start(); } } }
受信側のMessageConsumerはjavax.jms.Subscriberクラスを使用します。 メッセージを受信するメソッドはSubscriber#receive()です。
コンストラクタでスレッドの名前を設定し、メッセージを受信出来れば名前のあとにメッセージを出力しています。
[thread-0]test message!! [thread-1]test message!! [thread-2]test message!! [thread-3]test message!! [thread-4]test message!!
のように出力されれば成功です。(順番は入れ替わる可能性があります)
特定のTopicに対して購読を申し込む必要があるため、受信側を先に起動する必要があります。
3.2.メッセージの受動的な受け取り
先ほどまでの例では、MessageConsumer#receive()メソッドを用いてJMSサーバーまで動的にメッセージを受け取りに行っていました。 JMSではリスナを用いて受動的にメッセージを受け取ることが出来ます。
先ほどのSubscriberにリスナを追加してみましょう。
import javax.jms.MessageListener; public class Subscriber implements Runnable,MessageListener{ public void onMessage(Message message) { TextMessage msg = (TextMessage)message; try { System.out.println("[" + this.name +"]" + msg.getText()); } catch (JMSException e) { e.printStackTrace(); } } public void run(){ try{ ... topic = session.createTopic("TestTopic"); subscriber= session.createSubscriber(topic); subscriber.setMessageListener(this); connection.start(); ... } } }
Connectionが閉じられないように、解放処理は別で行います。
MessageListnerインターフェイスはメッセージを受信した際に呼ばれるonMessage()メソッドのみを持つインターフェイスです。
MessageConsumer#setMessageListener(MessageListener listener)メソッドで追加することが出来ます。