|
@@ -0,0 +1,238 @@
|
|
|
+package org.ssssssss.example.datacheck.service;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.context.annotation.PropertySource;
|
|
|
+import org.springframework.jdbc.core.JdbcTemplate;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import org.ssssssss.example.datacheck.bean.CheckErrorTypeEnum;
|
|
|
+import org.ssssssss.example.datacheck.bean.DBTypeEnum;
|
|
|
+import org.ssssssss.example.datacheck.bean.JsonResult;
|
|
|
+import org.ssssssss.example.datacheck.common.CustomDateSource;
|
|
|
+import org.ssssssss.example.datacheck.common.DateUtils;
|
|
|
+import org.ssssssss.example.datacheck.dao.MDJFDao;
|
|
|
+import org.ssssssss.example.mdjf.common.CommonUtil;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.math.BigDecimal;
|
|
|
+import java.text.DecimalFormat;
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
+
|
|
|
+@Service
|
|
|
+// 加载配置文件
|
|
|
+@PropertySource(value = "serviceConfig.yml", encoding = "UTF-8") // 加载配置文件
|
|
|
+public class DataCheckService {
|
|
|
+
|
|
|
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataCheckService.class);
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ public MDJFDao mdjfDao;
|
|
|
+
|
|
|
+ private Map<String, String> dataSourcePasswordMap;
|
|
|
+
|
|
|
+
|
|
|
+ @Value("${queryTimeOut}")
|
|
|
+ private int queryTimeOutMin; //分钟
|
|
|
+
|
|
|
+ public JsonResult checkDBConnect(String dbid) {
|
|
|
+ JsonResult result = new JsonResult();
|
|
|
+
|
|
|
+ Map<String, Object> dbinfoMap = mdjfDao.findDBInfoById(dbid);
|
|
|
+
|
|
|
+ if(dbinfoMap == null ){
|
|
|
+ result.setCode(JsonResult.ERROR);
|
|
|
+ result.setData("id:"+dbid+":数据源DBInfo不存在");
|
|
|
+ result.setMsg("id:"+dbid+":数据源DBInfo不存在");
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ String DBNAME = dbinfoMap.get("DBNAME").toString();
|
|
|
+ String DBTYPE = dbinfoMap.get("DBTYPE").toString();
|
|
|
+ String url = dbinfoMap.get("URL").toString();
|
|
|
+ String PWD = dbinfoMap.get("PWD").toString();
|
|
|
+ String USERNAME = dbinfoMap.get("USERNAME").toString();
|
|
|
+
|
|
|
+
|
|
|
+ String driverClass = "";
|
|
|
+
|
|
|
+ if (DBTypeEnum.ORACLE.getCode().equals(DBTYPE.trim())) {
|
|
|
+ driverClass = DBTypeEnum.ORACLE.getInfo();
|
|
|
+ url = DBTypeEnum.ORACLE.getPre() + url;
|
|
|
+
|
|
|
+ } else if (DBTypeEnum.MYSQL.getCode().equals(DBTYPE.trim())) {
|
|
|
+ driverClass = DBTypeEnum.MYSQL.getInfo();
|
|
|
+ url = DBTypeEnum.MYSQL.getPre() + url;
|
|
|
+
|
|
|
+ } else if (DBTypeEnum.POSTGRESQL.getCode().equals(DBTYPE.trim())) {
|
|
|
+ driverClass = DBTypeEnum.POSTGRESQL.getInfo();
|
|
|
+ url = DBTypeEnum.POSTGRESQL.getPre() + url;
|
|
|
+
|
|
|
+ } else if (DBTypeEnum.GBASE.getCode().equals(DBTYPE.trim())) {
|
|
|
+ driverClass = DBTypeEnum.GBASE.getInfo();
|
|
|
+ url = DBTypeEnum.GBASE.getPre() + url;
|
|
|
+ } else {
|
|
|
+ //根据DBTYPE 未匹配到合适数据库驱动,请检查端口及驱动信息
|
|
|
+
|
|
|
+ result.setCode(JsonResult.ERROR);
|
|
|
+ result.setData("id:"+dbid+"DBTYPE:"+DBTYPE+"--"+CheckErrorTypeEnum.DRIVERERROR.getInfo());
|
|
|
+ result.setMsg("id:"+dbid+"DBTYPE:"+DBTYPE+"--"+CheckErrorTypeEnum.DRIVERERROR.getInfo());
|
|
|
+ mdjfDao.updateDBConnectResult("2",CheckErrorTypeEnum.DRIVERERROR.getInfo(),dbid);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ JdbcTemplate tmpJdbcTemplate = null;
|
|
|
+ tmpJdbcTemplate = CustomDateSource.customJdbcTemplate(url, driverClass, USERNAME, PWD);
|
|
|
+ Map<String, Object> connectSelectMap = tmpJdbcTemplate.queryForMap("select 1 from dual");
|
|
|
+
|
|
|
+ if(connectSelectMap == null || connectSelectMap.size() < 1){
|
|
|
+ result.setCode(JsonResult.ERROR);
|
|
|
+ result.setData("访问异常 :select 1 from dual --无结果返回");
|
|
|
+ result.setMsg("访问异常:select 1 from dual --无结果返回");
|
|
|
+ mdjfDao.updateDBConnectResult("2","访问异常:select 1 from dual --无结果返回",dbid);
|
|
|
+
|
|
|
+ }else{
|
|
|
+ result.setCode(JsonResult.SUCCESS);
|
|
|
+ result.setData("连接成功");
|
|
|
+ result.setMsg("连接成功");
|
|
|
+ mdjfDao.updateDBConnectResult("1","连接成功",dbid);
|
|
|
+ }
|
|
|
+
|
|
|
+ return result;
|
|
|
+
|
|
|
+ }catch (Exception e){
|
|
|
+ result.setCode(JsonResult.ERROR);
|
|
|
+ result.setData(e.getMessage());
|
|
|
+ result.setMsg(e.getMessage());
|
|
|
+ mdjfDao.updateDBConnectResult("2",e.getMessage(),dbid);
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public void dataCheckScheduled(String flag) {
|
|
|
+ //防止连接数过多
|
|
|
+ Map<String, JdbcTemplate> dataSourceJdbcTemplate = new HashMap<String, JdbcTemplate>();
|
|
|
+
|
|
|
+ List<Map<String, Object>> needRunTask = new ArrayList<>();
|
|
|
+
|
|
|
+ if ("all".equals(flag)) {
|
|
|
+ needRunTask = mdjfDao.findAllRunTask();
|
|
|
+ } else if ("first".equals(flag)) {
|
|
|
+ needRunTask = mdjfDao.findFirstRunTask();
|
|
|
+ } else if (CommonUtil.isNotNullOrEmpty(flag)) {
|
|
|
+ needRunTask = mdjfDao.findTaskById(flag);
|
|
|
+ }
|
|
|
+
|
|
|
+ int successTaskNum = 0;
|
|
|
+
|
|
|
+ for (Map<String, Object> tmpTask : needRunTask) {
|
|
|
+ String DBID = tmpTask.get("DBID").toString();
|
|
|
+ String DBNAME = tmpTask.get("DBNAME").toString();
|
|
|
+ String DBTYPE = tmpTask.get("DBTYPE").toString();
|
|
|
+ String url = tmpTask.get("URL").toString();
|
|
|
+ String PWD = tmpTask.get("PWD").toString();
|
|
|
+ String USERNAME = tmpTask.get("USERNAME").toString();
|
|
|
+ String taskId = tmpTask.get("ID").toString();
|
|
|
+ String TBCNAME = tmpTask.get("TBCNAME").toString();
|
|
|
+ String TBENAME = tmpTask.get("TBENAME").toString();
|
|
|
+ String CJTYPE = tmpTask.get("CJTYPE").toString();
|
|
|
+ String CHECKFIELDS = tmpTask.get("CHECKFIELDS").toString();
|
|
|
+
|
|
|
+ String driverClass = "";
|
|
|
+
|
|
|
+ if (DBTypeEnum.ORACLE.getCode().equals(DBTYPE.trim())) {
|
|
|
+ driverClass = DBTypeEnum.ORACLE.getInfo();
|
|
|
+ url = DBTypeEnum.ORACLE.getPre() + url;
|
|
|
+
|
|
|
+ } else if (DBTypeEnum.MYSQL.getCode().equals(DBTYPE.trim())) {
|
|
|
+ driverClass = DBTypeEnum.MYSQL.getInfo();
|
|
|
+ url = DBTypeEnum.MYSQL.getPre() + url;
|
|
|
+
|
|
|
+ } else if (DBTypeEnum.POSTGRESQL.getCode().equals(DBTYPE.trim())) {
|
|
|
+ driverClass = DBTypeEnum.POSTGRESQL.getInfo();
|
|
|
+ url = DBTypeEnum.POSTGRESQL.getPre() + url;
|
|
|
+
|
|
|
+ } else if (DBTypeEnum.GBASE.getCode().equals(DBTYPE.trim())) {
|
|
|
+ driverClass = DBTypeEnum.GBASE.getInfo();
|
|
|
+ url = DBTypeEnum.GBASE.getPre() + url;
|
|
|
+ } else {
|
|
|
+ //根据DBTYPE 未匹配到合适数据库驱动,请检查端口及驱动信息
|
|
|
+ mdjfDao.saveCheckTaskLog(DBID, DBNAME, taskId, TBCNAME, TBENAME, null, null, null, CheckErrorTypeEnum.DRIVERERROR.getCode(), CheckErrorTypeEnum.DRIVERERROR.getInfo());
|
|
|
+// LASTCHECKTIME,LASTCHECKSTATUS,LASTCHECKSTATUSMSG,LASTCHECKROWS,LASTVALIDROWS,LASTWANZHENG,tableTaskID
|
|
|
+ mdjfDao.updateCheckTableTask(DateUtils.getNowDate(), CheckErrorTypeEnum.DRIVERERROR.getCode(), CheckErrorTypeEnum.DRIVERERROR.getInfo(), null, null, null, taskId);
|
|
|
+
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ //数据正常,开始检测
|
|
|
+ try { //捕获数据库处理异常,并写入日志
|
|
|
+ long startTime = System.currentTimeMillis() / 1000;
|
|
|
+
|
|
|
+ // 控制 JdbcTemplate总连接数
|
|
|
+ JdbcTemplate tmpJdbcTemplate = null;
|
|
|
+ if (dataSourceJdbcTemplate.containsKey(DBID)) {
|
|
|
+ tmpJdbcTemplate = dataSourceJdbcTemplate.get(DBID);
|
|
|
+ } else {
|
|
|
+ tmpJdbcTemplate = CustomDateSource.customJdbcTemplate(url, driverClass, USERNAME, PWD);
|
|
|
+ dataSourceJdbcTemplate.put(DBID, tmpJdbcTemplate);
|
|
|
+ }
|
|
|
+ tmpJdbcTemplate.setQueryTimeout(queryTimeOutMin * 60);
|
|
|
+
|
|
|
+
|
|
|
+ //全量计算---oracle 其他未适配
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ sb.setLength(0); //使用前清空原内容
|
|
|
+ sb.append("SELECT COUNT(1) COUNT, SUM( CASE WHEN 1=1 ");
|
|
|
+
|
|
|
+ //计算有效行数--条件过滤
|
|
|
+ for (String fieldTmp : CHECKFIELDS.split(",")) {
|
|
|
+ if (CommonUtil.isNotNullOrEmpty(fieldTmp)) {
|
|
|
+ sb.append(" AND " + fieldTmp + " IS NOT NULL AND TRIM(" + fieldTmp + ") IS NOT NULL ");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ sb.append(" THEN 1 ELSE 0 END ) AS VALIDROWS ");
|
|
|
+ //计算所有字段空值率---暂时无用
|
|
|
+ int countFields = 0;
|
|
|
+ for (String fieldTmp : CHECKFIELDS.split(",")) {
|
|
|
+ if (CommonUtil.isNotNullOrEmpty(fieldTmp)) {
|
|
|
+ countFields++;
|
|
|
+ sb.append(" ,COUNT(NVL2(TRIM(" + fieldTmp + "), 1, NULL)) ");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ sb.append(" from ");
|
|
|
+ sb.append(TBENAME);
|
|
|
+// 0 抽样1000条 1全量采集
|
|
|
+ if ("0".equals(CJTYPE)) {
|
|
|
+ sb.append(" where ROWNUM <=1000 ");
|
|
|
+ }
|
|
|
+ LOGGER.info("任务表ID:" + taskId + " 表名:" + TBENAME + "执行统计SQL:{}", sb.toString());
|
|
|
+ Map<String, Object> resultCountMap = tmpJdbcTemplate.queryForMap(sb.toString());
|
|
|
+
|
|
|
+ int CHECKROWS = ((BigDecimal) resultCountMap.get("COUNT")).intValue();
|
|
|
+ int VALIDROWS = ((BigDecimal) resultCountMap.get("VALIDROWS")).intValue();
|
|
|
+
|
|
|
+ DecimalFormat df = new DecimalFormat("0.00%"); // 格式化为百分比
|
|
|
+ String WANZHENG = df.format((double) VALIDROWS / CHECKROWS);
|
|
|
+
|
|
|
+// 写入检测日志
|
|
|
+// 更新最新状态信息
|
|
|
+ mdjfDao.saveCheckTaskLog(DBID, DBNAME, taskId, TBCNAME, TBENAME, String.valueOf(CHECKROWS), String.valueOf(VALIDROWS), WANZHENG, CheckErrorTypeEnum.SUCCESS.getCode(), CheckErrorTypeEnum.SUCCESS.getInfo());
|
|
|
+// LASTCHECKTIME,LASTCHECKSTATUS,LASTCHECKSTATUSMSG,LASTCHECKROWS,LASTVALIDROWS,LASTWANZHENG,tableTaskID
|
|
|
+ mdjfDao.updateCheckTableTask(DateUtils.getNowDate(), CheckErrorTypeEnum.SUCCESS.getCode(), CheckErrorTypeEnum.SUCCESS.getInfo(), String.valueOf(CHECKROWS), String.valueOf(VALIDROWS), WANZHENG, taskId);
|
|
|
+ successTaskNum++;
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOGGER.error("异常信息:{},表任务ID:{},目标连接信息:{},用户名:{},用户密码:{},目标表:{}", e.getMessage(), taskId, url, USERNAME, "*******", TBENAME, e);
|
|
|
+ mdjfDao.saveCheckTaskLog(DBID, DBNAME, taskId, TBCNAME, TBENAME, null, null, null, CheckErrorTypeEnum.SQLERROR.getCode(),CheckErrorTypeEnum.SQLERROR.getInfo()+" : "+ e.getMessage());
|
|
|
+ mdjfDao.updateCheckTableTask(DateUtils.getNowDate(), CheckErrorTypeEnum.SQLERROR.getCode() , CheckErrorTypeEnum.SQLERROR.getInfo()+" : "+ e.getMessage(), null, null, null, taskId);
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOGGER.info("本次监测完成 ,共计:{} 个任务 ,成功:{} 个任务 ,异常:{} 个任务 ",needRunTask.size(),successTaskNum,needRunTask.size()-successTaskNum);
|
|
|
+ }
|
|
|
+}
|