ITお絵かき修行

3歩歩いても忘れないために

RabbitMQ tutorial - Routing Pattern in Java

RabbitMQのチュートリアル
Publish/Subscribeパターンより派生した、メッセージの送信先を制御するRoutingパターン。
→Exchangeにて送信先を振り分ける際、「fanout」属性では送信先を制御できない。
 そこで、送信先を設定できる「direct」属性を指定し、送信するメッセージにroutingKeyを設定する。
 routingKeyを設定することにより、受信者ごとに受信するメッセージのポリシーを設定することができる。

【お題】
f:id:hhhhhskw:20140928122756j:plain
RabbitMQ - RabbitMQ tutorial - Routing

【実行環境】
Windows7 64bit
・RabbitMQ3.5.5
・jdk1.7.0_60


【前提】
・RabbitMQはインストール済
コンパイルする手順は省く


【手順】
1.EmitLogDirectクラスとReceiveLogsDirectクラスを作成

●EmitLogDirectクラス(Producer役)

package no4;

import java.io.IOException;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLogDirect {

	private static final String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] argv) {

		Connection connection = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("localhost");
			connection = factory.newConnection();
			channel = connection.createChannel();

			channel.exchangeDeclare(EXCHANGE_NAME, "direct");

			String severity = getSeverity(argv);
			String message = getMessage(argv);

			channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
			System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
		} catch (Throwable th) {
			th.printStackTrace(System.out);
		} finally {
			try {
				channel.close();
				connection.close();
			} catch (IOException e) {
				e.printStackTrace(System.out);
			}
		}
	}

	/**
	 * ログレベルに設定する文字列をmainメソッドの引数より取得
	 */
	private static String getSeverity(String[] strings) {
		if (strings.length < 1)
			return "info";
		return strings[0];
	}

	/**
	 * メッセージに設定する文字列をmainメソッドの引数より取得
	 */
	private static String getMessage(String[] strings) {
		if (strings.length < 1) {
			return "Hello World!";
		}
		return joinStrings(strings, " ");
	}

	private static String joinStrings(String[] strings, String delimiter) {
		int length = strings.length;
		if (length == 0) {
			return "";
		}
		StringBuilder words = new StringBuilder(strings[0]);
		for (int i = 1; i < length; i++) {
			words.append(delimiter).append(strings[i]);
		}
		return words.toString();
	}

}

●ReceiveLogsDirectクラス(Consumer役)

package no4;

import java.io.IOException;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsDirect {

	private static final String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] argv) {

		Connection connection = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("localhost");
			connection = factory.newConnection();
			channel = connection.createChannel();

			channel.exchangeDeclare(EXCHANGE_NAME, "direct");
			String queueName = channel.queueDeclare().getQueue();

			if (argv.length < 1) {
				System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
				System.exit(1);
			}

			// 引数で指定されたログレベルをバインドする
			for (String severity : argv) {
				channel.queueBind(queueName, EXCHANGE_NAME, severity);
			}

			System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume(queueName, true, consumer);

			// メッセージを待ち受ける
			while (true) {
				QueueingConsumer.Delivery delivery = consumer.nextDelivery();
				String message = new String(delivery.getBody());
				String routingKey = delivery.getEnvelope().getRoutingKey();

				System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
			}
		} catch (Throwable th) {
			th.printStackTrace(System.out);
		} finally {
			try {
				channel.close();
				connection.close();
			} catch (IOException e) {
				e.printStackTrace(System.out);
			}
		}
	}
}



2.EmitLogDirectを1窓、ReceiveLogsDirectを2窓で実行する。
  ReceiveLogsDirectは実行引数をそれぞれ以下のように設定する。

java -cp %CP% no4/ReceiveLogsDirect error

java -cp %CP% no4/ReceiveLogsDirect info warning error


●EmitLogDirectクラス(Producer役)

D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl
ient.jar

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no4/EmitLogDirect info
 [x] Sent 'info':'info'

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no4/EmitLogDirect warning
 [x] Sent 'warning':'warning'

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no4/EmitLogDirect error
 [x] Sent 'error':'error'

D:\workspace\workspace_rabbitmq\Tutorial>


●ReceiveLogsDirectクラス(Consumer役・「error」のみ受信する)

D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl
ient.jar

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no4/ReceiveLogsDirect error
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'error':'error'


●ReceiveLogsDirectクラス(Consumer役・「infp」「warning」「error」を受信する)

D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq
ient.jar

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no4/ReceiveLogsDirect info warning error
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'info':'info'
 [x] Received 'warning':'warning'
 [x] Received 'error':'error'


「routingKey」に従って送信先が選択されている。