|
@@ -1,7 +1,11 @@
|
|
|
package com.dragon.tj.portal.component.message;
|
|
|
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.dragon.tj.portal.common.convert.MessageReqToItemConvert;
|
|
|
+import com.dragon.tj.portal.common.dto.message.MessageInfoItem;
|
|
|
+import com.dragon.tj.portal.common.dto.message.MessageInfoReq;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.http.MediaType;
|
|
@@ -11,6 +15,7 @@ import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
|
|
|
/**
|
|
|
* @author huey China.
|
|
@@ -19,11 +24,14 @@ import java.util.Map;
|
|
|
*/
|
|
|
@Component
|
|
|
@Slf4j
|
|
|
-public class Consumer {
|
|
|
+public class MessageConsumer {
|
|
|
|
|
|
@Autowired
|
|
|
private Map<String, OpenApiSseEmitter> sseEmitters;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private MessageReqToItemConvert messageReqToItemConvert;
|
|
|
+
|
|
|
/**
|
|
|
* get client published message from kafka,process it according to the serverId
|
|
|
*
|
|
@@ -33,34 +41,41 @@ public class Consumer {
|
|
|
// @KafkaListener(topics = {"sseTopic"}, clientIdPrefix = "${spring.application.name}", id = "openapi-be-sse-connection-", idIsGroup = false, groupId = "openapi-be-sse-connection-" + "#{T(java.util.UUID).randomUUID()}")
|
|
|
@KafkaListener(topics = {"sseTopic"})
|
|
|
public void sseConnectionProcess(String msg, Acknowledgment ack) {
|
|
|
-// log.info("get kafka msg from topic:{}, msg:{}", customProperty.getKafka().getTopics().getSseConnection().getTopic(), msg);
|
|
|
log.info("get kafka msg from topic:{}, msg:{}", KafkaInitialConfiguration.sseTopic, msg);
|
|
|
if (StrUtil.isEmpty(msg)) {
|
|
|
log.error("kafka msg is empty, no process");
|
|
|
return;
|
|
|
}
|
|
|
- SsePublishReqDto reqDto;
|
|
|
+ MessageInfoReq reqDTO;
|
|
|
try {
|
|
|
- reqDto = JSONObject.parseObject(msg, SsePublishReqDto.class);
|
|
|
+ reqDTO = JSONObject.parseObject(msg, MessageInfoReq.class);
|
|
|
} catch (Exception e) {
|
|
|
log.error("parsing string to obj failed, msg={}, e={}", msg, e);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- OpenApiSseEmitter emitter = sseEmitters.get(reqDto.getClientId());
|
|
|
- if (emitter == null) {
|
|
|
- log.error("can't find the sseEmitter obj from sseEmitters, no process");
|
|
|
- return;
|
|
|
- }
|
|
|
+ Set<String> messageClientIds = reqDTO.getMessageClientIds();
|
|
|
|
|
|
- try {
|
|
|
- // 发送消息给客户端
|
|
|
- log.info("send sse msg={} to clientId={}", reqDto.getMessage(), reqDto.getClientId());
|
|
|
- emitter.send(reqDto.getMessage(), MediaType.APPLICATION_JSON);
|
|
|
- ack.acknowledge();
|
|
|
- } catch (IOException e) {
|
|
|
- emitter.completeWithError(e);
|
|
|
+ for (String clientId : messageClientIds) {
|
|
|
+ OpenApiSseEmitter emitter = sseEmitters.get(clientId);
|
|
|
+
|
|
|
+ if (emitter == null) {
|
|
|
+ log.error("can't find the sseEmitter obj from sseEmitters, no process {}",clientId);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ MessageInfoItem messageInfoItem = messageReqToItemConvert.reqToItem(reqDTO);
|
|
|
+ messageInfoItem.setClientId(clientId);
|
|
|
+
|
|
|
+ String sendMsg = JSON.toJSONString(messageInfoItem);
|
|
|
+ // 发送消息给客户端
|
|
|
+ log.info("send sse msg={} to clientId={}", sendMsg, clientId);
|
|
|
+ emitter.send(sendMsg, MediaType.APPLICATION_JSON);
|
|
|
+ } catch (IOException e) {
|
|
|
+ emitter.completeWithError(e);
|
|
|
+ }
|
|
|
}
|
|
|
+ ack.acknowledge();
|
|
|
}
|
|
|
|
|
|
}
|