Explorar o código

Merge branch 'mazq-kafkaExp-0314' into 'develop'

Mazq kafka exp 0314

See merge request dcuc/auth-service!15
黄资权 %!s(int64=4) %!d(string=hai) anos
pai
achega
5a2e62468b

+ 27 - 20
dcuc-auth-service/src/main/java/com/dragoninfo/dcuc/auth/msg/WorkFlowResultListener.java

@@ -13,7 +13,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.kafka.annotation.KafkaListener;
 
 import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.List;
 import java.util.Map;
@@ -31,26 +30,34 @@ public class WorkFlowResultListener {
     private IServiceAuthBusiness serviceAuthBusiness;
 
     @KafkaListener(topics = "${dcuc.auth.service-auth.topic}",containerFactory = "kafkaListenerContainerFactory")
-    public void receiveMessage(ConsumerRecord<String, byte[]> record) throws IOException, ClassNotFoundException {
-        byte[] value = record.value();
-        ObjectInputStream stream = new ObjectInputStream(new ByteArrayInputStream(value));
-        MessageInfoDTO messageInfoDTO = (MessageInfoDTO) stream.readObject();
-        logger.info("--------收到数据,{}", JSON.toJSONString(messageInfoDTO));
-        List<Map<String, String>> infoSet = messageInfoDTO.getInfoSet();
-        if (CollectionUtils.isEmpty(infoSet)) {
-            return;
+    public void receiveMessage(ConsumerRecord<String, byte[]> record) {
+        try {
+            byte[] value = record.value();
+            if (null == value || value.length == 0) {
+                logger.info("receiveMessage value length: 0 or null");
+                return;
+            }
+            ObjectInputStream stream = new ObjectInputStream(new ByteArrayInputStream(value));
+            MessageInfoDTO messageInfoDTO = (MessageInfoDTO) stream.readObject();
+            logger.info("--------收到数据,{}", JSON.toJSONString(messageInfoDTO));
+            List<Map<String, String>> infoSet = messageInfoDTO.getInfoSet();
+            if (CollectionUtils.isEmpty(infoSet)) {
+                return;
+            }
+            Map<String, String> map = infoSet.get(0);
+            if (null == map) {
+                logger.info("info set is empty");
+                return;
+            }
+            WorkFlowResutlAcceptDTO dto = new WorkFlowResutlAcceptDTO();
+            dto.setApproveResult(WorkFlowStatusEnum.SUCCESS.getValue());
+            dto.setMessageId(map.get("messageId"));
+            dto.setProcessInstanceId(map.get("processInstanceId"));
+            logger.info("processDTO work flow id:{}, message id:{}", dto.getProcessInstanceId(), dto.getMessageId());
+            serviceAuthBusiness.dealAuthFlowResult(dto);
+        } catch (Exception e) {
+            logger.error("receiveMessage error.", e);
         }
-        Map<String, String> map = infoSet.get(0);
-        if(null == map){
-            logger.info("info set is empty");
-            return;
-        }
-        WorkFlowResutlAcceptDTO dto = new WorkFlowResutlAcceptDTO();
-        dto.setApproveResult(WorkFlowStatusEnum.SUCCESS.getValue());
-        dto.setMessageId(map.get("messageId"));
-        dto.setProcessInstanceId(map.get("processInstanceId"));
-        logger.info("processDTO work flow id:{}, message id:{}", dto.getProcessInstanceId(), dto.getMessageId());
-        serviceAuthBusiness.dealAuthFlowResult(dto);
     }
 
 }