Browse Source

新增 SSE 推送能力支持

hubin 9 tháng trước cách đây
mục cha
commit
5aea863332

+ 10 - 0
src/main/java/com/aizuda/boot/config/BootException.java

@@ -13,6 +13,8 @@ import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.ResponseStatus;
 import org.springframework.web.bind.annotation.RestControllerAdvice;
 
+import java.io.IOException;
+
 /**
  * 自定义异常统一提示
  */
@@ -28,4 +30,12 @@ public class BootException {
     public ApiResult<String> handleBindException(FlowLongException e) {
         return ApiResult.failed(e.getMessage());
     }
+
+    @ExceptionHandler(IOException.class)
+    @ResponseStatus(HttpStatus.OK)
+    @ResponseBody
+    public ApiResult<String> handleIOException(IOException e) {
+        // 捕获 IO 异常,例如 sse 连接断开
+        return ApiResult.failed(e.getMessage());
+    }
 }

+ 5 - 0
src/main/java/com/aizuda/boot/modules/auth/service/impl/AuthServiceImpl.java

@@ -13,6 +13,7 @@ import com.aizuda.core.api.ApiAssert;
 import com.baomidou.kisso.common.encrypt.MD5Salt;
 import com.baomidou.kisso.enums.TokenOrigin;
 import com.baomidou.kisso.security.token.SSOToken;
+import com.baomidou.mybatisplus.core.toolkit.IdWorker;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import jakarta.servlet.http.HttpServletRequest;
 import jakarta.servlet.http.HttpServletResponse;
@@ -53,6 +54,10 @@ public class AuthServiceImpl implements IAuthService {
         st.setIssuer(user.getUsername());
         st.setUserAgent(request);
         st.setOrigin(TokenOrigin.HTML5);
+        st.setData(new HashMap<>(){{
+            // 设置会话ID,用于区分客户端消息发送
+            put("sid", IdWorker.get32UUID());
+        }});
         // 登录信息
         Map<String, Object> loginInfo = new HashMap<>(4);
         loginInfo.put("token", st.getToken());

+ 41 - 0
src/main/java/com/aizuda/boot/modules/system/controller/SysSSEController.java

@@ -0,0 +1,41 @@
+package com.aizuda.boot.modules.system.controller;
+
+import com.aizuda.boot.modules.system.service.ISysSSEService;
+import com.aizuda.core.api.ApiController;
+import com.aizuda.service.web.UserSession;
+import com.baomidou.kisso.annotation.Permission;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.AllArgsConstructor;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+/**
+ * SSE消息推送 前端控制器
+ *
+ * @author 青苗
+ * @since 2024-09-05
+ */
+@Tag(name = "SSE消息推送")
+@RestController
+@AllArgsConstructor
+@RequestMapping("/sys/sse")
+public class SysSSEController extends ApiController {
+    private ISysSSEService sseService;
+
+    @Operation(summary = "连接")
+    @Permission("sys:sse:connect")
+    @RequestMapping("/connect")
+    public SseEmitter connect() {
+        return sseService.connect();
+    }
+
+    @Operation(summary = "测试发送数据给当前用户")
+    @Permission("sys:sse:testSend")
+    @RequestMapping("/test-send")
+    public boolean sendTest(String message) {
+        sseService.send(UserSession.getLoginInfo().getId(), "message", message);
+        return true;
+    }
+}

+ 31 - 0
src/main/java/com/aizuda/boot/modules/system/service/ISysSSEService.java

@@ -0,0 +1,31 @@
+/*
+ * 爱组搭,低代码组件化开发平台
+ * ------------------------------------------
+ * 受知识产权保护,请勿删除版权申明,开发平台不允许做非法网站,后果自负
+ */
+package com.aizuda.boot.modules.system.service;
+
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+/**
+ * SSE消息推送 服务类
+ *
+ * @author 青苗
+ * @since 2024-09-05
+ */
+public interface ISysSSEService {
+
+    /**
+     * SSE 连接
+     */
+    SseEmitter connect();
+
+    /**
+     * 指定用户ID SSE 推送消息
+     *
+     * @param userId    用户ID
+     * @param eventName 事件名称
+     * @param message   消息内容
+     */
+    void send(Long userId, String eventName, String message);
+}

+ 73 - 0
src/main/java/com/aizuda/boot/modules/system/service/impl/SysSSEServiceImpl.java

@@ -0,0 +1,73 @@
+/*
+ * 爱组搭,低代码组件化开发平台
+ * ------------------------------------------
+ * 受知识产权保护,请勿删除版权申明,开发平台不允许做非法网站,后果自负
+ */
+package com.aizuda.boot.modules.system.service.impl;
+
+import com.aizuda.boot.modules.system.service.ISysSSEService;
+import com.aizuda.service.web.UserSession;
+import org.springframework.stereotype.Service;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.springframework.http.MediaType.APPLICATION_JSON;
+
+/**
+ * SSE消息推送 服务实现类
+ *
+ * @author 青苗
+ * @since 2024-09-05
+ */
+@Service
+public class SysSSEServiceImpl implements ISysSSEService {
+    private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
+
+    @Override
+    public SseEmitter connect() {
+        UserSession userSession = UserSession.getLoginInfo();
+        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userSession.getId(), k -> new ConcurrentHashMap<>());
+
+        // 创建一个新的 SseEmitter 实例,默认30秒超时,设置为0L则永不超时
+        SseEmitter emitter = new SseEmitter(0L);
+
+        // 用户会话ID
+        String sid = userSession.getSid();
+
+        // 会话关联推送
+        emitters.put(sid, emitter);
+
+        // 当 emitter 完成、超时或发生错误时,从映射表中移除对应的会话信息
+        emitter.onCompletion(() -> emitters.remove(sid));
+        emitter.onTimeout(() -> emitters.remove(sid));
+        emitter.onError((e) -> emitters.remove(sid));
+
+        try {
+            // 向客户端发送一条连接成功的事件
+            emitter.send(SseEmitter.event().comment("connected"));
+        } catch (IOException e) {
+            // 如果发送消息失败,则从映射表中移除 emitter
+            emitters.remove(sid);
+        }
+        return emitter;
+    }
+
+    @Override
+    public void send(Long userId, String eventName, String message) {
+        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
+        if (emitters != null) {
+            for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
+                SseEmitter sseEmitter = entry.getValue();
+                try {
+                    sseEmitter.send(SseEmitter.event().name(eventName).data(message, APPLICATION_JSON));
+                } catch (Exception e) {
+                    emitters.remove(entry.getKey());
+                    sseEmitter.completeWithError(e);
+                }
+            }
+        }
+    }
+}