要解决kafka消息丢失问题,需要从三个部分来考虑:

  1. 生产者保证消息发送成功。

  2. kafka本身要保证消息存储成功。

  3. 消费者消费成功。

一、生产者:确保消息成功发送到kafka

生产者消息丢失的场景主要包括:异步发送未处理失败,网络波动导致未送达,ACK机制配置不当等。

一般使用如下方式来处理以上的失败场景:

1.1 使用带回调的异步发送

  • 问题​​:生产者默认producer.send(msg)为异步发送,未处理回调时,网络波动或Broker故障可能导致消息未送达且无感知。

  • 解决方案​​:使用producer.send(msg, callback)接口,通过回调函数处理发送结果(成功/失败)。失败时可记录日志、重试或存储至补偿队列(如数据库),后续手动重发。

producer.send(new ProducerRecord<>("topic", "key", "value"), (metadata, exception) -> {
    if (exception == null) {
        // 发送成功,记录元数据(分区、偏移量)
    } else {
        // 发送失败,处理异常(如重试、记录日志)
        log.error("消息发送失败:{}", exception.getMessage());
    }
});

1.2 配置acks=all,确保所有副本写入成功

  • 问题​​:acks(确认机制)决定生产者何时认为消息发送成功。默认acks=1(仅Leader写入成功),若Leader故障且未同步至Follower,消息会丢失。

  • 解决方案​​:设置acks=all(或-1),要求​​所有ISR(In-Sync Replicas,同步副本)​​均写入消息后,Broker才返回确认。这是​​最强的持久化保证​​,确保只要有一个ISR存活,消息就不会丢失。

  • ​​注意​​:acks=all需配合min.insync.replicas(最小同步副本数)使用(见Broker端配置)。

1.3 启用重试机制,应对临时故障

  • 问题:网络抖动,Leader选举等临时故障可能导致消息发送失败,需要自动重试。

  • 解决方案:

    • 设置retries=100,确保临时故障时自动恢复。

    • 设置retry.backoff.ms=2000(重试间隔,默认100ms),避免频繁重试加重网络负担。

  • ​注意​​:重试可能导致消息重复(如网络恢复后,Broker已接收但生产者未收到确认),需消费者端做​​幂等处理​​(见消费者端配置)。

  • 启用幂等生产者,避免发送重复的消息。

    • 设置enable.idempotence=true

    • 原理:消息发送会携带生产者ID消息的序列号,Broker会记录每个生产者的序列号,如果收到重复序列号的消息,就拒绝写入。

二、kafka本身确保存储消息不丢失

Broker是kafka的消息存储节点,丢失消息的场景主要有:Leader故障未同步,副本数不足等。

2.1 设置合理的副本数

  • 问题:副本数过少,比如副本数为1时,如果Broker故障,消息则会丢失

  • 解决方案:设置副本数大于等于3个,并分布在不同的Broker中,确保单个节点故障时消息不会丢失。

2.2 配置min.insync.replicas 确保最小同步副本数

  • 问题:acks=all时,如果最小同步副本数为1(默认值),则Leader同步成功则返回,如果此时消息还未同步到副本,Leader发生故障,消息会丢失。

  • 解决方案:设置最小同步副本数大于等于2,确保消息至少同步给两个副本,能保证一个副本故障后,仍有副本能保证消息不丢失。

2.3 强制每条消息刷盘(不推荐,影响性能,金融行业使用)

三、消费者,确保消息消费成功

消费者消费消息时,也会执行一些操作,如果操作不当,也会有丢失消息的情况,如:自动提交Offset过早,处理消息后未提交Offset,Rebalance导致Offset丢失等。

3.1 关闭自动提交Offset(enable.auto.commit=false

  • 问题:默认enable.auto.commit=true(每5秒自动提交Offset),若消费者处理消息后未提交Offset就宕机,下次重启会重复消费(因为Offset未更新);若提交Offset后宕机,消息未处理则丢失。

  • 解决方案​​:设置enable.auto.commit=false(手动提交Offset),确保​​消息处理完成后​​再提交Offset,避免消息丢失。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息(如写入数据库)
        processRecord(record);
        // 手动提交Offset(同步提交,确保提交成功)
        consumer.commitSync();
    }
}

3.2 处理Rebalance,避免Offset丢失

  • 问题​​:消费者组(Consumer Group)发生Rebalance(如消费者加入/退出、分区分配变化)时,若未及时提交Offset,可能导致消息丢失(如消费者处理了消息但未提交Offset,Rebalance后新消费者重新拉取该消息)。

  • 解决方案​​: 使用ConsumerRebalanceListener监听Rebalance事件,在Rebalance前提交Offset(确保Offset已更新)。

consumer.subscribe(Collections.singletonList("topic"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Rebalance前提交Offset
        consumer.commitSync();
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Rebalance后,初始化分区分配(如从指定Offset开始消费)
    }
});

3.3 实现消费幂等性,避免重复消费​

  • 问题​​:手动提交Offset时,若提交失败或网络波动,可能导致重复消费(如消费者处理了消息,但提交Offset时失败,下次重启会重新拉取该消息)。

  • 解决方案​​:实现​​消费幂等性​​,确保同一消息多次消费不会产生副作用(如数据库插入时使用唯一键,避免重复插入)。

  • ​​示例​​:

    • 数据库场景:使用消息ID作为唯一键,插入时若冲突则忽略。

    • 缓存场景:使用消息ID作为缓存键,若已存在则跳过处理。

四、总结:消息零丢失的核心配置​

环节

核心配置

生产者

acks=all、enable.idempotence=true、retries=Integer.MAX_VALUE

kafka

replication.factor>=3、min.insync.replicas>=2、unclean.leader.election.enable=false

消费者

enable.auto.commit=false、手动提交Offset、消费幂等性

五、注意事项

性能和可靠性的平衡:确保可靠性的配置都会对性能有一些影响,如何配置,需要根据业务需求来判断,如:金融行业对可靠性要求极高,需要配置最高规格的可靠性。