Browse Source

feat(消息): sse

huey 2 years ago
parent
commit
1a9793ec4f

+ 6 - 0
pom.xml

@@ -95,6 +95,12 @@
             <version>5.5.8</version>
         </dependency>
 
+        <!--guava-->
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>11.0.2</version>
+        </dependency>
     </dependencies>
 
     <build>

+ 1 - 1
src/main/java/com/dragon/tj/portal/controller/TestController.java → src/main/java/com/dragon/tj/portal/controller/Test2Controller.java

@@ -22,7 +22,7 @@ import org.springframework.web.bind.annotation.RestController;
  */
 @RestController
 @RequestMapping("/test")
-public class TestController {
+public class Test2Controller {
 
     @Autowired
     private TestService testService;

+ 59 - 0
src/main/java/com/dragon/tj/portal/message/SSEWebServer.java

@@ -0,0 +1,59 @@
+package com.dragon.tj.portal.message;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.util.Objects;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@CrossOrigin
+@RestController
+@RequestMapping("/sse")
+public class SSEWebServer {
+
+    private static Cache<String, LinkedBlockingDeque<SseEvent>> sseCache = CacheBuilder.newBuilder()
+            .maximumSize(1000)
+            .expireAfterWrite(60, TimeUnit.MINUTES)
+            .build();
+
+    /* *
+     * sse 连接服务
+     */
+    @GetMapping("/sseEvent/{userId}")
+    public SseEmitter push(@PathVariable("userId") String userId) {
+        return SseEmitterServer.connect(userId);
+    }
+
+    //IE 浏览器不支持SSE 采用轮训
+    @GetMapping("/sseEventIE/{userId}")
+    public ResponseEntity pushIe(@PathVariable("userId") String userId) {
+        if (StringUtils.isEmpty(userId)) {
+            return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(" userId is Empty ! ");
+        }
+        log.info("IE 连接,userId = {} ", userId);
+        try {
+            SseEvent poll = Objects.requireNonNull(sseCache.getIfPresent(userId)).poll();
+            return poll == null ? ResponseEntity.status(HttpStatus.BAD_REQUEST).body("连接失败!") : ResponseEntity.ok().body(poll.getMsg());
+        } catch (Exception e) {
+            return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(e.getMessage());
+        }
+    }
+
+    static boolean publicMsg(SseEvent event) {
+        LinkedBlockingDeque<SseEvent> ifPresent = sseCache.getIfPresent(event.getUserId());
+        if (ifPresent == null) {
+            sseCache.put(event.getUserId(), new LinkedBlockingDeque<SseEvent>());
+        }
+        log.info("添加到队列,userId:{} ", event.getUserId());
+        return Objects.requireNonNull(sseCache.getIfPresent(event.getUserId())).offer(event);
+    }
+}
+

+ 58 - 0
src/main/java/com/dragon/tj/portal/message/SseEmitterServer.java

@@ -0,0 +1,58 @@
+package com.dragon.tj.portal.message;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class SseEmitterServer {
+
+    private static AtomicInteger count = new AtomicInteger(0);
+
+    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
+
+    public static SseEmitter connect(String userId) {
+        SseEmitter sseEmitter = new SseEmitter(0L);
+        sseEmitter.onCompletion(() -> {
+            log.info("结束连接:{}", userId);
+            removeUser(userId);
+        });
+        sseEmitter.onError(throwable -> {
+            log.info("连接异常:{}", userId);
+            removeUser(userId);
+        });
+        sseEmitter.onTimeout(() -> {
+            log.info("连接超时:{}", userId);
+            removeUser(userId);
+        });
+        sseEmitterMap.put(userId, sseEmitter);
+        count.getAndIncrement();
+        log.info("创建新的sse连接,当前用户:{}", userId);
+        return sseEmitter;
+    }
+
+
+    public static void sendMessage(String userId, Object message) {
+        if (sseEmitterMap.containsKey(userId)) {
+            try {
+                sseEmitterMap.get(userId).send(message);
+                log.info("SSE 发送信息成功!id = {} , message: {} ", userId, message);
+            } catch (IOException e) {
+                log.error("[{}]推送异常:{}", userId, e.getMessage());
+                removeUser(userId);
+            }
+        } else {
+            log.warn("SSE 发送信息异常,用户不存在:id = {} ", userId);
+        }
+    }
+
+    private static void removeUser(String userId) {
+        sseEmitterMap.remove(userId);
+        count.getAndDecrement();
+    }
+}
+

+ 19 - 0
src/main/java/com/dragon/tj/portal/message/SseEvent.java

@@ -0,0 +1,19 @@
+package com.dragon.tj.portal.message;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.context.ApplicationEvent;
+
+@Getter
+@Setter
+@ToString
+public class SseEvent<T> extends ApplicationEvent {
+    private int code;
+    private String userId;
+    private T msg;
+
+    public SseEvent(Object source) {
+        super(source);
+    }
+}

+ 15 - 0
src/main/java/com/dragon/tj/portal/message/SseListener.java

@@ -0,0 +1,15 @@
+package com.dragon.tj.portal.message;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class SseListener implements ApplicationListener<SseEvent> {
+    @Override
+    public void onApplicationEvent(SseEvent event) {
+        SseEmitterServer.sendMessage(event.getUserId(), event.getMsg());
+        SSEWebServer.publicMsg(event);
+    }
+}

+ 27 - 0
src/main/java/com/dragon/tj/portal/message/TestController.java

@@ -0,0 +1,27 @@
+package com.dragon.tj.portal.message;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+public class TestController {
+
+    @Autowired
+    private ApplicationEventPublisher applicationEventPublisher;
+
+
+    @GetMapping("/test/{userId}/{message}")
+    public ResponseEntity test(@PathVariable("userId") String userId, @PathVariable("message") String message) {
+        SseEvent<String> sseEvent = new SseEvent<>(this);
+        sseEvent.setCode(200);
+        sseEvent.setMsg(message);
+        sseEvent.setUserId(userId);
+        applicationEventPublisher.publishEvent(sseEvent);
+        return ResponseEntity.ok().build();
+    }
+
+}

+ 5 - 1
src/main/resources/application-local.properties

@@ -3,4 +3,8 @@ spring.datasource.url=jdbc:mysql:///portal?useSSL=true&useUnicode=true&character
 spring.datasource.username=root
 spring.datasource.password=123456
 mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
-#log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
+#log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
+logging.level.com.dragon=info
+logging.level.org.springframework=warn
+logging.level.org.apache.http=warn
+logging.config=classpath:logback-dev.xml

+ 78 - 0
src/main/resources/logback-dev.xml

@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration debug="false">
+    <!-- 应用名称 -->
+    <property name="APP_NAME" value="partal-service"/>
+    <!--日志文件的保存路径,首先查找系统属性-Dlog.dir,如果存在就使用其;否则,在当前目录下创建名为logs目录做日志存放的目录 -->
+    <property name="LOG_HOME" value="${log.dir:-log}/${APP_NAME}"/>
+    <!-- 日志输出格式 -->
+    <property name="ENCODER_PATTERN"
+              value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{80} - %msg%n"/>
+    <contextName>${APP_NAME}</contextName>
+
+
+    <!-- 控制台日志:输出全部日志到控制台 -->
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <Pattern>${ENCODER_PATTERN}</Pattern>
+        </encoder>
+    </appender>
+
+    <!-- 文件日志:输出全部日志到文件 -->
+    <appender name="FILE"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>${LOG_HOME}/output.%d{yyyy-MM-dd}.log.gz</fileNamePattern>
+            <maxHistory>15</maxHistory>
+        </rollingPolicy>
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <pattern>${ENCODER_PATTERN}</pattern>
+        </encoder>
+    </appender>
+
+    <!-- 错误日志:用于将错误日志输出到独立文件 -->
+    <appender name="ERROR_FILE"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>${LOG_HOME}/error.%d{yyyy-MM-dd}.log.gz</fileNamePattern>
+            <maxHistory>30</maxHistory>
+        </rollingPolicy>
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <pattern>${ENCODER_PATTERN}</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>WARN</level>
+        </filter>
+    </appender>
+
+    <!-- 4. 最终的策略 -->
+    <!-- 4.1 开发环境:打印控制台 -->
+    <springProfile name="dev">
+        <root>
+            <level value="info"/>
+            <appender-ref ref="STDOUT"/>
+            <appender-ref ref="FILE"/>
+            <appender-ref ref="ERROR_FILE"/>
+        </root>
+    </springProfile>
+
+    <springProfile name="local">
+        <root>
+            <level value="info"/>
+            <appender-ref ref="STDOUT"/>
+            <appender-ref ref="FILE"/>
+            <appender-ref ref="ERROR_FILE"/>
+        </root>
+    </springProfile>
+
+
+    <!-- 4.2 生产环境:输出到文档 -->
+    <springProfile name="prod">
+        <root>
+            <level value="INFO"/>
+            <appender-ref ref="STDOUT"/>
+            <appender-ref ref="FILE"/>
+            <appender-ref ref="ERROR_FILE"/>
+        </root>
+    </springProfile>
+
+</configuration>

+ 0 - 72
src/main/resources/logback-spring.xml

@@ -1,72 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    小技巧: 在根pom里面设置统一存放路径,统一管理方便维护
-    <properties>
-        <log-path>/Users/lengleng</log-path>
-    </properties>
-    1. 其他模块加日志输出,直接copy本文件放在resources 目录即可
-    2. 注意修改 <property name="${log-path}/log.path" value=""/> 的value模块
--->
-<configuration debug="false" scan="false">
-	<property name="log.path" value="logs/${project.artifactId}"/>
-	<!-- 彩色日志格式 -->
-	<property name="CONSOLE_LOG_PATTERN"
-			  value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} [%tid] %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
-	<!-- 彩色日志依赖的渲染类 -->
-	<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/>
-	<conversionRule conversionWord="wex"
-					converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/>
-	<conversionRule conversionWord="wEx"
-					converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
-	<!-- Console log output -->
-	<appender name="console" class="com.sunacwy.frame.common.log.appender.LogbackConsoleAppender">
-		<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
-			<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
-				<pattern>${CONSOLE_LOG_PATTERN}</pattern>
-			</layout>
-		</encoder>
-	</appender>
-
-	<!-- Log file debug output -->
-	<appender name="debug" class="ch.qos.logback.core.rolling.RollingFileAppender">
-		<file>${log.path}/debug.log</file>
-		<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
-			<fileNamePattern>${log.path}/%d{yyyy-MM, aux}/debug.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
-			<maxFileSize>50MB</maxFileSize>
-			<maxHistory>30</maxHistory>
-		</rollingPolicy>
-		<encoder>
-			<pattern>%date [%thread] %-5level [%logger{50}] %file:%line - %msg%n</pattern>
-		</encoder>
-	</appender>
-
-	<!-- Log file error output -->
-	<appender name="error" class="ch.qos.logback.core.rolling.RollingFileAppender">
-		<file>${log.path}/error.log</file>
-		<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
-			<fileNamePattern>${log.path}/%d{yyyy-MM}/error.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
-			<maxFileSize>50MB</maxFileSize>
-			<maxHistory>30</maxHistory>
-		</rollingPolicy>
-		<encoder>
-			<pattern>%date [%thread] %-5level [%logger{50}] %file:%line - %msg%n</pattern>
-		</encoder>
-		<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
-			<level>ERROR</level>
-		</filter>
-	</appender>
-
-	<logger name="org.activiti.engine.impl.db" level="DEBUG">
-		<appender-ref ref="debug"/>
-	</logger>
-
-	<!--nacos 心跳 INFO 屏蔽-->
-	<logger name="com.alibaba.nacos" level="OFF">
-		<appender-ref ref="error"/>
-	</logger>
-	<!-- Level: FATAL 0  ERROR 3  WARN 4  INFO 6  DEBUG 7 -->
-	<root level="INFO">
-		<appender-ref ref="console"/>
-		<appender-ref ref="debug"/>
-	</root>
-</configuration>