Kafka 消息可靠性:如何保证消息不丢失?¶
消息丢失可能发生在三个环节,需要分别保障:
flowchart LR
subgraph 生产者端
A["acks 配置\n重试机制\n幂等生产者"]
end
subgraph Broker 端
B["副本同步\nISR 机制\n持久化刷盘"]
end
subgraph 消费者端
C["手动提交 offset\n幂等消费\n异常处理"]
end
A -->|消息写入| B -->|消息拉取| C
1. 生产者端:acks 参数¶
| acks 值 | 含义 | 可靠性 | 性能 | 适用场景 |
|---|---|---|---|---|
0 |
不等待任何确认 | 最低(可能丢消息) | 最高 | 日志收集(允许少量丢失) |
1 |
等待 Leader 写入确认 | 中等(Leader 宕机可能丢) | 中等 | 一般业务 |
-1/all |
等待所有 ISR 副本确认 | 最高(不丢消息) | 最低 | 金融、订单等核心业务 |
ISR(In-Sync Replicas):与 Leader 保持同步的副本集合。落后太多的副本会被踢出 ISR。 为什么 acks=all 不是默认值:acks=all 需要等待所有 ISR 副本确认,延迟更高。大多数业务场景可以接受极小概率的消息丢失(Leader 宕机的概率很低),用 acks=1 换取更低延迟。
2. Broker 端:副本同步¶
sequenceDiagram
participant P as 生产者
participant L as Leader 副本
participant F1 as Follower 1(ISR)
participant F2 as Follower 2(ISR)
P->>L: 发送消息(acks=all)
L->>F1: 同步消息
L->>F2: 同步消息
F1-->>L: 确认写入
F2-->>L: 确认写入
L-->>P: 返回成功(所有 ISR 已确认)
3. 消费者端:手动提交 Offset¶
// ❌ 错误做法:自动提交(可能在消息处理失败后提交,导致消息丢失)
// 原因:自动提交是定时提交,不管处理是否成功
props.put("enable.auto.commit", "true");
// ✅ 正确做法:手动提交,处理成功后再提交
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
process(record); // 业务处理
} catch (Exception e) {
// 处理失败,不提交 offset,下次重新消费
log.error("处理失败", e);
continue;
}
}
// 所有消息处理成功后,手动提交
consumer.commitSync();
}
4. 幂等性:防止重复消息¶
flowchart TD
A[生产者发送消息] --> B{网络超时?}
B -->|是| C[重试发送]
C --> D{Broker 已收到?}
D -->|是| E["❌ 未开启幂等:消息重复"]
D -->|是| F["✅ 开启幂等:Broker 去重,只保留一条"]
B -->|否| G[正常写入]
// 开启幂等生产者(Producer ID + Sequence Number 去重)
props.put("enable.idempotence", "true");
// 开启幂等后,acks 自动设为 all,retries 自动设为 MAX_INT
// 为什么这样设计:幂等性依赖 acks=all 保证消息写入,依赖重试保证消息不丢
5. 三端保障总结¶
| 环节 | 关键配置/做法 | 作用 |
|---|---|---|
| 生产者 | acks=all + 重试 + 幂等 |
确保消息成功写入所有 ISR 副本 |
| Broker | 多副本 + ISR 同步 | 即使 Leader 宕机,Follower 可接管 |
| 消费者 | 手动提交 offset + 幂等处理 | 确保消息处理成功后才标记已消费 |