|
@@ -1,13 +1,10 @@
|
|
|
package com.hhwy.qbeqsjy.kafka.service;
|
|
|
|
|
|
-import com.alibaba.fastjson.JSONArray;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
-import com.hhwy.qbeqsjy.common.CommonUtil;
|
|
|
import com.hhwy.qbeqsjy.common.Constants;
|
|
|
import com.hhwy.qbeqsjy.common.TimeTool;
|
|
|
import com.hhwy.qbeqsjy.dao.CtrlDao;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
@@ -16,12 +13,10 @@ import org.springframework.context.ApplicationListener;
|
|
|
import org.springframework.context.event.ContextRefreshedEvent;
|
|
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
-
|
|
|
import javax.annotation.Resource;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
-import java.util.Date;
|
|
|
import java.util.List;
|
|
|
|
|
|
/**
|
|
@@ -61,19 +56,19 @@ public class ConsumeService implements ApplicationListener<ContextRefreshedEvent
|
|
|
log.info("kafka集群已开启安全模式");
|
|
|
AuthService.securityPrepare();
|
|
|
} catch (IOException e) {
|
|
|
- log.error("kafka集群安全认证失败!异常信息 : {}.", e);
|
|
|
+ log.error("kafka集群安全认证失败,异常信息 : {}.", e);
|
|
|
return;
|
|
|
}
|
|
|
log.info("kafka集群安全认证成功");
|
|
|
}
|
|
|
AuthService consumerAuthService = new AuthService(topic);
|
|
|
- log.info("【QBEQSJYCTRL】 消费者服务已启动");
|
|
|
+ log.info("【QBEQSJYCTRL - 消费者服务】 服务已启动");
|
|
|
process(consumerAuthService.consumer, topic);
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * 消费目标topic中预警结果数据
|
|
|
+ * 消费目标topic中的预警结果数据,处理后批量入库
|
|
|
* @param consumer
|
|
|
* @param topic
|
|
|
*/
|
|
@@ -83,29 +78,16 @@ public class ConsumeService implements ApplicationListener<ContextRefreshedEvent
|
|
|
while (true) {
|
|
|
// 间隔30秒拉取一次数据
|
|
|
ConsumerRecords<Integer, String> records = consumer.poll(Constants.WAIT_TIME);
|
|
|
- // 业务处理逻辑(解析、处理、入库)
|
|
|
+ // 数据解析、入库
|
|
|
if (records != null && records.count() > 0) {
|
|
|
- Date createTime = new Date();
|
|
|
for (ConsumerRecord<Integer, String> record : records) {
|
|
|
JSONObject message = JSONObject.parseObject(record.value());
|
|
|
// log.info(message.toJSONString());
|
|
|
- // 解析[ORIFIELD]节点,该节点数据格式与布控资源表有关,资源表不同,数据格式不同
|
|
|
+ // 获取[ORIFIELD]节点数据,该节点数据格式与布控资源表有关,表结构不同,数据格式不同
|
|
|
JSONObject oriField = message.getJSONObject("ORIFIELD");
|
|
|
- String baseStationId = oriField.getString("BASE_STATION_ID");
|
|
|
- // 过滤基站ID为空的数据
|
|
|
- if (StringUtils.isBlank(baseStationId)) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- // 查询当前任务对应的布控区域信息,进行范围比对
|
|
|
-
|
|
|
-
|
|
|
- String lon = oriField.getString("LONGITUDE");
|
|
|
- String lat = oriField.getString("LATITUDE");
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- // 解析[UNICTRL]节点,该节点数据格式保持一致,与布控资源表无关
|
|
|
+ // 考虑到[ORIFIELD]节点数据格式不统一,此处不进行解析,数据域统一入库,并以接口形式同步到用户域后,由前端业务针对不同的资源表进行定制化解析
|
|
|
+ String warningInfoJSONStr = oriField.toJSONString();
|
|
|
+ // 解析[UNICTRL]节点,该节点数据为统一格式,与布控资源表无关
|
|
|
JSONObject uniCtrl = message.getJSONObject("UNICTRL");
|
|
|
// 样例:clueId = 120000_59275350793392128_D201005_13502082832
|
|
|
String clueId = uniCtrl.getString("U_CLUEID");
|
|
@@ -113,17 +95,16 @@ public class ConsumeService implements ApplicationListener<ContextRefreshedEvent
|
|
|
String taskId = clueIdStrArr[Constants.NUM_1];
|
|
|
String clueType = clueIdStrArr[Constants.NUM_2];
|
|
|
String clue = clueIdStrArr[Constants.NUM_3];
|
|
|
- String warningInfoJSONStr = message.toJSONString();
|
|
|
- String resourceId = uniCtrl.getString("U_RESID");
|
|
|
- String captureTime = TimeTool.timeStampToDateString(Long.valueOf(oriField.getString("CAPTURE_TIME")));
|
|
|
- Object[] warningInfo = new Object[]{taskId, clue, clueType, warningInfoJSONStr, createTime};
|
|
|
+ // 原始预警数据中的"截获时间"
|
|
|
+// String captureTime = TimeTool.timeStampToDateString(Long.valueOf(oriField.getString("CAPTURE_TIME")));
|
|
|
+ // 不采用原始预警数据中的"截获时间",而使用当前时间作为"截获时间"
|
|
|
+ String captureTime = TimeTool.timeStampToDateString(TimeTool.getNowTimeStamp());
|
|
|
+ Object[] warningInfo = new Object[]{taskId, clue, clueType, warningInfoJSONStr, captureTime};
|
|
|
warningInfoList.add(warningInfo);
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
- //====================== 3.符合条件的预警数据批量入库 - to do ======================
|
|
|
+ //====================== 预警数据批量入库 ======================
|
|
|
ctrlDao.batchInsertWarningInfo(warningInfoList);
|
|
|
- log.info("【QBEQSJYCTRL - 消费者】 已消费数据量:{}", warningInfoList.size());
|
|
|
+ log.info("【QBEQSJYCTRL - 消费者服务】 当次消费的数据量:{}", warningInfoList.size());
|
|
|
warningInfoList.clear();
|
|
|
}
|
|
|
}
|