RabbitMQ tutorial - Publish/Subscribe Pattern in Java
RabbitMQのチュートリアル。
複数の受信者に対して同一のメッセージを送信するPublish/Subscribeパターン。
→RabbitMQでは送信されたメッセージが受信者ごとに用意されたキューにそれぞれ格納され、配信される(Exchange)。
【お題】
RabbitMQ - RabbitMQ tutorial - Publish/Subscribe
【実行環境】
・Windows7 64bit
・RabbitMQ3.5.5
・jdk1.7.0_60
【前提】
・RabbitMQはインストール済
・コンパイルする手順は省く
【手順】
1.EmitLogクラスとReceiveLogsクラスを作成
●EmitLogクラス(Producer役)
package no3; import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } finally { try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(System.out); } } } private static String getMessage(String[] strings) { if (strings.length < 1) { return "Hello World!"; } return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) { return ""; } StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
●ReceiveLogsクラス(Consumer役)
package no3; import java.io.IOException; import java.lang.InterruptedException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
2.ReceiveLogsを2窓(※1つはログファイルにログ出力する)、Emitlogを1窓で実行すると、
Emitlog側で送信されたメッセージがReceiveLogs側で受信される。
→前回とは違い、ReceiveLogsを実行している全プロセスに対し、同一のメッセージが同じ数送信されている。
●EmitLogクラス(Producer役)
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-client.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no3/EmitLog [x] Sent 'Hello World!' D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no3/EmitLog [x] Sent 'Hello World!' D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no3/EmitLog [x] Sent 'Hello World!' D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no3/EmitLog [x] Sent 'Hello World!' D:\workspace\workspace_rabbitmq\Tutorial>
●ReceiveLogsクラス(Consumer役・コンソールに出力)
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-client.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no3/ReceiveLogs [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!' [x] Received 'Hello World!' [x] Received 'Hello World!' [x] Received 'Hello World!'
●ReceiveLogsクラス(Consumer役・ログファイルに出力)
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-client.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no3/ReceiveLogs > logs_from_rabbit.log
「logs_from_rabbit.log」
[*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!' [x] Received 'Hello World!' [x] Received 'Hello World!' [x] Received 'Hello World!'