Skip to content

生产者与消费者深度解析

面试官:说说 Kafka 的 Producer 和 Consumer 是怎么工作的?

:Producer 采用批量发送机制,消息先积累到缓冲区,达到 batch.size 或 linger.ms 后打包发送,配合压缩提升吞吐。Consumer 通过 Consumer Group 机制并行消费,一个分区同一 Group 内只能被一个消费者消费,Rebalance 动态调整分区分配。

面试官:那 Rebalance 是怎么触发的?为什么会造成短暂不可用?

这个追问是面试的高频考点。能讲清楚 Rebalance 的触发条件、Stop-The-World 影响、Cooperative Rebalance 优化,才能体现对 Kafka 的深入理解。


Q1:Kafka Producer 的消息发送流程是什么?必考

Section titled “Q1:Kafka Producer 的消息发送流程是什么?”

回答要点

消息发送流程

应用层
│ send(ProducerRecord)
序列化器(Serializer)
│ key/value 序列化为字节数组
分区器(Partitioner)
│ 决定消息发往哪个分区
│ 默认规则:
│ 有 key → hash(key) % 分区数(相同 key 进同一分区)
│ 无 key → 粘性分区(Sticky Partitioner,同一 Batch 发同一分区)
消息缓冲区(RecordAccumulator)
│ 按 TopicPartition 分组,积累成 Batch
Sender 线程
│ 达到 batch.size 或 linger.ms 后,打包发往 Broker
Broker(对应分区的 Leader)

关键组件对比

组件作用配置参数
序列化器将对象转为字节数组key.serializer, value.serializer
分区器决定消息发往哪个分区partitioner.class
缓冲区积累消息,批量发送batch.size, linger.ms, buffer.memory
Sender 线程异步发送 Batch 到 Brokermax.in.flight.requests.per.connection

代码示例

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("batch.size", 16384); // 16KB
props.put("linger.ms", 5); // 等待 5ms
props.put("compression.type", "lz4"); // LZ4 压缩
props.put("acks", "all"); // 等待 ISR 全部确认
props.put("enable.idempotence", "true"); // 幂等性
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息(异步)
producer.send(new ProducerRecord<>("orders", "order-1", "data-1"));
// 关闭 Producer(刷新缓冲区)
producer.close();

本质一句话:Producer 通过序列化、分区、缓冲、批量发送四步,将消息高效写入 Kafka。


Q2:Producer 的批量发送有什么好处?batch.size 和 linger.ms 如何配置?高频

Section titled “Q2:Producer 的批量发送有什么好处?batch.size 和 linger.ms 如何配置?”

回答要点

批量发送的好处

单条发送:
每条消息一次网络请求 → 网络开销大,吞吐低
批量发送:
多条消息打包一次请求 → 网络开销小,吞吐高

性能对比

发送方式吞吐量(消息/秒)平均延迟网络请求数
单条发送10 万~5ms10 万次/秒
Batch(16KB)50 万~10ms3 万次/秒
Batch(16KB)+ LZ4100 万~15ms1 万次/秒

batch.size 和 linger.ms 的配置

batch.size=16384 # 单个 Batch 最大字节数(默认16KB)
linger.ms=0 # 等待时间(默认0,即 Batch 满才发)
# 配置策略
低延迟场景:batch.size=16384, linger.ms=0 # Batch 满即发,延迟低
高吞吐场景:batch.size=32768, linger.ms=5 # 等待 5ms 积累更多消息,吞吐高

对比表格

配置组合吞吐量延迟适用场景
batch.size 小, linger.ms=0实时消息
batch.size 大, linger.ms=0平衡场景
batch.size 大, linger.ms>0批处理

Q3:Producer 的分区策略是什么?如何保证相同 key 的消息进同一分区?高频

Section titled “Q3:Producer 的分区策略是什么?如何保证相同 key 的消息进同一分区?”

回答要点

分区策略

1. 有 key 时

// 默认分区器:DefaultPartitioner
int partition = Math.abs(key.hashCode()) % numPartitions;
// 效果:相同 key 的消息进同一分区,保证分区内有序
producer.send(new ProducerRecord("orders", "user-1", message)); // 进同一分区
producer.send(new ProducerRecord("orders", "user-1", message)); // 进同一分区

2. 无 key 时(Kafka 2.4+ 粘性分区)

