Skip to content

最佳实践与生产环境配置


一、实战要点:开发中最容易踩的坑

Section titled “一、实战要点:开发中最容易踩的坑”
// ❌ 错误配置:acks=0,可能丢失消息
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "0"); // 不等待确认,可能丢消息
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// ✅ 正确配置:acks=all,保证消息不丢失
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 等待 ISR 全部确认
props.put("min.insync.replicas", "2"); // ISR 至少 2 个副本
props.put("retries", "2147483647"); // 重试次数
props.put("enable.idempotence", "true"); // 开启幂等,防止重复
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Kafka 只保证分区内有序,跨分区不保证有序。如果需要保证同一业务key的消息有序:

// ✅ 相同 key 的消息发送到同一分区
// 方式1:使用有 key 的消息
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders", // topic
orderId, // key:相同 orderId 进同一分区
orderJson // value
);
producer.send(record);
// 方式2:自定义分区器(业务特殊需求)
props.put("partitioner.class", "com.mycompany.CustomPartitioner");
// ❌ 错误:缓冲区太小,发送太快会阻塞
props.put("buffer.memory", "33554432"); // 32MB,默认值
// ✅ 调大缓冲区,或调整发送策略
props.put("buffer.memory", "134217728"); // 128MB
props.put("batch.size", "32768"); // 32KB
props.put("linger.ms", "20"); // 等待 20ms 凑够 batch
props.put("max.block.ms", "60000"); // 阻塞超时时间

// ❌ 自动提交导致重复消费
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
// 场景:poll 了 100 条消息,处理到第 50 条时崩溃
// 自动提交 offset=50,但只有 50 条处理成功
// 重启后从 offset=50 开始,前 50 条被重复消费
// ✅ 手动提交避免重复消费
props.put("enable.auto.commit", "false");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record); // 处理消息
} catch (Exception e) {
// 记录失败消息到死信队列
sendToDLQ(record);
}
}
// ✅ 处理完成后提交 offset
if (!records.isEmpty()) {
consumer.commitSync(); // 同步提交,失败会抛异常
}
}
// ❌ 多线程并发消费导致顺序混乱
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 错误:不同分区并发处理,同一分区内也可能有并发
records.records(topicPartition).forEach(record -> {
executor.submit(() -> processMessage(record)); // 并发处理
});
}
// ✅ 单线程顺序处理(推荐)
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 按分区顺序处理
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
for (ConsumerRecord<String, String> record : partitionRecords) {
processMessage(record); // 顺序处理
}
}
consumer.commitSync();
}
// ❌ 处理太慢触发 Rebalance,消息丢失
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 处理一条消息需要 10 分钟
for (ConsumerRecord<String, String> record : records) {
processMessage(record); // 处理太慢
}
// max.poll.interval.ms=300000(5分钟),超时触发 Rebalance
}
// ✅ 优化:减少每次 poll 的消息数,或增加 poll 间隔
props.put("max.poll.records", "50"); // 每次只处理 50 条
props.put("max.poll.interval.ms", "600000"); // 10 分钟超时
// 或者:异步处理 + 手动控制 offset 提交时机
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 快速 poll 到本地队列
List<ConsumerRecord<String, String>> batch = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
batch.add(record);
}
// 异步处理,不阻塞 poll
CompletableFuture.runAsync(() -> {
for (ConsumerRecord<String, String> record : batch) {
processMessage(record);
}
// 处理完成后提交 offset
consumer.commitSync();
});
}

二、最佳实践:生产环境怎么用

Section titled “二、最佳实践:生产环境怎么用”
场景推荐方案说明
日志收集Kafka高吞吐、持久化能力强
业务解耦Kafka / RocketMQ解耦、削峰
实时计算Kafka + Flink流处理生态
事务消息RocketMQ本地事务 + 消息原子性
低延迟交易自研 / 专业 MQ优化网络、存储
消息顺序要求高RocketMQ全局有序

⚠️ Kafka 不适合的场景

  • 需要低延迟(< 1ms)的实时通讯
  • 需要复杂路由(RabbitMQ 更擅长)
  • 需要本地事务 + 消息发送的原子性(用 RocketMQ 或本地消息表)

