jBatch(Batchlet & Chunk)+JPA2.1(Entity & JPQL)でデータ読み書き
JavaEE7で追加されたjBatch(JSR-352)とJPA2.1(JSR-338)を使用して、DBのデータを読み書きする。
【やること】
・jBatchのBatchlet方式とChunk方式を両方使ってバッチジョブを実行する。
・JPAのうち、SELECTはJPQL、INSERT,UPDATE,DELETEはEntity方式を使用してデータの読み書きをする。
処理概要は下図の通り。
【環境】
●Java
・jdk1.8.0_25
・Oracle Enterprise Pack for Eclipse (12.1.3.3.1) ※中身はLuna SR1 (4.4.1)
・GlassFish Server Open Source Edition 4.1
●JPA
・Eclipselink 2.5.2-RC1 ※GlassFish組み込み
●DB・JDBC
・PostgreSQL 9.0
・JDBC41 Postgresql Driver, Version 9.3-1102
●ツール
・pgAdminⅢ
【準備】
・JDBCをGlassfishのクラスパスが通る場所(libフォルダ)に置いた。
・srcフォルダ配下にMETA-INFフォルダを作り、その中に「batch-jobs」フォルダを作った。
・GlassfishにてJNDIデータソースを設定済。
[参考]GlassfishでJNDIデータソースの設定方法その1 - しんさんの出張所 はてな編
・データ読み書き用のテーブルを作成済。
CREATE TABLE employees ( empno integer NOT NULL, ename character varying(10), yomi character varying(20), job character varying(8), mgr integer, hiredate date, sal character varying(7), comm character varying(7), deptno integer, CONSTRAINT pk_emp PRIMARY KEY (empno) ) WITH ( OIDS=FALSE );
【資産 - 定義体】
1.emp-batch.xml
・バッチジョブの実行順序を記述
2.persistence.xml
・JPA定義+DB接続先情報
・"eclipselink.logging.level.sql"を"FINE"とすると生成されたSQLが見える
【資産 - ソースファイル】
1.InsertEmpInfoBatchlet.java
・データ入力役
・従業員番号2001番・太郎くんの情報をINSERTする。 ※入力データはハードコーディングした。
2.EmpInfoReader.java
・データ読み込み役
・従業員番号2001番・太郎くんの情報をSELECTする。
・1件取得する場合はTypedQuery#getSingleResultを使用する。
3.EmpInfoProcessor.java
・データ加工役
・SELECTしたデータに数値を設定する。入力データに意味はない
4.EmpInfoWriter.java
・データ書き込み役
・EmpInfoProcessorで加工したデータを適用し、UPDATEする。
5.DeleteEmpInfoBatchlet.java
・データ消去役
・従業員番号2001番・太郎くんの情報をDELETEする。
6.BatchServlet.java
・バッチジョブを実行する。
作成したプロジェクトのトップページは以下。
hhhhhskw/EmpInfoBatch · GitHub
【実行ログ】
2015-02-11T15:07:09.847+0900|情報: EmpInfoBatch was successfully deployed in 5,035 milliseconds. 2015-02-11T15:07:26.098+0900|情報: id = 50 2015-02-11T15:07:26.271+0900|情報: InsertEmpInfoBatchlet#process 2015-02-11T15:07:26.393+0900|普通: INSERT INTO employee (empno, comm, deptno, ename, hiredate, job, mgr, sal, yomi) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) bind => [9 parameters bound] 2015-02-11T15:07:26.493+0900|情報: JTS5014: Recoverable JTS instance, serverId = [100] 2015-02-11T15:07:26.572+0900|情報: EmpInfoReader#open 2015-02-11T15:07:26.574+0900|情報: EmpInfoWriter#open 2015-02-11T15:07:26.578+0900|情報: EmpInfoReader#readItem 2015-02-11T15:07:26.770+0900|情報: EmpInfoProcessor#processItem 2015-02-11T15:07:26.772+0900|情報: EmpInfoWriter#writeItems 2015-02-11T15:07:26.772+0900|情報: persistEmployee 2015-02-11T15:07:26.772+0900|情報: EmpInfoReader#checkpointInfo 2015-02-11T15:07:26.783+0900|情報: EmpInfoWriter#checkpointInfo 2015-02-11T15:07:26.791+0900|普通: UPDATE employee SET comm = ? WHERE (empno = ?) bind => [2 parameters bound] 2015-02-11T15:07:26.823+0900|情報: EmpInfoReader#readItem 2015-02-11T15:07:26.824+0900|情報: EmpInfoReader#checkpointInfo 2015-02-11T15:07:26.835+0900|情報: EmpInfoWriter#checkpointInfo 2015-02-11T15:07:26.842+0900|情報: EmpInfoReader#close 2015-02-11T15:07:26.843+0900|情報: EmpInfoWriter#close 2015-02-11T15:07:26.886+0900|情報: DeleteEmpInfoBatchlet#process#begin 2015-02-11T15:07:26.906+0900|普通: UPDATE employee SET comm = ?, deptno = ?, ename = ?, hiredate = ?, job = ?, mgr = ?, sal = ?, yomi = ? WHERE (empno = ?) bind => [9 parameters bound] 2015-02-11T15:07:26.926+0900|普通: DELETE FROM employee WHERE (empno = ?) bind => [1 parameter bound] 2015-02-11T15:07:26.926+0900|情報: DeleteEmpInfoBatchlet#process#commit
【つまったところなど】
・バッチ処理で無限ループした時はItemReaderの実装をまず疑う。
→readItemメソッドの戻り値にnullが設定されるコードになってるか、チェックポイントをカウントする実装はおかしくないかetc
・JPQL使用時のコードでPSQLExceptionが発生する場合は、9割方文法が間違っている(体感)。
→SELECT句のカラム名に「*」を設定していないか、FROM句にEntityクラス名を指定しているかetc
・EclipseからGlassfishへ資産をデプロイできない場合は、EclipseからGlassfishサーバーを一旦削除して、Glassfishサーバーを再度作成する。もしくはドメインフォルダ配下に生成された一時フォルダ・ファイルを削除する。
・それでもデプロイできないときは、Eclipseと端末の再起動
・Glassfish起動時にポート1527が開いていません的例外が出たら、JDK付属のJavaDBを起動したうえで、Glassfishを起動する。
[2015-01-27T23:52:11.436+0900] [glassfish 4.1] [WARNING] [poolmgr.create_resource_error] [javax.enterprise.resource.resourceadapter.com.sun.enterprise.resource.allocator] [tid: _ThreadID=47 _ThreadName=admin-listener(5)] [timeMillis: 1422370331436] [levelValue: 900] [[ RAR5038:Unexpected exception while creating resource for pool DerbyPool. Exception : javax.resource.spi.ResourceAllocationException: Connection could not be allocated because: java.net.ConnectException: ポート1527のサーバーlocalhostへの接続中にメッセージConnection refused: connectでエラーが発生しました。]]
【参考文献・URL】
The Java EE 7 TutorialのjBatchの章をテキトーに訳した - kagamihogeの日記
jBatchでデータロードしてみる - kagamihogeの日記
jbatch (JSR352) - Chunk方式のStepを使ってみる - @lbtc_xxx lab
Java EE6を使う
GitHubに間違ってあげてしまった過去のコミットを削除してみる|めっとぼ
Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava (Programmer’s SELECTION)
- 作者: Antonio Goncalves,日本オラクル株式会社,株式会社プロシステムエルオーシー
- 出版社/メーカー: 翔泳社
- 発売日: 2012/03/09
- メディア: 大型本
- 購入: 5人 クリック: 147回
- この商品を含むブログ (28件) を見る
aptを使ったアノテーション処理
apt(Annotation Processing Tools)を使用してソースコード内のアノテーションを読み込む。
JSR-175に準拠したcom.sun.*パッケージのMirrorAPIではなく、JSR-269に準拠したaptを使用する。
JEP-117にて、aptが内部で使用するクラスが、JavaSE6より追加されたjavax.annotation.processingもしくはjarax.lang.modelパッケージに含まれるクラスになった。
※MirrorAPIを使用したaptはJavaSE8より削除対象となった。
「参考」
JEP 117: Remove the Annotation-Processing Tool (apt)
Oracle Blogs 日本語のまとめ: [Java] Java 8's new Type Annotations
javax.annotation.processing インタフェース Processor
ツールは、「検出処理」を使って注釈プロセッサを見つけ出し、それらを実行すべきかどうかを決定します。ツールを構成することで、可能性のあるプロセッサのセットを制御できます。たとえば、JavaCompiler の場合、実行候補プロセッサのリストは、直接設定 することも、サービススタイル の検索で使用される検索パス を使って制御することもできます。
実際に注釈プロセッサを作ってみた。
javac呼び出し時に、作成した注釈プロセッサを指定し実行する。
実装自体はアノテーションが設定可能な場所に設定されているアノテーション名を出力するだけ。
【実行環境】
jdk1.7.0_60
【実装】
package annotation; import java.util.Set; import javax.annotation.processing.AbstractProcessor; import javax.annotation.processing.RoundEnvironment; import javax.annotation.processing.SupportedAnnotationTypes; import javax.annotation.processing.SupportedSourceVersion; import javax.lang.model.SourceVersion; import javax.lang.model.element.TypeElement; @SupportedSourceVersion(SourceVersion.RELEASE_7) // Javaのバージョン @SupportedAnnotationTypes({ "*" }) // 抽出対象とするアノテーションクラスの指定 public class SampleProcessor extends AbstractProcessor{ @Override public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment roundEnv) { for( TypeElement element : annotations ) { // TypeElementクラスがアノテーションの要素にあたる System.out.println( element.toString() ); } return true; } }
【実行コマンド】
javac -cp (クラスパス) -proc:only -processor (プロセッサのクラス名(完全修飾)) (javaファイル)
D:\workspace\workspace_annotation_20141120\ProcessorProject>set CP=./bin D:\workspace\workspace_annotation_20141120\ProcessorProject>javac -cp %CP% -proc:only -processor processor.SampleProcessor src\annotation\SampleAnnotated.java Sample D:\workspace\workspace_annotation_20141120\ProcessorProject>tree /f D:. │ ├─bin │ ├─annotation │ │ Sample.class │ │ SampleAnnotated.class │ │ │ └─processor │ SampleProcessor.class │ └─src ├─annotation │ Sample.java │ SampleAnnotated.java │ └─processor SampleProcessor.java D:\workspace\workspace_annotation_20141120\ProcessorProject>
今度は取得するアノテーションの種類、取得対象のソースファイルの指定など、入出力の
仕様を柔軟にしてみる。
【参考】
JDK 6 Java Compiler (javac)-related APIs & Developer Guides
javac - Java programming language compiler
Java SE 6 and JWSDP 2.0:6
「追記 2015/01/09」
Java8でも上記プログラムは正常に動作した。
※ただし@SupportedSourceVersionで指定するバージョンは「8」へ変更した。
ゆえにapt自体はJavaSE8であっても動作する。
サーブレット3.0のセキュリティ系アノテーションを試す
色々試してみたのでまとめる。試したのは以下のアノテーション。
@ServletSecurity @HttpConstraint @HttpMethodConstraint
【検証環境】
Tomcat7.0.56
【パターン1】特定ロールのユーザのみアクセスを許す場合
※ロール「admin-role」にアクセス許可を与える
@ServletSecurity(value=@HttpConstraint(rolesAllowed={"admin-role"})) @WebServlet(name="hogehoge", urlPatterns="/hogehoge") public class TestServlet extends HttpServlet { ・・・(中略)・・・ }
2.コンテナ側で認証の設定
「設定内容」
・全てのURLにBASIC認証を適用する。
・ユーザ「admin」と「sub」を用意。
・web.xml
<security-constraint> <web-resource-collection> <web-resource-name>admin page</web-resource-name> <url-pattern>/*</url-pattern> </web-resource-collection> <auth-constraint> <role-name>admin-role</role-name> </auth-constraint> <auth-constraint> <role-name>sub-role</role-name> </auth-constraint> <user-data-constraint> <transport-guarantee>NONE</transport-guarantee> </user-data-constraint> </security-constraint> <security-role> <role-name>admin-role</role-name> </security-role> <security-role> <role-name>sub-role</role-name> </security-role> <login-config> <auth-method>BASIC</auth-method> <realm-name>admin page</realm-name> </login-config>
<role rolename="admin-role"/> <role rolename="sub-role"/> <user username="admin" password="password" roles="admin-role"/> <user username="sub" password="password" roles="sub-role"/>
3.デプロイ&起動後、ブラウザで「http://localhost:8080/TestServlet/hogehoge」へアクセス。
結果は以下の通り。
●ユーザ「admin」でアクセス : アクセス可能 (HTTPステータスコード:200)
●ユーザ「sub」でアクセス : アクセス拒否 (HTTPステータスコード:403)
【パターン2】特定メソッドは認証処理なし、その他のメソッドを使ったアクセスは特定ロールのユーザのみ可能
※GETメソッドは認証処理なし、その他のメソッドを使ったアクセスはロール「admin-role」に属するユーザのみ可能、とする。
@ServletSecurity(value=@HttpConstraint(rolesAllowed={"admin-role"}), httpMethodConstraints = {@HttpMethodConstraint("GET")}) @WebServlet(name="hogehoge", urlPatterns="/hogehoge") public class TestServlet extends HttpServlet { ・・・(中略)・・・ }
2.コンテナ側の設定はパターン1の時と同じ
3.デプロイ&起動後、ブラウザで「http://localhost:8080/TestServlet/hogehoge」へアクセス。
結果は以下の通り。
「結果」
●アクセス可能 (HTTPステータスコード:200) ※ユーザ認証なし
4.認証処理の対象をPOSTメソッドへ変更する。
@ServletSecurity(value=@HttpConstraint(rolesAllowed={"admin-role"}), httpMethodConstraints = {@HttpMethodConstraint("POST")}) @WebServlet(name="hogehoge", urlPatterns="/hogehoge") public class TestServlet extends HttpServlet { ・・・(中略)・・・ }
5.デプロイ&起動後、ブラウザで「http://localhost:8080/TestServlet/hogehoge」へアクセス。
結果は以下の通り。
「結果」
●ユーザ「admin」でアクセス : アクセス可能 (HTTPステータスコード:200)
●ユーザ「sub」でアクセス : アクセス拒否 (HTTPステータスコード:403)
【パターン3】特定メソッドは認証処理なし、その他のメソッドを使ったアクセスは特定ロールのユーザのみ可能。ただし特定メソッドはアクセス不可とする
※POSTメソッドは認証処理なし、その他のメソッドを使ったアクセスはロール「admin-role」に属するユーザのみ可能、ただしHEADメソッドはアクセス不可となる。
1.適当なサーブレットクラスに以下のアノテーションを設定
@ServletSecurity(value=@HttpConstraint(rolesAllowed={"admin-role"}), httpMethodConstraints = {@HttpMethodConstraint("POST"), @HttpMethodConstraint(emptyRoleSemantic = javax.servlet.annotation.ServletSecurity.EmptyRoleSemantic.DENY, value = "HEAD")}) @WebServlet(name="hogehoge", urlPatterns="/hogehoge") public class TestServlet extends HttpServlet { ・・・(中略)・・・ }
2.コンテナ側の設定はパターン1の時と同じ
3.デプロイ&起動後、「http://localhost:8080/TestServlet/hogehoge」へアクセス。
結果は以下の通り。
「結果」
●[HEAD]アクセス拒否 (HTTPステータスコード:503 ※エラーメッセージ:Duplicate method name: HEAD)
●[POST]アクセス可能 (HTTPステータスコード:200)※ユーザ認証なし
●[GET・ロール:admin]アクセス可能 (HTTPステータスコード:200)※ユーザ認証あり(認証失敗すると401)
●[GET・ロール:sub] アクセス拒否 (HTTPステータスコード:403)※ユーザ認証あり(認証失敗すると401)
[備考]ちなみに「@HttpMethodConstraint」におけるアクセス許可の設定については、
「javax.servlet.annotation.ServletSecurity.EmptyRoleSemantic」の設定が優先される。
よって、下記の設定においてGETメソッドはアクセス拒否となる。
@ServletSecurity(value=@HttpConstraint(rolesAllowed={"admin-role"}), httpMethodConstraints = {@HttpMethodConstraint("GET"), @HttpMethodConstraint(emptyRoleSemantic = javax.servlet.annotation.ServletSecurity.EmptyRoleSemantic.DENY, value = "GET")}) @WebServlet(name="hogehoge", urlPatterns="/hogehoge") public class TestServlet extends HttpServlet { ・・・(中略)・・・ }
- 作者: 川崎克巳
- 出版社/メーカー: 秀和システム
- 発売日: 2012/03
- メディア: 単行本
- この商品を含むブログを見る
http-method-omissionを試す
サーブレット3.0の仕様から追加された「http-method-omission」について。
詳しくは下記ブログ参照。
New Security Features in Glassfish v3 (Java EE 6) - Part I (Yours Officially...Nithya Subramanian)
<security-constraint> <display-name>WebConstraint</display-name> <web-resource-collection> <web-resource-name>test</web-resource-name> <description/> <url-pattern>/test.jsp</url-pattern> <http-method-omission>GET</http-method-omission> </web-resource-collection> <auth-constraint> <description/> <role-name>dev</role-name> </auth-constraint> </security-constraint>
which means that the auth-constraint for the resource accessible by the url-pattern /test.jsp is applicable for all methods except GET.
認証対象のURLパターン「/test.jsp」に対して実行するHTTPメソッドのうち、
"<http-method-omission>"で指定したメソッド(※上記の例ではGETメソッド)については認証処理の対象外となる。
・・・合ってるか不安なので実際に試してみた。
【前提・目標?】
・全てのJSPファイルに対し、BASIC認証をかける設定とする
・<http-method-omission>の値をGET・POSTの状態にした時、
任意のJSPファイルへブラウザよりアクセスし、表示される内容を確認する。
【環境】
・Tomcat7.0.56
【検証】
1.web.xmlに、全てのJSPに対し、BASIC認証をかける設定を行う。
「web.xml」
※DTD・XMLスキーマ定義
<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="serv" version="3.0">
※ルートタグ配下の末尾に追記。
<security-constraint> <web-resource-collection> <web-resource-name>admin page</web-resource-name> <url-pattern>*.jsp</url-pattern> </web-resource-collection> <auth-constraint> <role-name>admin</role-name> </auth-constraint> <user-data-constraint> <transport-guarantee>NONE</transport-guarantee> </user-data-constraint> </security-constraint> <security-role> <role-name>admin</role-name> </security-role> <login-config> <auth-method>BASIC</auth-method> <realm-name>admin page</realm-name> </login-config>
2.tomcat-user.xmlにユーザ・ロールの設定を行う。(権限は適当)
<role rolename="admin"/> <user username="admin" password="password" roles="admin"/>
3.適当なプロジェクト・JSPを作る
4.上記の状態で資産デプロイ・サーバ起動を行う
5.JSPへアクセスする。
→自分の場合は「http://localhost:8080/TestServlet/TestJSP.jsp」へアクセスした。
アクセスすると、BASIC認証のダイアログが表示された。
6."<http-method-omission>"にGETメソッドを指定する。
「web.xml」
<security-constraint> <web-resource-collection> <web-resource-name>admin page</web-resource-name> <url-pattern>*.jsp</url-pattern> <http-method-omission>GET</http-method-omission> </web-resource-collection>
7.JSPへアクセスする。
→アクセスすると、作成したJSPファイルの内容が表示された。
8."<http-method-omission>"にPOSTメソッドを指定する。
「web.xml」
<security-constraint> <web-resource-collection> <web-resource-name>admin page</web-resource-name> <url-pattern>*.jsp</url-pattern> <http-method-omission>POST</http-method-omission> </web-resource-collection>
9.JSPへアクセスする。
→アクセスすると、BASIC認証のダイアログが表示された。
【結論】
・"<http-method-omission>"に指定したHTTPメソッドは認証処理の対象外となる、であってた
- 作者: 宮川 幸久,ターゲット編集部
- 出版社/メーカー: 旺文社
- 発売日: 2011/11/23
- メディア: 単行本
- 購入: 3人 クリック: 10回
- この商品を含むブログを見る
LuhnアルゴリズムをJavaで書いた
カドカワ祭りで買った『プログラマの考え方がおもしろいほど身につく本』に載っていたLuhnアルゴリズムをJavaで書いてみた。
Amazon.co.jp: カドカワ祭り: Kindleストア: 【ピックアップ】ライトノベル, 【ピックアップ】文芸, 【ピックアップ】実用・ビジネス・専門書, 【ピックアップ】コミック, 【ピックアップ】新書 など
後でエロい人が書いたコード*1を見ると、自分の糞コードっぷりがよくわかったが、後学のために晒す。
【お題】
元の数字の各桁に対して、1桁おきにその数字を2倍する。そのあとで、各桁の数字を足していく(2倍した結果2桁になる数字があれれば、十の位と一の位をそれぞれ個別に足す)。その合計が10で割り切れる場合、識別記号は妥当なものであるとみなす。
public class SampleLuhn { /** * チェックサム値の取得 * @param total 各桁の総和 * @return チェックサム値 */ private static int getCheckSum(int total) { int checkSum = 0; if(total % 10 != 0){ checkSum = 10 - (total % 10); } return checkSum; } /** * 引数の値を2倍した値を返却する(ただし2桁になった場合は各桁の数値を足した値を返却する) * @param target 処理対象の数値 * @return 第一引数の値を2倍した値 */ private static int doubleDigitValue(int target) { int doubleDigit = target * 2; if(doubleDigit >= 10){ return 1 + doubleDigit % 10; }else { return doubleDigit; } } /** * 第一引数の数値(偶数桁)における各桁の総和を返却する * @param target 処理対象の数値 * @param length 処理対象の数値の桁数 * @param power 処理対象の数値の累乗 * @return 第一引数における各数字の総和 */ private static int evenSum(int target, int length, int power) { // 総和 int total = 0; for (int i = 1; i <= length; i++) { System.out.println(i + "桁目は" + (target / power)); if(i%2 == 0){ // 偶数桁読み込み時 total += target / power; }else { // 奇数桁読み込み時 int doubleVal= doubleDigitValue(target / power); total += doubleVal; } target %= power; power /= 10; } System.out.println("総和 : " + total); return total; } /** * 第一引数の数値(奇数桁)における各桁の総和を返却する * @param target 処理対象の数値 * @param length 処理対象の数値の桁数 * @param power 処理対象の数値の累乗 * @return 第一引数における各数字の総和 */ private static int oddSum(int target, int length, int power) { // 総和 int total = 0; for (int i = 1; i <= length; i++) { System.out.println(i + "桁目は" + (target / power)); if(i%2 == 0){ // 偶数桁読み込み時 int doubleVal= doubleDigitValue(target / power); total += doubleVal; }else { // 奇数桁読み込み時 total += target / power; } target %= power; power /= 10; } System.out.println("総和 : " + total); return total; } /** * チェックサム値の取得 * @param target * @return チェックサム値 */ public static int createLohnCheckSum(int target) { // 桁数 int length = String.valueOf(target).length(); // 桁数に対する累乗 int power = (int)Math.pow(10, length - 1); // 総和 int sum = 0; // 桁数の偶数・奇数で場合分け if(length % 2 == 0){ sum = evenSum(target, length, power); }else { sum = oddSum(target, length, power); } // チェックサム値取得 int checkSum = getCheckSum(sum); return checkSum; } /** * mainメソッド * @param args */ public static void main(String[] args) { // チェック対象の数値 int target = 12345; // チェックサム作成 int checkSum = createLohnCheckSum(target); System.out.println("checksum : " + checkSum); } }
ちなみに参照実装は下記。エレガントだった。
http://www.chriswareham.demon.co.uk/software/Luhn.java
プログラマの考え方がおもしろいほど身につく本 問題解決能力を鍛えよう! (アスキー書籍)
- 作者: V.AntonSpraul,角征典,高木正弘
- 出版社/メーカー: KADOKAWA / アスキー・メディアワークス
- 発売日: 2014/08/14
- メディア: Kindle版
- この商品を含むブログ (3件) を見る
RabbitMQ tutorial - RPC Pattern in Java
RabbitMQのチュートリアル。
Client/ServerスタイルのRPCパターン。
【お題】
RabbitMQ - RabbitMQ tutorial - Remote procedure call (RPC)
・リクエスト時に「correlationId」を指定しRPC通信におけるClient/Server間の通信を紐付ける。
・リクエスト時に「reply_to」を指定しレスポンス返却時に使用するキュー名を設定する。
【実行環境】
・Windows7 64bit
・RabbitMQ3.5.5
・jdk1.7.0_60
【前提】
・RabbitMQはインストール済
・コンパイルする手順は省く
【手順】
1.RPCClientクラスとRPCServerクラスを作成
●RPCClientクラス
※Server側へ数字(『30』固定)を送信する。同期通信とする。
package no6; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; import java.io.IOException; import java.util.UUID; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; /** * 接続に必要な準備 * @throws Exception */ public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } /** * RPC呼び出し * @param message * @return * @throws Exception */ public String call(String message) throws Exception { String response = null; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); // 呼び出し channel.basicPublish("", requestQueueName, props, message.getBytes()); // 処理結果を待ちうける while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; } /** * クローズ処理 */ public void close() { try { connection.close(); } catch (IOException e) { e.printStackTrace(System.out); } } /** * mainメソッド * @param argv */ public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.close(); } catch (Exception e) { // nop } } } } }
●RPCServerクラス
※CL側から送信された数字より、フィボナッチ数を計算して返却する。
package no6; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; /** * フィボナッチ数を得る * @param n 計算対象の数字 */ private static int fib(int n) { if (n == 0){ return 0; } if (n == 1){ return 1; } return fib(n - 1) + fib(n - 2); } /** * mainメソッド * @param argv */ public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); // メッセージを待ち受ける while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder() .correlationId(props.getCorrelationId()).build(); try { // メッセージから計算対象の数字を得る String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); // フィボナッチ数を計算する response = "" + fib(n); } catch (Exception e) { System.out.println(" [.] " + e.toString()); response = ""; } finally { // 計算結果を送信する channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(System.out); } finally { if (connection != null) { try { connection.close(); } catch (Exception e) { // nop } } } } }
2.RPCClientを1窓、RPCServerを1窓で実行すると、Client側で計算結果が得られる。
●RPCClient
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-client.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no6/RPCClient [x] Requesting fib(30) [.] Got '832040'
●RPCServer
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl ient.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no6/RPCServer [x] Awaiting RPC requests [.] fib(30)
【さいごに】
・MOMについてこんな記事を見つけた。奥が深そう…
IT検証ラボ - 新規格MOM、AMQPやSTOMP対応、プロダクトの性能はJMSと同等以上:ITpro
RabbitMQ tutorial - Topics Pattern in Java
RabbitMQのチュートリアル。
Publish/Subscribeパターンより派生した、メッセージの送信先を制御するRoutingパターンより、routingKeyに正規表現を使用してより細かい送信先制御を行うTopicsパターン。
【お題】
RabbitMQ - RabbitMQ tutorial - Topics
【実行環境】
・Windows7 64bit
・RabbitMQ3.5.5
・jdk1.7.0_60
【前提】
・RabbitMQはインストール済
・コンパイルする手順は省く
【手順】
1.EmitLogTopicクラスとReceiveLogsTopicクラスを作成
●EmitLogTopicクラス(Producer役)
package no5; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; /** * mainメソッド * * <dl> * <dt><span class="strong">第一引数:routingKey</span></dt> * <dd>ログレベルを指定する。書式:{facility}.{severity}</dd> * <br> * <dt><span class="strong">第二引数:message</span></dt> * <dd>ログ出力する文字列</dd> * </dl> * * @param argv */ public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } catch (Throwable th) { th.printStackTrace(System.out); } finally { try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(System.out); } } } /** * routingKeyに設定する文字列をmainメソッドの引数より取得 */ private static String getRouting(String[] strings) { if (strings.length < 1) return "anonymous.info"; return strings[0]; } /** * ログメッセージとして設定する文字列をmainメソッドの引数より取得 */ private static String getMessage(String[] strings) { if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length < startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
●ReceiveLogsTopicクラス(Consumer役)
package no5; import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } // 引数で指定されたログレベルのポリシーでバインドする for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); // メッセージを待ち受ける while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } catch (Throwable th) { th.printStackTrace(System.out); } finally { try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(System.out); } } } }
2.EmitLogTopicを1窓、ReceiveLogsTopicを2窓で実行する。
ReceiveLogsTopicは実行引数をそれぞれ以下のように設定する。
java -cp %CP% no5/ReceiveLogsTopic kern.*
java -cp %CP% no5/ReceiveLogsTopic *.critical
●EmitLogTopicクラス(Producer役)
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl ient.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/EmitLogTopic "kern.critical" "A critical kernel error" [x] Sent 'kern.critical':'A critical kernel error' D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/EmitLogTopic "kern.warn" "A warning kernel error" [x] Sent 'kern.warn':'A warning kernel error' D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/EmitLogTopic "cron.critical" "A critical cron error" [x] Sent 'cron.critical':'A critical cron error' D:\workspace\workspace_rabbitmq\Tutorial>
●ReceiveLogsTopicクラス(Consumer役・ログのポリシー:kern.*)
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl ient.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/ReceiveLogsTopic kern.* [*] Waiting for messages. To exit press CTRL+C [x] Received 'kern.critical':'A critical kernel error' [x] Received 'kern.warn':'A warning kernel error'
●ReceiveLogsTopicクラス(Consumer役・ログのポリシー: *.critical)
D:\workspace\workspace_rabbitmq\Tutorial>set CP=./bin;./lib/commons-io-1.2.jar;./lib/commons-cli-1.1.jar;./lib/rabbitmq-cl ient.jar D:\workspace\workspace_rabbitmq\Tutorial>java -cp %CP% no5/ReceiveLogsTopic *.critical [*] Waiting for messages. To exit press CTRL+C [x] Received 'kern.critical':'A critical kernel error' [x] Received 'cron.critical':'A critical cron error'