RabbitMQ tutorial - RPC Pattern in Java
RabbitMQのチュートリアル。
Client/ServerスタイルのRPCパターン。
【お題】
RabbitMQ - RabbitMQ tutorial - Remote procedure call (RPC)
・リクエスト時に「correlationId」を指定しRPC通信におけるClient/Server間の通信を紐付ける。
・リクエスト時に「reply_to」を指定しレスポンス返却時に使用するキュー名を設定する。
【実行環境】
・Windows7 64bit
・RabbitMQ3.5.5
・jdk1.7.0_60
【前提】
・RabbitMQはインストール済
・コンパイルする手順は省く
【手順】
1.RPCClientクラスとRPCServerクラスを作成
●RPCClientクラス
※Server側へ数字(『30』固定)を送信する。同期通信とする。
package no6; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; import java.io.IOException; import java.util.UUID; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; /** * 接続に必要な準備 * @throws Exception */ public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } /** * RPC呼び出し * @param message * @return * @throws Exception */ public String call(String message) throws Exception { String response = null; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); // 呼び出し channel.basicPublish("", requestQueueName, props, message.getBytes()); // 処理結果を待ちうける while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; } /** * クローズ処理 */ public void close() { try { connection.close(); } catch (IOException e) { e.printStackTrace(System.out); } } /** * mainメソッド * @param argv */ public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.close(); } catch (Exception e) { // nop } } } } }
●RPCServerクラス
※CL側から送信された数字より、フィボナッチ数を計算して返却する。
package no6; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; /** * フィボナッチ数を得る * @param n 計算対象の数字 */ private static int fib(int n) { if (n == 0){ return 0; } if (n == 1){ return 1; } return fib(n - 1) + fib(n - 2); } /** * mainメソッド * @param argv */ public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); // メッセージを待ち受ける while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder() .correlationId(props.getCorrelationId()).build(); try { // メッセージから計算対象の数字を得る String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); // フィボナッチ数を計算する response = "" + fib(n); } catch (Exception e) { System.out.println(" [.] " + e.toString()); response = ""; } finally { // 計算結果を送信する channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(System.out); } finally { if (connection != null) { try { connection.close(); } catch (Exception e) { // nop } } } } }
2.RPCClientを1窓、RPCServerを1窓で実行すると、Client側で計算結果が得られる。
●RPCClient
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-client.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no6/RPCClient [x] Requesting fib(30) [.] Got '832040'
●RPCServer
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl ient.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no6/RPCServer [x] Awaiting RPC requests [.] fib(30)
【さいごに】
・MOMについてこんな記事を見つけた。奥が深そう…
IT検証ラボ - 新規格MOM、AMQPやSTOMP対応、プロダクトの性能はJMSと同等以上:ITpro