使用JavaAPI管理Topic

2024-02-11 SpringBootKafka中间件

# 汇总

  • 创建Topic:adminClient.createTopics()
  • 获取Topic:adminClient.listTopics()
  • 删除Topic:adminClient.deleteTopics()
  • 查询详情:adminClient.describeTopics()

# 创建Topic

@Slf4j
@SpringBootTest
public class KafkaTest {

    /** 设置Admin客户端 */
    public static AdminClient initAdminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9092");
        return AdminClient.create(properties);
    }
    
    @Test
    public void createTopic() {
       AdminClient adminClient = initAdminClient();
        // 2个分区,1个副本
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 2 , (short) 1);
        CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
        //future等待创建,成功不会有任何报错,如果创建失败和超时会报错。
        try {
            createTopicsResult.all().get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("创建新的topic");
    }
}

# 创建多个Topic

场景说明:项目中使用枚举统一管理Topic,在项目创建的时候需要检查Topic是否创建,如果没有则创建。

@ToString
@AllArgsConstructor
public enum TopicEnums {

    GE_TUI_MESSAGE("getui_msg", "用于直接向个推平台推送消息"),
    GE_TUI_PUSH_DELAY("ge_tui_push_delay", "定时消息,用于向个推平台推送消息");

    /** MQ的Topic(主题)值 */
    @Getter
    private final String value;
    /** 简单的业务用处说明 */
    private final String desc;
}

实现思路:遍历枚举类,按个尝试创建,如果已经创建会抛出异常,异常不做处理;

@Test
public void createTopic() {
    AdminClient adminClient = initAdminClient();

    // 1.获取枚举类中所有Topic名称
    List<String> topicStrList = Arrays.stream(TopicEnums.values()).map(TopicEnums::getValue).collect(Collectors.toList());

    // 2.创建Topic
    for (String topicStr : topicStrList) {
        NewTopic newTopic = new NewTopic(topicStr, 2, (short) 1);
        try {
            CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
            createTopicsResult.all().get();
            log.warn( "Kafka Topic 创建成功,Topic Name:{}", topicStr);
        }catch (InterruptedException | ExecutionException e) {
            if (e.getCause() instanceof TopicExistsException) {
                log.info("Kafka Topic 已存在,Topic Name:{}", topicStr);
            } else {
                e.printStackTrace();
            }
        }
    }
    log.info("Kafka检查创建Topic任务完成");
}

# 操作和查看Topic

# 获取所有Topic

@Test
public void listTopic() throws ExecutionException, InterruptedException {
    AdminClient adminClient = initAdminClient();
    ListTopicsResult listTopics = adminClient.listTopics(new ListTopicsOptions());
    Set<String> topics = listTopics.names().get();
    topics.forEach(topic -> log.warn("Kafka topic:{}", topic));
}

# 删除Topic

@Test
public void delTopicTest() {
    AdminClient adminClient = initAdminClient();
    DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList("test"));
    try {
        deleteTopicsResult.all().get();
    } catch (ExecutionException | InterruptedException e) {
        if (e.getCause() instanceof UnknownTopicOrPartitionException) {
            log.warn("Kafka Topic 不存在,可能已被删除");
        }
        throw new RuntimeException(e);
    }
}

# 查看topic详情

@Test
public void getTopicInfo() throws Exception {
    // 初始化AdminClient
    AdminClient adminClient = initAdminClient();
    // 描述Topic
    DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList("getui_msg"));
    // 获取Topic描述信息
    Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
    // 遍历Topic描述信息
    Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
    entries.forEach((entry) -> log.warn("name:{} , desc: {}", entry.getKey(), entry.getValue()));
}

# 增加分区数量

如果当主题中的消息包含有key时(即key不为null),根据key来计算分区的行为就会有所影响消息顺序性。

@Test
public void incrPartitionsTest() throws Exception {
    Map<String, NewPartitions> infoMap = new HashMap<>();
    NewPartitions newPartitions = NewPartitions.increaseTo(5);
    AdminClient adminClient = initAdminClient();
    infoMap.put(TOPIC_NAME, newPartitions);
    CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap);
    createPartitionsResult.all().get();
}

注意:Kafka中的分区数只能增加不能减少,减少的话数据不知怎么处理