|
@@ -4,8 +4,10 @@ import com.alibaba.fastjson.JSONArray;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.hhwy.qbeqsjy.common.CommonUtil;
|
|
|
import com.hhwy.qbeqsjy.common.Constants;
|
|
|
+import com.hhwy.qbeqsjy.common.TimeTool;
|
|
|
import com.hhwy.qbeqsjy.dao.CtrlDao;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
@@ -87,39 +89,37 @@ public class ConsumeService implements ApplicationListener<ContextRefreshedEvent
|
|
|
for (ConsumerRecord<Integer, String> record : records) {
|
|
|
JSONObject message = JSONObject.parseObject(record.value());
|
|
|
// log.info(message.toJSONString());
|
|
|
- // 解析[UNICTRL]节点
|
|
|
+ // 解析[ORIFIELD]节点,该节点数据格式与布控资源表有关,资源表不同,数据格式不同
|
|
|
+ JSONObject oriField = message.getJSONObject("ORIFIELD");
|
|
|
+ String baseStationId = oriField.getString("BASE_STATION_ID");
|
|
|
+ // 过滤基站ID为空的数据
|
|
|
+ if (StringUtils.isBlank(baseStationId)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 查询当前任务对应的布控区域信息,进行范围比对
|
|
|
+
|
|
|
+
|
|
|
+ String lon = oriField.getString("LONGITUDE");
|
|
|
+ String lat = oriField.getString("LATITUDE");
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ // 解析[UNICTRL]节点,该节点数据格式保持一致,与布控资源表无关
|
|
|
JSONObject uniCtrl = message.getJSONObject("UNICTRL");
|
|
|
- String clueId = uniCtrl.getString("U_CLUEID"); // 样例:120000_59275350793392128_D201005_13502082832
|
|
|
+ // 样例:clueId = 120000_59275350793392128_D201005_13502082832
|
|
|
+ String clueId = uniCtrl.getString("U_CLUEID");
|
|
|
String[] clueIdStrArr = clueId.split("_");
|
|
|
String taskId = clueIdStrArr[Constants.NUM_1];
|
|
|
String clueType = clueIdStrArr[Constants.NUM_2];
|
|
|
String clue = clueIdStrArr[Constants.NUM_3];
|
|
|
String warningInfoJSONStr = message.toJSONString();
|
|
|
String resourceId = uniCtrl.getString("U_RESID");
|
|
|
+ String captureTime = TimeTool.timeStampToDateString(Long.valueOf(oriField.getString("CAPTURE_TIME")));
|
|
|
Object[] warningInfo = new Object[]{taskId, clue, clueType, warningInfoJSONStr, createTime};
|
|
|
warningInfoList.add(warningInfo);
|
|
|
|
|
|
|
|
|
- // 解析[ORIFIELD]节点
|
|
|
- JSONObject oriField = message.getJSONObject("ORIFIELD");
|
|
|
- // 获取配置文件中数据资源编码对应的字段信息(以JSON字符串形式存储)
|
|
|
- String fieldJSONStr = CommonUtil.getConfValueByKey(resourceId);
|
|
|
-
|
|
|
-
|
|
|
- //====================== 1.过滤基站号为空的数据 ======================
|
|
|
-// if (StringUtils.isNotBlank(baseStationId)) {
|
|
|
-// JSONObject msg = new JSONObject();
|
|
|
-// msg.put("clue", clue);
|
|
|
-// msg.put("longitude", longitude);
|
|
|
-// msg.put("latitude", latitude);
|
|
|
-// msg.put("baseStationId", baseStationId);
|
|
|
-// msg.put("captureTime", captureTime);
|
|
|
-// consumeCount++;
|
|
|
-// hitMsgList.add(message);
|
|
|
-// //====================== 2.过滤掉经纬度不在指定区域范围的数据 - to do ======================
|
|
|
-
|
|
|
-
|
|
|
-// }
|
|
|
}
|
|
|
//====================== 3.符合条件的预警数据批量入库 - to do ======================
|
|
|
ctrlDao.batchInsertWarningInfo(warningInfoList);
|