跳转至

Kafka 消息可靠性:如何保证消息不丢失?


消息丢失可能发生在三个环节,需要分别保障:

flowchart LR
    subgraph 生产者端
        A["acks 配置\n重试机制\n幂等生产者"]
    end
    subgraph Broker 端
        B["副本同步\nISR 机制\n持久化刷盘"]
    end
    subgraph 消费者端
        C["手动提交 offset\n幂等消费\n异常处理"]
    end
    A -->|消息写入| B -->|消息拉取| C

1. 生产者端:acks 参数

acks 值 含义 可靠性 性能 适用场景
0 不等待任何确认 最低(可能丢消息) 最高 日志收集(允许少量丢失)
1 等待 Leader 写入确认 中等(Leader 宕机可能丢) 中等 一般业务
-1/all 等待所有 ISR 副本确认 最高(不丢消息) 最低 金融、订单等核心业务

ISR(In-Sync Replicas):与 Leader 保持同步的副本集合。落后太多的副本会被踢出 ISR。 为什么 acks=all 不是默认值:acks=all 需要等待所有 ISR 副本确认,延迟更高。大多数业务场景可以接受极小概率的消息丢失(Leader 宕机的概率很低),用 acks=1 换取更低延迟。


2. Broker 端:副本同步

sequenceDiagram
    participant P as 生产者
    participant L as Leader 副本
    participant F1 as Follower 1(ISR)
    participant F2 as Follower 2(ISR)

    P->>L: 发送消息(acks=all)
    L->>F1: 同步消息
    L->>F2: 同步消息
    F1-->>L: 确认写入
    F2-->>L: 确认写入
    L-->>P: 返回成功(所有 ISR 已确认)

3. 消费者端:手动提交 Offset

// ❌ 错误做法:自动提交(可能在消息处理失败后提交,导致消息丢失)
// 原因:自动提交是定时提交,不管处理是否成功
props.put("enable.auto.commit", "true");

// ✅ 正确做法:手动提交,处理成功后再提交
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        try {
            process(record);  // 业务处理
        } catch (Exception e) {
            // 处理失败,不提交 offset,下次重新消费
            log.error("处理失败", e);
            continue;
        }
    }
    // 所有消息处理成功后,手动提交
    consumer.commitSync();
}

4. 幂等性:防止重复消息

flowchart TD
    A[生产者发送消息] --> B{网络超时?}
    B -->|是| C[重试发送]
    C --> D{Broker 已收到?}
    D -->|是| E["❌ 未开启幂等:消息重复"]
    D -->|是| F["✅ 开启幂等:Broker 去重,只保留一条"]
    B -->|否| G[正常写入]
// 开启幂等生产者(Producer ID + Sequence Number 去重)
props.put("enable.idempotence", "true");
// 开启幂等后,acks 自动设为 all,retries 自动设为 MAX_INT
// 为什么这样设计:幂等性依赖 acks=all 保证消息写入,依赖重试保证消息不丢

5. 三端保障总结

环节 关键配置/做法 作用
生产者 acks=all + 重试 + 幂等 确保消息成功写入所有 ISR 副本
Broker 多副本 + ISR 同步 即使 Leader 宕机,Follower 可接管
消费者 手动提交 offset + 幂等处理 确保消息处理成功后才标记已消费