# 可靠性配置(生产环境推荐)
acks=all
min.insync.replicas=2
enable.idempotence=true
retries=2147483647
# 吞吐量配置
batch.size=32768 # 32KB,适当调大
linger.ms=10 # 等待时间,增加吞吐量
compression.type=lz4 # LZ4 压缩率高、CPU 开销小
buffer.memory=134217728 # 128MB 缓冲区
# 超时配置
request.timeout.ms=30000 # 请求超时
max.block.ms=60000 # 发送阻塞超时
# 可靠性配置
enable.auto.commit=false # 手动提交
auto.offset.reset=latest # 从最新消息开始消费
# 消费性能配置
fetch.min.bytes=1 # 最小拉取字节
fetch.max.wait.ms=500 # 等待时间
max.poll.records=500 # 每次 poll 条数
max.poll.interval.ms=300000 # poll 间隔(避免 Rebalance)
# Rebalance 优化
heartbeat.interval.ms=3000 # 心跳间隔
session.timeout.ms=45000 # session 超时
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
# 存储配置
log.retention.hours=168 # 保留 7 天
log.retention.check.interval.ms=300000 # 检查间隔
log.segment.bytes=1073741824 # 1GB 一个 Segment
log.index.size.max.bytes=10485760 # 10MB 索引
# 副本配置
default.replication.factor=3 # 默认 3 副本
min.insync.replicas=2 # 最小同步副本数
# 网络配置
num.network.threads=3 # 网络线程数
num.io.threads=8 # IO 线程数
socket.send.buffer.bytes=102400 # 发送缓冲区
socket.receive.buffer.bytes=102400 # 接收缓冲区

规模Broker 数副本数内存磁盘网络
小型3316GB1TB SSD1Gbps
中型6332GB2TB SSD10Gbps
大型12+364GB+4TB SSD+10Gbps+
分区数 = max(目标吞吐 / 单分区生产吞吐, 目标吞吐 / 单分区消费吞吐)
经验值:
- 单分区生产吞吐:10~100MB/s(取决于消息大小)
- 单分区消费吞吐:50MB/s
示例:
- 目标吞吐:100万条/秒,每条 1KB = 1GB/s
- 需要分区数 = max(1GB/50MB, 1GB/100MB) = max(20, 10) = 20 个分区
建议分区数 = Broker 数的 2~4 倍
例如:
- 3 台 Broker:6~12 个分区
- 6 台 Broker:12~24 个分区
注意:
- 分区只能增不能减,初始不要设太多
- 分区数影响文件句柄、Controller 元数据
- 消费者数 <= 分区数才有意义

指标JMX / Prometheus告警阈值说明
消息堆积(Lag)consumer-lag> 10000消费跟不上生产
UnderReplicatedPartitionsUnderReplicatedPartitions> 0ISR 缩容
ISR Shrinks/ExpandsIsrShrinksPerSec频繁收缩副本不同步
请求延迟 P99request.latency.p99> 1000ms请求超时
网络处理器空闲率NetworkProcessorAvgIdlePercent< 0.5CPU 瓶颈
日志 flush 延迟log.flush.time.ms.p99> 1000ms磁盘 IO 瓶颈
Terminal window
# 查看消费积压
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --describe
# 查看 Topic 列表
kafka-topics.sh --bootstrap-server localhost:9092 \
--list
# 创建 Topic
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic my-topic \
--partitions 6 --replication-factor 3
# 查看 Topic 详情
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic my-topic
# 重置消费位移(从最早开始)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --topic my-topic \
--reset-offsets --to-earliest --execute
# 手动平衡分区(不推荐,仅紧急情况)
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--election-type preferred \
--topic my-topic

Terminal window
# 1. 查看积压情况
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --describe
# 输出:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# orders 0 1000 5000 4000 ← 积压4000条
# 2. 排查方向
# - 消费者实例数是否足够?(< 分区数)
# - 消费者处理是否太慢?(查看日志)
# - 消费者是否频繁 Rebalance?(查看 Rebalance 日志)
# 3. 快速处理
# - 增加消费者实例
# - 临时跳过积压消息(谨慎使用)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --topic my-topic \
--reset-offsets --to-latest --execute
Terminal window
# 1. 检查 Broker 是否正常
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# 2. 查看 Producer 日志
# TimeoutException: Expiring 1 record(s) for xxx-topic-0: 120004 ms has passed
# 3. 可能原因
# - Broker 负载过高(检查 CPU、IO)
# - 网络延迟(检查网络)
# - 消息太大(检查 batch.size 配置)
# - 缓冲区不足(增加 buffer.memory)
Terminal window
# 1. 查看 Rebalance 日志
# Rebalancing, MemberId: xxx, Reason: xxx
# 2. 常见原因
# - 消费者处理太慢(超过 max.poll.interval.ms)
# - 心跳超时(网络抖动、GC 停顿)
# - 消费者频繁加入/离开
# 3. 解决方案
# - 使用 CooperativeStickyAssignor
# - 增加 session.timeout.ms
# - 优化业务处理速度
# - 使用静态成员(group.instance.id)

  1. 如果你发现 Kafka 的消息积压持续增加,但消费者数量已经等于分区数,应该从哪些方向排查?

  2. 在生产环境中,你会如何设计一个高可用的 Kafka 集群?需要考虑哪些因素?

  3. 如果需要保证「每条消息恰好处理一次」,除了 Kafka 本身的配置,还需要在业务层做哪些工作?