Ver código fonte

feat(message): kafka创建及测试

huey 1 ano atrás
pai
commit
366a0ba895

+ 63 - 0
src/main/java/com/dragon/tj/portal/component/page/KafkaUtil.java

@@ -0,0 +1,63 @@
+package com.dragon.tj.portal.component.page;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * @author huey China.
+ * @Description : kafka 工具
+ * @Date Created in 2023/7/21 14:43
+ */
+@Slf4j
+@Getter
+@Setter
+@Component
+public class KafkaUtil {
+
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String server;
+
+    public void create(String topicName) throws ExecutionException, InterruptedException {
+        // 设置 Kafka 服务器的地址和端口号
+
+        int replicationFactor = 3;
+        int numPartitions = 5;
+
+        // 创建 AdminClient 配置
+        Properties props = new Properties();
+        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server);
+
+        // 创建 AdminClient 实例
+        AdminClient adminClient = null;
+        try {
+            adminClient = AdminClient.create(props);
+            // 创建 NewTopic 对象
+            NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) replicationFactor);
+            // 使用 AdminClient 创建 topic
+            adminClient.createTopics(Collections.singletonList(newTopic));
+            log.info("Topic创建成功:" + topicName);
+        } catch (Exception e) {
+            log.error("Topic创建失败:" + e.getMessage());
+        }
+
+        ListTopicsResult topicsResult = adminClient.listTopics();
+        Set<String> topicNames = topicsResult.names().get();
+
+        // 打印所有 topic 的名称
+        System.out.println("已创建的 Topic 列表:" +topicNames);
+    }
+
+
+}