Эх сурвалжийг харах

feat(message): message-temp

huey 1 жил өмнө
parent
commit
996cb62fd6

+ 0 - 4
src/main/java/com/dragon/tj/portal/common/util/TestUtil.java

@@ -1,4 +0,0 @@
-package com.dragon.tj.portal.common.util;
-
-public class TestUtil {
-}

+ 46 - 31
src/main/java/com/dragon/tj/portal/component/message/KafkaInitialConfiguration.java

@@ -1,8 +1,15 @@
 package com.dragon.tj.portal.component.message;
 
+import cn.hutool.core.collection.CollUtil;
+import com.dragon.tj.portal.service.AppInfoService;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.TopicBuilder;
+import org.springframework.kafka.core.KafkaAdmin;
+
+import java.util.List;
 
 /**
  * @author huey China.
@@ -12,43 +19,51 @@ import org.springframework.context.annotation.Configuration;
 @Configuration
 public class KafkaInitialConfiguration {
 
-    public static String sseTopic = "sseTopic";
-
     /**
-    *
-     @Bean
-     public KafkaAdmin.NewTopics topics456() {
-     return new NewTopics(
-     TopicBuilder.name("defaultBoth")
-     .build(),
-     TopicBuilder.name("defaultPart")
-     .replicas(1)
-     .build(),
-     TopicBuilder.name("defaultRepl")
-     .partitions(3)
-     .build());
-     }
-
-     ————————————————
-     版权声明:本文为CSDN博主「Doker 多克」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
-     原文链接:https://blog.csdn.net/leesinbad/article/details/129889030
-    *
-    */
+     * 信息管理
+     */
+    public static final String sseTopic = "sseTopic";
+
     /**
-     * 创建一个名为 sseTopic 的Topic并设置分区数为8,分区副本数为2
+     * @author huey China.
+     * @Description : 注入信息及不同系统级别的topic
+     * @Date Created in 2023/7/11 16:03
      */
     @Bean
-    public NewTopic initialTopic() {
+    public KafkaAdmin.NewTopics messageTopics() {
+//        List<String> allAppSysCodes = appInfoService.getAllAppSysCodes();
+        NewTopic base = TopicBuilder.name(sseTopic).partitions(1).replicas(1).build();
 
-        return new NewTopic(sseTopic, 1, (short) 1);
+        NewTopic[] newTopics = null;
 
+//        if (CollUtil.isNotEmpty(allAppSysCodes)) {
+//            newTopics = new NewTopic[allAppSysCodes.size() + 1];
+//            for (int i = 0; i < allAppSysCodes.size(); i++) {
+//                String sysCode = allAppSysCodes.get(i);
+//                newTopics[i] = TopicBuilder.name(sseTopic + "-" + sysCode).partitions(1).replicas(1).build();
+//            }
+//            newTopics[allAppSysCodes.size()] = base;
+//        } else {
+//            newTopics = new NewTopic[]{base};
+//        }
+        newTopics = new NewTopic[]{base};
+        //信息相关
+        KafkaAdmin.NewTopics infoBase = new KafkaAdmin.NewTopics(newTopics);
+        return infoBase;
     }
 
-    // 如果要修改分区数,只需修改配置值重启项目即可
-    // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
-    @Bean
-    public NewTopic updateTopic() {
-        return new NewTopic(sseTopic, 1, (short) 1);
-
-    }
+//    @Bean
+//    public NewTopic initialTopic() {
+//
+//        return new NewTopic(sseTopic, 1, (short) 1);
+//
+//    }
+//
+//    // 如果要修改分区数,只需修改配置值重启项目即可
+//    // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
+//    @Bean
+//    public NewTopic updateTopic() {
+//        return new NewTopic(sseTopic, 1, (short) 1);
+//
+//    }
 }

+ 38 - 1
src/main/java/com/dragon/tj/portal/component/message/MessageConsumer.java

