RabbitMQ tutorial - Routing Pattern in Java
RabbitMQのチュートリアル。
Publish/Subscribeパターンより派生した、メッセージの送信先を制御するRoutingパターン。
→Exchangeにて送信先を振り分ける際、「fanout」属性では送信先を制御できない。
そこで、送信先を設定できる「direct」属性を指定し、送信するメッセージにroutingKeyを設定する。
routingKeyを設定することにより、受信者ごとに受信するメッセージのポリシーを設定することができる。
【お題】
RabbitMQ - RabbitMQ tutorial - Routing
【実行環境】
・Windows7 64bit
・RabbitMQ3.5.5
・jdk1.7.0_60
【前提】
・RabbitMQはインストール済
・コンパイルする手順は省く
【手順】
1.EmitLogDirectクラスとReceiveLogsDirectクラスを作成
●EmitLogDirectクラス(Producer役)
package no4; import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); } catch (Throwable th) { th.printStackTrace(System.out); } finally { try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(System.out); } } } /** * ログレベルに設定する文字列をmainメソッドの引数より取得 */ private static String getSeverity(String[] strings) { if (strings.length < 1) return "info"; return strings[0]; } /** * メッセージに設定する文字列をmainメソッドの引数より取得 */ private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
●ReceiveLogsDirectクラス(Consumer役)
package no4; import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } // 引数で指定されたログレベルをバインドする for (String severity : argv) { channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); // メッセージを待ち受ける while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } catch (Throwable th) { th.printStackTrace(System.out); } finally { try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(System.out); } } } }
2.EmitLogDirectを1窓、ReceiveLogsDirectを2窓で実行する。
ReceiveLogsDirectは実行引数をそれぞれ以下のように設定する。
java -cp %CP% no4/ReceiveLogsDirect error
java -cp %CP% no4/ReceiveLogsDirect info warning error
●EmitLogDirectクラス(Producer役)
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl ient.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no4/EmitLogDirect info [x] Sent 'info':'info' D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no4/EmitLogDirect warning [x] Sent 'warning':'warning' D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no4/EmitLogDirect error [x] Sent 'error':'error' D:\workspace\workspace_rabbitmq\Tutorial>
●ReceiveLogsDirectクラス(Consumer役・「error」のみ受信する)
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl ient.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no4/ReceiveLogsDirect error [*] Waiting for messages. To exit press CTRL+C [x] Received 'error':'error'
●ReceiveLogsDirectクラス(Consumer役・「infp」「warning」「error」を受信する)
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq ient.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no4/ReceiveLogsDirect info warning error [*] Waiting for messages. To exit press CTRL+C [x] Received 'info':'info' [x] Received 'warning':'warning' [x] Received 'error':'error'
「routingKey」に従って送信先が選択されている。