旧版(轮询):
消息1 → Partition0
消息2 → Partition1
消息3 → Partition2
→ Batch 分散,每个 Batch 都很小,吞吐低
新版(粘性分区):
消息1~100 → Partition0(持续发到这个分区直到 Batch 满)
消息101~200 → Partition1
→ Batch 集中,单个 Batch 更大,吞吐高

对比表格

分区策略Batch 大小吞吐量适用场景
有 key小(分散到多个分区)需要分区内有序
无 key(轮询)小(分散)无顺序需求
无 key(粘性)大(集中)高吞吐,无顺序需求

自定义分区器

public class OrderPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 自定义分区逻辑
String orderId = (String) key;
return Math.abs(orderId.hashCode()) % cluster.partitionCountForTopic(topic);
}
}
// 配置
props.put("partitioner.class", "com.example.OrderPartitioner");

本质一句话:有 key 时用 hash 保证同一 key 进同一分区,无 key 时用粘性分区提升吞吐。


Q4:enable.idempotence=true 是什么?它解决了什么问题?高频

Section titled “Q4:enable.idempotence=true 是什么?它解决了什么问题?”

回答要点

幂等 Producer 解决的问题

没有幂等性时:
Producer 发送消息 A → Broker 写入成功 → 网络故障,ACK 未收到
Producer 重试,再次发送消息 A → Broker 再次写入
→ 消息 A 被写入两次(重复)
开启幂等性后:
Producer 发送消息 A(PID=1001, Sequence=0)
→ Broker 写入成功 → 网络故障,ACK 未收到
Producer 重试,再次发送消息 A(PID=1001, Sequence=0)
→ Broker 检测到相同 Sequence,丢弃重复消息
→ 消息 A 只写入一次

实现原理

每个 Producer 实例在初始化时,会从 Broker 获取一个唯一的 PID(Producer ID)。每条发往同一分区的消息都携带一个单调递增的序列号(Sequence Number)

消息结构(幂等时):
PID=1001, Sequence=0, data="msg1"
PID=1001, Sequence=1, data="msg2"
PID=1001, Sequence=2, data="msg3"

Broker 的去重逻辑

// Broker 为每个 (PID, Partition) 维护最新的 Sequence Number
Map<PID, Map<Partition, Integer>> sequenceCache;
void append(PID pid, Partition partition, int sequence, Message message) {
int expectedSeq = sequenceCache.get(pid).get(partition);
if (sequence == expectedSeq) {
// 正常消息,写入
log.append(message);
sequenceCache.get(pid).put(partition, sequence + 1);
} else if (sequence < expectedSeq) {
// 重复消息,丢弃(返回成功,不报错)
return;
} else if (sequence > expectedSeq + 1) {
// 消息乱序,返回 OutOfOrderSequenceException
throw new OutOfOrderSequenceException();
}
}

配置

enable.idempotence=true # 开启幂等性
# 自动设置:acks=all, retries=MAX_INT, max.in.flight.requests.per.connection=5

幂等性的局限

局限性说明解决方案
单分区只保证单个分区内不重复Kafka 事务
单会话Producer 重启后 PID 改变,无法识别旧会话的重复Kafka 事务
Consumer 侧重复幂等 Producer 只保证写入不重复Consumer 幂等消费

本质一句话:幂等 Producer 通过 PID + Sequence Number 实现单分区、单会话的去重,防止重试导致的重复写入。


Q5:Kafka 的 Consumer Group 是什么?为什么要用 Consumer Group?必考

Section titled “Q5:Kafka 的 Consumer Group 是什么?为什么要用 Consumer Group?”

回答要点

Consumer Group 定义

Consumer Group 是一组消费同一 Topic 的消费者实例集合,共享一个 group.id

核心规则

规则1:同一 Group 内,一个分区只能被一个消费者消费
规则2:不同 Group 之间互不影响,可以各自独立消费全量数据

架构示意

Topic: orders(3 个分区)
├─ Partition0
├─ Partition1
└─ Partition2
Consumer Group A (group.id = "group-a"):
Consumer1 → Partition0
Consumer2 → Partition1
Consumer3 → Partition2
Consumer Group B (group.id = "group-b"):
Consumer1 → Partition0, Partition1, Partition2 // 独立消费全量数据
Group A 和 Group B 都能完整消费全量数据,相互独立

使用价值

1. 水平扩展消费能力

单消费者:
1 个消费者消费 3 个分区 → 吞吐受限
Consumer Group:
3 个消费者各消费 1 个分区 → 并行处理,吞吐提升 3 倍

