Browse Source

【情报二期】数据域服务 - kafka数据消费业务代码

fangtasyj 3 months ago
parent
commit
1248590953

+ 1 - 1
src/main/java/com/hhwy/qbeqsjy/dao/CtrlDao.java

@@ -71,7 +71,7 @@ public class CtrlDao {
      * @return
      */
     public boolean batchInsertWarningInfo(List<Object[]> warningInfoList){
-        String sql = "insert into t_ctrl_area_info (task_id, clue, warning_msg, create_time) values (?, ?, ?, ?)";
+        String sql = "insert into t_ctrl_area_info (task_id, clue, clue_type, warning_msg, create_time) values (?, ?, ?, ?)";
         try {
             int[] insertRows = gaussDBJdbcTemplate.batchUpdate(sql, warningInfoList);
             return insertRows.length == warningInfoList.size();

+ 44 - 42
src/main/java/com/hhwy/qbeqsjy/kafka/service/DataProcessService.java → src/main/java/com/hhwy/qbeqsjy/kafka/service/ConsumeService.java

@@ -2,10 +2,10 @@ package com.hhwy.qbeqsjy.kafka.service;
 
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import com.hhwy.qbeqsjy.common.CommonUtil;
 import com.hhwy.qbeqsjy.common.Constants;
-import com.hhwy.qbeqsjy.common.TimeTool;
+import com.hhwy.qbeqsjy.dao.CtrlDao;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -14,12 +14,13 @@ import org.springframework.context.ApplicationListener;
 import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
  * 布控命中数据处理
@@ -27,10 +28,12 @@ import java.util.concurrent.Executors;
 @Slf4j
 @Service
 @EnableScheduling
-public class DataProcessService implements ApplicationListener<ContextRefreshedEvent> {
+public class ConsumeService implements ApplicationListener<ContextRefreshedEvent> {
 
     @Value("${topic.name}")
     private String topic;
+    @Resource
+    private CtrlDao ctrlDao;
     //默认发送1000条消息
     public static final int messageNumToSend = 1000;
 
@@ -73,56 +76,55 @@ public class DataProcessService implements ApplicationListener<ContextRefreshedE
      * @param topic
      */
     public void process(KafkaConsumer<Integer, String> consumer, String topic) {
-        JSONObject message;
-        JSONObject oriField;
-        List<String> messageList = new ArrayList<>();
+        List<Object[]> warningInfoList = new ArrayList<>();
         consumer.subscribe(Collections.singletonList(topic));
         while (true) {
             // 间隔30秒拉取一次数据
             ConsumerRecords<Integer, String> records = consumer.poll(Constants.WAIT_TIME);
             // 业务处理逻辑(解析、处理、入库)
             if (records != null && records.count() > 0) {
-                long consumeCount = 0L;
-                JSONArray hitMsgList = new JSONArray();
+                Date createTime = new Date();
                 for (ConsumerRecord<Integer, String> record : records) {
-                    message = JSONObject.parseObject(record.value());
-                    log.info(message.toJSONString());
-                    messageList.add(message.toJSONString());
-                    consumeCount++;
-                    oriField = message.getJSONObject("ORIFIELD");
-                    // 线索值
-                    String clue = oriField.getString("AUTH_ACCOUNT");
-                    // 命中时间
-                    String captureTime = TimeTool.timeStampToDateString(Long.valueOf(oriField.getString("CAPTURE_TIME")));
-                    // 经度
-                    String longitude = oriField.getString("LONGITUDE");
-                    // 维度
-                    String latitude = oriField.getString("LATITUDE");
-                    // 基站ID(4G)
-                    String baseStationId = oriField.getString("BASE_STATION_ID");
-                    //====================== 1.过滤基站号为空的数据 ======================
-                    if (StringUtils.isNotBlank(baseStationId)) {
-                        JSONObject msg = new JSONObject();
-                        msg.put("clue", clue);
-                        msg.put("longitude", longitude);
-                        msg.put("latitude", latitude);
-                        msg.put("baseStationId", baseStationId);
-                        msg.put("captureTime", captureTime);
-                        consumeCount++;
-                        hitMsgList.add(message);
-//                        //====================== 2.过滤掉经纬度不在指定区域范围的数据 - to do ======================
+                    JSONObject message = JSONObject.parseObject(record.value());
+//                    log.info(message.toJSONString());
+                    // 解析[UNICTRL]节点
+                    JSONObject uniCtrl = message.getJSONObject("UNICTRL");
+                    String clueId = uniCtrl.getString("U_CLUEID");   // 样例:120000_59275350793392128_D201005_13502082832
+                    String[] clueIdStrArr = clueId.split("_");
+                    String taskId = clueIdStrArr[Constants.NUM_1];
+                    String clueType = clueIdStrArr[Constants.NUM_2];
+                    String clue = clueIdStrArr[Constants.NUM_3];
+                    String warningInfoJSONStr = message.toJSONString();
+                    String resourceId = uniCtrl.getString("U_RESID");
+                    Object[] warningInfo = new Object[]{taskId, clue, clueType, warningInfoJSONStr, createTime};
+                    warningInfoList.add(warningInfo);
 
 
-//                    }
-                    }
+                    // 解析[ORIFIELD]节点
+                    JSONObject oriField = message.getJSONObject("ORIFIELD");
+                    // 获取配置文件中数据资源编码对应的字段信息(以JSON字符串形式存储)
+                    String fieldJSONStr = CommonUtil.getConfValueByKey(resourceId);
 
-                    //====================== 3.符合条件的预警数据批量入库 - to do ======================
 
+                    //====================== 1.过滤基站号为空的数据 ======================
+//                    if (StringUtils.isNotBlank(baseStationId)) {
+//                        JSONObject msg = new JSONObject();
+//                        msg.put("clue", clue);
+//                        msg.put("longitude", longitude);
+//                        msg.put("latitude", latitude);
+//                        msg.put("baseStationId", baseStationId);
+//                        msg.put("captureTime", captureTime);
+//                        consumeCount++;
+//                        hitMsgList.add(message);
+//                        //====================== 2.过滤掉经纬度不在指定区域范围的数据 - to do ======================
 
-                    log.info("【QBEQSJYCTRL - 消费者】 已消费数据量:{}", consumeCount);
-                    // 消费完一批数据后集合清零
-                    messageList.clear();
+
+//                    }
                 }
+                //====================== 3.符合条件的预警数据批量入库 - to do ======================
+                ctrlDao.batchInsertWarningInfo(warningInfoList);
+                log.info("【QBEQSJYCTRL - 消费者】 已消费数据量:{}", warningInfoList.size());
+                warningInfoList.clear();
             }
         }
     }

+ 10 - 5
src/main/java/com/hhwy/qbeqsjy/service/CtrlService.java

@@ -270,7 +270,8 @@ public class CtrlService {
     public AjaxResult doCtrl(HttpClient httpClient, Map<String, Object> frontParamsMap){
         String taskId = (String) frontParamsMap.get("taskId");
         String clue = (String) frontParamsMap.get("clue");
-        String clueIdOrigin = fromAreaCode.concat("_").concat(taskId).concat("_").concat(clue);
+        String clueType = (String) frontParamsMap.get("clueType");
+        String clueIdOrigin = fromAreaCode.concat("_").concat(taskId).concat("_").concat(clueType).concat("_").concat(clue);
         // 1.调用烽火布控接口,下发布控任务
         JSONObject resJSONObj = callFhUniCtrlService(httpClient, frontParamsMap);
         String statusCode = resJSONObj.getString("statuscode");
@@ -319,7 +320,8 @@ public class CtrlService {
         List<String> resourceIdList = (List<String>) frontParamsMap.get("resourceIdList");
         // 构建调用烽火布控接口所需的请求参数
         JSONObject params = new JSONObject();
-        String clueId = fromAreaCode.concat("_").concat(taskId).concat("_").concat(clue);
+        // 样例:120000_59275350793392128_D201005_13502082832
+        String clueId = fromAreaCode.concat("_").concat(taskId).concat("_").concat(clueType).concat("_").concat(clue);
         params.put("clue_id", clueId);
         params.put("from_areacode", fromAreaCode);
         params.put("to_areacode", JSONArray.parseArray(toAreaCode));
@@ -362,7 +364,8 @@ public class CtrlService {
     public String callFhUniCtrlStatusQueryService(HttpClient httpClient, Map<String, Object> frontParamsMap) {
         String taskId = (String) frontParamsMap.get("taskId");
         String clue = (String) frontParamsMap.get("clue");
-        String clueId = fromAreaCode.concat("_").concat(taskId).concat("_").concat(clue);
+        String clueType = (String) frontParamsMap.get("clueType");
+        String clueId = fromAreaCode.concat("_").concat(taskId).concat("_").concat(clueType).concat("_").concat(clue);
         JSONObject params = new JSONObject();
         params.put("clue_id", clueId);
         params.put("from_areacode", fromAreaCode);
@@ -406,7 +409,8 @@ public class CtrlService {
     public AjaxResult doStopCtrl(HttpClient httpClient, Map<String, Object> frontParamsMap){
         String taskId = (String) frontParamsMap.get("taskId");
         String clue = (String) frontParamsMap.get("clue");
-        String clueIdOrigin = fromAreaCode.concat("_").concat(taskId).concat("_").concat(clue);
+        String clueType = (String) frontParamsMap.get("clueType");
+        String clueIdOrigin = fromAreaCode.concat("_").concat(taskId).concat("_").concat(clueType).concat("_").concat(clue);
         // 调用烽火停控接口,停止布控
         JSONObject responseJSONObj = callFhUniCtrlStopService(httpClient, frontParamsMap);
         String statusCode = responseJSONObj.getString("statuscode");
@@ -449,7 +453,8 @@ public class CtrlService {
     public JSONObject callFhUniCtrlStopService(HttpClient httpClient, Map<String, Object> frontParamsMap) {
         String taskId = (String) frontParamsMap.get("taskId");
         String clue = (String) frontParamsMap.get("clue");
-        String clueId = fromAreaCode.concat("_").concat(taskId).concat("_").concat(clue);
+        String clueType = (String) frontParamsMap.get("clueType");
+        String clueId = fromAreaCode.concat("_").concat(taskId).concat("_").concat(clueType).concat("_").concat(clue);
         // 构建调用烽火停控接口所需的请求参数
         JSONObject params = new JSONObject();
         params.put("clue_id", clueId);

+ 2 - 2
src/main/java/com/hhwy/qbeqsjy/strategy/DataParseStrategy.java → src/main/java/com/hhwy/qbeqsjy/strategy/Strategy.java

@@ -6,11 +6,11 @@ import com.alibaba.fastjson.JSONObject;
 /**
  * 预警数据(JSON)解析策略接口
  */
-public interface DataParseStrategy {
+public interface Strategy {
 
     /**
      * 定义数据解析方法
      */
-    void doParse(JSONObject warningInfo);
+    JSONObject doParse(JSONObject warningInfo);
 
 }

+ 0 - 16
src/main/java/com/hhwy/qbeqsjy/strategy/impl/Auth.java

@@ -1,16 +0,0 @@
-package com.hhwy.qbeqsjy.strategy.impl;
-
-import com.alibaba.fastjson.JSONObject;
-import com.hhwy.qbeqsjy.strategy.DataParseStrategy;
-
-
-/**
- * AUTH表预警数据解析规则类
- */
-public class Auth implements DataParseStrategy {
-    @Override
-    public void doParse(JSONObject warningInfo) {
-
-
-    }
-}

+ 37 - 0
src/main/java/com/hhwy/qbeqsjy/strategy/impl/AuthStrategy.java

@@ -0,0 +1,37 @@
+package com.hhwy.qbeqsjy.strategy.impl;
+
+import com.alibaba.fastjson.JSONObject;
+import com.hhwy.qbeqsjy.common.Constants;
+import com.hhwy.qbeqsjy.common.TimeTool;
+import com.hhwy.qbeqsjy.strategy.Strategy;
+
+import java.util.Date;
+
+
+/**
+ * AUTH表预警数据解析规则类
+ */
+public class AuthStrategy implements Strategy {
+    @Override
+    public JSONObject doParse(JSONObject warningInfo) {
+        // 原始字段
+        JSONObject oriField = warningInfo.getJSONObject("ORIFIELD");
+        // 线索值
+//        String clue = oriField.getString("AUTH_ACCOUNT");
+        // 命中时间
+        String captureTime = TimeTool.timeStampToDateString(Long.valueOf(oriField.getString("CAPTURE_TIME")));
+        // 经度
+        String lon = oriField.getString("LONGITUDE");
+        // 维度
+        String lat = oriField.getString("LATITUDE");
+        // 基站ID(4G)
+        String baseStationId = oriField.getString("BASE_STATION_ID");
+
+        return null;
+
+
+
+
+
+    }
+}