001package org.opengion.plugin.daemon;
002
003import java.util.Date;
004
005import javax.jms.QueueSession;
006
007// import org.hsqldb.lib.StringUtil;
008import org.opengion.fukurou.util.StringUtil;                                            // 7.0.6.0 (2019/10/07)
009import org.opengion.fukurou.queue.QueueInfo;
010import org.opengion.fukurou.queue.QueueSend;
011import org.opengion.fukurou.queue.QueueSendFactory;
012import org.opengion.fukurou.util.HybsTimerTask;
013import org.opengion.hayabusa.common.HybsSystem;
014import org.opengion.hayabusa.queue.DBAccessQueue;
015
016/**
017 * メッセージキュー送信
018 * メッセージキュー送信テーブルを監視して、
019 * 送信処理を行います。
020 * 
021 * @og.group メッセージ連携
022 *
023 * @og.rev 5.10.15.0 (2019/08/30) 新規作成
024 * @og.rev 5.10.15.2 (2019/09/20) DB登録の実装をhayabusa.queueに移動
025 * 
026 * @version 5.0
027 * @author oota
028 * @since JDK7
029 *
030 */
031public class Daemon_QueueSend extends HybsTimerTask {
032        private int loopCnt = 0;
033        private static final int LOOP_COUNTER = 24;
034        private QueueSend queueSend;
035
036        private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID");
037        private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" );
038        private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" );
039        private final String USER_ID = "CYYYYY";
040        private final String PG_ID = "DMN_QueSnd";
041        private final String DMN_NAME = "QueueReceiveDMN";
042        private final DBAccessQueue dbAccessQueue;
043        
044        /**
045         * コンストラクター
046         * 初期処理を行います。
047         */
048        public Daemon_QueueSend(){
049                dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
050        }
051        /**
052         * 開始処理
053         * タイマータスクのデーモン処理の開始ポイントです。
054         */
055        @Override
056        protected void startDaemon() {
057                if (loopCnt % LOOP_COUNTER == 0) {
058                        loopCnt = 1;
059                        System.out.println();
060                        System.out.println(toString() + " " + new Date() + "");
061                } else {
062                        // メッセージキュー送信管理テーブルから、送信対象のレコードを取得
063                        final String[][] vals = dbAccessQueue.selectGE65();
064                        
065                        // 取得データ分の繰り返し処理を実行する
066                        for(int i = 0; i  < vals.length; i++) {
067                                final String[] record = vals[i];
068                                
069                                // GE65から取得した値を変数に格納
070                                final String ykno =  record[0];
071                                final String queueId = record[1];
072                                final String message = record[2];
073                                final String dedupliId = record[3];
074                                final String queSyu = record[4];
075                                final String jmsUrl = record[5];
076                                
077                                final String queueType = queSyu.toUpperCase();
078                                queueSend = QueueSendFactory.newQueueSend(queueType);
079        
080                                // 接続処理
081                                queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY);
082                                
083                                // メッセージ送信管理テーブルから取得したデータを送信実装予定
084                                final QueueInfo queueInfo = new QueueInfo();
085                                
086                                // 応答確認種別
087                                if("MQ".equals(queueType)){
088                                        // MQメッセージサーバ指定時
089                                        queueInfo.setMqTransacted(false);
090                                        queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE);
091                                        // キュー名
092                                        queueInfo.setMqQueueName(queueId);
093                                }else if("SQS".equals(queueType)){
094                                        // SQSメッセージサーバ指定時
095                                        // グループID
096                                        queueInfo.setSqsFifoGroupId(queueId);
097                                        if(!StringUtil.isEmpty(dedupliId)) {
098                                                // 重複排除ID
099                                                // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる)
100                                                queueInfo.setSqsFifoDedupliId(dedupliId);
101                                        }
102                                }
103                        
104                                // メッセージ
105                                queueInfo.setMessage(message);
106                        
107                                // 完了フラグを処理中:2に更新
108                                dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_PROCESS);
109                                
110                                // メッセージ送信処理
111                                try{
112                                        queueSend.sendMessage(queueInfo);
113                                        
114                                        // 完了フラグを完了:3に更新
115                                        dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_END);
116                                        
117                                }catch(Exception e) {
118                                        // 完了フラグをエラー:4に更新して、エラー情報を登録
119                                        dbAccessQueue.updateGE66Error(ykno, e.getMessage());
120                                } 
121                        }
122                        
123                        // クローズ処理
124                        queueSend.close();
125                        
126                        loopCnt++;
127                }
128        }
129}