2. 广播消费

订单消息需要触发多个系统:
Group A(库存系统)→ 扣减库存
Group B(积分系统)→ 增加积分
Group C(通知系统)→ 发送通知
同一消息被 3 个 Group 各消费一次,实现广播

3. 故障容错

Consumer Group 有 3 个消费者:
Consumer1 宕机 → 触发 Rebalance
→ Partition0 被分配给 Consumer2 或 Consumer3
→ 消费不中断

对比表格

对比维度单消费者Consumer Group
消费并行度低(单线程)高(多线程)
故障容错无(宕机后消费中断)有(Rebalance 恢复)
广播能力有(多个 Group)

本质一句话:Consumer Group 实现消费并行化、广播消费、故障容错三大能力。


Q6:什么是消费位移?自动提交和手动提交各有什么问题?必考

Section titled “Q6:什么是消费位移?自动提交和手动提交各有什么问题?”

回答要点

消费位移定义

消费位移(offset)记录了 Consumer Group 消费到某个分区的哪个位置,存储在 Kafka 内部 Topic __consumer_offsets 中。

存储格式

__consumer_offsets Topic:
key: groupId + topic + partition
value: committed offset
示例:
key: "group-a" + "orders" + "0"
value: 12345 // 已消费到 offset=12345

两种提交方式对比

自动提交

enable.auto.commit=true # 开启自动提交(默认)
auto.commit.interval.ms=5000 # 每 5 秒自动提交
流程:
T1: poll() 返回 100 条消息(offset 0-99)
T2: 处理消息(可能处理到 offset 50)
T3: 5 秒后自动提交 offset=99(可能还没处理完)
T4: 宕机重启 → 从 offset=99 开始消费 → offset 50-99 未处理,消息丢失
或:
T1: poll() 返回 100 条消息(offset 0-99)
T2: 5 秒后自动提交 offset=99
T3: 处理消息到 offset 50 时宕机
T4: 重启 → 从 offset=99 开始消费 → offset 50-99 未处理,消息丢失

手动提交

enable.auto.commit=false # 关闭自动提交
# 同步提交
consumer.commitSync(); // 阻塞直到提交成功
# 异步提交
consumer.commitAsync(); // 不阻塞,提交失败不重试

代码示例

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
process(record);
}
// 处理完后手动提交(同步)
consumer.commitSync();
}

对比表格

提交方式消息丢失风险重复消费风险性能
自动提交高(提交后宕机)
手动同步提交中(处理完提交前宕机)
手动异步提交中(提交失败)

推荐方案

// 批量处理 + 手动提交
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
// 同步提交(保证可靠性)
consumer.commitSync();
}
} catch (Exception e) {
// 异常处理
} finally {
consumer.close();
}

Q7:Rebalance 是怎么触发的?为什么会造成短暂不可用?必考

Section titled “Q7:Rebalance 是怎么触发的?为什么会造成短暂不可用?”

回答要点

触发 Rebalance 的场景

1. 消费者加入 Consumer Group(新实例启动)
2. 消费者离开 Consumer Group(实例下线或崩溃)
3. 消费者超过 max.poll.interval.ms 未 poll(被判定为死亡)
4. Topic 分区数变化
5. 订阅的 Topic 变化

Rebalance 流程

Group Coordinator(协调者,一个特殊的 Broker)
Step 1: 所有消费者发送 JoinGroup 请求
→ Coordinator 选出一个 Consumer 作为 Group Leader
→ Leader 收到所有 Consumer 的订阅信息
Step 2: Group Leader 执行分区分配策略
→ 计算每个 Consumer 应该分配哪些分区
Step 3: Leader 将分配结果发送给 Coordinator(SyncGroup 请求)
Step 4: Coordinator 将分配结果分发给所有 Consumer
Step 5: 所有 Consumer 按新分配恢复消费

Stop-The-World(STW)影响

Rebalance 触发
→ 所有消费者停止消费("stop the world")
→ 等待 JoinGroup + SyncGroup 完成(通常数秒)
→ 消费者恢复消费
消息积压可能在这段时间增大

时序图

时间轴:T1 → T2 → T3 → T4 → T5
T1: Consumer3 加入 Group → 触发 Rebalance
T2: Consumer1, Consumer2, Consumer3 发送 JoinGroup
T3: Coordinator 选出 Leader,Leader 计算分配方案
T4: Leader 发送 SyncGroup,Coordinator 分发分配结果
T5: 所有 Consumer 恢复消费
STW 期间:T1 到 T5,所有消费者停止消费

