|
@@ -42,8 +42,10 @@ public class CtrlService {
|
|
|
private String multiThreadEnabled;
|
|
|
@Value("${thread.num}")
|
|
|
private int threadNum;
|
|
|
- @Value("${txt.output.path}")
|
|
|
- private String txtOutputPath;
|
|
|
+ @Value("${ctrl.txt.output.path}")
|
|
|
+ private String ctrlTxtOutputPath;
|
|
|
+ @Value("${stop.ctrl.txt.output.path}")
|
|
|
+ private String stopCtrlTxtOutputPath;
|
|
|
@Resource
|
|
|
private CtrlDao ctrlDao;
|
|
|
@Value("${uniCtrl.add.url}")
|
|
@@ -54,65 +56,185 @@ public class CtrlService {
|
|
|
private String uniCtrlQueryUrl;
|
|
|
|
|
|
|
|
|
- public void processCtrl(CtrlDTO ctrlDTO) throws IOException {
|
|
|
+ /**
|
|
|
+ * 处理布控任务
|
|
|
+ * @param ctrlDTO
|
|
|
+ */
|
|
|
+ public void processCtrl(CtrlDTO ctrlDTO) {
|
|
|
String taskId = ctrlDTO.getTaskId();
|
|
|
String endCtrlTime = ctrlDTO.getEndCtrlTime();
|
|
|
List<Map<String, Object>> clueList = ctrlDTO.getClueList();
|
|
|
List<String> resourceIdList = ctrlDTO.getResourceIdList();
|
|
|
List<String> areaIdList = ctrlDTO.getAreaIdList();
|
|
|
- // 默认单线程处理任务
|
|
|
+ // 默认只开启一个子线程处理任务
|
|
|
if(!Constants.MULTI_THREAD_ENABLED_FLAG.equals(multiThreadEnabled)){
|
|
|
+ ExecutorService executorService = Executors.newFixedThreadPool(Constants.NUM_1);
|
|
|
+ executorService.execute(() -> {
|
|
|
+ Date createTime = new Date();
|
|
|
+ List<Object[]> ctrlErrorList = new ArrayList<>();
|
|
|
+ int ctrlNum = 0;
|
|
|
+ HttpClient httpClient = new HttpClient();
|
|
|
+ for (Map<String, Object> clueMap : clueList) {
|
|
|
+ if(!clueMap.isEmpty()){
|
|
|
+ // 重新组装前端传入的参数
|
|
|
+ String clue = String.valueOf(clueMap.get("clue"));
|
|
|
+ String clueType = String.valueOf(clueMap.get("clueType"));
|
|
|
+ clueMap.put("taskId", taskId);
|
|
|
+ clueMap.put("endCtrlTime", endCtrlTime);
|
|
|
+ clueMap.put("resourceIdList", resourceIdList);
|
|
|
+ clueMap.put("areaIdList", areaIdList);
|
|
|
+ AjaxResult result = doCtrl(httpClient, clueMap);
|
|
|
+ int code = (int) result.get("code");
|
|
|
+ if(code == 500){
|
|
|
+ // 记录布控失败的线索信息,等待当前任务布控操作执行结束后再批量入库,后续通过设置定时任务按照记录的布控失败线索信息继续实施布控
|
|
|
+ // 存在操作延迟,但保证数据最终一致性
|
|
|
+ Object[] errorData = new Object[]{taskId, clue, clueType, endCtrlTime, resourceIdList, areaIdList, createTime};
|
|
|
+ ctrlErrorList.add(errorData);
|
|
|
+ } else {
|
|
|
+ JSONObject data = (JSONObject) result.get("data");
|
|
|
+ if(data == null){
|
|
|
+ String msg = (String) result.get("msg");
|
|
|
+ log.info("任务ID:{},线索值:{},布控服务返回信息:{}", taskId, clue, msg);
|
|
|
+ } else {
|
|
|
+ String clueId = data.getString("clueId");
|
|
|
+ log.info("任务ID:{},线索值:{},线索ID:{}", taskId, clue, clueId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ctrlNum++;
|
|
|
+ }
|
|
|
+ // 每布控5000条线索,当前线程睡眠30s,减轻【统一布控】服务端压力
|
|
|
+ if(ctrlNum == Constants.NUM_5000){
|
|
|
+ CommonUtil.threadSleep(Constants.NUM_30000);
|
|
|
+ ctrlNum = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.info("--- > 任务ID:{},线索集合布控完成", taskId);
|
|
|
+ if(ctrlErrorList.size() > 0){
|
|
|
+ log.info("--- > 任务ID:{},线索集合布控失败的线索数量:{},布控失败数据开始执行批量入库操作...", taskId, ctrlErrorList.size());
|
|
|
+ boolean optFlag = ctrlDao.batchInsertCtrlErrorInfo(ctrlErrorList);
|
|
|
+ if(optFlag){
|
|
|
+ log.info("--- > 任务ID:{},线索集合布控失败数据批量入库成功,数据量:{}", taskId, ctrlErrorList.size());
|
|
|
+ } else {
|
|
|
+ log.error("--- > 任务ID:{},线索集合布控失败数据批量入库失败,现以.txt文件形式记录到服务器指定路径,后续可手动录入目标表", taskId);
|
|
|
+ StringBuffer sbRow = new StringBuffer();
|
|
|
+ BufferedWriter bw = null;
|
|
|
+ try {
|
|
|
+ bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(ctrlTxtOutputPath + taskId + "_" + TimeTool.getNowTimeStamp() + ".txt"), "UTF-8"));
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("--- > 任务ID:{},创建目标文件失败,异常信息:{}", taskId, e.getMessage());
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ // 字段信息:taskId, clue, clueType, endCtrlTime, resourceIdList, areaIdList, createTime
|
|
|
+ int dataRows = 0;
|
|
|
+ for (Object[] item : ctrlErrorList) {
|
|
|
+ sbRow.append(item[0]);
|
|
|
+ sbRow.append("\t");
|
|
|
+ sbRow.append(item[1]);
|
|
|
+ sbRow.append("\t");
|
|
|
+ sbRow.append(item[2]);
|
|
|
+ sbRow.append("\t");
|
|
|
+ sbRow.append(item[3]);
|
|
|
+ sbRow.append("\t");
|
|
|
+ sbRow.append(item[4]);
|
|
|
+ sbRow.append("\t");
|
|
|
+ sbRow.append(item[5]);
|
|
|
+ sbRow.append("\t");
|
|
|
+ sbRow.append(item[6]);
|
|
|
+ sbRow.append("\r\n");
|
|
|
+ dataRows++;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ bw.write(sbRow.toString());
|
|
|
+ bw.flush();
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("--- > 任务ID:{},线索集合布控失败数据写入目标文件失败,异常信息:{}", taskId, e.getMessage());
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ log.info("--- > 任务ID:{},线索集合布控失败数据已记录到目标文件,数据行数:{}", taskId, dataRows);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ executorService.shutdown();
|
|
|
+ } else { // 开启多线程处理任务
|
|
|
+ int accountsPerThread = clueList.size() / threadNum;
|
|
|
+ ExecutorService executor = Executors.newFixedThreadPool(threadNum);
|
|
|
+ for (int i = 0; i < threadNum; i++) {
|
|
|
+ int startIndex = i * accountsPerThread;
|
|
|
+ int endIndex = (i == threadNum - 1) ? clueList.size() : (i + 1) * accountsPerThread;
|
|
|
+ // 根据调用方提交的任务对应线索集合和每一个线程需处理线索集合中的起、止索引号创建对应的布控任务类
|
|
|
+ Callable<List<Map<String, String>>> task = new CtrlTask(taskId,i + 1, endCtrlTime, clueList, startIndex, endIndex, resourceIdList, areaIdList);
|
|
|
+ // 提交任务
|
|
|
+ executor.submit(task);
|
|
|
+ }
|
|
|
+ executor.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理停控任务
|
|
|
+ * @param taskId
|
|
|
+ * @param clueList
|
|
|
+ */
|
|
|
+ public void processStopCtrl(String taskId, JSONArray clueList) {
|
|
|
+ // 停控任务只开启一个子线程处理即可
|
|
|
+ ExecutorService executorService = Executors.newFixedThreadPool(Constants.NUM_1);
|
|
|
+ executorService.execute(() -> {
|
|
|
Date createTime = new Date();
|
|
|
- List<Object[]> ctrlErrorList = new ArrayList<>();
|
|
|
- int ctrlNum = 0;
|
|
|
+ List<Object[]> stopCtrlErrorList = new ArrayList<>();
|
|
|
+ int stopCtrlNum = 0;
|
|
|
HttpClient httpClient = new HttpClient();
|
|
|
- for (Map<String, Object> clueMap : clueList) {
|
|
|
- if(!clueMap.isEmpty()){
|
|
|
+ for (Object item : clueList) {
|
|
|
+ JSONObject clueObj = (JSONObject) item;
|
|
|
+ if(!clueObj.isEmpty()){
|
|
|
// 重新组装前端传入的参数
|
|
|
- String clue = String.valueOf(clueMap.get("clue"));
|
|
|
- String clueType = String.valueOf(clueMap.get("clueType"));
|
|
|
- clueMap.put("taskId", taskId);
|
|
|
- clueMap.put("endCtrlTime", endCtrlTime);
|
|
|
- clueMap.put("resourceIdList", resourceIdList);
|
|
|
- clueMap.put("areaIdList", areaIdList);
|
|
|
- AjaxResult result = doCtrl(httpClient, clueMap);
|
|
|
+ String clue = String.valueOf(clueObj.get("clue"));
|
|
|
+ String clueType = String.valueOf(clueObj.get("clueType"));
|
|
|
+ clueObj.put("taskId", taskId);
|
|
|
+ AjaxResult result = doStopCtrl(httpClient, clueObj);
|
|
|
int code = (int) result.get("code");
|
|
|
if(code == 500){
|
|
|
- // 记录布控失败的线索信息,等待当前任务布控操作执行结束后再批量入库,后续通过设置定时任务按照记录的布控失败线索信息继续实施布控
|
|
|
+ // 记录停控失败的线索信息,等待当前任务停控操作执行结束后再批量入库,后续通过设置定时任务按照记录的停控失败线索信息继续实施停控
|
|
|
// 存在操作延迟,但保证数据最终一致性
|
|
|
- Object[] errorData = new Object[]{taskId, clue, clueType, endCtrlTime, resourceIdList, areaIdList, createTime};
|
|
|
- ctrlErrorList.add(errorData);
|
|
|
+ Object[] errorData = new Object[]{taskId, clue, clueType, createTime};
|
|
|
+ stopCtrlErrorList.add(errorData);
|
|
|
} else {
|
|
|
JSONObject data = (JSONObject) result.get("data");
|
|
|
if(data == null){
|
|
|
String msg = (String) result.get("msg");
|
|
|
- log.info("任务ID:{},线索值:{},布控服务返回信息:{}", taskId, clue, msg);
|
|
|
+ log.info("任务ID:{},线索值:{},停控服务返回信息:{}", taskId, clue, msg);
|
|
|
} else {
|
|
|
String clueId = data.getString("clueId");
|
|
|
log.info("任务ID:{},线索值:{},线索ID:{}", taskId, clue, clueId);
|
|
|
}
|
|
|
}
|
|
|
- ctrlNum++;
|
|
|
+ stopCtrlNum++;
|
|
|
}
|
|
|
- // 每布控5000条线索,当前线程睡眠30s,减轻【统一布控】服务端压力
|
|
|
- if(ctrlNum == Constants.NUM_5000){
|
|
|
+ // 每停控5000条线索,当前线程睡眠30s,减轻【统一布控】服务端压力
|
|
|
+ if(stopCtrlNum == Constants.NUM_5000){
|
|
|
CommonUtil.threadSleep(Constants.NUM_30000);
|
|
|
- ctrlNum = 0;
|
|
|
+ stopCtrlNum = 0;
|
|
|
}
|
|
|
}
|
|
|
- log.info("--- > 任务ID:{},线索集合布控完成", taskId);
|
|
|
- if(ctrlErrorList.size() > 0){
|
|
|
- log.info("--- > 任务ID:{},线索集合布控失败的线索数量:{},布控失败数据开始执行批量入库操作...", taskId, ctrlErrorList.size());
|
|
|
- boolean optFlag = ctrlDao.batchInsertCtrlErrorInfo(ctrlErrorList);
|
|
|
+ log.info("--- > 任务ID:{},线索集合停控完成", taskId);
|
|
|
+ if(stopCtrlErrorList.size() > 0){
|
|
|
+ log.info("--- > 任务ID:{},线索集合停控失败的线索数量:{},停控失败数据开始执行批量入库操作...", taskId, stopCtrlErrorList.size());
|
|
|
+ boolean optFlag = ctrlDao.batchInsertStopCtrlErrorInfo(stopCtrlErrorList);
|
|
|
if(optFlag){
|
|
|
- log.info("--- > 任务ID:{},线索集合布控失败数据批量入库成功,数据量:{}", taskId, ctrlErrorList.size());
|
|
|
+ log.info("--- > 任务ID:{},线索集合停控失败数据批量入库成功,数据量:{}", taskId, stopCtrlErrorList.size());
|
|
|
} else {
|
|
|
- log.error("--- > 任务ID:{},线索集合布控失败数据批量入库失败,现以.txt文件形式记录到服务器指定路径,后续可手动录入目标表", taskId);
|
|
|
+ log.error("--- > 任务ID:{},线索集合停控失败数据批量入库失败,现以.txt文件形式记录到服务器指定路径,后续可手动录入目标表", taskId);
|
|
|
StringBuffer sbRow = new StringBuffer();
|
|
|
- BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(txtOutputPath + taskId + "_" + TimeTool.getNowTimeStamp() + ".txt"), "UTF-8"));
|
|
|
- // 字段信息:taskId, clue, clueType, endCtrlTime, resourceIdList, areaIdList, createTime
|
|
|
+ BufferedWriter bw = null;
|
|
|
+ try {
|
|
|
+ bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(stopCtrlTxtOutputPath + taskId + "_" + TimeTool.getNowTimeStamp() + ".txt"), "UTF-8"));
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("--- > 任务ID:{},创建目标文件失败,异常信息:{}", taskId, e.getMessage());
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ // 字段信息:taskId, clue, clueType, createTime
|
|
|
int dataRows = 0;
|
|
|
- for (Object[] item : ctrlErrorList) {
|
|
|
+ for (Object[] item : stopCtrlErrorList) {
|
|
|
sbRow.append(item[0]);
|
|
|
sbRow.append("\t");
|
|
|
sbRow.append(item[1]);
|
|
@@ -120,33 +242,21 @@ public class CtrlService {
|
|
|
sbRow.append(item[2]);
|
|
|
sbRow.append("\t");
|
|
|
sbRow.append(item[3]);
|
|
|
- sbRow.append("\t");
|
|
|
- sbRow.append(item[4]);
|
|
|
- sbRow.append("\t");
|
|
|
- sbRow.append(item[5]);
|
|
|
- sbRow.append("\t");
|
|
|
- sbRow.append(item[6]);
|
|
|
sbRow.append("\r\n");
|
|
|
dataRows++;
|
|
|
}
|
|
|
- bw.write(sbRow.toString());
|
|
|
- bw.flush();
|
|
|
- log.info("--- > 任务ID:{},线索集合布控失败数据已记录到目标文件,数据行数:{}", taskId, dataRows);
|
|
|
+ try {
|
|
|
+ bw.write(sbRow.toString());
|
|
|
+ bw.flush();
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("--- > 任务ID:{},线索集合停控失败数据写入目标文件失败,异常信息:{}", taskId, e.getMessage());
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ log.info("--- > 任务ID:{},线索集合停控失败数据已记录到目标文件,数据行数:{}", taskId, dataRows);
|
|
|
}
|
|
|
}
|
|
|
- } else { // 开启多线程处理任务
|
|
|
- int accountsPerThread = clueList.size() / threadNum;
|
|
|
- ExecutorService executor = Executors.newFixedThreadPool(threadNum);
|
|
|
- for (int i = 0; i < threadNum; i++) {
|
|
|
- int startIndex = i * accountsPerThread;
|
|
|
- int endIndex = (i == threadNum - 1) ? clueList.size() : (i + 1) * accountsPerThread;
|
|
|
- // 根据调用方提交的任务对应线索集合和每一个线程需处理线索集合中的起、止索引号创建对应的布控任务类
|
|
|
- Callable<List<Map<String, String>>> task = new CtrlTask(taskId,i + 1, endCtrlTime, clueList, startIndex, endIndex, resourceIdList, areaIdList);
|
|
|
- // 提交任务
|
|
|
- executor.submit(task);
|
|
|
- }
|
|
|
- executor.shutdown();
|
|
|
- }
|
|
|
+ });
|
|
|
+ executorService.shutdown();
|
|
|
}
|
|
|
|
|
|
|
|
@@ -308,22 +418,22 @@ public class CtrlService {
|
|
|
log.info("【线索停控状态: 成功】 clueId:{}", clueId);
|
|
|
Map<String, Object> resultMap = new HashMap<>();
|
|
|
resultMap.put("clueId", clueId);
|
|
|
- return AjaxResult.success(resultMap);
|
|
|
+ return AjaxResult.success(resultMap); // code:200
|
|
|
}else if(Constants.STOP_CTRL_TODO.equals(status)){
|
|
|
log.info("【线索停控状态: 待停控】 clueId: {}", clueId);
|
|
|
- return AjaxResult.warn(Constants.STOP_CTRL_TODO_MSG);
|
|
|
+ return AjaxResult.warn(Constants.STOP_CTRL_TODO_MSG); // code:601
|
|
|
}else {
|
|
|
log.info("【线索停控: 失败】 clueId: {}, 响应码:{}", clueId, status);
|
|
|
- return AjaxResult.error(Constants.STOP_CTRL_FAIL_MSG);
|
|
|
+ return AjaxResult.error(Constants.STOP_CTRL_FAIL_MSG); // code:500
|
|
|
}
|
|
|
} else if (Constants.STOP_CTRL_EXIST.equals(statusCode)){
|
|
|
log.error("【线索已经停控,无需再次发起停控】 clueId:{}", clueIdOrigin);
|
|
|
- return AjaxResult.success(message);
|
|
|
+ return AjaxResult.success(message); // code:200
|
|
|
|
|
|
} else {
|
|
|
// 如果停控请求处理失败,则直接返回异常信息,无需查询线索停控状态
|
|
|
log.error("【线索停控请求处理失败】 clueId:{}, 响应码:{}, 异常信息:{}", clueIdOrigin, statusCode, message);
|
|
|
- return AjaxResult.error(message);
|
|
|
+ return AjaxResult.error(message); // code:500
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -338,6 +448,7 @@ public class CtrlService {
|
|
|
String taskId = (String) frontParamsMap.get("taskId");
|
|
|
String clue = (String) frontParamsMap.get("clue");
|
|
|
String clueId = fromAreaCode.concat("_").concat(taskId).concat("_").concat(clue);
|
|
|
+ // 构建调用烽火停控接口所需的请求参数
|
|
|
JSONObject params = new JSONObject();
|
|
|
params.put("clue_id", clueId);
|
|
|
params.put("from_areacode", fromAreaCode);
|