|
@@ -1,13 +1,19 @@
|
|
|
package com.hhwy.qbeqsjy.kafka.service;
|
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.hhwy.qbeqsjy.common.Constants;
|
|
|
+import com.hhwy.qbeqsjy.common.GeoUtils;
|
|
|
+import com.hhwy.qbeqsjy.common.SnowflakeIdGenerator;
|
|
|
import com.hhwy.qbeqsjy.common.TimeTool;
|
|
|
import com.hhwy.qbeqsjy.dao.CtrlDao;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
+import org.locationtech.jts.geom.Coordinate;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.context.ApplicationListener;
|
|
|
import org.springframework.context.event.ContextRefreshedEvent;
|
|
@@ -18,6 +24,7 @@ import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
|
|
|
/**
|
|
|
* 布控命中数据处理
|
|
@@ -29,10 +36,14 @@ public class ConsumeService implements ApplicationListener<ContextRefreshedEvent
|
|
|
|
|
|
@Value("${topic.name}")
|
|
|
private String topic;
|
|
|
+ @Value("${clue.code.phone}")
|
|
|
+ private String clueCodePhone;
|
|
|
+ @Value("${clue.code.idCard}")
|
|
|
+ private String clueCodeIdCard;
|
|
|
@Resource
|
|
|
private CtrlDao ctrlDao;
|
|
|
- //默认发送1000条消息
|
|
|
- public static final int messageNumToSend = 1000;
|
|
|
+ private SnowflakeIdGenerator generator = new SnowflakeIdGenerator(1);
|
|
|
+
|
|
|
|
|
|
// IOC容器刷新完成后就会发布ContextRefreshedEvent事件
|
|
|
@Override
|
|
@@ -78,7 +89,7 @@ public class ConsumeService implements ApplicationListener<ContextRefreshedEvent
|
|
|
while (true) {
|
|
|
// 间隔30秒拉取一次数据
|
|
|
ConsumerRecords<Integer, String> records = consumer.poll(Constants.WAIT_TIME);
|
|
|
- // 数据解析、入库
|
|
|
+ // 数据解析、处理、入库
|
|
|
if (records != null && records.count() > 0) {
|
|
|
for (ConsumerRecord<Integer, String> record : records) {
|
|
|
JSONObject message = JSONObject.parseObject(record.value());
|
|
@@ -89,22 +100,64 @@ public class ConsumeService implements ApplicationListener<ContextRefreshedEvent
|
|
|
String warningInfoJSONStr = oriField.toJSONString();
|
|
|
// 解析[UNICTRL]节点,该节点数据为统一格式,与布控资源表无关
|
|
|
JSONObject uniCtrl = message.getJSONObject("UNICTRL");
|
|
|
- // 样例:clueId = 120000_59275350793392128_D201005_13502082832
|
|
|
- String clueId = uniCtrl.getString("U_CLUEID");
|
|
|
+ String clueId = uniCtrl.getString("U_CLUEID"); // 样例:clueId:120000_59275350793392128_D201005_13502082832
|
|
|
String[] clueIdStrArr = clueId.split("_");
|
|
|
String taskId = clueIdStrArr[Constants.NUM_1];
|
|
|
String clueType = clueIdStrArr[Constants.NUM_2];
|
|
|
String clue = clueIdStrArr[Constants.NUM_3];
|
|
|
+ String resourceId = uniCtrl.getString("U_RESID");
|
|
|
+ // 如果线索类型是手机号
|
|
|
+ if(clueCodePhone.equals(clueType) && "134B102".equals(resourceId)){
|
|
|
+ String baseStationId = oriField.getString("BASE_STATION_ID");
|
|
|
+ // 1. ========================= 过滤基站号为空的数据 =========================
|
|
|
+ if(StringUtils.isBlank(baseStationId)){
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ List<Map<String, Object>> ctrlAreaInfoList = ctrlDao.queryCtrlAreaInfo(taskId);
|
|
|
+ if(ctrlAreaInfoList.size() > 0){
|
|
|
+ String lon = oriField.getString("LONGITUDE");
|
|
|
+ String lat = oriField.getString("LATITUDE");
|
|
|
+ // 2. ========================= 过滤不在布控区域范围的数据 =========================
|
|
|
+ // 不采用原始预警数据中的"截获时间",而使用当前时间作为"截获时间"
|
|
|
+ String captureTime = TimeTool.timeStampToDateString(TimeTool.getNowTimeStamp());
|
|
|
+ for (Map<String, Object> ctrlAreaInfo : ctrlAreaInfoList) {
|
|
|
+ Integer areaGeoType = (Integer) ctrlAreaInfo.get("area_geo_type");
|
|
|
+ JSONArray location = JSON.parseArray((String) ctrlAreaInfo.get("location"));
|
|
|
+ // ****** 说明:如果出现布控区域范围存在重叠,并且命中点位也恰巧出现在重叠区域的极端情况,目前则只保存一份数据
|
|
|
+ // 布控区域是圆形
|
|
|
+ if(Constants.NUM_0 == areaGeoType){
|
|
|
+ JSONObject loc = location.getJSONObject(0);
|
|
|
+ String centerLon = loc.getString("lon");
|
|
|
+ String centerLat = loc.getString("lat");
|
|
|
+ String radius = loc.getString("radius");
|
|
|
+ if(GeoUtils.isPointInCircle(Double.valueOf(lat), Double.valueOf(lon), Double.valueOf(centerLat), Double.valueOf(centerLon), Double.valueOf(radius))){
|
|
|
+ warningInfoList.add(new Object[]{taskId, clue, clueType, warningInfoJSONStr, captureTime});
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } else { // 布控区域是一般多边形
|
|
|
+ List<Coordinate> coordinates = new ArrayList<>();
|
|
|
+ for (Object obj : location) {
|
|
|
+ JSONObject loc = (JSONObject) obj;
|
|
|
+ String vertexLon = loc.getString("lon");
|
|
|
+ String vertexLat = loc.getString("lat");
|
|
|
+ coordinates.add(new Coordinate(Double.valueOf(vertexLon), Double.valueOf(vertexLat)));
|
|
|
+ }
|
|
|
+ if(GeoUtils.isPointInPolygon(coordinates, new Coordinate(Double.valueOf(lon), Double.valueOf(lat)))){
|
|
|
+ warningInfoList.add(new Object[]{taskId, clue, clueType, warningInfoJSONStr, captureTime});
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
// 原始预警数据中的"截获时间"
|
|
|
// String captureTime = TimeTool.timeStampToDateString(Long.valueOf(oriField.getString("CAPTURE_TIME")));
|
|
|
- // 不采用原始预警数据中的"截获时间",而使用当前时间作为"截获时间"
|
|
|
- String captureTime = TimeTool.timeStampToDateString(TimeTool.getNowTimeStamp());
|
|
|
- Object[] warningInfo = new Object[]{taskId, clue, clueType, warningInfoJSONStr, captureTime};
|
|
|
- warningInfoList.add(warningInfo);
|
|
|
+ // 生成预警信息编号
|
|
|
+ String warningMsgId = String.valueOf(generator.nextId());
|
|
|
}
|
|
|
- //====================== 预警数据批量入库 ======================
|
|
|
+ // ========================= 3.批量入库符合判断条件的预警数据 =========================
|
|
|
ctrlDao.batchInsertWarningInfo(warningInfoList);
|
|
|
- log.info("【QBEQSJYCTRL - 消费者服务】 当次消费的数据量:{}", warningInfoList.size());
|
|
|
+ log.info("【QBEQSJYCTRL - 消费者服务】 当次消费的数据量:{},符合条件的数据量:{}", records.count(), warningInfoList.size());
|
|
|
warningInfoList.clear();
|
|
|
}
|
|
|
}
|