Consumer消费者实战

2024-02-11 SpringBootKafka中间件

消费者从Broker获取数据通常采用的是"pull"(拉取)模式。在这种模式下,消费者通过主动向Broker发送请求来获取数据。

# 为什么使用pull模式?

首先,pull模式能够根据消费者的消费能力进行调整,比如有三台机器,A机器配置高处理能力强,另外两台弱一些,在消费完本次数据后就主动向Broker“要”数据,实现机器消费能力自适应;

如果使用broker主动push的方式,当消息量非常大时,消费者可能会无法及时处理这些推送过来的消息。这样会导致消息堆积,影响系统的稳定性和性能。而使用pull模式,消费者可以根据自身的处理能力合理地控制数据的获取速度,避免消息的积压。

使用push模式等于是Broker向消费者发送消息,Broker没法知道消费者对之前消息的处理情况,也就无法实现自适应功能。

此外,如果broker没有数据可供消费者获取时,消费者可以通过配置超时时间来进行阻塞等待。当有新的消息到达时,消费者会立即获取到这些消息并进行处理。

Kafka消费者两种模式

# SpringBoot关闭Kafka日志

logback.xml

<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!-- 格式化输出: %d表示日期, %thread表示线程名, %-5level: 级别从左显示5个字符宽度 %msg:日志消息, %n是换行符 -->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern>
        </encoder>
    </appender>
    <root level="info">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

application.yml

logging:
  config: classpath:logback.xml

# 消费者配置

#消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
group.id

#为true则自动提交偏移量
enable.auto.commit

#自动提交offset周期
auto.commit.interval.ms

#重置消费偏移量策略,消费者在读取一个没有偏移量的分区或者偏移量无效情况下(因消费者长时间失效、包含偏移量的记录已经过时并被删除)该如何处理,
#默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更 才可以
auto.offset.reset

#序列化器
key.deserializer

JavaApi配置如下

private Properties getKafkaConfigProperties() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, xk857ProducerMqService.getEndpoint());
    // 消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
    props.put("group.id", "xk857-g1");

    // 开启自动提交offset
    props.put("enable.auto.commit", "true");

    // 自动提交offset延迟时间
    props.put("auto.commit.interval.ms", "1000");

    // 反序列化
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    return props;
}

# 消费者消费消息

@Test
public void simpleConsumerTest(){
    Properties props = getKafkaConfigProperties();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

    // 订阅topic主题
    consumer.subscribe(Arrays.asList(TopicEnums.GE_TUI_MESSAGE.getValue()));

    while (true) {
        //拉取时间控制,阻塞超时时间
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<String, String> record : records) {
            System.err.printf("topic = %s, offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());
        }
    }
}