Browse Source

【情报二期】数据域服务 - 布控任务下发接口功能调整;更新配置文件

peifj 3 months ago
parent
commit
ab7f74163d

+ 7 - 0
pom.xml

@@ -42,6 +42,13 @@
       <version>3.5.1</version>
     </dependency>-->
 
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-validation</artifactId>
+      <version>2.5.15</version>
+    </dependency>
+
+
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter</artifactId>

+ 1 - 0
src/main/java/com/hhwy/qbeqsjy/common/Constants.java

@@ -266,6 +266,7 @@ public class Constants {
     public static final Double D_6 = 6.0d;
     public static final Double D_0 = 0.0d;
     public static final long L_0 = 0L;
+    public static final long NUMBER_5000L = 5000L;
     public static final long NUMBER_3000L = 3000L;
     public static final long NUMBER_2000L = 2000L;
     public static final int NUM_7776000 = 7776000;

+ 45 - 29
src/main/java/com/hhwy/qbeqsjy/controller/CtrlController.java

@@ -2,67 +2,83 @@ package com.hhwy.qbeqsjy.controller;
 
 import com.hhwy.qbeqsjy.common.Constants;
 import com.hhwy.qbeqsjy.domain.AjaxResult;
-import com.hhwy.qbeqsjy.dto.BusinessDTO;
+import com.hhwy.qbeqsjy.dto.CtrlDTO;
+import com.hhwy.qbeqsjy.dto.SubmitDTO;
 import com.hhwy.qbeqsjy.service.CtrlService;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.*;
 
 
 @Slf4j
 @RestController
-@RequestMapping("/api/v1/ctrl")
+@RequestMapping("/api/v1/")
 public class CtrlController {
 
     @Value("${clue.type.list}")
     private String accountTypeList;
-    @Value("${sys.id.list}")
-    private String sysIdList;
     @Autowired
     private CtrlService ctrlService;
 
 
     /**
-     * 下发布控任务
-     * @param businessDTO
+     * 下发布控任务操作
+     * @param ctrlDTO
      * @return
      */
-    @PostMapping("/submitCtrlTask")
-    public AjaxResult submitCtrlTask(@RequestBody BusinessDTO businessDTO){
-        String taskId = businessDTO.getTaskId();
-        String account = businessDTO.getAccount();
-        String accountType = businessDTO.getAccountType();
-        String sysId = businessDTO.getSysId();
-        if(StringUtils.isBlank(taskId) || StringUtils.isBlank(account) || StringUtils.isBlank(accountType) || StringUtils.isBlank(sysId)){
-            return AjaxResult.error(Constants.PARAMS_EMPTY_MSG);
+    @PostMapping("/ctrl")
+    public AjaxResult ctrl(@Validated @RequestBody CtrlDTO ctrlDTO){
+        try {
+            ctrlService.ctrl(ctrlDTO);
+            return AjaxResult.success();
+        }catch (Exception e){
+            log.error("任务布控发生异常", e);
         }
-        if(!accountTypeList.contains(accountType) || !sysIdList.contains(sysId)){
+        return AjaxResult.error(Constants.SERVER_ERROR_MSG);
+    }
+
+
+    /**
+     * 下发布控任务
+     * @param submitDTO
+     * @return
+     */
+    @PostMapping("/submit")
+    public AjaxResult submit(@Validated @RequestBody SubmitDTO submitDTO){
+        String accountType = submitDTO.getAccountType();
+        if(!accountTypeList.contains(accountType)){
             return AjaxResult.error(Constants.PARAMS_ILLEGAL_MSG);
         }
-        return ctrlService.ctrl(businessDTO);
+        try{
+            return ctrlService.submitCtrl(submitDTO);
+        } catch (Exception e){
+            log.error("任务布控发生异常", e);
+        }
+        return AjaxResult.error(Constants.SERVER_ERROR_MSG);
+
     }
 
 
     /**
      * 停止布控任务
-     * @param businessDTO
+     * @param submitDTO
      * @return
      */
-    @PostMapping("/stopCtrlTask")
-    public AjaxResult stopCtrlTask(@RequestBody BusinessDTO businessDTO){
-        String taskId = businessDTO.getTaskId();
-        String account = businessDTO.getAccount();
-        String accountType = businessDTO.getAccountType();
-        String sysId = businessDTO.getSysId();
-        if(StringUtils.isBlank(taskId) || StringUtils.isBlank(account) || StringUtils.isBlank(accountType) || StringUtils.isBlank(sysId)){
-            return AjaxResult.error(Constants.PARAMS_EMPTY_MSG);
-        }
-        if(!accountTypeList.contains(accountType) || !sysIdList.contains(sysId)){
+    @PostMapping("/stop")
+    public AjaxResult stop(@Validated @RequestBody SubmitDTO submitDTO){
+        String accountType = submitDTO.getAccountType();
+        if(!accountTypeList.contains(accountType)){
             return AjaxResult.error(Constants.PARAMS_ILLEGAL_MSG);
         }
-        return ctrlService.stopCtrl(businessDTO);
+        try{
+            return ctrlService.stopCtrl(submitDTO);
+        } catch (Exception e){
+            log.error("任务停控发生异常", e);
+        }
+        return AjaxResult.error(Constants.SERVER_ERROR_MSG);
+
     }
 
 

+ 26 - 0
src/main/java/com/hhwy/qbeqsjy/dto/CtrlDTO.java

@@ -0,0 +1,26 @@
+package com.hhwy.qbeqsjy.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotBlank;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * 业务数据DTO对象
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class CtrlDTO {
+
+    // 任务编号
+    @NotBlank(message = "taskId不能为空")
+    private String taskId;
+    // 线索值集合   Map<线索类型编码, 线索值>
+    private List<Map<String, String>> acconutList;
+
+}

+ 10 - 5
src/main/java/com/hhwy/qbeqsjy/dto/BusinessDTO.java → src/main/java/com/hhwy/qbeqsjy/dto/SubmitDTO.java

@@ -3,6 +3,9 @@ package com.hhwy.qbeqsjy.dto;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import javax.validation.constraints.NotBlank;
+import java.util.List;
+import java.util.Map;
 
 
 /**
@@ -11,15 +14,18 @@ import lombok.NoArgsConstructor;
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
-public class BusinessDTO {
+public class SubmitDTO {
 
     // 任务编号
+    @NotBlank(message = "taskId不能为空")
     private String taskId;
     // 线索值
+    @NotBlank(message = "account不能为空")
     private String account;
     // 线索类型编码
+    @NotBlank(message = "accountType不能为空")
     private String accountType;
-    // 系统ID
+    // 业务系统ID
     private String sysId;
     // 开始布控时间
     private String startCtrlTime;
@@ -29,8 +35,7 @@ public class BusinessDTO {
     private String resourceIdList;
     // 布控区域编号集合
     private String areaIdList;
-
-
-
+    // 线索值集合   Map<线索类型编码, 线索值>
+    private List<Map<String, String>> acconutList;
 
 }

+ 120 - 100
src/main/java/com/hhwy/qbeqsjy/service/CtrlService.java

@@ -7,13 +7,17 @@ import com.hhwy.qbeqsjy.common.Constants;
 import com.hhwy.qbeqsjy.common.HttpClient;
 import com.hhwy.qbeqsjy.common.TimeTool;
 import com.hhwy.qbeqsjy.domain.AjaxResult;
-import com.hhwy.qbeqsjy.dto.BusinessDTO;
+import com.hhwy.qbeqsjy.dto.CtrlDTO;
+import com.hhwy.qbeqsjy.dto.SubmitDTO;
+import com.hhwy.qbeqsjy.task.CtrlTask;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.*;
 
 @Slf4j
 @Service
@@ -21,6 +25,8 @@ public class CtrlService {
 
     @Value("${from.areacode}")
     private String fromAreaCode;
+    @Value("${sys.id}")
+    private String sysId;
     @Value("${to.areacode}")
     private String toAreaCode;
     @Value("${to.sys}")
@@ -31,6 +37,8 @@ public class CtrlService {
     private String servVer;
     @Value("${sysuserid}")
     private String sysUserId;
+    @Value("${thread.num}")
+    private int threadNum;
     @Value("${uniCtrl.add.url}")
     private String uniCtrlAddUrl;
     @Value("${uniCtrl.del.url}")
@@ -40,112 +48,127 @@ public class CtrlService {
     private HttpClient httpClient = new HttpClient();
 
 
+    public void ctrl(CtrlDTO ctrlDTO) {
+        String taskId = ctrlDTO.getTaskId();
+        // 计算每一个线程需要布控的线索数量
+        List<Map<String, String>> acconutList = ctrlDTO.getAcconutList();
+        int accountsPerThread = acconutList.size() / threadNum;
+        // 创建线程池
+        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
+        // 创建任务集合
+//        List<Future<List<Map<String, String>>>> futureList = new ArrayList<>();
+        for (int i = 0; i < threadNum; i++) {
+            int startIndex = i * accountsPerThread;
+            int endIndex = (i == threadNum - 1) ? acconutList.size() : (i + 1) * accountsPerThread;
+            // 根据调用方提交的任务对应线索集合和每一个线程需处理线索集合中的起、止索引号创建对应的布控任务类
+            Callable<List<Map<String, String>>> task = new CtrlTask(taskId,i + 1, acconutList, startIndex, endIndex);
+            // 提交任务
+            executor.submit(task);
+//            Future<List<Map<String, String>>> future = executor.submit(task);
+//            futureList.add(future);
+        }
+        executor.shutdown();
+    }
+
+
     /**
      * 调用烽火【统一布控】接口,下发布控任务
-     * @param businessDTO
+     * @param submitDTO
      * @return
      */
-    public AjaxResult ctrl(BusinessDTO businessDTO){
-        String taskId = businessDTO.getTaskId();
-        String account = businessDTO.getAccount();
-        String accountType = businessDTO.getAccountType();
-        String clueIdSrc = fromAreaCode.concat(taskId).concat("_").concat(accountType).concat(account);
-        try{
-            // 1.调用布控接口,下发布控任务
-            JSONObject resJSONObj = doCtrl(httpClient, businessDTO);
-            String statusCode = resJSONObj.getString("statuscode");
-            String message = resJSONObj.getString("message");
-            if (Constants.CTRL_PROCESS_SUCESS.equals(statusCode)) {
-                // 2.布控请求处理成功(不代表实际布控成功,需查询线索布控状态信息)
-                String clueId = resJSONObj.getString("clue_id");
-                log.info("【布控请求处理成功】 clueId:{}", clueId);
-                // 3.查询线索实际布控状态
-                log.info("【查询线索布控状态】 clueId:{}", clueId);
-                String status = queryCtrlStatus(httpClient, businessDTO);
-                if(Constants.CTRL_SUCCESS.equals(status)){
-                    log.info("【线索布控状态: 成功】 clueId:{}", clueId);
-                    Map<String, Object> resultMap = new HashMap<>();
-                    resultMap.put("clueId", clueId);
-                    return AjaxResult.success(resultMap);
-                }else if(Constants.CTRL_TODO.equals(status)){
-                    log.info("【线索布控状态: 待布控】 clueId:{}", clueId);
-                    return AjaxResult.success(Constants.CTRL_TODO_MSG);
-                }else {
-                    log.info("【线索布控状态: 失败】 clueId: {}, 响应码:{}", clueId, status);
-                    return AjaxResult.error(Constants.CTRL_FAIL_MSG);
-                }
-            } else if (Constants.CTRL_EXIST.equals(statusCode)){
-                log.error("【线索已经在控,无需再次发起布控】 clueId:{}", clueIdSrc);
-                return AjaxResult.success(message);
-            } else {
-                // 如果布控请求处理失败,则直接返回异常信息,无需查询线索布控状态
-                log.error("【线索布控请求处理失败】 clueId:{}, 响应码:{}, 异常信息:{}", clueIdSrc, statusCode, message);
-                return AjaxResult.error(message);
+    public AjaxResult submitCtrl(SubmitDTO submitDTO){
+        String taskId = submitDTO.getTaskId();
+        String account = submitDTO.getAccount();
+        String accountType = submitDTO.getAccountType();
+        String clueIdOrigin = fromAreaCode.concat(taskId).concat("_").concat(accountType).concat(account);
+        // 1.调用布控接口,下发布控任务
+        JSONObject resJSONObj = doSubmitCtrl(httpClient, submitDTO);
+        String statusCode = resJSONObj.getString("statuscode");
+        String message = resJSONObj.getString("message");
+        if (Constants.CTRL_PROCESS_SUCESS.equals(statusCode)) {
+            // 2.布控请求处理成功(不代表实际布控成功,需查询线索布控状态信息)
+            String clueId = resJSONObj.getString("clue_id");
+            log.info("【布控请求处理成功】 clueId:{}", clueId);
+            // 3.查询线索实际布控状态
+            log.info("【查询线索布控状态】 clueId:{}", clueId);
+            String status = queryCtrlStatus(httpClient, submitDTO);
+            if(Constants.CTRL_SUCCESS.equals(status)){
+                log.info("【线索布控状态: 成功】 clueId:{}", clueId);
+                Map<String, Object> resultMap = new HashMap<>();
+                resultMap.put("clueId", clueId);
+                // 4.保存布控任务信息到业务库(烽火统一布控接口目前只支持单次调用)   --- todo
+
+                return AjaxResult.success(resultMap);   // code:200
+            }else if(Constants.CTRL_TODO.equals(status)){
+                log.info("【线索布控状态: 待布控】 clueId:{}", clueId);
+                return AjaxResult.warn(Constants.CTRL_TODO_MSG);   // code:601
+            }else {
+                log.info("【线索布控状态: 失败】 clueId: {}, 响应码:{}", clueId, status);
+                return AjaxResult.error(Constants.CTRL_FAIL_MSG);   // code:500
             }
-        } catch (Exception e){
-            log.error("【下发布控任务发生异常】 异常信息:", e);
+        } else if (Constants.CTRL_EXIST.equals(statusCode)){
+            log.error("【线索已经在控,无需再次发起布控】 clueId:{}", clueIdOrigin);
+            return AjaxResult.success(message);
+        } else {
+            // 如果布控请求处理失败,则直接返回异常信息,无需查询线索布控状态
+            log.error("【线索布控请求处理失败】 clueId:{}, 响应码:{}, 异常信息:{}", clueIdOrigin, statusCode, message);
+            return AjaxResult.error(message);
         }
-        return AjaxResult.error(Constants.SERVER_ERROR_MSG);
     }
 
 
     /**
      * 调用烽火【统一布控】接口,下发停控任务
-     * @param businessDTO
+     * @param submitDTO
      * @return
      */
-    public AjaxResult stopCtrl(BusinessDTO businessDTO){
-        String taskId = businessDTO.getTaskId();
-        String account = businessDTO.getAccount();
-        String accountType = businessDTO.getAccountType();
-        String clueIdSrc = fromAreaCode.concat(taskId).concat("_").concat(accountType).concat(account);
-        try{
-            // 调用停控接口
-            JSONObject responseJSONObj = doStopCtrl(httpClient, businessDTO);
-            String statusCode = responseJSONObj.getString("statuscode");
-            String message = responseJSONObj.getString("message");
-            if (Constants.CTRL_PROCESS_SUCESS.equals(statusCode)) {
-                String clueId = responseJSONObj.getString("clue_id");
-                log.info("【线索停控请求处理成功】 clueId:{}", clueId);
-                log.info("【查询线索停控状态】 clueId:{}", clueId);
-                String status = queryCtrlStatus(httpClient, businessDTO);
-                if(Constants.STOP_CTRL_SUCCESS.equals(status)){
-                    log.info("【线索停控状态: 成功】 clueId:{}", clueId);
-                    Map<String, Object> resultMap = new HashMap<>();
-                    resultMap.put("clueId", clueId);
-                    return AjaxResult.success(resultMap);
-                }else if(Constants.STOP_CTRL_TODO.equals(status)){
-                    log.info("【线索停控状态: 待停控】 clueId: {}", clueId);
-                    return AjaxResult.success(Constants.STOP_CTRL_TODO_MSG);
-                }else {
-                    log.info("【线索停控: 失败】 clueId: {}, 响应码:{}", clueId, status);
-                    return AjaxResult.error(Constants.STOP_CTRL_FAIL_MSG);
-                }
-            } else if (Constants.STOP_CTRL_EXIST.equals(statusCode)){
-                log.error("【线索已经停控,无需再次发起停控】 clueId:{}", clueIdSrc);
-                return AjaxResult.success(message);
-
-            } else {
-                // 如果停控请求处理失败,则直接返回异常信息,无需查询线索停控状态
-                log.error("【线索停控请求处理失败】 clueId:{}, 响应码:{}, 异常信息:{}", clueIdSrc, statusCode, message);
-                return AjaxResult.error(message);
+    public AjaxResult stopCtrl(SubmitDTO submitDTO){
+        String taskId = submitDTO.getTaskId();
+        String account = submitDTO.getAccount();
+        String accountType = submitDTO.getAccountType();
+        String clueIdOrigin = fromAreaCode.concat(taskId).concat("_").concat(accountType).concat(account);
+        // 调用停控接口
+        JSONObject responseJSONObj = doStopCtrl(httpClient, submitDTO);
+        String statusCode = responseJSONObj.getString("statuscode");
+        String message = responseJSONObj.getString("message");
+        if (Constants.CTRL_PROCESS_SUCESS.equals(statusCode)) {
+            String clueId = responseJSONObj.getString("clue_id");
+            log.info("【线索停控请求处理成功】 clueId:{}", clueId);
+            log.info("【查询线索停控状态】 clueId:{}", clueId);
+            String status = queryCtrlStatus(httpClient, submitDTO);
+            if(Constants.STOP_CTRL_SUCCESS.equals(status)){
+                log.info("【线索停控状态: 成功】 clueId:{}", clueId);
+                Map<String, Object> resultMap = new HashMap<>();
+                resultMap.put("clueId", clueId);
+                return AjaxResult.success(resultMap);
+            }else if(Constants.STOP_CTRL_TODO.equals(status)){
+                log.info("【线索停控状态: 待停控】 clueId: {}", clueId);
+                return AjaxResult.warn(Constants.STOP_CTRL_TODO_MSG);
+            }else {
+                log.info("【线索停控: 失败】 clueId: {}, 响应码:{}", clueId, status);
+                return AjaxResult.error(Constants.STOP_CTRL_FAIL_MSG);
             }
-        } catch (Exception e){
-            log.error("【下发停控任务发生异常】 异常信息:", e);
+        } else if (Constants.STOP_CTRL_EXIST.equals(statusCode)){
+            log.error("【线索已经停控,无需再次发起停控】 clueId:{}", clueIdOrigin);
+            return AjaxResult.success(message);
+
+        } else {
+            // 如果停控请求处理失败,则直接返回异常信息,无需查询线索停控状态
+            log.error("【线索停控请求处理失败】 clueId:{}, 响应码:{}, 异常信息:{}", clueIdOrigin, statusCode, message);
+            return AjaxResult.error(message);
         }
-        return AjaxResult.error(Constants.SERVER_ERROR_MSG);
     }
 
 
     /**
      * 调用烽火【统一布控】接口, 下发布控请求 - 布控失败时,重试3次
      * @param httpClient
-     * @param businessDTO
+     * @param submitDTO
      * @return
      */
-    public JSONObject doCtrl(HttpClient httpClient, BusinessDTO businessDTO) {
+    public JSONObject doSubmitCtrl(HttpClient httpClient, SubmitDTO submitDTO) {
         // 构建请求参数
-        String resourceIdList = businessDTO.getResourceIdList();
+        String resourceIdList = submitDTO.getResourceIdList();
         JSONObject params = new JSONObject();
         // 将配置文件中的数据资源业务ID转换为烽火盘古平台数据资源编码
         if(StringUtils.isNotBlank(resourceIdList) && resourceIdList.split(",").length > 0){
@@ -162,12 +185,11 @@ public class CtrlService {
         } else {
             throw new RuntimeException("待布控数据资源列表为空,布控任务下发失败");
         }
-        String taskId = businessDTO.getTaskId();
-        String account = businessDTO.getAccount();
-        String accountType = businessDTO.getAccountType();
+        String taskId = submitDTO.getTaskId();
+        String account = submitDTO.getAccount();
+        String accountType = submitDTO.getAccountType();
         String clueId = fromAreaCode.concat(taskId).concat("_").concat(accountType).concat(account);
-        String sysId = businessDTO.getSysId();
-        String endCtrlTime = businessDTO.getEndCtrlTime();
+        String endCtrlTime = submitDTO.getEndCtrlTime();
 
         params.put("clue_id", clueId);
         params.put("from_areacode", fromAreaCode);
@@ -204,15 +226,14 @@ public class CtrlService {
     /**
      * 调用烽火【统一布控】接口, 下发停控请求 - 布控失败时,重试3次
      * @param httpClient
-     * @param businessDTO
+     * @param submitDTO
      * @return
      */
-    public JSONObject doStopCtrl(HttpClient httpClient, BusinessDTO businessDTO) {
+    public JSONObject doStopCtrl(HttpClient httpClient, SubmitDTO submitDTO) {
         JSONObject params = new JSONObject();
-        String taskId = businessDTO.getTaskId();
-        String account = businessDTO.getAccount();
-        String accountType = businessDTO.getAccountType();
-        String sysId = businessDTO.getSysId();
+        String taskId = submitDTO.getTaskId();
+        String account = submitDTO.getAccount();
+        String accountType = submitDTO.getAccountType();
         String clueId = fromAreaCode.concat(taskId).concat("_").concat(accountType).concat(account);
         params.put("clue_id", clueId);
         params.put("from_areacode", fromAreaCode);
@@ -239,15 +260,14 @@ public class CtrlService {
     /**
      * 调用烽火【统一布控】接口, 查询线索布/停控状态 - 查询失败时,重试3次
      * @param httpClient
-     * @param businessDTO 需要查询状态的布控内容
+     * @param submitDTO 需要查询状态的布控内容
      * @return String 布/停控状态
      */
-    public String queryCtrlStatus(HttpClient httpClient, BusinessDTO businessDTO) {
+    public String queryCtrlStatus(HttpClient httpClient, SubmitDTO submitDTO) {
         JSONObject params = new JSONObject();
-        String taskId = businessDTO.getTaskId();
-        String account = businessDTO.getAccount();
-        String accountType = businessDTO.getAccountType();
-        String sysId = businessDTO.getSysId();
+        String taskId = submitDTO.getTaskId();
+        String account = submitDTO.getAccount();
+        String accountType = submitDTO.getAccountType();
         String clueId = fromAreaCode.concat(taskId).concat("_").concat(accountType).concat(account);
         params.put("clue_id", clueId);
         params.put("from_areacode", fromAreaCode);

+ 76 - 0
src/main/java/com/hhwy/qbeqsjy/task/CtrlTask.java

@@ -0,0 +1,76 @@
+package com.hhwy.qbeqsjy.task;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.hhwy.qbeqsjy.common.CommonUtil;
+import com.hhwy.qbeqsjy.common.Constants;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import java.util.*;
+import java.util.concurrent.Callable;
+import com.hhwy.qbeqsjy.common.HttpClient;
+import org.springframework.stereotype.Component;
+
+
+/**
+ * 布控任务类
+ */
+@Slf4j
+public class CtrlTask implements Callable<List<Map<String, String>>> {
+
+    // 任务ID
+    private String taskId;
+    // 任务执行批次号
+    private int batchNum;
+    // 当前任务需处理的全量线索集合
+    private List<Map<String, String>> completeAccountList;
+    // 每一个线程需处理的全量线索集合中的起始索引
+    private int startIndex;
+    // 每一个线程需处理的全量线索集合中的结束索引
+    private int endIndex;
+    private static final String QBEQ_CTRL_URL = CommonUtil.getConfValueByKey("qbeqCtrl.add.url");
+
+    public CtrlTask(String taskId, int batchNum, List<Map<String, String>> completeAccountList, int startIndex, int endIndex) {
+        this.taskId = taskId;
+        this.batchNum = batchNum;
+        this.completeAccountList = completeAccountList;
+        this.startIndex = startIndex;
+        this.endIndex = endIndex;
+    }
+
+    // 任务行为
+    @Override
+    public List<Map<String, String>> call() {
+        HttpClient httpClient = new HttpClient();
+        List<Map<String, String>> ctrlErrorList = new ArrayList<>();
+        /*List<Map<String, String>> elementList = completeAccountList.subList(startIndex, endIndex);
+        for (int i = 0; i < elementList.size(); i++) {
+            Map<String, String> elementMap = elementList.get(i);
+            if(!elementMap.isEmpty()){
+                String account = elementMap.get("account");
+                String response = httpClient.postResource(QBEQ_CTRL_URL, JSON.toJSONString(elementMap), null);
+                JSONObject result = JSON.parseObject(response);
+                int code = result.getIntValue("code");
+                if(code == 500){
+                    // 记录布控失败的线索,等待当前布控任务执行结束后再统一入库,后续扫表继续布控
+                    ctrlErrorList.add(elementMap);
+                } else {
+                    JSONObject data = result.getJSONObject("data");
+                    if(data == null){
+                        String msg = result.getString("msg");
+                        log.info("任务ID:{},线索值:{},布控服务返回信息:{}", this.taskId, account, msg);
+                    } else {
+                        String clueId = data.getString("clueId");
+                        log.info("任务ID:{},线索值:{},线索ID:{}", this.taskId, account, clueId);
+                    }
+                }
+            }
+        }
+        log.info("--- > 任务ID:{},第{}批次线索集合布控完成", this.taskId, this.batchNum);*/
+        log.info("模拟任务执行......");
+        CommonUtil.threadSleep(Constants.NUMBER_5000L);
+        log.info("--- > 任务ID:{},第{}批次线索集合布控完成", this.taskId, this.batchNum);
+        return ctrlErrorList;
+    }
+
+}

+ 10 - 5
src/main/resources/application.properties

@@ -16,8 +16,8 @@ spring.datasource.password=123
 # \u7EBF\u7D22\u7C7B\u578B\u7F16\u7801\u5217\u8868 - \u624B\u673A\u53F7,\u8EAB\u4EFD\u8BC1\u53F7\uFF08\u591A\u503C\u9017\u53F7\u5206\u9694\uFF09
 clue.type.list=D201005,D310111
 # \u70FD\u706B - \u3010\u7EDF\u4E00\u5E03\u63A7\u3011\u670D\u52A1\u914D\u7F6E
-# \u6388\u6743\u7CFB\u7EDFID\u5217\u8868\uFF08\u591A\u503C\u9017\u53F7\u5206\u9694\uFF09
-sys.id.list=QBEQSJYCTRL
+# \u6388\u6743\u4E1A\u52A1\u7CFB\u7EDFID
+sys.id=QBEQSJYCTRL
 # \u8BF7\u6C42\u5730\u5E02
 from.areacode=120000
 # \u76EE\u7684\u5730\u5E02
@@ -30,16 +30,21 @@ end.ctrl.time=-1
 serv.ver=2.1
 # \u8B66\u5458ID
 sysuserid=fenghuo
-# \u4E0B\u53D1\u5E03\u63A7\u63A5\u53E3url
+# \u3010\u7EDF\u4E00\u5E03\u63A7\u3011\u4E0B\u53D1\u5E03\u63A7\u63A5\u53E3url
 uniCtrl.add.url=http://130.0.46.141:8090/lokiRest/uniCtrl/monitor/add
-# \u505C\u6B62\u5E03\u63A7\u63A5\u53E3url
+# \u3010\u7EDF\u4E00\u5E03\u63A7\u3011\u505C\u6B62\u5E03\u63A7\u63A5\u53E3url
 uniCtrl.del.url=http://130.0.46.141:8090/lokiRest/uniCtrl/monitor/del
-# \u5E03\u63A7\u7ED3\u679C\u67E5\u8BE2\u63A5\u53E3url
+# \u3010\u7EDF\u4E00\u5E03\u63A7\u3011\u5E03\u63A7\u7ED3\u679C\u67E5\u8BE2\u63A5\u53E3url
 uniCtrl.query.url=http://130.0.46.141:8090/lokiRest/unictrl/query/status
+# \u3010\u60C5\u62A5\u4E8C\u671F\u3011\u4E0B\u53D1\u5E03\u63A7\u63A5\u53E3url - \u539F\u751F\u63A5\u53E3\u5305\u88C5\u7248\u672C
+qbeqCtrl.add.url=http://localhost:2535/qbeqsjy/api/v1/ctrl/submit
 # \u534E\u4E3A - kafka\u914D\u7F6E
 # \u4E1A\u52A1topic
 topic.name=UNICTRL_DATA_QBEQSJYCTRL_V1.0
 
+# \u4EFB\u52A1\u6267\u884C\u7EBF\u7A0B\u6570\u91CF
+thread.num=3
+
 # \u5E03\u63A7\u8D44\u6E90\u5217\u8868
 # \u4E1A\u52A1ID\uFF1A\u70FD\u706B\u76D8\u53E4\u5E73\u53F0\u6570\u636E\u8D44\u6E90\u6807\u8BC6\u7B26
 1=134B102