ITお絵かき修行

3歩歩いても忘れないために

RabbitMQ tutorial - RPC Pattern in Java

RabbitMQのチュートリアル
Client/ServerスタイルのRPCパターン。

【お題】
f:id:hhhhhskw:20140928160046j:plain
RabbitMQ - RabbitMQ tutorial - Remote procedure call (RPC)
・リクエスト時に「correlationId」を指定しRPC通信におけるClient/Server間の通信を紐付ける。
・リクエスト時に「reply_to」を指定しレスポンス返却時に使用するキュー名を設定する。


【実行環境】
Windows7 64bit
・RabbitMQ3.5.5
・jdk1.7.0_60


【前提】
・RabbitMQはインストール済
コンパイルする手順は省く


【手順】
1.RPCClientクラスとRPCServerクラスを作成

●RPCClientクラス
※Server側へ数字(『30』固定)を送信する。同期通信とする。

package no6;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

import java.io.IOException;
import java.util.UUID;

public class RPCClient {

	private Connection connection;
	private Channel channel;
	private String requestQueueName = "rpc_queue";
	private String replyQueueName;
	private QueueingConsumer consumer;

	/**
	 * 接続に必要な準備
	 * @throws Exception
	 */
	public RPCClient() throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		connection = factory.newConnection();
		channel = connection.createChannel();

		replyQueueName = channel.queueDeclare().getQueue();
		consumer = new QueueingConsumer(channel);
		channel.basicConsume(replyQueueName, true, consumer);
	}

	/**
	 * RPC呼び出し
	 * @param message
	 * @return
	 * @throws Exception
	 */
	public String call(String message) throws Exception {
		String response = null;
		String corrId = UUID.randomUUID().toString();

		BasicProperties props = new BasicProperties.Builder()
			.correlationId(corrId)
			.replyTo(replyQueueName)
			.build();

		// 呼び出し
		channel.basicPublish("", requestQueueName, props, message.getBytes());

		// 処理結果を待ちうける
		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			if (delivery.getProperties().getCorrelationId().equals(corrId)) {
				response = new String(delivery.getBody(), "UTF-8");
				break;
			}
		}

		return response;
	}

	/**
	 * クローズ処理
	 */
	public void close() {
		try {
			connection.close();
		} catch (IOException e) {
			e.printStackTrace(System.out);
		}
	}

	/**
	 * mainメソッド
	 * @param argv
	 */
	public static void main(String[] argv) {
		RPCClient fibonacciRpc = null;
		String response = null;
		try {
			fibonacciRpc = new RPCClient();

			System.out.println(" [x] Requesting fib(30)");
			response = fibonacciRpc.call("30");
			System.out.println(" [.] Got '" + response + "'");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (fibonacciRpc != null) {
				try {
					fibonacciRpc.close();
				} catch (Exception e) {
					// nop
				}
			}
		}
	}

}


●RPCServerクラス
※CL側から送信された数字より、フィボナッチ数を計算して返却する。

package no6;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {

	private static final String RPC_QUEUE_NAME = "rpc_queue";

	/**
	 * フィボナッチ数を得る
	 * @param n 計算対象の数字
	 */
	private static int fib(int n) {
		if (n == 0){
			return 0;
		}
		if (n == 1){
			return 1;
		}
		return fib(n - 1) + fib(n - 2);
	}

	/**
	 * mainメソッド
	 * @param argv
	 */
	public static void main(String[] argv) {
		Connection connection = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("localhost");

			connection = factory.newConnection();
			channel = connection.createChannel();

			channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

			channel.basicQos(1);

			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

			System.out.println(" [x] Awaiting RPC requests");

			// メッセージを待ち受ける
			while (true) {
				String response = null;

				QueueingConsumer.Delivery delivery = consumer.nextDelivery();

				BasicProperties props = delivery.getProperties();
				BasicProperties replyProps = new BasicProperties.Builder()
						.correlationId(props.getCorrelationId()).build();

				try {
					// メッセージから計算対象の数字を得る
					String message = new String(delivery.getBody(), "UTF-8");
					int n = Integer.parseInt(message);
					System.out.println(" [.] fib(" + message + ")");

					// フィボナッチ数を計算する
					response = "" + fib(n);
				} catch (Exception e) {
					System.out.println(" [.] " + e.toString());
					response = "";
				} finally {
					// 計算結果を送信する
					channel.basicPublish("", props.getReplyTo(), replyProps,
							response.getBytes("UTF-8"));
					channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
							false);
				}
			}
		} catch (Exception e) {
			e.printStackTrace(System.out);
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (Exception e) {
					// nop
				}
			}
		}
	}
}



2.RPCClientを1窓、RPCServerを1窓で実行すると、Client側で計算結果が得られる。

●RPCClient

D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-client.jar

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no6/RPCClient
 [x] Requesting fib(30)
 [.] Got '832040'


●RPCServer

D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl
ient.jar

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no6/RPCServer
 [x] Awaiting RPC requests
 [.] fib(30)



【さいごに】
・MOMについてこんな記事を見つけた。奥が深そう…
IT検証ラボ - 新規格MOM、AMQPやSTOMP対応、プロダクトの性能はJMSと同等以上:ITpro