最佳实践与生产环境配置
一、实战要点:开发中最容易踩的坑
Section titled “一、实战要点:开发中最容易踩的坑”1.1 Producer 常见问题
Section titled “1.1 Producer 常见问题”问题1:消息丢失
Section titled “问题1:消息丢失”// ❌ 错误配置:acks=0,可能丢失消息Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "0"); // 不等待确认,可能丢消息KafkaProducer<String, String> producer = new KafkaProducer<>(props);// ✅ 正确配置:acks=all,保证消息不丢失Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "all"); // 等待 ISR 全部确认props.put("min.insync.replicas", "2"); // ISR 至少 2 个副本props.put("retries", "2147483647"); // 重试次数props.put("enable.idempotence", "true"); // 开启幂等,防止重复KafkaProducer<String, String> producer = new KafkaProducer<>(props);问题2:顺序保证
Section titled “问题2:顺序保证”Kafka 只保证分区内有序,跨分区不保证有序。如果需要保证同一业务key的消息有序:
// ✅ 相同 key 的消息发送到同一分区// 方式1:使用有 key 的消息ProducerRecord<String, String> record = new ProducerRecord<>( "orders", // topic orderId, // key:相同 orderId 进同一分区 orderJson // value);producer.send(record);
// 方式2:自定义分区器(业务特殊需求)props.put("partitioner.class", "com.mycompany.CustomPartitioner");问题3:消息积压(缓冲区满)
Section titled “问题3:消息积压(缓冲区满)”// ❌ 错误:缓冲区太小,发送太快会阻塞props.put("buffer.memory", "33554432"); // 32MB,默认值
// ✅ 调大缓冲区,或调整发送策略props.put("buffer.memory", "134217728"); // 128MBprops.put("batch.size", "32768"); // 32KBprops.put("linger.ms", "20"); // 等待 20ms 凑够 batchprops.put("max.block.ms", "60000"); // 阻塞超时时间1.2 Consumer 常见问题
Section titled “1.2 Consumer 常见问题”问题1:重复消费
Section titled “问题1:重复消费”// ❌ 自动提交导致重复消费props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "5000");
// 场景:poll 了 100 条消息,处理到第 50 条时崩溃// 自动提交 offset=50,但只有 50 条处理成功// 重启后从 offset=50 开始,前 50 条被重复消费// ✅ 手动提交避免重复消费props.put("enable.auto.commit", "false");
Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("orders"));
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) { try { processMessage(record); // 处理消息 } catch (Exception e) { // 记录失败消息到死信队列 sendToDLQ(record); } }
// ✅ 处理完成后提交 offset if (!records.isEmpty()) { consumer.commitSync(); // 同步提交,失败会抛异常 }}问题2:消息顺序混乱
Section titled “问题2:消息顺序混乱”// ❌ 多线程并发消费导致顺序混乱while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 错误:不同分区并发处理,同一分区内也可能有并发 records.records(topicPartition).forEach(record -> { executor.submit(() -> processMessage(record)); // 并发处理 });}// ✅ 单线程顺序处理(推荐)while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 按分区顺序处理 for (TopicPartition tp : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(tp); for (ConsumerRecord<String, String> record : partitionRecords) { processMessage(record); // 顺序处理 } } consumer.commitSync();}问题3:Rebalance 期间消息丢失
Section titled “问题3:Rebalance 期间消息丢失”// ❌ 处理太慢触发 Rebalance,消息丢失while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 处理一条消息需要 10 分钟 for (ConsumerRecord<String, String> record : records) { processMessage(record); // 处理太慢 } // max.poll.interval.ms=300000(5分钟),超时触发 Rebalance}// ✅ 优化:减少每次 poll 的消息数,或增加 poll 间隔props.put("max.poll.records", "50"); // 每次只处理 50 条props.put("max.poll.interval.ms", "600000"); // 10 分钟超时
// 或者:异步处理 + 手动控制 offset 提交时机while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 快速 poll 到本地队列 List<ConsumerRecord<String, String>> batch = new ArrayList<>(); for (ConsumerRecord<String, String> record : records) { batch.add(record); }
// 异步处理,不阻塞 poll CompletableFuture.runAsync(() -> { for (ConsumerRecord<String, String> record : batch) { processMessage(record); } // 处理完成后提交 offset consumer.commitSync(); });}二、最佳实践:生产环境怎么用
Section titled “二、最佳实践:生产环境怎么用”2.1 场景选型
Section titled “2.1 场景选型”| 场景 | 推荐方案 | 说明 |
|---|---|---|
| 日志收集 | Kafka | 高吞吐、持久化能力强 |
| 业务解耦 | Kafka / RocketMQ | 解耦、削峰 |
| 实时计算 | Kafka + Flink | 流处理生态 |
| 事务消息 | RocketMQ | 本地事务 + 消息原子性 |
| 低延迟交易 | 自研 / 专业 MQ | 优化网络、存储 |
| 消息顺序要求高 | RocketMQ | 全局有序 |
⚠️ Kafka 不适合的场景:
- 需要低延迟(< 1ms)的实时通讯
- 需要复杂路由(RabbitMQ 更擅长)
- 需要本地事务 + 消息发送的原子性(用 RocketMQ 或本地消息表)
2.2 参数调优
Section titled “2.2 参数调优”Producer 关键参数
Section titled “Producer 关键参数”# 可靠性配置(生产环境推荐)acks=allmin.insync.replicas=2enable.idempotence=trueretries=2147483647
# 吞吐量配置batch.size=32768 # 32KB,适当调大linger.ms=10 # 等待时间,增加吞吐量compression.type=lz4 # LZ4 压缩率高、CPU 开销小buffer.memory=134217728 # 128MB 缓冲区
# 超时配置request.timeout.ms=30000 # 请求超时max.block.ms=60000 # 发送阻塞超时Consumer 关键参数
Section titled “Consumer 关键参数”# 可靠性配置enable.auto.commit=false # 手动提交auto.offset.reset=latest # 从最新消息开始消费
# 消费性能配置fetch.min.bytes=1 # 最小拉取字节fetch.max.wait.ms=500 # 等待时间max.poll.records=500 # 每次 poll 条数max.poll.interval.ms=300000 # poll 间隔(避免 Rebalance)
# Rebalance 优化heartbeat.interval.ms=3000 # 心跳间隔session.timeout.ms=45000 # session 超时partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignorBroker 关键参数
Section titled “Broker 关键参数”# 存储配置log.retention.hours=168 # 保留 7 天log.retention.check.interval.ms=300000 # 检查间隔log.segment.bytes=1073741824 # 1GB 一个 Segmentlog.index.size.max.bytes=10485760 # 10MB 索引
# 副本配置default.replication.factor=3 # 默认 3 副本min.insync.replicas=2 # 最小同步副本数
# 网络配置num.network.threads=3 # 网络线程数num.io.threads=8 # IO 线程数socket.send.buffer.bytes=102400 # 发送缓冲区socket.receive.buffer.bytes=102400 # 接收缓冲区2.3 集群规划
Section titled “2.3 集群规划”机器配置建议
Section titled “机器配置建议”| 规模 | Broker 数 | 副本数 | 内存 | 磁盘 | 网络 |
|---|---|---|---|---|---|
| 小型 | 3 | 3 | 16GB | 1TB SSD | 1Gbps |
| 中型 | 6 | 3 | 32GB | 2TB SSD | 10Gbps |
| 大型 | 12+ | 3 | 64GB+ | 4TB SSD+ | 10Gbps+ |
分区规划公式
Section titled “分区规划公式”分区数 = max(目标吞吐 / 单分区生产吞吐, 目标吞吐 / 单分区消费吞吐)
经验值:- 单分区生产吞吐:10~100MB/s(取决于消息大小)- 单分区消费吞吐:50MB/s
示例:- 目标吞吐:100万条/秒,每条 1KB = 1GB/s- 需要分区数 = max(1GB/50MB, 1GB/100MB) = max(20, 10) = 20 个分区分区数规划建议
Section titled “分区数规划建议”建议分区数 = Broker 数的 2~4 倍
例如:- 3 台 Broker:6~12 个分区- 6 台 Broker:12~24 个分区
注意:- 分区只能增不能减,初始不要设太多- 分区数影响文件句柄、Controller 元数据- 消费者数 <= 分区数才有意义2.4 监控运维
Section titled “2.4 监控运维”关键监控指标
Section titled “关键监控指标”| 指标 | JMX / Prometheus | 告警阈值 | 说明 |
|---|---|---|---|
| 消息堆积(Lag) | consumer-lag | > 10000 | 消费跟不上生产 |
| UnderReplicatedPartitions | UnderReplicatedPartitions | > 0 | ISR 缩容 |
| ISR Shrinks/Expands | IsrShrinksPerSec | 频繁收缩 | 副本不同步 |
| 请求延迟 P99 | request.latency.p99 | > 1000ms | 请求超时 |
| 网络处理器空闲率 | NetworkProcessorAvgIdlePercent | < 0.5 | CPU 瓶颈 |
| 日志 flush 延迟 | log.flush.time.ms.p99 | > 1000ms | 磁盘 IO 瓶颈 |
常用运维命令
Section titled “常用运维命令”# 查看消费积压kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group my-group --describe
# 查看 Topic 列表kafka-topics.sh --bootstrap-server localhost:9092 \ --list
# 创建 Topickafka-topics.sh --bootstrap-server localhost:9092 \ --create --topic my-topic \ --partitions 6 --replication-factor 3
# 查看 Topic 详情kafka-topics.sh --bootstrap-server localhost:9092 \ --describe --topic my-topic
# 重置消费位移(从最早开始)kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group my-group --topic my-topic \ --reset-offsets --to-earliest --execute
# 手动平衡分区(不推荐,仅紧急情况)kafka-leader-election.sh --bootstrap-server localhost:9092 \ --election-type preferred \ --topic my-topic2.5 问题排查
Section titled “2.5 问题排查”问题1:消息积压
Section titled “问题1:消息积压”# 1. 查看积压情况kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group my-group --describe
# 输出:# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG# orders 0 1000 5000 4000 ← 积压4000条
# 2. 排查方向# - 消费者实例数是否足够?(< 分区数)# - 消费者处理是否太慢?(查看日志)# - 消费者是否频繁 Rebalance?(查看 Rebalance 日志)
# 3. 快速处理# - 增加消费者实例# - 临时跳过积压消息(谨慎使用)kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group my-group --topic my-topic \ --reset-offsets --to-latest --execute问题2:Producer 发送超时
Section titled “问题2:Producer 发送超时”# 1. 检查 Broker 是否正常kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# 2. 查看 Producer 日志# TimeoutException: Expiring 1 record(s) for xxx-topic-0: 120004 ms has passed
# 3. 可能原因# - Broker 负载过高(检查 CPU、IO)# - 网络延迟(检查网络)# - 消息太大(检查 batch.size 配置)# - 缓冲区不足(增加 buffer.memory)问题3:Rebalance 频繁
Section titled “问题3:Rebalance 频繁”# 1. 查看 Rebalance 日志# Rebalancing, MemberId: xxx, Reason: xxx
# 2. 常见原因# - 消费者处理太慢(超过 max.poll.interval.ms)# - 心跳超时(网络抖动、GC 停顿)# - 消费者频繁加入/离开
# 3. 解决方案# - 使用 CooperativeStickyAssignor# - 增加 session.timeout.ms# - 优化业务处理速度# - 使用静态成员(group.instance.id)-
如果你发现 Kafka 的消息积压持续增加,但消费者数量已经等于分区数,应该从哪些方向排查?
-
在生产环境中,你会如何设计一个高可用的 Kafka 集群?需要考虑哪些因素?
-
如果需要保证「每条消息恰好处理一次」,除了 Kafka 本身的配置,还需要在业务层做哪些工作?