@@ -40,7 +40,7 @@ public class MessageConsumer {
      * @param ack
      */
 //    @KafkaListener(topics = {"sseTopic"}, clientIdPrefix = "${spring.application.name}", id = "openapi-be-sse-connection-", idIsGroup = false, groupId = "openapi-be-sse-connection-" + "#{T(java.util.UUID).randomUUID()}")
-    @KafkaListener(topics = {"sseTopic"})
+    @KafkaListener(topics = KafkaInitialConfiguration.sseTopic)
     public void sseConnectionProcess(String msg, Acknowledgment ack) {
         log.info("get kafka msg from topic:{}, msg:{}", KafkaInitialConfiguration.sseTopic, msg);
         if (StrUtil.isEmpty(msg)) {
@@ -78,4 +78,41 @@ public class MessageConsumer {
         ack.acknowledge();
     }
 
+    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}")
+    public void sseConnectionProcess2(String msg, Acknowledgment ack) {
+        log.info("get kafka2 msg from topic:{}, msg:{}", KafkaInitialConfiguration.sseTopic, msg);
+        if (StrUtil.isEmpty(msg)) {
+            log.error("kafka msg is empty, no process");
+            return;
+        }
+        MessageInfoSend reqDTO;
+        try {
+            reqDTO = JSONObject.parseObject(msg, MessageInfoSend.class);
+        } catch (Exception e) {
+            log.error("parsing string to obj failed, msg={}, e={}", msg, e);
+            return;
+        }
+        Set<String> clientIds = reqDTO.getClientIds();
+        MessageInfoReq messageInfoReq = reqDTO.getMessageInfoReq();
+
+        for (String clientId : clientIds) {
+            OpenApiSseEmitter emitter = sseEmitters.get(clientId);
+            if (emitter == null) {
+                log.error("can't find the sseEmitter obj from sseEmitters, no process {}", clientId);
+                continue;
+            }
+            try {
+                MessageInfoItem messageInfoItem = messageReqConvert.reqToItem(messageInfoReq);
+                messageInfoItem.setClientId(clientId);
+
+                String sendMsg = JSON.toJSONString(messageInfoItem);
+                // 发送消息给客户端
+                log.info("send sse msg={} to clientId={}", sendMsg, clientId);
+                emitter.send(sendMsg, MediaType.APPLICATION_JSON);
+            } catch (IOException e) {
+                emitter.completeWithError(e);
+            }
+        }
+        ack.acknowledge();
+    }
 }

+ 9 - 8
src/main/java/com/dragon/tj/portal/component/message/SseController.java

@@ -2,7 +2,7 @@ package com.dragon.tj.portal.component.message;
 
 import com.alibaba.fastjson.JSON;
 import com.dragon.tj.portal.auth.model.LoginUser;
-import com.dragon.tj.portal.auth.service.TokenService;
+import com.dragon.tj.portal.auth.util.SecurityUtils;
 import com.dragon.tj.portal.common.base.R;
 import com.dragon.tj.portal.common.dto.message.MessageInfoReq;
 import com.dragon.tj.portal.service.MessageInfoService;
@@ -13,7 +13,6 @@ import org.springframework.web.bind.annotation.*;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
 import javax.annotation.Resource;
-import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import java.util.Map;
 
@@ -32,9 +31,6 @@ public class SseController {
     @Autowired
     private MessageInfoService messageInfoService;
 
-    @Autowired
-    private TokenService tokenService;
-
     @GetMapping("/subscribe/{clientId}")
     public SseEmitter subscribe(@PathVariable String clientId, HttpServletResponse response) {
 
@@ -51,12 +47,17 @@ public class SseController {
         return emitter;
     }
 
+    /**
+     * @author huey China.
+     * @Description : 信息发送
+     * @Date Created in 2023/7/11 15:14
+     */
     @PostMapping("/publish")
-    public R publish(@Validated @RequestBody MessageInfoReq messageInfoReq, HttpServletRequest request) {
+    public R publish(@Validated @RequestBody MessageInfoReq messageInfoReq) {
         log.info("sseController req param is {}", JSON.toJSONString(messageInfoReq));
-        LoginUser loginUser = tokenService.getLoginUser(request);
+        LoginUser loginUser = SecurityUtils.getLoginUser();
         log.info("sseController current people is {}", JSON.toJSONString(loginUser));
-        return messageInfoService.push(messageInfoReq,loginUser);
+        return messageInfoService.push(messageInfoReq, loginUser);
     }
 
 

+ 49 - 0
src/main/java/com/dragon/tj/portal/controller/MessageCenterController.java

@@ -0,0 +1,49 @@
+package com.dragon.tj.portal.controller;
+
+
+import com.alibaba.fastjson.JSON;
+import com.dragon.tj.portal.common.base.R;
+import com.dragon.tj.portal.common.dto.message.MessageInfoSend;
+import com.dragon.tj.portal.component.message.KafkaInitialConfiguration;
+import com.dragon.tj.portal.component.message.MessageProducer;
+import com.dragon.tj.portal.service.MessageInfoService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * <p>
+ * 消息对接
+ * </p>
+ *
+ * @author huey
+ * @since 2023-06-15
+ */
+@Slf4j
+@RestController
+@RequestMapping("/api/message")
+public class MessageCenterController {
+
+    @Autowired
+    private MessageInfoService messageInfoService;
+
+    @Autowired
+    private MessageProducer producer;
+
+    /**
+     * @author huey China.
+     * @Description : 消息提醒发送
+     * @Date Created in 2023/7/11 16:12
+     */
+    @PostMapping("/send")
+    public R publish(@Validated @RequestBody MessageInfoSend messageInfoSend) {
+        producer.send(KafkaInitialConfiguration.sseTopic + "-a1", JSON.toJSONString(messageInfoSend));
+        return R.ok();
+    }
+
+}
+

+ 10 - 1
src/main/java/com/dragon/tj/portal/controller/MessageInfoController.java

@@ -10,6 +10,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.*;
 
+import java.util.List;
+
 /**
  * <p>
  * 信息表 前端控制器
@@ -44,11 +46,18 @@ public class MessageInfoController {
     }
 
     /**
-     * 
+     * 阅
      */
     @GetMapping("read")
     public R read(Long id) {
         return R.ok(messageInfoService.read(id));
     }
+    /**
+     * 批阅
+     */
+    @GetMapping("reads")
+    public R reads(List<Long> ids) {
+        return R.ok(messageInfoService.reads(ids));
+    }
 }
 

+ 0 - 31
src/main/java/com/dragon/tj/portal/entity/Test.java

@@ -1,31 +0,0 @@
-package com.dragon.tj.portal.entity;
-
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableId;
-import java.io.Serializable;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.experimental.Accessors;
-
-/**
- * <p>
- * 
- * </p>
- *
- * @author huey
- * @since 2023-06-11
- */
-@Getter
-@Setter
-@Accessors(chain = true)
-public class Test implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    @TableId(value = "id", type = IdType.AUTO)
-    private Integer id;
-
-    private String content;
-
-
-}

+ 0 - 18
src/main/java/com/dragon/tj/portal/mapper/TestMapper.java

@@ -1,18 +0,0 @@
-package com.dragon.tj.portal.mapper;
-
-import com.dragon.tj.portal.entity.Test;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import org.apache.ibatis.annotations.Mapper;
-
-/**
- * <p>
- *  Mapper 接口
- * </p>
- *
- * @author huey
- * @since 2023-06-11
- */
-@Mapper
-public interface TestMapper extends BaseMapper<Test> {
-
-}

+ 11 - 0
src/main/java/com/dragon/tj/portal/service/AppInfoService.java

@@ -0,0 +1,11 @@
+package com.dragon.tj.portal.service;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import com.dragon.tj.portal.entity.AppInfo;
+
+import java.util.List;
+
+public interface AppInfoService extends IService<AppInfo> {
+
+    List<String> getAllAppSysCodes();
+}

+ 4 - 0
src/main/java/com/dragon/tj/portal/service/MessageInfoService.java

@@ -8,6 +8,8 @@ import com.dragon.tj.portal.common.vo.message.MessageInfoVO;
 import com.dragon.tj.portal.entity.MessageInfo;
 import com.baomidou.mybatisplus.extension.service.IService;
 
+import java.util.List;
+
 /**
  * <p>
  * 信息表 服务类
@@ -25,4 +27,6 @@ public interface MessageInfoService extends IService<MessageInfo> {
     boolean update(MessageInfoParam messageInfoParam);
 
     boolean read(Long id);
+
+    boolean reads(List<Long> ids);
 }

+ 38 - 0
src/main/java/com/dragon/tj/portal/service/impl/AppInfoServiceImpl.java

@@ -0,0 +1,38 @@
+package com.dragon.tj.portal.service.impl;
+
+
+import cn.hutool.core.collection.CollUtil;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.dragon.tj.portal.common.enums.DelFlagEnum;
+import com.dragon.tj.portal.entity.AppInfo;
+import com.dragon.tj.portal.mapper.app.AppInfoMapper;
+import com.dragon.tj.portal.service.AppInfoService;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Service
+public class AppInfoServiceImpl extends ServiceImpl<AppInfoMapper, AppInfo> implements AppInfoService {
+
+
+    @Override
+    public List<String> getAllAppSysCodes() {
+        LambdaQueryWrapper<AppInfo> wrapper = Wrappers.lambdaQuery();
+        wrapper.eq(AppInfo::getDelFlag, DelFlagEnum.NO.value());
+        List<AppInfo> list = this.list(wrapper);
+        if (CollUtil.isEmpty(list)) {
+            return Lists.newArrayList();
+        } else {
+            Set<String> sysCodes = list.stream().filter(e -> StringUtils.isNotEmpty(e.getSystemNumber())).map(AppInfo::getSystemNumber).collect(Collectors.toSet());
+            return Lists.newArrayList(sysCodes);
+        }
+    }
+}
+
+

+ 14 - 0
src/main/java/com/dragon/tj/portal/service/impl/MessageInfoServiceImpl.java

@@ -2,6 +2,8 @@ package com.dragon.tj.portal.service.impl;
 
 import cn.hutool.core.collection.CollUtil;
 import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.dragon.tj.portal.auth.model.LoginUser;
 import com.dragon.tj.portal.auth.web.service.SysDeptService;
@@ -86,6 +88,18 @@ public class MessageInfoServiceImpl extends ServiceImpl<MessageInfoMapper, Messa
         return this.updateById(messageInfo);
     }
 
+    @Override
+    public boolean reads(List<Long> ids) {
+        List<MessageInfo> list = Lists.newArrayList();
+        ids.forEach(e->{
+            MessageInfo messageInfo = new MessageInfo();
+            messageInfo.setReadStatus(ReadStatusEnum.YES.value());
+            messageInfo.setId(e);
+
+        });
+        return this.updateBatchById(list);
+    }
+
 
     /**
      * @author huey China.

+ 2 - 0
src/main/resources/application-dev.properties

@@ -7,6 +7,8 @@ mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
 #log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
 
 ###########kafka#############
+#指定哪些appId能发topic,动态变更后,不能自动订阅需重启服务
+kafka.topics=sseTopic-a1,sseTopic-a2
 spring.kafka.bootstrap-servers=149.28.158.127:9092
 spring.kafka.producer.retries=2
 spring.kafka.producer.acks=1

+ 9 - 0
src/main/resources/application-local.properties

@@ -5,6 +5,8 @@ spring.datasource.password=123456
 mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
 #log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
 ###########kafka#############
+#指定哪些appId能发topic,动态变更后,不能自动订阅需重启服务
+kafka.topics=sseTopic-a1,sseTopic-a2
 spring.kafka.bootstrap-servers=149.28.158.127:9092
 spring.kafka.producer.retries=2
 spring.kafka.producer.acks=1
@@ -30,6 +32,13 @@ cas.target.url=http://localhost:8081/#/?token=
 cas.filter.url=/sso/login
 cas.failure.url=http://localhost:8081/#/401
 
+########## dcuc api ################
+client.dcuc.user.url=http://localhost:8086/dcuc/user/api/
+client.dcuc.auth.url=http://localhost:8086/dcuc/auth/api/
+
+########## path api ################
+dragon.file.path=D:/crayon/file
+
 logging.level.org.springframework.security=trace
 logging.level.org.jasig.cas=trace
 logging.level.org.apache.kafka=warn