Kafka消息丢失问题
要解决kafka消息丢失问题,需要从三个部分来考虑:
-
生产者保证消息发送成功。
-
kafka本身要保证消息存储成功。
-
消费者消费成功。
一、生产者:确保消息成功发送到kafka
生产者消息丢失的场景主要包括:异步发送未处理失败,网络波动导致未送达,ACK机制配置不当等。
一般使用如下方式来处理以上的失败场景:
1.1 使用带回调的异步发送
-
问题:生产者默认
producer.send(msg)为异步发送,未处理回调时,网络波动或Broker故障可能导致消息未送达且无感知。 -
解决方案:使用
producer.send(msg, callback)接口,通过回调函数处理发送结果(成功/失败)。失败时可记录日志、重试或存储至补偿队列(如数据库),后续手动重发。
producer.send(new ProducerRecord<>("topic", "key", "value"), (metadata, exception) -> {
if (exception == null) {
// 发送成功,记录元数据(分区、偏移量)
} else {
// 发送失败,处理异常(如重试、记录日志)
log.error("消息发送失败:{}", exception.getMessage());
}
});
1.2 配置acks=all,确保所有副本写入成功
-
问题:
acks(确认机制)决定生产者何时认为消息发送成功。默认acks=1(仅Leader写入成功),若Leader故障且未同步至Follower,消息会丢失。 -
解决方案:设置
acks=all(或-1),要求所有ISR(In-Sync Replicas,同步副本)均写入消息后,Broker才返回确认。这是最强的持久化保证,确保只要有一个ISR存活,消息就不会丢失。 -
注意:
acks=all需配合min.insync.replicas(最小同步副本数)使用(见Broker端配置)。
1.3 启用重试机制,应对临时故障
-
问题:网络抖动,
Leader选举等临时故障可能导致消息发送失败,需要自动重试。 -
解决方案:
-
设置
retries=100,确保临时故障时自动恢复。 -
设置
retry.backoff.ms=2000(重试间隔,默认100ms),避免频繁重试加重网络负担。
-
-
注意:重试可能导致消息重复(如网络恢复后,
Broker已接收但生产者未收到确认),需消费者端做幂等处理(见消费者端配置)。 -
启用幂等生产者,避免发送重复的消息。
-
设置
enable.idempotence=true -
原理:消息发送会携带
生产者ID和消息的序列号,Broker会记录每个生产者的序列号,如果收到重复序列号的消息,就拒绝写入。
-
二、kafka本身确保存储消息不丢失
Broker是kafka的消息存储节点,丢失消息的场景主要有:Leader故障未同步,副本数不足等。
2.1 设置合理的副本数
-
问题:副本数过少,比如副本数为1时,如果Broker故障,消息则会丢失
-
解决方案:设置副本数大于等于3个,并分布在不同的Broker中,确保单个节点故障时消息不会丢失。
2.2 配置min.insync.replicas 确保最小同步副本数
-
问题:
acks=all时,如果最小同步副本数为1(默认值),则Leader同步成功则返回,如果此时消息还未同步到副本,Leader发生故障,消息会丢失。 -
解决方案:设置最小同步副本数大于等于2,确保消息至少同步给两个副本,能保证一个副本故障后,仍有副本能保证消息不丢失。
2.3 强制每条消息刷盘(不推荐,影响性能,金融行业使用)
三、消费者,确保消息消费成功
消费者消费消息时,也会执行一些操作,如果操作不当,也会有丢失消息的情况,如:自动提交Offset过早,处理消息后未提交Offset,Rebalance导致Offset丢失等。
3.1 关闭自动提交Offset(enable.auto.commit=false)
-
问题:默认
enable.auto.commit=true(每5秒自动提交Offset),若消费者处理消息后未提交Offset就宕机,下次重启会重复消费(因为Offset未更新);若提交Offset后宕机,消息未处理则丢失。 -
解决方案:设置
enable.auto.commit=false(手动提交Offset),确保消息处理完成后再提交Offset,避免消息丢失。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息(如写入数据库)
processRecord(record);
// 手动提交Offset(同步提交,确保提交成功)
consumer.commitSync();
}
}
3.2 处理Rebalance,避免Offset丢失
-
问题:消费者组(Consumer Group)发生Rebalance(如消费者加入/退出、分区分配变化)时,若未及时提交Offset,可能导致消息丢失(如消费者处理了消息但未提交Offset,Rebalance后新消费者重新拉取该消息)。
-
解决方案: 使用ConsumerRebalanceListener监听Rebalance事件,在Rebalance前提交Offset(确保Offset已更新)。
consumer.subscribe(Collections.singletonList("topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Rebalance前提交Offset
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Rebalance后,初始化分区分配(如从指定Offset开始消费)
}
});
3.3 实现消费幂等性,避免重复消费
-
问题:手动提交Offset时,若提交失败或网络波动,可能导致重复消费(如消费者处理了消息,但提交Offset时失败,下次重启会重新拉取该消息)。
-
解决方案:实现消费幂等性,确保同一消息多次消费不会产生副作用(如数据库插入时使用唯一键,避免重复插入)。
-
示例:
-
数据库场景:使用消息ID作为唯一键,插入时若冲突则忽略。
-
缓存场景:使用消息ID作为缓存键,若已存在则跳过处理。
-
四、总结:消息零丢失的核心配置
五、注意事项
性能和可靠性的平衡:确保可靠性的配置都会对性能有一些影响,如何配置,需要根据业务需求来判断,如:金融行业对可靠性要求极高,需要配置最高规格的可靠性。