Forráskód Böngészése

重构优化消息推送相关

hubin 9 hónapja
szülő
commit
cef124de17

+ 56 - 0
src/main/java/com/aizuda/boot/modules/common/MessageEvent.java

@@ -0,0 +1,56 @@
+package com.aizuda.boot.modules.common;
+
+import com.aizuda.core.bean.BeanConvert;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * 消息事件对象
+ */
+@Getter
+@Setter
+public class MessageEvent implements BeanConvert, Serializable {
+    /**
+     * 创建人ID
+     */
+    protected Long createId;
+
+    /**
+     * 创建人
+     */
+    protected String createBy;
+
+    /**
+     * 标题
+     */
+    private String title;
+
+    /**
+     * 内容
+     */
+    private String content;
+
+    /**
+     * 类别 0,通知 1,消息 2,待办
+     */
+    private Integer category;
+
+    /**
+     * 业务ID
+     */
+    private Long businessId;
+
+    /**
+     * 业务类型
+     */
+    private String businessType;
+
+    /**
+     * 接收人ID列表
+     */
+    private List<Long> userIds;
+
+}

+ 29 - 0
src/main/java/com/aizuda/boot/modules/flw/flow/FlowTaskListener.java

@@ -1,20 +1,26 @@
 package com.aizuda.boot.modules.flw.flow;
 
+import com.aizuda.boot.modules.common.MessageEvent;
 import com.aizuda.boot.modules.flw.entity.ApprovalContent;
 import com.aizuda.boot.modules.flw.entity.FlwProcessApproval;
 import com.aizuda.boot.modules.flw.service.IFlwProcessApprovalService;
+import com.aizuda.boot.modules.system.entity.enums.BusinessType;
 import com.aizuda.bpm.engine.FlowLongEngine;
 import com.aizuda.bpm.engine.core.FlowCreator;
 import com.aizuda.bpm.engine.core.enums.*;
 import com.aizuda.bpm.engine.entity.FlwTask;
+import com.aizuda.bpm.engine.entity.FlwTaskActor;
 import com.aizuda.bpm.engine.listener.TaskListener;
 import com.aizuda.bpm.engine.model.NodeModel;
 import com.aizuda.bpm.engine.model.ProcessModel;
 import com.aizuda.common.toolkit.StringUtils;
 import com.aizuda.core.api.ApiAssert;
 import jakarta.annotation.Resource;
+import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+import java.util.Optional;
 import java.util.function.Supplier;
 
 @Component
@@ -23,6 +29,9 @@ public class FlowTaskListener implements TaskListener {
     private IFlwProcessApprovalService flwProcessApprovalService;
     @Resource
     private FlowLongEngine flowLongEngine;
+    @Resource
+    private ApplicationEventPublisher applicationEventPublisher;
+
 
     @Override
     public boolean notify(EventType eventType, Supplier<FlwTask> supplier, NodeModel nodeModel, FlowCreator flowCreator) {
@@ -39,6 +48,9 @@ public class FlowTaskListener implements TaskListener {
                         // 执行自动跳转逻辑
                         flowLongEngine.autoJumpTask(flwTask.getId(), flowCreator);
                     }
+                } else {
+                    // 推送消息
+                    this.sendMessage(flwTask, flowCreator);
                 }
             }
             // 创建任务直接跳过
@@ -181,4 +193,21 @@ public class FlowTaskListener implements TaskListener {
         }
         return type;
     }
+
+    public void sendMessage(FlwTask flwTask, FlowCreator flowCreator) {
+        Optional<List<FlwTaskActor>> taskActorsOptional = flowLongEngine.queryService().getActiveTaskActorsByTaskId(flwTask.getId());
+        if (taskActorsOptional.isPresent()) {
+            // 发送消息
+            MessageEvent messageEvent = new MessageEvent();
+            messageEvent.setTitle("待处理任务:" + flwTask.getTaskName());
+            messageEvent.setContent(messageEvent.getTitle());
+            messageEvent.setCreateId(Long.valueOf(flowCreator.getCreateId()));
+            messageEvent.setCreateBy(flowCreator.getCreateBy());
+            messageEvent.setCategory(2);
+            messageEvent.setBusinessId(flwTask.getId());
+            messageEvent.setBusinessType(BusinessType.flowTask.name());
+            messageEvent.setUserIds(taskActorsOptional.get().stream().map(t -> Long.valueOf(t.getActorId())).toList());
+            applicationEventPublisher.publishEvent(messageEvent);
+        }
+    }
 }