性能数据

Consumer 数量Rebalance 时间影响范围
3 个~3 秒所有分区暂停 3 秒
10 个~10 秒所有分区暂停 10 秒
100 个~30 秒所有分区暂停 30 秒

Q8:Consumer 消费慢导致频繁 Rebalance,该怎么排查和解决?高频

Section titled “Q8:Consumer 消费慢导致频繁 Rebalance,该怎么排查和解决?”

回答要点

根因分析

max.poll.interval.ms(默认5分钟)是两次 poll 之间的最大间隔。如果业务处理一批消息的时间超过这个值,Consumer 被 Coordinator 判定为”死亡”,触发 Rebalance。

排查步骤

Terminal window
# 查日志
[Consumer clientId=..., groupId=...] Rebalancing
member ... timed out, heartbeat expired
# 确认原因
是处理慢(DB 慢查询、外部接口超时)还是 poll 间隔太短

解决方案

方案1:减少每次 poll 的消息数

# 减少每次 poll 的消息数,处理更快完成
max.poll.records=50 # 从默认 500 降低

方案2:增大 poll 间隔时间

# 增大 poll 间隔时间(适合业务确实需要较长处理时间)
max.poll.interval.ms=600000 # 10 分钟

方案3:异步处理

// poll 后立即提交到线程池,主线程继续 poll
ExecutorService executor = Executors.newFixedThreadPool(10);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
executor.submit(() -> {
for (ConsumerRecord<String, String> record : records) {
process(record);
}
});
// 主线程继续 poll,保持心跳
}
// 注意:需要自行管理 offset 提交顺序

方案4:使用静态成员

# 使用静态成员(重启不触发 Rebalance)
group.instance.id=consumer-1 # 固定实例 ID

对比表格

方案优点缺点适用场景
减少 max.poll.records简单吞吐降低处理确实慢
增大 max.poll.interval.ms不影响吞吐Rebalance 延迟增加处理时间可控
异步处理高吞吐复杂,需管理 offset高吞吐场景
静态成员重启不触发 Rebalance配置复杂固定消费者实例

Q9:分区分配策略有哪些?StickyAssignor 和 CooperativeStickyAssignor 有什么区别?中频

Section titled “Q9:分区分配策略有哪些?StickyAssignor 和 CooperativeStickyAssignor 有什么区别?”

回答要点

分区分配策略对比

策略分配规则Rebalance 影响推荐度
RangeAssignor(默认)按分区范围按顺序分配给消费者分配不均匀(前几个消费者多)不推荐
RoundRobinAssignor轮询分配所有分区分配均匀,但 Rebalance 后大量迁移不推荐
StickyAssignor尽量保持上次分配,最小化迁移Rebalance 影响小推荐
CooperativeStickyAssignor增量 Rebalance,不停止全部消费者Rebalance 影响最小最优,Kafka 2.4+ 推荐

Cooperative Rebalance(增量再平衡)

传统 Rebalance 是 Eager 协议:所有消费者先放弃全部分区,再重新分配(全量停止)。

旧协议(Eager):
Consumer1 放弃 P0,P1,P2 → 停止消费
Consumer2 放弃 P3,P4 → 停止消费
重新分配所有分区 → 所有人恢复消费
(全程停止消费)
新协议(Cooperative):
第一轮:只迁移需要变动的分区
Consumer1 只放弃 P2(需要给新加入的 Consumer3)
Consumer1 继续消费 P0,P1,不中断
第二轮:Consumer3 接管 P2,开始消费
(只有需要迁移的分区暂停,其余正常消费)

配置

# Consumer 配置
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

性能数据

策略Rebalance 时间消费中断范围
Eager~5 秒全部分区
Cooperative~2 秒仅变动分区

Q10:如何避免不必要的 Rebalance?静态成员是什么?中频

Section titled “Q10:如何避免不必要的 Rebalance?静态成员是什么?”

回答要点

避免不必要的 Rebalance

1. 增大 session.timeout.ms

session.timeout.ms=45000 # 心跳超时,超过则被认为宕机(默认 45s)
heartbeat.interval.ms=3000 # 心跳发送间隔(建议为 session.timeout/3)
# 调整策略
session.timeout.ms=60000 # 增大到 60 秒,减少误判

2. 增大 max.poll.interval.ms

