001package org.opengion.plugin.daemon;
002
003import java.io.File;
004import java.util.Date;
005import java.util.Locale;                                                                                        // 7.2.9.4 (2020/11/20)
006
007import javax.jms.JMSException;
008import javax.jms.Message;
009import javax.jms.MessageListener;
010import javax.jms.TextMessage;
011
012// import org.opengion.fukurou.util.BizUtil;
013import org.opengion.fukurou.business.BizUtil;
014import org.opengion.fukurou.queue.QueueInfo;
015import org.opengion.fukurou.queue.QueueReceive;
016import org.opengion.fukurou.queue.QueueReceiveFactory;
017import org.opengion.fukurou.util.HybsTimerTask;
018import org.opengion.fukurou.util.StringUtil;
019import org.opengion.hayabusa.common.HybsSystem;
020import org.opengion.hayabusa.common.HybsSystemException;
021import org.opengion.hayabusa.queue.DBAccessQueue;
022
023/**
024 * メッセージキュー受信 メッセージキューの受信処理を行います。
025 *
026 * @og.group メッセージ連携
027 *
028 * @og.rev 5.10.15.2 (2019/09/20) 新規作成
029 *
030 * @version 5.0
031 * @author oota
032 * @since JDK7
033 *
034 */
035public class Daemon_QueueReceive extends HybsTimerTask {
036        /** このプログラムのVERSION文字列を設定します。   {@value} */
037        private static final String VERSION = "7.2.9.4 (2020/11/20)" ;
038
039        private int loopCnt ;
040        private QueueReceive queueReceive ;
041
042        private static final int LOOP_COUNTER = 24;
043        private static final char FPSC = File.pathSeparatorChar ;                                       // 7.2.9.4 (2020/11/20) システムに依存するパス区切り文字
044
045        private final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys("CLOUD_SQS_ACCESS_KEY");
046        private final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys("CLOUD_SQS_SECRET_KEY");
047        private final String MQ_QUEUE_TYPE;
048        private final String MQ_QURUE_SERVER_URL = HybsSystem.sys("MQ_QUEUE_SERVER_URL");
049        private final String MQ_QUEUE_RECEIVE_LISTENER =HybsSystem.sys("MQ_QUEUE_RECEIVE_LISTENER");
050
051        private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID");
052        private final String USER_ID   = "CYYYYY";
053        private final String PG_ID;
054        private final String DMN_NAME  = "QueueReceiveDMN";
055        private final DBAccessQueue dbAccessQueue;
056
057        private final String REAL_PATH = HybsSystem.sys("REAL_PATH");                           // 7.2.9.4 (2020/11/20)
058
059        /**
060         * コンストラクター
061         * 初期処理を行います。
062         *
063         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する
064         */
065        public Daemon_QueueReceive() {
066                super();
067
068                // パラメータの設定
069                // 7.2.9.4 (2020/11/20) PMD:Avoid if (x != y) ..; else ..;
070                if(StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) {
071                        throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい");
072                }else {
073//                      MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase();
074                        MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN );    // 7.2.9.4 (2020/11/20)
075                        PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10);
076                }
077
078                dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
079
080//              // パラメータの設定
081//              if(!StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) {
082////                    MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase();
083//                      MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN );    // 7.2.9.4 (2020/11/20)
084//                      PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10);
085//              }else {
086//                      throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい");
087//              }
088//
089//              dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
090        }
091
092        /**
093         * 初期処理 MQサーバに接続します。
094         *
095         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する
096         */
097        @Override
098        public void initDaemon() {
099                // 開始ログO
100                final StringBuilder errMsg = new StringBuilder();
101                if (MQ_QUEUE_TYPE == null) {
102                        errMsg.append("MQ_QUEUE_TYPE");
103                }
104                if (MQ_QURUE_SERVER_URL == null) {
105                        errMsg.append(" MQ_QUEUE_SERVER_URL");
106                }
107
108                if (errMsg.length() > 0) {
109                        errMsg.append(" キュータイプを特定するために、左記のシステムリソースを登録して下さい。");
110                        throw new HybsSystemException(errMsg.toString());
111                }
112
113//              final String queueType = MQ_QUEUE_TYPE.toUpperCase();
114                final String queueType = MQ_QUEUE_TYPE.toUpperCase( Locale.JAPAN );     // 7.2.9.4 (2020/11/20)
115
116                // 開始ログ
117                System.out.println("MQキュータイプ:" + queueType);
118                System.out.println("MQサーバーURL:" + MQ_QURUE_SERVER_URL);
119
120                queueReceive = QueueReceiveFactory.newQueueReceive(queueType);
121
122                queueReceive.connect(MQ_QURUE_SERVER_URL, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY);
123        }
124
125        /**
126         * 開始処理 タイマータスクのデーモン処理の開始ポイントです。
127         */
128        @Override
129        protected void startDaemon() {
130                if (loopCnt % LOOP_COUNTER == 0) {
131                        loopCnt = 1;
132                        System.out.println();
133                        System.out.print(toString() + " " + new Date() + " ");
134                } else {
135                        // 対象 キュー名(グループ名)とbizlogic名の取得処理
136                        final String[][] ge67vals = dbAccessQueue.setlectGE67();
137                        // キュー情報登録チェック
138                        if (ge67vals.length == 0) {
139                                final String errMsg = "GE67にキュー情報が登録されていません。";
140                                throw new RuntimeException(errMsg);
141                        }
142                        // MQとSQSで処理を分岐
143                        // MQ:指定キューIDからキューメッセージを取得
144                        // SQS:キューメッセージを取得してからキューID(グループID)を取得
145                        switch (MQ_QUEUE_TYPE) {
146                                case "MQ":
147                                        processMq(ge67vals);
148                                        break;
149                                case "SQS":
150                                        processSqs(ge67vals);
151                                        break;
152                                default:
153                                        final String errMsg = "リソース(MQ_QUEUE_TYPE)の値が不正です。:" + MQ_QUEUE_TYPE;
154                                        throw new RuntimeException(errMsg);
155                        }
156
157                        loopCnt++;
158                }
159        }
160
161        /**
162         * MQ用の処理
163         * GE67に登録されているキューIDの、
164         * メッセージキューを取得して処理を行います。
165         *
166         * @param ge67vals GE67の配列データ
167         */
168        private void processMq(final String[][] ge67vals) {
169                boolean listenerMode = false;
170
171                if("true".equals(MQ_QUEUE_RECEIVE_LISTENER)) {
172                        listenerMode = true;
173                }
174
175                if(listenerMode) {
176                        // リスナーの初期化
177                        queueReceive.closeListener();
178                }
179
180                // ge67のキューリスト分繰り返します
181                for (int row = 0; row < ge67vals.length; row++) {
182                        final String queueId = ge67vals[row][0];
183                        final String bizLogicId = ge67vals[row][1];
184
185                        if(listenerMode) {
186                                // リスナーを設定して、動的な受信処理(MQ専用)
187                                final QueueReceiveListener listener = new QueueReceiveListener(queueId, bizLogicId);
188                                queueReceive.setListener(queueId, listener);
189                        }else {
190                                // 1件の受信処理
191                                final QueueInfo queueInfo = queueReceive.receive(queueId);
192                                if (queueInfo != null) {
193                                        processMessage(queueId, bizLogicId, queueInfo.getMessage());
194                                        // 1件処理を行ったら処理を終了します。
195                                        break;
196                                }
197                        }
198                }
199        }
200
201        /**
202         * SQS用の処理
203         * SQSはグループIDを指定して、キューを取得することはできず、
204         * 任意のキューを1つ取得してから、
205         * 判定処理を行います。
206         * GE67に登録されていないグループIDのキューが取得された場合は、
207         * GE68にエラーレコードを登録します。
208         *
209         * @param ge67vals GE67の配列データ
210         */
211        private void processSqs(final String[][] ge67vals) {
212                // 下記はSQSの場合(キューを1件取得して処理)
213                final QueueInfo queueInfo = queueReceive.receive(null);
214
215                // キューが未取得の場合
216                if(queueInfo == null) {
217                        return;
218                }
219
220                // 受信したキューを処理
221                final String groupId = queueInfo.getSqsFifoGroupId();
222                Boolean existsFlg = false;
223                // valsにグループIDのレコードが存在するか検索
224                for (int row = 0; row < ge67vals.length; row++) {
225                        final String queueId = ge67vals[row][0];
226
227                        if (groupId != null && groupId.equals(queueId)) {
228                                // 該当レコードあり
229                                final String bizLogicId = ge67vals[row][1];
230                                processMessage(queueId, bizLogicId, queueInfo.getMessage());
231
232                                existsFlg = true;
233                                break;
234                        }
235                }
236
237                if (!existsFlg) {
238                        // 該当groupIdの未登録エラー
239                        // 処理番号生成
240                        final String syoriNo = dbAccessQueue.generateSyoriNo();
241                        dbAccessQueue.insertGE68(groupId, syoriNo, null, queueInfo.getMessage());
242                        dbAccessQueue.updateGE68Error(syoriNo, "SQSキューに設定されているグループIDが、GE67に未登録です。");
243                }
244        }
245
246        /**
247         * キャンセル処理
248         * タイマータスクのデーモン処理の終了ポイントです。
249         *
250         * @return キャンセルできれば、true
251         */
252        @Override
253        public boolean cancel() {
254                if (queueReceive != null) {
255                        queueReceive.close();
256                }
257
258                return super.cancel();
259        }
260
261        /**
262         * メッセージの処理
263         *  受信したメッセージをbizLogicに渡して、
264         *  処理を実行します。
265         *
266         * @param queueId キューID
267         * @param bizLogicId ビズロジックID
268         * @param msgText 受信メッセージ
269         */
270        private void processMessage(final String queueId, final String bizLogicId, final String msgText) {
271                String syoriNo = "";
272                try {
273                        // 処理番号生成
274                        syoriNo = dbAccessQueue.generateSyoriNo();
275
276                        // 管理テーブル登録
277                        dbAccessQueue.insertGE68(queueId, syoriNo, bizLogicId, msgText);
278
279                        // bizLogicの処理を実行
280                        callActBizLogic(SYSTEM_ID, bizLogicId, msgText);
281
282                        // 管理テーブル更新(完了)
283                        dbAccessQueue.updateGE68(syoriNo, DBAccessQueue.FGKAN_END);
284
285                } catch (Throwable te) {
286                        // bizLogicでのエラーはログの未出力して、処理を継続します。
287                        // bizLogicのエラー情報はCauseに格納されているため、Causeから取得します。
288                        String errMessage = null;
289                        if (te.getCause() != null) {
290                                // causeが設定されている場合のエラー情報
291                                errMessage = te.getCause().getMessage();
292                        } else {
293                                // causeが未設定の場合のエラー情報
294                                errMessage = te.getMessage();
295                        }
296                        System.out.println(errMessage);
297                        try {
298                                // エラーテーブルに登録
299                                dbAccessQueue.updateGE68Error(syoriNo, errMessage);
300                        } catch (Exception e) {
301                                // ここでのエラーはスルーします。
302                                System.out.println("管理テーブル登録エラー:" + e.getMessage());
303                        }
304                }
305        }
306
307        /**
308         * bizLogic処理の呼び出し
309         * 必要なパス情報をリソースから取得して、
310         * BizUtil.actBizLogicにパス情報を渡すことで、
311         * bizLogicの処理を行います。
312         *
313         * @og.rev 7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している
314         *
315         * @param systemId  システムID
316         * @param logicName ロジックファイル名
317         * @param msgText   メッセージ
318         * @throws Throwable エラー情報
319         */
320        private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable {
321                // 対象 クラスパスの生成
322                // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。
323                // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。
324                // bizLogicTag.javaのコードを移植
325                final String classDir = REAL_PATH + HybsSystem.sys( "BIZLOGIC_CLASS_PATH" );            // bizの下のパス
326                final String webIinf  = REAL_PATH + "WEB-INF" + File.separator ;
327
328                final StringBuilder sb = new StringBuilder().append('.').append(FPSC);
329
330                final File lib = new File( webIinf + "lib");
331                final File[] libFiles = lib.listFiles();
332                if( libFiles != null ) {
333                        // 7.2.9.4 (2020/11/20) PMD:This for loop can be replaced by a foreach loop
334                        for( final File file : libFiles ) {
335                                sb.append( file.getAbsolutePath() ).append(FPSC);
336                        }
337//                      for (int i = 0; i < libFiles.length; i++) {
338//                              sb.append( libFiles[i].getAbsolutePath() ).append(FPSC);
339//                      }
340                }
341
342                // 上記で生成したクラスパスをclassPathに格納
343                final String classPath =
344                        sb.append( webIinf ).append( "classes" ).append(FPSC)
345                          .append( classDir ).append(FPSC)              // bizの下のパス
346                          .toString();
347
348                // ソースパス情報の生成
349                final String  srcDir        = REAL_PATH + HybsSystem.sys( "BIZLOGIC_SRC_PATH" );
350                final boolean isAutoCompile = HybsSystem.sysBool( "BIZLOGIC_AUTO_COMPILE" );
351                final boolean isHotDeploy   = HybsSystem.sysBool( "BIZLOGIC_HOT_DEPLOY" );
352
353                // bizLogicに渡すパラメータ
354                final String[] keys = new String[] { "message" };
355                final String[] vals = new String[] { msgText };
356
357                // bizLogic処理の実行
358                BizUtil.actBizLogic( srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals );
359        }
360
361//      7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している
362//      private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable {
363//              // 対象 クラスパスの生成
364//              // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。
365//              // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。
366//              // bizLogicTag.javaのコードを移植
367//              final StringBuilder sb = new StringBuilder();
368//              sb.append('.').append(File.pathSeparatorChar);
369//              final File lib = new File(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "lib");
370//              final File[] libFiles = lib.listFiles();
371//              for (int i = 0; i < libFiles.length; i++) {
372//                      sb.append(libFiles[i].getAbsolutePath()).append(File.pathSeparatorChar);
373//              }
374//              sb.append(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "classes").append(File.pathSeparatorChar);
375//              // bizの下のパス
376//              sb.append(HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH")).append(File.pathSeparatorChar);
377//              // 上記で生成したクラスパスをclassPathに格納
378//              final String classPath = sb.toString();
379//
380//              // ソースパス情報の生成
381//              final String srcDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_SRC_PATH");
382//              final String classDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH");
383//              final boolean isAutoCompile = HybsSystem.sysBool("BIZLOGIC_AUTO_COMPILE");
384//              final boolean isHotDeploy = HybsSystem.sysBool("BIZLOGIC_HOT_DEPLOY");
385//
386//              // bizLogicに渡すパラメータ
387//              final String[] keys = new String[] { "message" };
388//              final String[] vals = new String[] { msgText };
389//
390//              // bizLogic処理の実行
391//              BizUtil.actBizLogic(srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals);
392//      }
393
394        /**
395         * 受信処理リスナー用のインナークラス
396         * QueueReceiveリスナークラス リスナー用のクラスです。
397         *  MQに設定することで、メッセージが受信されると、
398         * onMessageメソッドが実行されます。
399         *
400         * @og.rev 7.2.9.4 (2020/11/20) private final 追加
401         */
402//      class QueueReceiveListener implements MessageListener {
403        private final class QueueReceiveListener implements MessageListener {
404//              private String queueId = "";
405//              private String bizLogicId = "";
406                private final String queueId ;
407                private final String bizLogicId ;
408
409                /**
410                 * コンストラクター 初期処理を行います。
411                 *
412                 * @param quId  キューID
413                 * @param bizId ビズロジックID
414                 */
415                public QueueReceiveListener(final String quId, final String bizId) {
416                        queueId    = quId;
417                        bizLogicId = bizId;
418                }
419
420                /**
421                 * メッセージ受信処理 MQサーバにメッセージが受信されると、 メソッドの処理が行われます。
422                 *
423                 * @param message 受信メッセージ
424                 */
425                @Override
426                public void onMessage(final Message message) {
427                        // 要求番号 : ここでは使用していません。
428                        final String ykno = "";
429
430                        // メッセージ受信
431                        final TextMessage msg = (TextMessage) message;
432                        String msgText = "";
433
434                        try {
435                                // キューサーバのメッセージを取得
436                                msgText = msg.getText();
437
438                                // メーッセージの受信応答を返します。
439                                msg.acknowledge();
440
441                                processMessage(queueId, bizLogicId, msgText);
442
443                        } catch (JMSException jmse) {
444                                try {
445                                        // 管理テーブル更新
446                                        // 管理テーブル更新(エラー)
447                                        dbAccessQueue.updateGE68(ykno, DBAccessQueue.FGKAN_ERROR);
448                                } catch (Exception e) {
449                                        // ここでのエラーはスルーします。
450                                        System.out.println("管理テーブル登録エラー:" + e.getMessage());
451                                }
452
453                                throw new HybsSystemException("bizLogicの処理中にエラーが発生しました。" + jmse.getMessage());
454                        }
455                }
456        }
457}