Bläddra i källkod

【情报二期】数据域服务 - 布控预警结果数据消费者服务开发

fangtasyj 3 månader sedan
förälder
incheckning
8c676c8832

+ 2 - 0
src/main/java/com/hhwy/qbeqsjy/common/Constants.java

@@ -11,6 +11,8 @@ import java.util.Locale;
  */
 public class Constants {
 
+    //网安AUTH协议表数据资源编码
+    public static final String TABLE_AUTH_RES_ID = "134B102";
     public static final String MULTI_THREAD_ENABLED_FLAG = "true";
     // 一次请求的最大等待时间
     public static final int WAIT_TIME = 30000;

+ 1 - 1
src/main/java/com/hhwy/qbeqsjy/dao/CtrlDao.java

@@ -114,7 +114,7 @@ public class CtrlDao {
      * @return
      */
     public boolean batchInsertWarningInfo(List<Object[]> warningInfoList){
-        String sql = "insert into t_ctrl_warning_info (task_id, clue, clue_type, warning_msg, capture_time) values (?, ?, ?, ?, ?)";
+        String sql = "insert into t_ctrl_warning_info (task_id, resource_id, clue, clue_type, warning_msg, capture_time) values (?, ?, ?, ?, ?, ?)";
         try {
             int[] insertRows = gaussDBJdbcTemplate.batchUpdate(sql, warningInfoList);
             return insertRows.length == warningInfoList.size();

+ 19 - 24
src/main/java/com/hhwy/qbeqsjy/kafka/service/ConsumeService.java

@@ -89,52 +89,47 @@ public class ConsumeService implements ApplicationListener<ContextRefreshedEvent
         while (true) {
             // 间隔30秒拉取一次数据
             ConsumerRecords<Integer, String> records = consumer.poll(Constants.WAIT_TIME);
-            // 数据解析、处理、入库
             if (records != null && records.count() > 0) {
                 for (ConsumerRecord<Integer, String> record : records) {
                     JSONObject message = JSONObject.parseObject(record.value());
-//                    log.info(message.toJSONString());
-                    // 获取[ORIFIELD]节点数据,该节点数据格式与布控资源表有关,表结构不同,数据格式不同
-                    JSONObject oriField = message.getJSONObject("ORIFIELD");
-                    // 考虑到[ORIFIELD]节点数据格式不统一,此处不进行解析,数据域统一入库,并以接口形式同步到用户域后,由前端业务针对不同的资源表进行定制化解析
-                    String warningInfoJSONStr = oriField.toJSONString();
-                    // 解析[UNICTRL]节点,该节点数据为统一格式,与布控资源表无关
                     JSONObject uniCtrl = message.getJSONObject("UNICTRL");
-                    String clueId = uniCtrl.getString("U_CLUEID");     // 样例:clueId:120000_59275350793392128_D201005_13502082832
+                    String clueId = uniCtrl.getString("U_CLUEID");
                     String[] clueIdStrArr = clueId.split("_");
                     String taskId = clueIdStrArr[Constants.NUM_1];
                     String clueType = clueIdStrArr[Constants.NUM_2];
                     String clue = clueIdStrArr[Constants.NUM_3];
                     String resourceId = uniCtrl.getString("U_RESID");
-                    // 如果线索类型是手机号
-                    if(clueCodePhone.equals(clueType) && "134B102".equals(resourceId)){
+                    // 每张布控资源表对应的命中数据JSON对象
+                    JSONObject oriField = message.getJSONObject("ORIFIELD");
+                    String warningInfoJSONStr = oriField.toJSONString();
+                    // 命中时间不采用原始预警数据中的【截获时间】,而采用该服务消费到当前数据时的自然时间作为命中时间
+                    String captureTime = TimeTool.timeStampToDateString(TimeTool.getNowTimeStamp());
+                    if(Constants.TABLE_AUTH_RES_ID.equals(resourceId)){
+                        // 1. ========================= 过滤基站号为空的数据
                         String baseStationId = oriField.getString("BASE_STATION_ID");
-                        // 1. ========================= 过滤基站号为空的数据 =========================
                         if(StringUtils.isBlank(baseStationId)){
                             continue;
                         }
+                        // 2. ========================= 过滤不在布控区域范围的数据
                         List<Map<String, Object>> ctrlAreaInfoList = ctrlDao.queryCtrlAreaInfo(taskId);
                         if(ctrlAreaInfoList.size() > 0){
                             String lon = oriField.getString("LONGITUDE");
                             String lat = oriField.getString("LATITUDE");
-                            // 2. ========================= 过滤不在布控区域范围的数据 =========================
-                            // 不采用原始预警数据中的"截获时间",而使用当前时间作为"截获时间"
-                            String captureTime = TimeTool.timeStampToDateString(TimeTool.getNowTimeStamp());
                             for (Map<String, Object> ctrlAreaInfo : ctrlAreaInfoList) {
                                 Integer areaGeoType = (Integer) ctrlAreaInfo.get("area_geo_type");
                                 JSONArray location = JSON.parseArray((String) ctrlAreaInfo.get("location"));
                                 // ****** 说明:如果出现布控区域范围存在重叠,并且命中点位也恰巧出现在重叠区域的极端情况,目前则只保存一份数据
-                                // 布控区域是圆形
+                                // 布控区域是圆形
                                 if(Constants.NUM_0 == areaGeoType){
                                     JSONObject loc = location.getJSONObject(0);
                                     String centerLon = loc.getString("lon");
                                     String centerLat = loc.getString("lat");
                                     String radius = loc.getString("radius");
                                     if(GeoUtils.isPointInCircle(Double.valueOf(lat), Double.valueOf(lon), Double.valueOf(centerLat), Double.valueOf(centerLon), Double.valueOf(radius))){
-                                        warningInfoList.add(new Object[]{taskId, clue, clueType, warningInfoJSONStr, captureTime});
+                                        warningInfoList.add(new Object[]{taskId, resourceId, clue, clueType, warningInfoJSONStr, captureTime});
                                         break;
                                     }
-                                } else {   // 布控区域是一般多边形
+                                } else {   // 布控区域是一般多边形
                                     List<Coordinate> coordinates = new ArrayList<>();
                                     for (Object obj : location) {
                                         JSONObject loc = (JSONObject) obj;
@@ -143,20 +138,20 @@ public class ConsumeService implements ApplicationListener<ContextRefreshedEvent
                                         coordinates.add(new Coordinate(Double.valueOf(vertexLon), Double.valueOf(vertexLat)));
                                     }
                                     if(GeoUtils.isPointInPolygon(coordinates, new Coordinate(Double.valueOf(lon), Double.valueOf(lat)))){
-                                        warningInfoList.add(new Object[]{taskId, clue, clueType, warningInfoJSONStr, captureTime});
+                                        warningInfoList.add(new Object[]{taskId, resourceId, clue, clueType, warningInfoJSONStr, captureTime});
                                         break;
                                     }
                                 }
                             }
                         }
+                    } else {
+                        warningInfoList.add(new Object[]{taskId, resourceId, clue, clueType, warningInfoJSONStr, captureTime});
                     }
-                    // 原始预警数据中的"截获时间"
-//                    String captureTime = TimeTool.timeStampToDateString(Long.valueOf(oriField.getString("CAPTURE_TIME")));
-                    // 生成预警信息编号
-                    String warningMsgId = String.valueOf(generator.nextId());
                 }
-                // ========================= 3.批量入库符合判断条件的预警数据 =========================
-                ctrlDao.batchInsertWarningInfo(warningInfoList);
+                // ========================= 批量入库符合条件的预警数据
+                if(warningInfoList.size() > 0){
+                    ctrlDao.batchInsertWarningInfo(warningInfoList);
+                }
                 log.info("【QBEQSJYCTRL - 消费者服务】 当次消费的数据量:{},符合条件的数据量:{}", records.count(), warningInfoList.size());
                 warningInfoList.clear();
             }

+ 5 - 2
src/main/java/com/hhwy/qbeqsjy/strategy/Strategy.java

@@ -2,6 +2,9 @@ package com.hhwy.qbeqsjy.strategy;
 
 
 import com.alibaba.fastjson.JSONObject;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import java.util.List;
 
 /**
  * 预警数据(JSON)解析策略接口
@@ -9,8 +12,8 @@ import com.alibaba.fastjson.JSONObject;
 public interface Strategy {
 
     /**
-     * 定义数据解析方法
+     * 定义数据解析抽象方法
      */
-    JSONObject doParse(JSONObject warningInfo);
+    List<Object[]> doParse(ConsumerRecords<Integer, String> records, List<Object[]> warningInfoList);
 
 }

+ 99 - 0
src/main/java/com/hhwy/qbeqsjy/strategy/impl/AUTHStrategy.java

@@ -0,0 +1,99 @@
+package com.hhwy.qbeqsjy.strategy.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.hhwy.qbeqsjy.common.Constants;
+import com.hhwy.qbeqsjy.common.GeoUtils;
+import com.hhwy.qbeqsjy.common.TimeTool;
+import com.hhwy.qbeqsjy.dao.CtrlDao;
+import com.hhwy.qbeqsjy.strategy.Strategy;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.locationtech.jts.geom.Coordinate;
+import org.springframework.stereotype.Component;
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * AUTH表预警数据解析规则类
+ */
+@Component
+public class AUTHStrategy implements Strategy {
+
+    @Resource
+    private CtrlDao ctrlDao;
+
+
+    /**
+     * 解析处理ConsumerRecords对象,将处理完的结果封装到List集合中,并返回
+     * @param records
+     * @param warningInfoList
+     * @return
+     */
+    @Override
+    public List<Object[]> doParse(ConsumerRecords<Integer, String> records, List<Object[]> warningInfoList) {
+        if (records != null && records.count() > 0) {
+            for (ConsumerRecord<Integer, String> record : records) {
+                JSONObject message = JSONObject.parseObject(record.value());
+                JSONObject uniCtrl = message.getJSONObject("UNICTRL");
+                String clueId = uniCtrl.getString("U_CLUEID");
+                String[] clueIdStrArr = clueId.split("_");
+                String taskId = clueIdStrArr[Constants.NUM_1];
+                String clueType = clueIdStrArr[Constants.NUM_2];
+                String clue = clueIdStrArr[Constants.NUM_3];
+                String resourceId = uniCtrl.getString("U_RESID");
+                JSONObject oriField = message.getJSONObject("ORIFIELD");
+                String warningInfoJSONStr = oriField.toJSONString();
+
+                // 1. ========================= 过滤基站号为空的数据
+                String baseStationId = oriField.getString("BASE_STATION_ID");
+                if(StringUtils.isBlank(baseStationId)){
+                    continue;
+                }
+
+                // 2. ========================= 过滤不在布控区域范围的数据
+                List<Map<String, Object>> ctrlAreaInfoList = ctrlDao.queryCtrlAreaInfo(taskId);
+                if(ctrlAreaInfoList.size() > 0){
+                    String lon = oriField.getString("LONGITUDE");
+                    String lat = oriField.getString("LATITUDE");
+                    // 命中时间不采用预警数据中的【截获时间】,而采用该服务消费到当前数据时的自然时间作为命中时间
+                    String captureTime = TimeTool.timeStampToDateString(TimeTool.getNowTimeStamp());
+                    for (Map<String, Object> ctrlAreaInfo : ctrlAreaInfoList) {
+                        Integer areaGeoType = (Integer) ctrlAreaInfo.get("area_geo_type");
+                        JSONArray location = JSON.parseArray((String) ctrlAreaInfo.get("location"));
+                        // ****** 说明:如果出现布控区域范围存在重叠,并且命中点位也恰巧出现在重叠区域的极端情况,目前则只保存一份数据
+                        // 布控区域是圆形时
+                        if(Constants.NUM_0 == areaGeoType){
+                            JSONObject loc = location.getJSONObject(0);
+                            String centerLon = loc.getString("lon");
+                            String centerLat = loc.getString("lat");
+                            String radius = loc.getString("radius");
+                            if(GeoUtils.isPointInCircle(Double.valueOf(lat), Double.valueOf(lon), Double.valueOf(centerLat), Double.valueOf(centerLon), Double.valueOf(radius))){
+                                warningInfoList.add(new Object[]{taskId, resourceId, clue, clueType, warningInfoJSONStr, captureTime});
+                                break;
+                            }
+                        } else {   // 布控区域是一般多边形时
+                            List<Coordinate> coordinates = new ArrayList<>();
+                            for (Object obj : location) {
+                                JSONObject loc = (JSONObject) obj;
+                                String vertexLon = loc.getString("lon");
+                                String vertexLat = loc.getString("lat");
+                                coordinates.add(new Coordinate(Double.valueOf(vertexLon), Double.valueOf(vertexLat)));
+                            }
+                            if(GeoUtils.isPointInPolygon(coordinates, new Coordinate(Double.valueOf(lon), Double.valueOf(lat)))){
+                                warningInfoList.add(new Object[]{taskId, resourceId, clue, clueType, warningInfoJSONStr, captureTime});
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        return warningInfoList;
+    }
+}

+ 0 - 36
src/main/java/com/hhwy/qbeqsjy/strategy/impl/AuthStrategy.java

@@ -1,36 +0,0 @@
-package com.hhwy.qbeqsjy.strategy.impl;
-
-import com.alibaba.fastjson.JSONObject;
-import com.hhwy.qbeqsjy.common.Constants;
-import com.hhwy.qbeqsjy.common.TimeTool;
-import com.hhwy.qbeqsjy.strategy.Strategy;
-
-import java.util.Date;
-
-
-/**
- * AUTH表预警数据解析规则类
- */
-public class AuthStrategy implements Strategy {
-    @Override
-    public JSONObject doParse(JSONObject warningInfo) {
-        JSONObject oriField = warningInfo.getJSONObject("ORIFIELD");
-        // 线索值
-//        String clue = oriField.getString("SRC_AUTH_ACCOUNT");
-        // 命中时间
-        String captureTime = TimeTool.timeStampToDateString(Long.valueOf(oriField.getString("CAPTURE_TIME")));
-        // 经度
-        String lon = oriField.getString("LONGITUDE");
-        // 维度
-        String lat = oriField.getString("LATITUDE");
-        // 基站ID
-        String baseStationId = oriField.getString("BASE_STATION_ID");
-
-        return null;
-
-
-
-
-
-    }
-}

+ 0 - 3
src/main/java/com/hhwy/qbeqsjy/task/CtrlTask.java

@@ -1,6 +1,5 @@
 package com.hhwy.qbeqsjy.task;
 
-import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.hhwy.qbeqsjy.common.CommonUtil;
 import com.hhwy.qbeqsjy.common.Constants;
@@ -9,12 +8,10 @@ import com.hhwy.qbeqsjy.dao.CtrlDao;
 import com.hhwy.qbeqsjy.domain.AjaxResult;
 import com.hhwy.qbeqsjy.service.CtrlService;
 import lombok.extern.slf4j.Slf4j;
-
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import com.hhwy.qbeqsjy.common.HttpClient;
 
 

+ 3 - 2
src/main/resources/application.properties

@@ -57,8 +57,9 @@ stop.ctrl.txt.output.path=/home/qbeqsjy/stop_ctrl_error/
 # \u9884\u8B66\u6570\u636E\u67E5\u8BE2\u4E00\u6B21\u6700\u5927\u8FD4\u56DE\u6570\u636E\u6761\u6570\uFF0C\u9ED8\u8BA41000\u6761
 query.max.count=1000
 
-# \u5E03\u63A7\u8D44\u6E90\u5217\u8868
-res.auth.code=134B102
+# \u53EF\u5E03\u63A7\u8D44\u6E90\u5217\u8868
+# \u6570\u636E\u8D44\u6E90\u7F16\u7801=\u53EF\u89E3\u6790\u7684\u5B57\u6BB5
+134B102=BASE_STATION_ID,LONGITUDE,LATITUDE