Ver Fonte

【情报二期】数据域服务 - 对接华为Gaussdb,测试数据域消费topic数据批量入库无问题

peifj há 2 meses atrás
pai
commit
e50fefe899

+ 15 - 7
pom.xml

@@ -4,15 +4,11 @@
   <groupId>com.hhwy</groupId>
   <artifactId>QBEQSJY</artifactId>
 
-  <packaging>war</packaging>
+  <packaging>jar</packaging>
   <version>1.0-SNAPSHOT</version>
-  <name>QBEQSJY Maven Webapp</name>
+  <name>QBEQSJY</name>
   <url>http://maven.apache.org</url>
 
-  <build>
-    <finalName>QBEQSJY</finalName>
-  </build>
-
   <properties>
     <maven.compiler.source>8</maven.compiler.source>
     <maven.compiler.target>8</maven.compiler.target>
@@ -637,7 +633,19 @@
       <systemPath>${project.basedir}/lib/postgresql-42.7.5.jar</systemPath>
     </dependency>
 
-
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-maven-plugin</artifactId>
+        <version>3.4.4</version>
+        <configuration>
+          <mainClass>com.hhwy.QBEQSJYApplication</mainClass>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>

+ 5 - 0
src/main/java/com/hhwy/qbeqsjy/controller/CtrlController.java

