読者です 読者をやめる 読者になる 読者になる

ITお絵かき修行

3歩歩いても忘れないために

RabbitMQ tutorial - Topics Pattern in Java

RabbitMQのチュートリアル
Publish/Subscribeパターンより派生した、メッセージの送信先を制御するRoutingパターンより、routingKeyに正規表現を使用してより細かい送信先制御を行うTopicsパターン。


【お題】
f:id:hhhhhskw:20140928131504j:plain
RabbitMQ - RabbitMQ tutorial - Topics


【実行環境】
Windows7 64bit
・RabbitMQ3.5.5
・jdk1.7.0_60


【前提】
・RabbitMQはインストール済
コンパイルする手順は省く


【手順】
1.EmitLogTopicクラスとReceiveLogsTopicクラスを作成

●EmitLogTopicクラス(Producer役)

package no5;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

	private static final String EXCHANGE_NAME = "topic_logs";

	/**
	 * mainメソッド
	 *
	 * <dl>
	 * <dt><span class="strong">第一引数:routingKey</span></dt>
	 * <dd>ログレベルを指定する。書式:{facility}.{severity}</dd>
	 * <br>
	 * <dt><span class="strong">第二引数:message</span></dt>
	 * <dd>ログ出力する文字列</dd>
	 * </dl>
	 *
	 * @param argv
	 */
	public static void main(String[] argv) {

		Connection connection = null;
		Channel channel = null;
		try {
	        ConnectionFactory factory = new ConnectionFactory();
	        factory.setHost("localhost");
	        connection = factory.newConnection();
	        channel = connection.createChannel();

	        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

	        String routingKey = getRouting(argv);
	        String message = getMessage(argv);

	        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
	        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

		} catch (Throwable th) {
			th.printStackTrace(System.out);
		} finally {
			try {
				channel.close();
				connection.close();
			} catch (IOException e) {
				e.printStackTrace(System.out);
			}
		}
	}

	/**
	 * routingKeyに設定する文字列をmainメソッドの引数より取得
	 */
	private static String getRouting(String[] strings) {
		if (strings.length < 1)
			return "anonymous.info";
		return strings[0];
	}

	/**
	 * ログメッセージとして設定する文字列をmainメソッドの引数より取得
	 */
	private static String getMessage(String[] strings) {
		if (strings.length < 2)
			return "Hello World!";
		return joinStrings(strings, " ", 1);
	}

	private static String joinStrings(String[] strings, String delimiter,
			int startIndex) {
		int length = strings.length;
		if (length == 0)
			return "";
		if (length < startIndex)
			return "";
		StringBuilder words = new StringBuilder(strings[startIndex]);
		for (int i = startIndex + 1; i < length; i++) {
			words.append(delimiter).append(strings[i]);
		}
		return words.toString();
	}
}


●ReceiveLogsTopicクラス(Consumer役)

package no5;

import java.io.IOException;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsTopic {

	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) {

		Connection connection = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("localhost");
			connection = factory.newConnection();
			channel = connection.createChannel();

			channel.exchangeDeclare(EXCHANGE_NAME, "topic");
			String queueName = channel.queueDeclare().getQueue();

			if (argv.length < 1) {
				System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
				System.exit(1);
			}

			// 引数で指定されたログレベルのポリシーでバインドする
			for (String bindingKey : argv) {
				channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
			}

			System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume(queueName, true, consumer);

			// メッセージを待ち受ける
			while (true) {
				QueueingConsumer.Delivery delivery = consumer.nextDelivery();
				String message = new String(delivery.getBody());
				String routingKey = delivery.getEnvelope().getRoutingKey();

				System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
			}
		} catch (Throwable th) {
			th.printStackTrace(System.out);
		} finally {
			try {
				channel.close();
				connection.close();
			} catch (IOException e) {
				e.printStackTrace(System.out);
			}
		}
	}
}



2.EmitLogTopicを1窓、ReceiveLogsTopicを2窓で実行する。
  ReceiveLogsTopicは実行引数をそれぞれ以下のように設定する。

java -cp %CP% no5/ReceiveLogsTopic kern.*

java -cp %CP% no5/ReceiveLogsTopic *.critical


●EmitLogTopicクラス(Producer役)

D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl
ient.jar

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/EmitLogTopic "kern.critical" "A critical kernel error"
 [x] Sent 'kern.critical':'A critical kernel error'

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/EmitLogTopic "kern.warn" "A warning kernel error"
 [x] Sent 'kern.warn':'A warning kernel error'

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/EmitLogTopic "cron.critical" "A critical cron error"
 [x] Sent 'cron.critical':'A critical cron error'

D:\workspace\workspace_rabbitmq\Tutorial>


●ReceiveLogsTopicクラス(Consumer役・ログのポリシー:kern.*)

D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl
ient.jar

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/ReceiveLogsTopic kern.*
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'kern.critical':'A critical kernel error'
 [x] Received 'kern.warn':'A warning kernel error'


●ReceiveLogsTopicクラス(Consumer役・ログのポリシー: *.critical)

D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl
ient.jar

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/ReceiveLogsTopic *.critical
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'kern.critical':'A critical kernel error'
 [x] Received 'cron.critical':'A critical cron error'