RabbitMQ tutorial - Topics Pattern in Java
RabbitMQのチュートリアル。
Publish/Subscribeパターンより派生した、メッセージの送信先を制御するRoutingパターンより、routingKeyに正規表現を使用してより細かい送信先制御を行うTopicsパターン。
【お題】
RabbitMQ - RabbitMQ tutorial - Topics
【実行環境】
・Windows7 64bit
・RabbitMQ3.5.5
・jdk1.7.0_60
【前提】
・RabbitMQはインストール済
・コンパイルする手順は省く
【手順】
1.EmitLogTopicクラスとReceiveLogsTopicクラスを作成
●EmitLogTopicクラス(Producer役)
package no5; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; /** * mainメソッド * * <dl> * <dt><span class="strong">第一引数:routingKey</span></dt> * <dd>ログレベルを指定する。書式:{facility}.{severity}</dd> * <br> * <dt><span class="strong">第二引数:message</span></dt> * <dd>ログ出力する文字列</dd> * </dl> * * @param argv */ public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } catch (Throwable th) { th.printStackTrace(System.out); } finally { try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(System.out); } } } /** * routingKeyに設定する文字列をmainメソッドの引数より取得 */ private static String getRouting(String[] strings) { if (strings.length < 1) return "anonymous.info"; return strings[0]; } /** * ログメッセージとして設定する文字列をmainメソッドの引数より取得 */ private static String getMessage(String[] strings) { if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length < startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
●ReceiveLogsTopicクラス(Consumer役)
package no5; import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } // 引数で指定されたログレベルのポリシーでバインドする for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); // メッセージを待ち受ける while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } catch (Throwable th) { th.printStackTrace(System.out); } finally { try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(System.out); } } } }
2.EmitLogTopicを1窓、ReceiveLogsTopicを2窓で実行する。
ReceiveLogsTopicは実行引数をそれぞれ以下のように設定する。
java -cp %CP% no5/ReceiveLogsTopic kern.*
java -cp %CP% no5/ReceiveLogsTopic *.critical
●EmitLogTopicクラス(Producer役)
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl ient.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/EmitLogTopic "kern.critical" "A critical kernel error" [x] Sent 'kern.critical':'A critical kernel error' D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/EmitLogTopic "kern.warn" "A warning kernel error" [x] Sent 'kern.warn':'A warning kernel error' D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/EmitLogTopic "cron.critical" "A critical cron error" [x] Sent 'cron.critical':'A critical cron error' D:\workspace\workspace_rabbitmq\Tutorial>
●ReceiveLogsTopicクラス(Consumer役・ログのポリシー:kern.*)
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl ient.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/ReceiveLogsTopic kern.* [*] Waiting for messages. To exit press CTRL+C [x] Received 'kern.critical':'A critical kernel error' [x] Received 'kern.warn':'A warning kernel error'
●ReceiveLogsTopicクラス(Consumer役・ログのポリシー: *.critical)
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl ient.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/ReceiveLogsTopic *.critical [*] Waiting for messages. To exit press CTRL+C [x] Received 'kern.critical':'A critical kernel error' [x] Received 'cron.critical':'A critical cron error'