@@ -3,12 +3,14 @@ package com.hhwy.qbeqsjy.controller;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.hhwy.qbeqsjy.common.Constants;
+import com.hhwy.qbeqsjy.dao.CtrlDao;
 import com.hhwy.qbeqsjy.domain.AjaxResult;
 import com.hhwy.qbeqsjy.dto.CtrlDTO;
 import com.hhwy.qbeqsjy.service.CtrlService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.web.bind.annotation.*;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -23,11 +25,14 @@ public class CtrlController {
 
     @Autowired
     private CtrlService ctrlService;
+    @Autowired
+    private JdbcTemplate gaussDBJdbcTemplate;
 
 
     @GetMapping("/test")
     public AjaxResult test(){
         log.info("测试接口");
+        log.info(gaussDBJdbcTemplate.queryForList("select * from t_user").toString());
         return AjaxResult.success("test......");
     }
 

+ 17 - 17
src/main/java/com/hhwy/qbeqsjy/dao/CtrlDao.java

@@ -20,7 +20,7 @@ public class CtrlDao {
     @Value("${clue.code.phone}")
     private String clueCodePhone;
     @Resource
-    private JdbcTemplate gaussDBJdbcTemplate;
+    private JdbcTemplate gaussdbJdbcTemplate;
 
 
     /**
@@ -31,7 +31,7 @@ public class CtrlDao {
     public boolean batchInsertCtrlErrorInfo(List<Object[]> ctrlErrorList){
         String sql = "insert into t_ctrl_error_info (task_id, clue, clue_type, end_ctrl_time, resource_id_list, area_id_list, create_time) values (?, ?, ?, ?, ?, ?, ?)";
         try {
-            int[] insertRows = gaussDBJdbcTemplate.batchUpdate(sql, ctrlErrorList);
+            int[] insertRows = gaussdbJdbcTemplate.batchUpdate(sql, ctrlErrorList);
             return insertRows.length == ctrlErrorList.size();
         } catch (Exception e){
             log.error("【批量插入布控失败线索数据发生异常】 异常信息:", e);
@@ -48,7 +48,7 @@ public class CtrlDao {
     public boolean batchInsertStopCtrlErrorInfo(List<Object[]> stopCtrlErrorList){
         String sql = "insert into t_stop_ctrl_error_info (task_id, clue, clue_type, create_time) values (?, ?, ?, ?)";
         try {
-            int[] insertRows = gaussDBJdbcTemplate.batchUpdate(sql, stopCtrlErrorList);
+            int[] insertRows = gaussdbJdbcTemplate.batchUpdate(sql, stopCtrlErrorList);
             return insertRows.length == stopCtrlErrorList.size();
         } catch (Exception e){
             log.error("【批量插入停控失败线索数据发生异常】 异常信息:", e);
@@ -66,7 +66,7 @@ public class CtrlDao {
     /*public boolean insertTaskInfo(Object[] taskInfo){
         String sql = "insert into t_ctrl_task_info (task_id, warning_msg_id, clue_person_name, clue_person_idCard, clue_person_phone, clue_person_type, clue_person_label, ctrl_level, task_obj_name, receive_unit_code, receive_unit_name, receive_dept_code, receive_dept_name, apply_user_name, apply_user_idCrad, apply_user_policeNo, apply_user_unit_code, apply_user_unit_name, business_source) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
         try {
-            int insertRows = gaussDBJdbcTemplate.update(sql, taskInfo);
+            int insertRows = jdbcTemplate.update(sql, taskInfo);
             return insertRows > 0;
         } catch (Exception e){
             log.error("【插入布控任务信息发生异常】 异常信息:", e);
@@ -83,7 +83,7 @@ public class CtrlDao {
     /*public boolean batchInsertCtrlAreaInfo(List<Object[]> ctrlAreaInfoList){
         String sql = "insert into t_ctrl_area_info (task_id, area_id, area_name, area_geo_type, location, create_time) values (?, ?, ?, ?, ?, ?)";
         try {
-            int[] insertRows = gaussDBJdbcTemplate.batchUpdate(sql, ctrlAreaInfoList);
+            int[] insertRows = jdbcTemplate.batchUpdate(sql, ctrlAreaInfoList);
             return insertRows.length == ctrlAreaInfoList.size();
         } catch (Exception e){
             log.error("【批量插入布控任务区域信息发生异常】 异常信息:", e);
@@ -101,7 +101,7 @@ public class CtrlDao {
     public boolean batchInsertCtrlTaskInfo(List<Object[]> ctrlTaskInfoList){
         String sql = "insert into t_ctrl_task_info (rwbh, rwmc, rwlx, rwlxmc, rwdxlxmc, sqr_xm, sqr_sfzh, sqr_jh, sqr_dwbm, sqr_dwmc, bkqyxx, ryxm, zjlxdm, zjhm, hjd, sjh, yzzt, xszt, rylb, rybq, gkjb, gkjbmc, zrjz, zrmjid, zrmjxm, zrmjjh, zrmjlxfs, zrdwbm, zrbmbm, cjsj) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
         try {
-            int[] insertRows = gaussDBJdbcTemplate.batchUpdate(sql, ctrlTaskInfoList);
+            int[] insertRows = gaussdbJdbcTemplate.batchUpdate(sql, ctrlTaskInfoList);
             return insertRows.length == ctrlTaskInfoList.size();
         } catch (Exception e){
             log.error("【批量插入布控任务相关信息发生异常】 异常信息:", e);
@@ -130,7 +130,7 @@ public class CtrlDao {
         sql.append(clue);
         sql.append("' limit 1");
         try {
-            ctrlTaskInfoList = gaussDBJdbcTemplate.queryForList(sql.toString());
+            ctrlTaskInfoList = gaussdbJdbcTemplate.queryForList(sql.toString());
         } catch (Exception e){
             log.error("【查询布控任务相关信息发生异常】 异常信息:", e);
         }
@@ -146,7 +146,7 @@ public class CtrlDao {
     public int batchInsertWarningInfo(List<Object[]> warningInfoList){
         String sql = "insert into t_ctrl_warning_info (rwbh, rwmc, rwlx, rwlxmc, rwdxlxmc, yjxxbh, yjry_xm, yjry_sfzh, yjry_sjh, yjxxsm, yjzt, yjztmc, yjsj, zrjz, zrmjid, zrmjxm, zrmjjh, zrmjlxfs, zrdwbm, zrdwmc, zrbmbm, zrbmmc, sqr_xm, sqr_sfzh, sqr_jh, sqr_dwbm, sqr_dwmc, yjry_lb, yjry_bq, gkjb, gkjbmc, ywly, cjsj, gxsj) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
         try {
-            int[] insertRows = gaussDBJdbcTemplate.batchUpdate(sql, warningInfoList);
+            int[] insertRows = gaussdbJdbcTemplate.batchUpdate(sql, warningInfoList);
             return insertRows.length;
         } catch (Exception e){
             log.error("【批量插入布控任务预警结果数据发生异常】 异常信息:", e);
@@ -190,7 +190,7 @@ public class CtrlDao {
         sql.append(" offset ");
         sql.append((pageNum - Constants.NUM_1) * pageSize);
         try {
-            warningInfoList = gaussDBJdbcTemplate.queryForList(sql.toString());
+            warningInfoList = gaussdbJdbcTemplate.queryForList(sql.toString());
             log.info("【查询布控任务预警结果数据成功】 数据量:{}", warningInfoList.size());
         } catch (Exception e){
             log.error("【查询布控任务预警结果数据发生异常】 异常信息:", e);
@@ -205,9 +205,9 @@ public class CtrlDao {
      * @return
      */
     public Timestamp getStartQueryTimeBySysId(String sysId){
-        String sql = "select start_query_time from t_ctrl_warning_query_info where sysId = ?";
+        String sql = "select start_query_time from t_ctrl_warning_query_info where sys_id = ?";
         try {
-            return (Timestamp) gaussDBJdbcTemplate.queryForMap(sql, sysId).get("start_query_time");
+            return (Timestamp) gaussdbJdbcTemplate.queryForMap(sql, sysId).get("start_query_time");
         } catch (Exception e){
             log.error("【根据系统ID获取预警数据查询记录表中对应的起始查询时间发生异常】 异常信息:", e);
         }
@@ -222,7 +222,7 @@ public class CtrlDao {
     public Timestamp getNewMaxCreateTime(Timestamp startQueryTime){
         String sql = "select max(cjsj) max_cjsj from t_ctrl_warning_info where cjsj > ?";
         try {
-            return (Timestamp) gaussDBJdbcTemplate.queryForMap(sql, startQueryTime).get("max_cjsj");
+            return (Timestamp) gaussdbJdbcTemplate.queryForMap(sql, startQueryTime).get("max_cjsj");
         } catch (Exception e){
             log.error("【基于起始查询时间,计算当前预警数据表中的最大入库时间发生异常】 异常信息:", e);
         }
@@ -239,7 +239,7 @@ public class CtrlDao {
         List<Map<String, Object>> warningInfoList = new ArrayList<>();
         String sql = "select rwbh, rwmc, rwlx, rwlxmc, rwdxlxmc, yjxxbh, yjry_xm, yjry_sfzh, yjry_sjh, yjxxsm, yjzt, yjztmc, yjsj, zrjz, zrmjid, zrmjxm, zrmjjh, zrmjlxfs, zrdwbm, zrdwmc, zrbmbm, zrbmmc, sqr_xm, sqr_sfzh, sqr_jh, sqr_dwbm, sqr_dwmc, yjry_lb, yjry_bq, gkjb, gkjbmc, ywly, cjsj, gxsj from t_ctrl_warning_info where yjsj > ? and yjsj <= ? order by yjsj desc";
         try {
-            warningInfoList = gaussDBJdbcTemplate.queryForList(sql, startQueryTime, newMaxCreateTime);
+            warningInfoList = gaussdbJdbcTemplate.queryForList(sql, startQueryTime, newMaxCreateTime);
             log.info("【增量查询布控任务预警结果数据成功】 数据量:{}", warningInfoList.size());
         } catch (Exception e){
             log.error("【增量查询布控任务预警结果数据发生异常】 异常信息:", e);
@@ -253,9 +253,9 @@ public class CtrlDao {
      * @return
      */
     public boolean updateStartQueryTimeBySysId(Timestamp newMaxCreateTime, String sysId){
-        String sql = "update t_ctrl_warning_query_info set start_query_time = ? where where sysId = ?";
+        String sql = "update t_ctrl_warning_query_info set start_query_time = ? where where sys_id = ?";
         try {
-            int updateRows = gaussDBJdbcTemplate.update(sql, newMaxCreateTime, sysId);
+            int updateRows = gaussdbJdbcTemplate.update(sql, newMaxCreateTime, sysId);
             return updateRows > 0;
         } catch (Exception e){
             log.error("【根据系统ID更新预警数据查询记录表中对应的起始查询时间发生异常】 异常信息:", e);
@@ -272,7 +272,7 @@ public class CtrlDao {
     /*public boolean batchInsertCtrlClueInfo(List<Object[]> clueInfoList){
         String sql = "insert into t_ctrl_clue_info (rwbh, ryxm, zjlxdm, zjhm, hjd, sjh, xswjbh, xswjm, yzzt, yzztmc, xszt, rylb, rybq, gkjb, gkjbmc, cjsj) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
         try {
-            int[] insertRows = gaussDBJdbcTemplate.batchUpdate(sql, clueInfoList);
+            int[] insertRows = gaussdbJdbcTemplate.batchUpdate(sql, clueInfoList);
             return insertRows.length == clueInfoList.size();
         } catch (Exception e){
             log.error("【批量插入布控任务线索人员数据发生异常】 异常信息:", e);
@@ -290,7 +290,7 @@ public class CtrlDao {
         Map<String, Object> modelDictInfoMap = new HashMap<>();
         String sql = "select id, yjzdlxbm, yjzdlxmc, ejzdlxbm, ejzdlxmc, sanjzdlxbm, sanjzdlxmc, sjzdlxbm, sjzdlxmc, wjzdlxbm, wjzdlxmc, ljzdlxbm, ljzdlxmc, bz from t_model_dict_info where id = ?";
         try {
-            modelDictInfoMap = gaussDBJdbcTemplate.queryForMap(sql, resourceId);
+            modelDictInfoMap = gaussdbJdbcTemplate.queryForMap(sql, resourceId);
             log.info("【根据数据资源业务ID查询对应模型配置字典信息成功】");
         } catch (Exception e){
             log.error("【根据数据资源业务ID查询对应模型配置字典信息发生异常】 异常信息:", e);

+ 14 - 11
src/main/java/com/hhwy/qbeqsjy/kafka/service/ConsumeService.java

@@ -51,7 +51,7 @@ public class ConsumeService implements ApplicationListener<ContextRefreshedEvent
         executor.execute(() -> {
             if (contextRefreshedEvent.getApplicationContext().getParent() == null) {
                 try {
-//                    this.dataConsume();
+                    this.dataConsume();
                 } catch (Exception e) {
                     log.error("【QBEQSJYCTRL】 数据处理服务启动异常", e);
                 }
@@ -94,12 +94,15 @@ public class ConsumeService implements ApplicationListener<ContextRefreshedEvent
                 for (ConsumerRecord<Integer, String> record : records) {
                     JSONObject message = JSONObject.parseObject(record.value());
                     JSONObject uniCtrl = message.getJSONObject("UNICTRL");
-                    /*String clueId = uniCtrl.getString("U_CLUEID");
-                    String[] clueIdStrArr = clueId.split("_");
-                    String rwbh = clueIdStrArr[Constants.NUM_1];
-                    String clueType = clueIdStrArr[Constants.NUM_2];
-                    String clue = clueIdStrArr[Constants.NUM_3];
-                    String resourceId = uniCtrl.getString("U_RESID");*/
+                    String rwbh = "59616388210888704";
+                    String clueType = "D201005";
+                    String clue = "18920718758";
+//                    String clueId = uniCtrl.getString("U_CLUEID");
+//                    String[] clueIdStrArr = clueId.split("_");
+//                    String rwbh = clueIdStrArr[Constants.NUM_1];
+//                    String clueType = clueIdStrArr[Constants.NUM_2];
+//                    String clue = clueIdStrArr[Constants.NUM_3];
+                    String resourceId = uniCtrl.getString("U_RESID");
                     // 每张布控资源表对应的命中数据JSON对象
                     JSONObject oriField = message.getJSONObject("ORIFIELD");
 //                    String yjxxsm = oriField.toJSONString();
@@ -112,7 +115,7 @@ public class ConsumeService implements ApplicationListener<ContextRefreshedEvent
                     // 预警状态默认为0(未读)
                     Integer yjzt = 0;
                     // 根据clueId中的任务编号、线索值、线索类型查询对应的布控任务相关信息(包含任务信息、区域信息、线索信息)
-                    /*List<Map<String, Object>> ctrlTaskInfoList = ctrlDao.queryCtrlTaskInfo(rwbh, clue, clueType);
+                    List<Map<String, Object>> ctrlTaskInfoList = ctrlDao.queryCtrlTaskInfo(rwbh, clue, clueType);
                     if(ctrlTaskInfoList.size() > 0){
                         Map<String, Object> ctrlTaskInfo = ctrlTaskInfoList.get(0);
                         String rwmc = String.valueOf(ctrlTaskInfo.get("rwmc"));
@@ -201,12 +204,12 @@ public class ConsumeService implements ApplicationListener<ContextRefreshedEvent
                         } else {   // 如果是其他类布控资源,预警数据直接入库
                             warningInfoList.add(new Object[]{rwbh, rwmc, rwlx, rwlxmc, rwdxlxmc, yjxxbh, yjryXm, yjrySfzh, yjrySjh, yjxxsm.toString(), yjzt, Constants.YJZT_WD, yjsj, zrjz, zrmjid, zrmjxm, zrmjjh, zrmjlxfs, zrdwbm, CommonUtil.getConfValueByKey(zrdwbm), zrbmbm, CommonUtil.getConfValueByKey(zrbmbm), sqrXm, sqrSfzh, sqrJh, sqrDwbm, sqrDwmc, yjryLb, yjryBq, gkjb, gkjbmc, resourceId, yjsj, yjsj});
                         }
-                    }*/
+                    }
                 }
                 // ========================= 批量入库符合条件的预警数据
-                /*if(warningInfoList.size() > 0){
+                if(warningInfoList.size() > 0){
                     batchSaveNum = ctrlDao.batchInsertWarningInfo(warningInfoList);
-                }*/
+                }
                 log.info("【QBEQSJYCTRL - 单人预警消费者服务】 当次消费到的数据量:{},符合条件保存入库的数据量:{}", records.count(), batchSaveNum);
             }
         }

+ 11 - 9
src/main/resources/application.properties

@@ -7,15 +7,16 @@ server.tomcat.threads.max=800
 server.tomcat.threads.min-spare=100
 
 
-# \u6570\u636E\u5E93\u914D\u7F6E\uFF08\u540E\u7EED\u66FF\u6362GaussDB\uFF09
+# \u6570\u636E\u5E93\u914D\u7F6E
 spring.datasource.driver-class-name=org.postgresql.Driver
-spring.datasource.url=jdbc:postgresql://localhost:5432/qbeq?currentSchema=public
-spring.datasource.username=postgres
-spring.datasource.password=123456
+spring.datasource.url=jdbc:postgresql://130.0.39.3:25308/sjy_365?currentSchema=qbeq_sjy
+spring.datasource.username=sjy_kx_365
+spring.datasource.password=sjy365#$
+
+#spring.datasource.url=jdbc:postgresql://localhost:5432/qbeq?currentSchema=public
+#spring.datasource.username=postgres
+#spring.datasource.password=123456
 
-#spring.datasource.url=jdbc:postgresql://130.0.46.149:5432/hcacdb_pg?currentSchema=public
-#spring.datasource.username=hcacdb
-#spring.datasource.password=hcacdb
 
 # \u4E1A\u52A1\u914D\u7F6E
 # \u7EBF\u7D22\u7C7B\u578B\u7F16\u7801\u5217\u8868
@@ -63,7 +64,7 @@ fwkfpt.serviceId=673976555399020544
 
 # \u534E\u4E3A - kafka\u914D\u7F6E
 # kafka\u96C6\u7FA4\u8BA4\u8BC1\u914D\u7F6E\u6587\u4EF6\u5B58\u653E\u8DEF\u5F84
-#kafka.properties.path=\u670D\u52A1\u5668\u751F\u4EA7\u73AF\u5883\u5B58\u653E\u8DEF\u5F84
+#kafka.properties.path=/home/QBEQSJY/kafka/
 kafka.properties.path=C:\\Users\\Administrator\\Desktop\\pfj\\idea_project\\QBEQSJY\\src\\main\\resources\\kafka\\
 # \u4E1A\u52A1topic
 topic.name=UNICTRL_DATA_QBEQSJYCTRL_V1.0
@@ -7547,4 +7548,5 @@ KRL=\u5E93\u5C14\u52D2\u673A\u573A
 120225730000=\u84DF\u5DDE\u5206\u5C40\u76D8\u5C71\u98CE\u666F\u540D\u80DC\u533A\u6CBB\u5B89\u6D3E\u51FA\u6240
 120225750000=\u84DF\u5DDE\u5206\u5C40\u957F\u57CE\u6CBB\u5B89\u6D3E\u51FA\u6240
 120225760000=\u84DF\u5DDE\u5206\u5C40\u7EAA\u59D4\u529E\u516C\u5BA4
-120225780000=\u84DF\u5DDE\u5206\u5C40\u5DDE\u6CB3\u6E7E\u6D3E\u51FA\u6240
+120225780000=\u84DF\u5DDE\u5206\u5C40\u5DDE\u6CB3\u6E7E\u6D3E\u51FA\u6240
+