+ 5 - 5
src/main/java/com/aizuda/boot/modules/system/controller/SysMessageController.java

@@ -2,12 +2,12 @@ package com.aizuda.boot.modules.system.controller;
 
 import com.aizuda.boot.modules.system.entity.SysMessage;
 import com.aizuda.boot.modules.system.entity.vo.InformMessageVO;
+import com.aizuda.boot.modules.system.entity.vo.SysMessageVO;
 import com.aizuda.boot.modules.system.service.ISysMessageService;
 import com.aizuda.core.api.ApiController;
 import com.aizuda.core.api.PageParam;
 import com.aizuda.core.validation.Create;
 import com.aizuda.core.validation.Update;
-import com.aizuda.service.web.UserSession;
 import com.baomidou.kisso.annotation.Permission;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import io.swagger.v3.oas.annotations.Operation;
@@ -42,15 +42,15 @@ public class SysMessageController extends ApiController {
     @Operation(summary = "我的分页列表")
     @Permission("sys:message:page-my")
     @PostMapping("/page-my")
-    public Page<SysMessage> pageMy(@RequestBody PageParam<SysMessage> dto) {
+    public Page<SysMessageVO> pageMy(@RequestBody PageParam<SysMessage> dto) {
         return sysMessageService.pageMy(dto.page(), dto.getData());
     }
 
     @Operation(summary = "一键已读")
     @Permission("sys:message:page")
     @PostMapping("/read")
-    public boolean read() {
-        return sysMessageService.readAll(UserSession.getLoginInfo());
+    public boolean read(@RequestParam(required = false) Long id) {
+        return sysMessageService.read(id);
     }
 
     @Operation(summary = "告知消息")
@@ -64,7 +64,7 @@ public class SysMessageController extends ApiController {
     @Permission("sys:message:get")
     @GetMapping("/get")
     public SysMessage get(@RequestParam Long id) {
-        return sysMessageService.getViewedById(id);
+        return sysMessageService.getById(id);
     }
 
     @Operation(summary = "根据 id 修改信息")

+ 9 - 34
src/main/java/com/aizuda/boot/modules/system/entity/SysMessage.java

@@ -29,34 +29,24 @@ import java.util.Date;
 @TableName("sys_message")
 public class SysMessage  extends SuperEntity {
 
-	/**
-	 * 创建人ID
-	 */
+	@Schema(description = "创建人ID")
 	@TableField(fill = FieldFill.INSERT)
 	protected Long createId;
 
-	/**
-	 * 创建人
-	 */
+	@Schema(description = "创建人")
 	@TableField(fill = FieldFill.INSERT, condition = SqlCondition.LIKE)
 	protected String createBy;
 
-	/**
-	 * 创建时间
-	 */
+	@Schema(description = "创建时间")
 	@JsonFormat(pattern = ApiConstants.DATE_MM)
 	@TableField(fill = FieldFill.INSERT)
 	protected Date createTime;
 
-	/**
-	 * 修改人
-	 */
+	@Schema(description = "修改人")
 	@TableField(fill = FieldFill.UPDATE)
 	protected String updateBy;
 
-	/**
-	 * 修改时间
-	 */
+	@Schema(description = "修改时间")
 	@JsonFormat(pattern = ApiConstants.DATE_MM)
 	@TableField(fill = FieldFill.UPDATE)
 	protected Date updateTime;
@@ -75,27 +65,12 @@ public class SysMessage  extends SuperEntity {
 	@PositiveOrZero
 	private Integer category;
 
-	@Schema(description = "接收人ID")
+	@Schema(description = "业务ID")
 	@PositiveOrZero
-	private Long userId;
+	private Long businessId;
 
-	@Schema(description = "接收人")
+	@Schema(description = "业务类型")
 	@Size(max = 50)
-	private String username;
-
-	@Schema(description = "已查看 0,否 1,是")
-	@PositiveOrZero
-	private Integer viewed;
-
-	@Schema(description = "发送状态 0,未发送 1,成功 2,失败")
-	@PositiveOrZero
-	private Integer sendStatus;
-
-	@Schema(description = "发送失败原因")
-	@Size(max = 255)
-	private String sendFailure;
-
-	@Schema(description = "发送时间")
-	private Date sendTime;
+	private String businessType;
 
 }

+ 52 - 0
src/main/java/com/aizuda/boot/modules/system/entity/SysMessageReceiver.java

@@ -0,0 +1,52 @@
+package com.aizuda.boot.modules.system.entity;
+
+import com.aizuda.core.bean.SuperEntity;
+import com.aizuda.core.validation.Create;
+import com.baomidou.mybatisplus.annotation.TableName;
+import io.swagger.v3.oas.annotations.media.Schema;
+import jakarta.validation.constraints.NotNull;
+import jakarta.validation.constraints.PositiveOrZero;
+import jakarta.validation.constraints.Size;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Date;
+
+/**
+ * 消息接收人表
+ *
+ * @author 青苗
+ * @since 2024-09-13
+ */
+@Getter
+@Setter
+@Schema(name = "SysMessageReceiver", description = "消息接收人表")
+@TableName("sys_message_receiver")
+public class SysMessageReceiver extends SuperEntity {
+
+	@Schema(description = "消息ID")
+	@NotNull(groups = Create.class)
+	@PositiveOrZero
+	private Long messageId;
+
+	@Schema(description = "接收人ID")
+	@NotNull(groups = Create.class)
+	@PositiveOrZero
+	private Long userId;
+
+	@Schema(description = "已查看 0,否 1,是")
+	@PositiveOrZero
+	private Integer viewed;
+
+	@Schema(description = "发送状态 0,未发送 1,成功 2,失败")
+	@PositiveOrZero
+	private Integer sendStatus;
+
+	@Schema(description = "发送失败原因")
+	@Size(max = 255)
+	private String sendFailure;
+
+	@Schema(description = "发送时间")
+	private Date sendTime;
+
+}

+ 19 - 0
src/main/java/com/aizuda/boot/modules/system/entity/enums/BusinessType.java

@@ -0,0 +1,19 @@
+package com.aizuda.boot.modules.system.entity.enums;
+
+/**
+ * 消息业务类型类
+ *
+ * @author 青苗
+ * @since 2024-09-13
+ */
+public enum BusinessType {
+    /**
+     * 流程任务
+     */
+    flowTask,
+    /**
+     * 其它
+     */
+    other
+
+}

+ 31 - 0
src/main/java/com/aizuda/boot/modules/system/entity/vo/SysMessageVO.java

@@ -0,0 +1,31 @@
+package com.aizuda.boot.modules.system.entity.vo;
+
+import com.aizuda.boot.modules.system.entity.SysMessage;
+import io.swagger.v3.oas.annotations.media.Schema;
+import jakarta.validation.constraints.PositiveOrZero;
+import jakarta.validation.constraints.Size;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Date;
+
+@Getter
+@Setter
+public class SysMessageVO extends SysMessage {
+
+    @Schema(description = "已查看 0,否 1,是")
+    @PositiveOrZero
+    private Integer viewed;
+
+    @Schema(description = "发送状态 0,未发送 1,成功 2,失败")
+    @PositiveOrZero
+    private Integer sendStatus;
+
+    @Schema(description = "发送失败原因")
+    @Size(max = 255)
+    private String sendFailure;
+
+    @Schema(description = "发送时间")
+    private Date sendTime;
+
+}

+ 4 - 0
src/main/java/com/aizuda/boot/modules/system/mapper/SysMessageMapper.java

@@ -1,7 +1,10 @@
 package com.aizuda.boot.modules.system.mapper;
 
 import com.aizuda.boot.modules.system.entity.SysMessage;
+import com.aizuda.boot.modules.system.entity.vo.SysMessageVO;
 import com.aizuda.service.mapper.CrudMapper;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import org.apache.ibatis.annotations.Param;
 
 /**
  * <p>
@@ -13,4 +16,5 @@ import com.aizuda.service.mapper.CrudMapper;
  */
 public interface SysMessageMapper extends CrudMapper<SysMessage> {
 
+    Page<SysMessageVO> selectPageVOByUserId(Page<SysMessage> page, @Param("userId") Long userId, @Param("m") SysMessage sysMessage);
 }

+ 16 - 0
src/main/java/com/aizuda/boot/modules/system/mapper/SysMessageReceiverMapper.java

@@ -0,0 +1,16 @@
+package com.aizuda.boot.modules.system.mapper;
+
+import com.aizuda.boot.modules.system.entity.SysMessageReceiver;
+import com.aizuda.service.mapper.CrudMapper;
+
+/**
+ * <p>
+ * 消息接收人表 Mapper 接口
+ * </p>
+ *
+ * @author 青苗
+ * @since 2024-09-13
+ */
+public interface SysMessageReceiverMapper extends CrudMapper<SysMessageReceiver> {
+
+}

+ 21 - 0
src/main/java/com/aizuda/boot/modules/system/service/ISysMessageReceiverService.java

@@ -0,0 +1,21 @@
+package com.aizuda.boot.modules.system.service;
+
+import com.aizuda.boot.modules.system.entity.SysMessageReceiver;
+import com.aizuda.service.service.IBaseService;
+
+/**
+ * 消息接收人表 服务类
+ *
+ * @author 青苗
+ * @since 2024-09-13
+ */
+public interface ISysMessageReceiverService extends IBaseService<SysMessageReceiver> {
+
+    /**
+     * 更新已读
+     *
+     * @param messageId 消息ID
+     * @param userId    用户ID
+     */
+    boolean updateViewed(Long messageId, Long userId);
+}

+ 3 - 4
src/main/java/com/aizuda/boot/modules/system/service/ISysMessageService.java

@@ -2,6 +2,7 @@ package com.aizuda.boot.modules.system.service;
 
 import com.aizuda.boot.modules.system.entity.SysMessage;
 import com.aizuda.boot.modules.system.entity.vo.InformMessageVO;
+import com.aizuda.boot.modules.system.entity.vo.SysMessageVO;
 import com.aizuda.service.service.IBaseService;
 import com.aizuda.service.web.UserSession;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -19,9 +20,7 @@ public interface ISysMessageService extends IBaseService<SysMessage> {
     /**
      * 我的消息分页列表
      */
-    Page<SysMessage> pageMy(Page<SysMessage> page, SysMessage sysMessage);
-
-    SysMessage getViewedById(Long id);
+    Page<SysMessageVO> pageMy(Page<SysMessage> page, SysMessage sysMessage);
 
     /**
      * 告知消息
@@ -31,6 +30,6 @@ public interface ISysMessageService extends IBaseService<SysMessage> {
     /**
      * 一键已读
      */
-    boolean readAll(UserSession userSession);
+    boolean read(Long id);
 
 }

+ 1 - 1
src/main/java/com/aizuda/boot/modules/system/service/ISysSSEService.java

@@ -27,5 +27,5 @@ public interface ISysSSEService {
      * @param eventName 事件名称
      * @param message   消息内容
      */
-    void send(Long userId, String eventName, String message);
+    boolean send(Long userId, String eventName, String message);
 }

+ 26 - 0
src/main/java/com/aizuda/boot/modules/system/service/impl/SysMessageReceiverServiceImpl.java

@@ -0,0 +1,26 @@
+package com.aizuda.boot.modules.system.service.impl;
+
+import com.aizuda.boot.modules.system.entity.SysMessageReceiver;
+import com.aizuda.boot.modules.system.mapper.SysMessageReceiverMapper;
+import com.aizuda.boot.modules.system.service.ISysMessageReceiverService;
+import com.aizuda.service.service.BaseServiceImpl;
+import org.springframework.stereotype.Service;
+
+/**
+ * 消息接收人表 服务实现类
+ *
+ * @author 青苗
+ * @since 2024-09-13
+ */
+@Service
+public class SysMessageReceiverServiceImpl extends BaseServiceImpl<SysMessageReceiverMapper, SysMessageReceiver> implements ISysMessageReceiverService {
+
+    @Override
+    public boolean updateViewed(Long messageId, Long userId) {
+        lambdaUpdate().set(SysMessageReceiver::getViewed, 1)
+                .eq(null != messageId, SysMessageReceiver::getId, messageId)
+                .eq(SysMessageReceiver::getUserId, userId)
+                .update();
+        return true;
+    }
+}

+ 46 - 30
src/main/java/com/aizuda/boot/modules/system/service/impl/SysMessageServiceImpl.java

@@ -1,18 +1,25 @@
 package com.aizuda.boot.modules.system.service.impl;
 
+import com.aizuda.boot.modules.common.MessageEvent;
 import com.aizuda.boot.modules.system.entity.SysMessage;
+import com.aizuda.boot.modules.system.entity.SysMessageReceiver;
 import com.aizuda.boot.modules.system.entity.vo.InformMessageVO;
+import com.aizuda.boot.modules.system.entity.vo.SysMessageVO;
 import com.aizuda.boot.modules.system.mapper.SysMessageMapper;
+import com.aizuda.boot.modules.system.service.ISysMessageReceiverService;
 import com.aizuda.boot.modules.system.service.ISysMessageService;
+import com.aizuda.boot.modules.system.service.ISysSSEService;
 import com.aizuda.core.api.ApiAssert;
 import com.aizuda.service.service.BaseServiceImpl;
 import com.aizuda.service.web.UserSession;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.AllArgsConstructor;
+import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Service;
 
-import java.util.Objects;
+import java.util.Date;
 
 /**
  * 系统消息表 服务实现类
@@ -21,7 +28,10 @@ import java.util.Objects;
  * @since 2023-10-03
  */
 @Service
+@AllArgsConstructor
 public class SysMessageServiceImpl extends BaseServiceImpl<SysMessageMapper, SysMessage> implements ISysMessageService {
+    private ISysMessageReceiverService messageReceiverService;
+    private ISysSSEService sseService;
 
     @Override
     public Page<SysMessage> page(Page<SysMessage> page, SysMessage sysMessage) {
@@ -30,15 +40,9 @@ public class SysMessageServiceImpl extends BaseServiceImpl<SysMessageMapper, Sys
     }
 
     @Override
-    public Page<SysMessage> pageMy(Page<SysMessage> page, SysMessage sysMessage) {
-        LambdaQueryWrapper<SysMessage> lqw = Wrappers.lambdaQuery();
+    public Page<SysMessageVO> pageMy(Page<SysMessage> page, SysMessage sysMessage) {
         UserSession userSession = UserSession.getLoginInfo();
-        if(null == sysMessage) {
-            sysMessage = new SysMessage();
-        }
-        sysMessage.setUserId(userSession.getId());
-        lqw.setEntity(sysMessage);
-        return super.page(page, lqw);
+        return baseMapper.selectPageVOByUserId(page, userSession.getId(), sysMessage);
     }
 
     @Override
@@ -47,22 +51,6 @@ public class SysMessageServiceImpl extends BaseServiceImpl<SysMessageMapper, Sys
         return super.updateById(sysMessage);
     }
 
-    @Override
-    public SysMessage getViewedById(Long id) {
-        SysMessage sysMessage = this.getById(id);
-        if (null != sysMessage && Objects.equals(sysMessage.getViewed(), 0)) {
-            UserSession userSession = UserSession.getLoginInfo();
-            if (Objects.equals(sysMessage.getUserId(), userSession.getId())) {
-                // 接收人查看,更新已读状态
-                SysMessage temp = new SysMessage();
-                temp.setViewed(1);
-                temp.setId(id);
-                super.updateById(temp);
-            }
-        }
-        return sysMessage;
-    }
-
     @Override
     public InformMessageVO getInformByUser() {
         UserSession userSession = UserSession.getLoginInfo();
@@ -85,13 +73,41 @@ public class SysMessageServiceImpl extends BaseServiceImpl<SysMessageMapper, Sys
     }
 
     public Page<SysMessage> pageCategoryByUser(Long userId, Integer category) {
-        return super.page(Page.of(1, 5), Wrappers.<SysMessage>lambdaQuery().select(SysMessage::getId, SysMessage::getTitle,
-                        SysMessage::getContent, SysMessage::getCreateTime).eq(SysMessage::getCategory, category)
-                .eq(SysMessage::getUserId, userId).eq(SysMessage::getViewed, 0).orderByDesc(SysMessage::getCreateTime));
+        return super.page(Page.of(1, 5));
     }
 
     @Override
-    public boolean readAll(UserSession userSession) {
-        return lambdaUpdate().set(SysMessage::getViewed, 1).eq(SysMessage::getUserId, userSession.getId()).update();
+    public boolean read(Long id) {
+        UserSession userSession = UserSession.getLoginInfo();
+        return messageReceiverService.updateViewed(id, userSession.getId());
+    }
+
+    /**
+     * 消息处理监听器
+     *
+     * @param event 消息事件
+     */
+    @EventListener
+    public void onMessageEvent(MessageEvent event) {
+        // 保存消息
+        SysMessage message = event.convert(SysMessage.class);
+        message.setCreateTime(new Date());
+        if (super.save(message)) {
+            // 保存消息接收者列表
+            messageReceiverService.saveBatch(event.getUserIds().stream().map(userId -> {
+                SysMessageReceiver receiver = new SysMessageReceiver();
+                receiver.setMessageId(message.getId());
+                receiver.setUserId(userId);
+                // 发送消息
+                if (sseService.send(userId, event.getTitle(), event.getContent())) {
+                    receiver.setSendStatus(1);
+                } else {
+                    receiver.setSendStatus(2);
+                    receiver.setSendFailure("未上线");
+                }
+                receiver.setSendTime(new Date());
+                return receiver;
+            }).toList());
+        }
     }
 }

+ 7 - 1
src/main/java/com/aizuda/boot/modules/system/service/impl/SysSSEServiceImpl.java

@@ -56,18 +56,24 @@ public class SysSSEServiceImpl implements ISysSSEService {
     }
 
     @Override
-    public void send(Long userId, String eventName, String message) {
+    public boolean send(Long userId, String eventName, String message) {
+        boolean result = false;
         Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
         if (emitters != null) {
             for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
                 SseEmitter sseEmitter = entry.getValue();
                 try {
                     sseEmitter.send(SseEmitter.event().name(eventName).data(message, APPLICATION_JSON));
+                    if (!result) {
+                        // 只要推送一次成功认为成功
+                        result = true;
+                    }
                 } catch (Exception e) {
                     emitters.remove(entry.getKey());
                     sseEmitter.completeWithError(e);
                 }
             }
         }
+        return result;
     }
 }

+ 17 - 0
src/main/resources/mapper/SysMessageMapper.xml

@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.aizuda.boot.modules.system.mapper.SysMessageMapper">
+
+    <select id="selectPageVOByUserId" resultType="com.aizuda.boot.modules.system.entity.vo.SysMessageVO">
+        SELECT s.*,r.viewed,r.send_status,r.send_failure,r.send_time FROM sys_message s
+        LEFT JOIN sys_message_receiver r ON r.message_id = s.id AND r.user_id = #{userId}
+        <where>
+            <if test="m.title != null">
+                AND s.title LIKE CONCAT('%',#{m.title},'%')
+            </if>
+            <if test="m.category != null">
+                AND s.category = #{m.category}
+            </if>
+        </where>
+    </select>
+</mapper>