Sfoglia il codice sorgente

feat(sse): sse验证及mapstruct

huey 2 anni fa
parent
commit
6a16f85b91

+ 5 - 0
pom.xml

@@ -134,6 +134,11 @@
             <artifactId>mapstruct-processor</artifactId>
             <version>${mapstruct.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

+ 4 - 1
src/main/java/com/dragon/tj/portal/common/convert/MessageReqToItemConvert.java → src/main/java/com/dragon/tj/portal/common/convert/MessageReqConvert.java

@@ -3,11 +3,12 @@ package com.dragon.tj.portal.common.convert;
 
 import com.dragon.tj.portal.common.dto.message.MessageInfoItem;
 import com.dragon.tj.portal.common.dto.message.MessageInfoReq;
+import com.dragon.tj.portal.entity.MessageInfo;
 import org.mapstruct.Mapper;
 import org.mapstruct.MappingConstants;
 
 @Mapper(componentModel = MappingConstants.ComponentModel.SPRING)
-public interface MessageReqToItemConvert {
+public interface MessageReqConvert {
 
     /**
      * User转换UserDTO
@@ -18,4 +19,6 @@ public interface MessageReqToItemConvert {
      */
     MessageInfoItem reqToItem(MessageInfoReq messageInfoReq);
 
+    MessageInfo reqToInfo(MessageInfoReq messageInfoReq);
+
 }

+ 11 - 2
src/main/java/com/dragon/tj/portal/common/dto/message/MessageInfoReq.java

@@ -5,6 +5,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.hibernate.validator.constraints.Range;
 
+import javax.validation.constraints.NotNull;
 import java.util.Set;
 
 /**
@@ -51,6 +52,16 @@ public class MessageInfoReq {
      */
     private Integer version;
 
+
+    @NotNull(message = "毫秒时间戳 不能为空")
+    private Long t;
+
+    /**
+     * md5 (登录人标识+t)
+     */
+//    @NotBlank(message = "调用者身份验证标识 不能为空")
+    private String k;
+
     /**
      *
      *--------------非req参数
@@ -60,6 +71,4 @@ public class MessageInfoReq {
      */
     private String appId = "appId";
 
-    private Set<String> messageClientIds;
-
 }

+ 24 - 0
src/main/java/com/dragon/tj/portal/common/dto/message/MessageInfoSend.java

@@ -0,0 +1,24 @@
+package com.dragon.tj.portal.common.dto.message;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import org.hibernate.validator.constraints.Range;
+
+import javax.validation.constraints.NotNull;
+import java.util.Set;
+
+/**
+ * @author huey China.
+ * @Description :
+ * @Date Created in 2023/6/15 15:59
+ */
+@Getter
+@Setter
+public class MessageInfoSend {
+
+    private MessageInfoReq messageInfoReq;
+
+    private Set<String> clientIds;
+
+}

+ 34 - 0
src/main/java/com/dragon/tj/portal/common/enums/ScopeEnums.java

@@ -0,0 +1,34 @@
+package com.dragon.tj.portal.common.enums;
+
+import com.dragon.tj.portal.common.constants.BusinessConstants;
+
+/**
+ * @author huey China.
+ * @Description : 1部门 2 人员
+ * @Date Created in 2023/6/15 15:40
+ */
+public enum ScopeEnums {
+
+    SCOPE_DEPT(1, "部门"),
+    SCOPE_MEMBER(2, "人员");
+
+    private final Integer value;
+    private final String name;
+
+    ScopeEnums(Integer value, String name) {
+        this.value = value;
+        this.name = name;
+    }
+
+    public Integer value() {
+        return this.value;
+    }
+
+    public static ScopeEnums ofMessageType(Integer messageType) {
+        if (messageType.equals(BusinessConstants.DICT_ITEM_ID_2)) {
+            return SCOPE_MEMBER;
+        } else {
+            return SCOPE_MEMBER;
+        }
+    }
+}

+ 10 - 10
src/main/java/com/dragon/tj/portal/component/message/MessageConsumer.java

@@ -3,9 +3,10 @@ package com.dragon.tj.portal.component.message;
 import cn.hutool.core.util.StrUtil;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
-import com.dragon.tj.portal.common.convert.MessageReqToItemConvert;
+import com.dragon.tj.portal.common.convert.MessageReqConvert;
 import com.dragon.tj.portal.common.dto.message.MessageInfoItem;
 import com.dragon.tj.portal.common.dto.message.MessageInfoReq;
+import com.dragon.tj.portal.common.dto.message.MessageInfoSend;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
@@ -30,7 +31,7 @@ public class MessageConsumer {
     private Map<String, OpenApiSseEmitter> sseEmitters;
 
     @Autowired
-    private MessageReqToItemConvert messageReqToItemConvert;
+    private MessageReqConvert messageReqConvert;
 
     /**
      * get client published message from kafka,process it according to the serverId
@@ -46,25 +47,24 @@ public class MessageConsumer {
             log.error("kafka msg is empty, no process");
             return;
         }
-        MessageInfoReq reqDTO;
+        MessageInfoSend reqDTO;
         try {
-            reqDTO = JSONObject.parseObject(msg, MessageInfoReq.class);
+            reqDTO = JSONObject.parseObject(msg, MessageInfoSend.class);
         } catch (Exception e) {
             log.error("parsing string to obj failed, msg={}, e={}", msg, e);
             return;
         }
+        Set<String> clientIds = reqDTO.getClientIds();
+        MessageInfoReq messageInfoReq = reqDTO.getMessageInfoReq();
 
-        Set<String> messageClientIds = reqDTO.getMessageClientIds();
-
-        for (String clientId : messageClientIds) {
+        for (String clientId : clientIds) {
             OpenApiSseEmitter emitter = sseEmitters.get(clientId);
-
             if (emitter == null) {
-                log.error("can't find the sseEmitter obj from sseEmitters, no process {}",clientId);
+                log.error("can't find the sseEmitter obj from sseEmitters, no process {}", clientId);
                 continue;
             }
             try {
-                MessageInfoItem messageInfoItem = messageReqToItemConvert.reqToItem(reqDTO);
+                MessageInfoItem messageInfoItem = messageReqConvert.reqToItem(messageInfoReq);
                 messageInfoItem.setClientId(clientId);
 
                 String sendMsg = JSON.toJSONString(messageInfoItem);

+ 60 - 6
src/main/java/com/dragon/tj/portal/service/impl/MessageInfoServiceImpl.java

@@ -1,19 +1,26 @@
 package com.dragon.tj.portal.service.impl;
 
+import cn.hutool.core.collection.CollUtil;
 import com.alibaba.fastjson.JSON;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.dragon.tj.portal.common.base.R;
 import com.dragon.tj.portal.common.constants.BusinessConstants;
 import com.dragon.tj.portal.common.dto.message.MessageInfoReq;
+import com.dragon.tj.portal.common.dto.message.MessageInfoSend;
+import com.dragon.tj.portal.common.enums.ScopeEnums;
 import com.dragon.tj.portal.component.message.KafkaInitialConfiguration;
 import com.dragon.tj.portal.component.message.MessageProducer;
 import com.dragon.tj.portal.entity.MessageInfo;
+import com.dragon.tj.portal.entity.MessageInfoScope;
 import com.dragon.tj.portal.mapper.MessageInfoMapper;
+import com.dragon.tj.portal.service.MessageInfoScopeService;
 import com.dragon.tj.portal.service.MessageInfoService;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -30,6 +37,12 @@ public class MessageInfoServiceImpl extends ServiceImpl<MessageInfoMapper, Messa
     @Autowired
     private MessageProducer producer;
 
+    @Autowired
+    private MessageInfoService messageInfoService;
+
+    @Autowired
+    private MessageInfoScopeService messageInfoScopeService;
+
     @Override
     public R push(MessageInfoReq messageInfoReq) {
         return R.ok(this.doMessage(messageInfoReq));
@@ -57,22 +70,63 @@ public class MessageInfoServiceImpl extends ServiceImpl<MessageInfoMapper, Messa
         Integer messageType = messageInfoReq.getMessageType();
         Set<String> scopeIds = messageInfoReq.getScopeIds();
 
-        //测试部门1
+        //登录人标识
+        Set<String> messageClientIds = Sets.newHashSet();
+
+        //测试联系人1
         if (messageType.equals(BusinessConstants.DICT_ITEM_ID_2) && scopeIds.contains("1")) {
-            //登录人标识
-            Set<String> messageClientIds = Sets.newHashSet();
             messageClientIds.add("c1");
             messageClientIds.add("c2");
-            messageInfoReq.setMessageClientIds(messageClientIds);
+        }
+        ScopeEnums scopeEnums = ScopeEnums.ofMessageType(messageType);
+        if (CollUtil.isNotEmpty(messageClientIds)) {
+
+            List<MessageInfoScope> insertList = Lists.newArrayList();
+            MessageInfoSend messageInfoSend = new MessageInfoSend();
+            messageInfoSend.setMessageInfoReq(messageInfoReq);
+            messageInfoSend.setClientIds(messageClientIds);
+            messageClientIds.forEach(e->{
+                MessageInfoScope messageInfoScope = new MessageInfoScope();
+                messageInfoScope.setScopeId(e);
+                messageInfoScope.setScopeType(scopeEnums.value());
+                //messageInfoScope.setScopeLevel("");
+                insertList.add(messageInfoScope);
+            });
+
 
-            isSend = producer.send(KafkaInitialConfiguration.sseTopic, JSON.toJSONString(messageInfoReq));
+            // 存储
+            boolean isSaveBatchSuccess = false;
+            try {
+            } catch (Exception e) {
+                log.warn("消息中心 saveBatch 存储失败 {}");
+            }
+
+            // 批量存储失败,则逐个存储
+            if (!isSaveBatchSuccess) {
+                try {
+
+                } catch (Exception e) {
+                    log.warn("消息中心 存储失败 {}");
+                }
+            }
+
+
+            isSend = producer.send(KafkaInitialConfiguration.sseTopic, JSON.toJSONString(messageInfoSend));
         }
         return isSend;
     }
 
 
+    /**
+     * @author huey China.
+     * @Description : md5 验证消息唯一性
+     * @Date Created in 2023/6/16 16:15
+     */
     private void validate(MessageInfoReq messageInfoReq) {
-
+//        String correctHash = DigestUtils.md5Hex("currentId" + messageInfoReq.getT());
+//        if (!correctHash.equalsIgnoreCase(messageInfoReq.getK())) {
+//            return;
+//        }
     }
 
 }