読者です 読者をやめる 読者になる 読者になる

ITお絵かき修行

3歩歩いても忘れないために

RabbitMQ tutorial - Publish/Subscribe Pattern in Java

RabbitMQのチュートリアル
複数の受信者に対して同一のメッセージを送信するPublish/Subscribeパターン。
→RabbitMQでは送信されたメッセージが受信者ごとに用意されたキューにそれぞれ格納され、配信される(Exchange)。

【お題】
f:id:hhhhhskw:20140927211104j:plain
RabbitMQ - RabbitMQ tutorial - Publish/Subscribe


【実行環境】
Windows7 64bit
・RabbitMQ3.5.5
・jdk1.7.0_60


【前提】
・RabbitMQはインストール済
コンパイルする手順は省く


【手順】
1.EmitLogクラスとReceiveLogsクラスを作成

●EmitLogクラス(Producer役)

package no3;

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

	private static final String EXCHANGE_NAME = "logs";

	public static void main(String[] argv) throws IOException {

		Connection connection = null;
		Channel channel = null;

		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("localhost");
			connection = factory.newConnection();
			channel = connection.createChannel();

			channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

			String message = getMessage(argv);

			channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");

		} finally {

			try {
				channel.close();
				connection.close();
			} catch (IOException e) {
				e.printStackTrace(System.out);
			}
		}
	}

	private static String getMessage(String[] strings) {
		if (strings.length < 1) {
			return "Hello World!";
		}
		return joinStrings(strings, " ");
	}

	private static String joinStrings(String[] strings, String delimiter) {
		int length = strings.length;
		if (length == 0) {
			return "";
		}
		StringBuilder words = new StringBuilder(strings[0]);
		for (int i = 1; i < length; i++) {
			words.append(delimiter).append(strings[i]);
		}
		return words.toString();
	}
}


●ReceiveLogsクラス(Consumer役)

package no3;

import java.io.IOException;
import java.lang.InterruptedException;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws IOException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(" [x] Received '" + message + "'");
        }
    }
}


2.ReceiveLogsを2窓(※1つはログファイルにログ出力する)、Emitlogを1窓で実行すると、
  Emitlog側で送信されたメッセージがReceiveLogs側で受信される。
  →前回とは違い、ReceiveLogsを実行している全プロセスに対し、同一のメッセージが同じ数送信されている。

●EmitLogクラス(Producer役)

D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-client.jar
D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no3/EmitLog
 [x] Sent 'Hello World!'

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no3/EmitLog
 [x] Sent 'Hello World!'

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no3/EmitLog
 [x] Sent 'Hello World!'

D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no3/EmitLog
 [x] Sent 'Hello World!'

D:\workspace\workspace_rabbitmq\Tutorial>

●ReceiveLogsクラス(Consumer役・コンソールに出力)

D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-client.jar
D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no3/ReceiveLogs
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'


●ReceiveLogsクラス(Consumer役・ログファイルに出力)

D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-client.jar
D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no3/ReceiveLogs > logs_from_rabbit.log

「logs_from_rabbit.log」

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'