max.poll.interval.ms=600000 # 10 分钟,避免处理慢触发 Rebalance

3. 减少每次 poll 的消息数

max.poll.records=100 # 从默认 500 降低,处理更快完成

静态成员

静态成员(Static Membership)允许消费者重启后保持原有的分区分配,不触发 Rebalance。

# Consumer 配置
group.instance.id=consumer-1 # 固定实例 ID(全局唯一)
# 效果
Consumer 重启 → Coordinator 识别到相同的 group.instance.id
→ 恢复原有的分区分配,不触发 Rebalance

对比表格

对比维度动态成员(默认)静态成员
重启行为触发 Rebalance不触发 Rebalance
分区分配重新分配恢复原有分配
消费中断全部暂停仅重启的消费者暂停
配置复杂度中(需配置 group.instance.id)

适用场景

动态成员:
消费者数量频繁变化(弹性伸缩)
静态成员:
消费者数量固定,偶尔重启(定时任务、版本发布)

Q11:Kafka 如何保证消息的顺序消费?实战

Section titled “Q11:Kafka 如何保证消息的顺序消费?”

回答要点

Kafka 只保证分区内有序,跨分区不保证。要实现顺序消费,需要从三个层面保证:

1. 生产端:相同业务 key 发到同一分区

// 相同 orderId 的消息进同一分区
producer.send(new ProducerRecord("orders", orderId, message));
// 分区器逻辑
int partition = Math.abs(orderId.hashCode()) % numPartitions;

2. 分区端:单分区内消息天然有序

每个 Partition 是一个 append-only 的顺序日志,消息按写入顺序分配递增的 offset,Consumer 按 offset 顺序消费,保证分区内严格有序。

3. 消费端:单线程消费一个分区

Consumer Group 的分区与消费者是 1:1 分配的,每个分区只有一个线程消费。如果消费者内部使用多线程处理,顺序会被打乱,需要在业务层按序列号排序后处理。

代码示例

// 单线程消费(保证顺序)
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record); // 顺序处理
}
consumer.commitSync();
}

极端场景(全局顺序)

Topic 只设 1 个分区,1 个消费者
→ 完全串行,吞吐量极低
→ 慎用,仅用于严格全局顺序场景(流水号)

对比表格

方案顺序保证并行度吞吐量适用场景
单分区全局顺序极低极低流水号、全局事件
Key 分区局部顺序用户维度、订单维度
Consumer 排序最终顺序允许短暂乱序

Q12:Consumer 端如何实现消息的幂等消费(防止重复处理)?实战

Section titled “Q12:Consumer 端如何实现消息的幂等消费(防止重复处理)?”

回答要点

Kafka 的 At Least Once 语义决定了重复消费无法完全避免,必须在业务层做幂等处理。

常见幂等方案

1. 数据库唯一约束

-- 以消息的唯一业务 ID 作为主键或唯一索引
CREATE TABLE orders (
order_id VARCHAR(50) PRIMARY KEY,
user_id VARCHAR(50),
amount DECIMAL(10, 2),
...
);
-- 消费逻辑
INSERT IGNORE INTO orders(order_id, ...) VALUES(?, ...);
-- 重复消费时,INSERT 被忽略,不会重复处理

2. Redis SETNX 去重

String key = "consumed:" + topic + ":" + partition + ":" + offset;
if (redis.setnx(key, "1", 24 * 3600)) {
// 未消费过,处理消息
processMessage(record);
}
// 否则跳过(重复消息)

3. 状态机

// 消息处理结果存 DB,重复消费时检查状态
Order order = db.query("SELECT status FROM orders WHERE order_id=?", orderId);
if (order.status == "CREATED") {
// 首次消费,处理消息
process(record);
db.update("UPDATE orders SET status='PAID' WHERE order_id=?", orderId);
} else {
// 已处理,跳过
}

对比表格

方案性能可靠性适用场景
数据库唯一约束低(DB 写入)简单场景
Redis SETNX高(内存)中(Redis 故障)高频场景
状态机有状态流转的业务

代码示例(数据库唯一约束):

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String orderId = record.key();
try {
// 插入订单(唯一约束)
db.execute("INSERT INTO orders(order_id, ...) VALUES(?, ...)", orderId);
} catch (DuplicateKeyException e) {
// 重复消费,跳过
continue;
}
}
consumer.commitSync();
}

Q13:auto.offset.reset=latest 和 earliest 分别在什么时候生效?已有消费记录时会用到吗?中频

Section titled “Q13:auto.offset.reset=latest 和 earliest 分别在什么时候生效?已有消费记录时会用到吗?”

回答要点

生效条件

auto.offset.reset 只在以下情况生效

  1. Consumer Group 第一次消费该 Topic(__consumer_offsets 中没有该 Group 对该分区的 offset 记录)
  2. 已记录的 offset 超出了分区的有效范围(消息被删除,比如 retention 过期)

正常情况

Consumer 从 __consumer_offsets 中读取上次提交的 offset,从那里继续消费,auto.offset.reset 不起作用。

对比表格

配置值无消费记录时行为已有消费记录时行为
earliest从最早消息开始消费从上次提交的 offset 继续
latest从最新消息开始消费(跳过历史)从上次提交的 offset 继续
none抛出异常从上次提交的 offset 继续

常见误解

误解:重启消费者会重置 offset
正确:重启消费者(group.id 不变)会从上次提交的 offset 继续
想从头重新消费,需要:
方法1:改 group.id(新 Group 没有历史 offset)
方法2:手动重置 offset

手动重置 offset

Terminal window
# 重置到最早
kafka-consumer-groups.sh --bootstrap-server ... --group my-group \
--topic my-topic --reset-offsets --to-earliest --execute
# 重置到最新
kafka-consumer-groups.sh --bootstrap-server ... --group my-group \
--topic my-topic --reset-offsets --to-latest --execute
# 重置到指定 offset
kafka-consumer-groups.sh --bootstrap-server ... --group my-group \
--topic my-topic --reset-offsets --to-offset 1000 --execute

Q14:Consumer 端的消费积压如何快速排查和处理?实战

Section titled “Q14:Consumer 端的消费积压如何快速排查和处理?”

回答要点

排查步骤

Terminal window
# 查看消费积压(Lag)
kafka-consumer-groups.sh --bootstrap-server ... \
--group my-group --describe
# 输出:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# orders 0 1000 5000 4000 ← 积压4000条

处理策略

短期积压(消费者处理慢)

1. 增加消费者实例(不超过分区数)
→ 或先增加分区数再增加消费者
2. 临时降级:跳过非关键消息(如监控数据)
→ 先消费业务核心消息
3. 优化消费逻辑
→ 批量写 DB、异步化

持续积压(生产速率 > 消费速率)

1. 优化消费端代码
→ 减少 DB 查询次数、批量写入
2. 增加消费者和分区数(需重新部署)
→ 分区数从 3 增加到 6
→ 消费者从 3 增加到 6
3. 消息分级
→ 高优先级 Topic 和低优先级 Topic 分开
→ 分别配置消费者数量

历史积压处理

Terminal window
# 如果允许跳过历史数据,直接重置到最新 offset
kafka-consumer-groups.sh ... --reset-offsets --to-latest --execute

性能数据

优化措施吞吐提升实施难度
增加消费者实例
批量写 DB
异步处理
增加分区数高(需重新部署)

监控告警

# Prometheus + Grafana
监控指标:
- kafka_consumer_group_lag(消费积压)
- kafka_consumer_group_offset(消费位移)
- kafka_consumer_group_members(消费者数量)
告警阈值:
- Lag > 10000 → 消费积压严重
- Lag 增长速率 > 1000/分钟 → 需立即处理

总结:Kafka 生产消费的面试答题思路

Section titled “总结:Kafka 生产消费的面试答题思路”

Producer 机制

  • 批量发送提升吞吐,batch.size 和 linger.ms 需权衡
  • 分区策略:有 key 用 hash 保证顺序,无 key 用粘性分区提升吞吐
  • 幂等 Producer 通过 PID + Sequence Number 防止重试重复写入

Consumer Group 机制

  • Consumer Group 实现并行消费、广播消费、故障容错
  • 手动提交 offset 避免 At Least Once 的重复消费问题
  • Rebalance 触发后 STW,需优化减少影响

Rebalance 优化

  • 增大 session.timeout.ms 和 max.poll.interval.ms 避免误判
  • 使用 CooperativeStickyAssignor 减少中断范围
  • 使用静态成员避免重启触发 Rebalance

消费语义与幂等

  • 分区内有序通过 key 分区 + 单线程消费实现
  • 幂等消费通过数据库唯一约束、Redis SETNX、状态机实现
  • 监控 Lag 及时发现消费积压

掌握这些链式追问的答案,你就能在 Kafka 生产消费面试中拿高分!