侧边栏壁纸
  • 累计撰写 75 篇文章
  • 累计创建 62 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Kafka 深度学习教程与面试题

温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

一、Kafka 全景图

1.1 定位与演进

Kafka 由 LinkedIn 于 2011 年开源,现为 Apache 顶级项目。最初定位为分布式日志系统(Log Aggregation),后来演变为"事件流平台"(Event Streaming Platform),广泛用于日志采集、实时数据管道、流计算、CDC(Change Data Capture)等场景。

版本演进关键节点:

  • 0.8(2013):引入复制机制(Replication),具备高可用能力
  • 0.10(2016):引入 Kafka Streams,具备流处理能力
  • 0.11(2017):引入事务(Transactions)和 Exactly-Once 语义
  • 2.8(2021):引入 KRaft 模式(Early Access),开始去 ZooKeeper 依赖
  • 3.3+(2022):KRaft 模式正式生产可用(GA)
  • 4.0(2025):完全移除 ZooKeeper 模式,KRaft 成为唯一运行模式

Kafka 与 RocketMQ 的根本区别:Kafka 的设计哲学是"高吞吐的日志管道",一切设计围绕吞吐量和顺序写入优化;RocketMQ 的设计哲学是"金融级消息中间件",一切设计围绕可靠性和业务功能(事务、延迟、死信队列)。选型时记住:日志和流计算选 Kafka,业务消息选 RocketMQ。

1.2 架构全景

                        ┌──────────────────┐
                        │   ZooKeeper /    │
                        │   KRaft Quorum   │  ← 元数据管理 + Leader 选举
                        └────────┬─────────┘
                                 │
          ┌──────────────────────┼──────────────────────┐
          │                      │                      │
   ┌──────▼──────┐        ┌──────▼──────┐        ┌──────▼──────┐
   │  Broker-0   │        │  Broker-1   │        │  Broker-2   │
   │  (Leader     │        │  (Follower  │        │  (Leader    │
   │   p0, p2)    │        │   p0, p2)   │        │   p1)       │
   └──┬───────┬──┘        └──────┬──────┘        └──────┬──────┘
      │       │                 │                      │
  ┌───▼──┐ ┌──▼───┐        ┌───▼──┐               ┌───▼──┐
  │Producer│ │Consumer│       │Consumer│              │Producer│
  └──────┘ └──────┘        └──────┘               └──────┘

Kafka 集群由多个 Broker 组成,每个 Broker 是一个独立的 Kafka 进程。 Topic 被切分为多个 Partition(分区),每个 Partition 分布在不同的 Broker 上,实现水平扩展。每个 Partition 有 Leader 和 Follower 副本,Leader 负责读写,Follower 从 Leader 同步数据。

1.3 核心角色详解

ZooKeeper / KRaft

Kafka 早期版本依赖 ZooKeeper 管理元数据(Broker 列表、Topic 配置、Partition 分配、Controller 选举)。ZooKeeper 的问题:① 元数据更新通过 ZAB 协议达成共识,延迟高(几十毫秒级);② Partition 数量过多时 ZK 压力大(每个 Partition 在 ZK 上是一个 ZNode);③ Controller 故障转移依赖 ZK 的 Session 超时检测,恢复慢(秒级)。

KRaft(Kafka Raft Metadata Mode)是 Kafka 自研的元数据管理方案,用内置的 Raft 协议替代 ZooKeeper。KRaft 的优势:① 元数据存储在 Kafka 内部的 __cluster_metadata Topic 中,与消息存储共用同一套高效的日志存储引擎;② 支持百万级 Partition(ZooKeeper 模式下建议不超过 20 万);③ Controller 故障转移基于 Raft 选举,毫秒级恢复。Kafka 4.0 开始 KRaft 成为唯一模式,ZooKeeper 模式已被完全移除。

Broker

Broker 是 Kafka 的服务节点,核心职责:接收生产者消息、存储消息到磁盘、响应消费者拉取请求、管理副本同步。每个 Broker 内部的关键组件:

  • SocketServer:基于 NIO 的多线程网络层(Acceptor 线程接收连接 → Processor 线程处理请求 → RequestHandler 线程池处理业务逻辑)
  • LogManager:管理所有 Partition 的日志存储(每个 Partition 对应一个 Log 对象,由多个 LogSegment 组成)
  • ReplicaManager:管理副本同步,Follower 通过 ReplicaFetcherThread 从 Leader 拉取消息
  • GroupCoordinator:管理 Consumer Group 的 offset 和 Rebalance
  • KafkaController(仅 Controller Broker 上运行):管理 Partition Leader 选举、副本分配、Broker 上下线

Controller

Controller 是 Kafka 集群中一个特殊的 Broker(通过 ZooKeeper/KRaft 选举产生),负责集群级别的元数据管理:Partition Leader 选举、副本重新分配(Reassignment)、Broker 上下线通知。整个集群同一时刻只有一个 Controller,Controller 故障时其他 Broker 竞选成为新 Controller。

Producer

Kafka Producer 是异步、批量、高效的。核心流程:消息先写入 RecordAccumulator(本地缓冲区),按 Partition 分组攒批,由后台 Sender 线程批量发送到 Broker。这种设计用延迟换吞吐——单条消息不立即发送,而是等攒够一批再发,极大减少了网络请求次数。

Consumer

Kafka Consumer 是纯拉取模式(Pull),消费者主动从 Broker 拉取消息,Broker 不做推送。Consumer 通过 poll() 方法拉取,每次拉取返回一批消息(默认最多 500 条,max.poll.records)。Consumer Group 内的消费者共同分担 Topic 的 Partition,每个 Partition 只被组内一个消费者消费。

二、Partition 与 Replica——Kafka 最核心的抽象

2.1 Partition:并行度的最小单元

一个 Topic 可以有多个 Partition,每个 Partition 是一个有序、不可变的消息序列。Partition 是 Kafka 并行度的最小单元:生产者可以并行向不同 Partition 写入,消费者可以并行从不同 Partition 读取。

Topic: "order-events" (3 Partitions)

Partition-0:  [msg0] [msg3] [msg6] [msg9]  ...  (Leader: Broker-0)
Partition-1:  [msg1] [msg4] [msg7] [msg10] ...  (Leader: Broker-1)
Partition-2:  [msg2] [msg5] [msg8] [msg11] ...  (Leader: Broker-2)

Partition 数量的选择:太少则并行度不够(消费者数 > Partition 数时,部分消费者空闲),太多则文件描述符开销大、Rebalance 慢、Leader 选举慢。经验公式:Partition 数 = max(目标吞吐 / 单 Partition 吞吐, 消费者数量)。单 Partition 的写入吞吐约 10-50 MB/s(取决于消息大小和磁盘性能),一个消费者单线程处理一个 Partition 的消费吞吐约 5-20 MB/s。

2.2 Replica:高可用的基石

每个 Partition 可以配置多个副本(replication.factor,推荐 3)。副本分为 Leader 和 Follower:Leader 处理所有读写请求,Follower 只从 Leader 同步数据(不处理客户端请求,Kafka 2.4+ 支持 Follower Read)。

ISR(In-Sync Replicas):与 Leader 保持同步的副本集合。"同步"的标准是:Follower 在 replica.lag.time.max.ms(默认 30 秒)内持续向 Leader 发送 Fetch 请求并消费消息。如果 Follower 落后太多(超过 30 秒没有 Fetch),会被从 ISR 中移除。

Partition-0:
  Leader:    Broker-0  (ISR)
  Follower:  Broker-1  (ISR)   ← 同步正常
  Follower:  Broker-2  (NOT in ISR) ← 落后太多,被移出 ISR

AR(Assigned Replicas):Partition 的所有副本列表,无论是否同步。ISR ⊆ AR。

OSR(Out-of-Sync Replicas):AR - ISR,即落后的副本。

ISR 的核心意义:当 min.insync.replicas=2(最少同步副本数)且 acks=all 时,Leader 必须确保消息被 ISR 中至少 2 个副本写入才返回 ACK。如果 ISR 中的副本数 < min.insync.replicas,Leader 会拒绝写入请求(抛出 NotEnoughReplicasException),保证数据安全性。

2.3 Leader 选举

当 Leader 宕机时,Controller 从 ISR 中选举新的 Leader(优先选 ISR 中的第一个副本)。如果 ISR 为空(所有副本都落后了),行为取决于 unclean.leader.election.enable

  • false(默认):Partition 不可用,直到原 Leader 恢复。数据最安全,但可用性低。
  • true:允许从非 ISR 副本中选举 Leader(可能丢失数据)。可用性高,但数据不安全。

生产环境建议保持 false,通过监控 ISR 收缩事件及时告警,而不是靠牺牲数据安全性来保证可用性。

三、消息存储——Kafka 高吞吐的秘密

3.1 LogSegment:日志的物理组织

Kafka 的每个 Partition 在磁盘上由一个 Log 对象管理,Log 由多个 LogSegment 组成。每个 LogSegment 包含三个文件:

/data/kafka-logs/order-events-0/
├── 00000000000000000000.log      # 消息数据(LogSegment)
├── 00000000000000000000.index    # 偏移量索引(Offset Index)
├── 00000000000000000000.timeindex # 时间戳索引(Time Index)
├── 00000000000000368769.log      # 下一个 Segment
├── 00000000000000368769.index
├── 00000000000000368769.timeindex
└── leader-epoch-checkpoint       # Leader Epoch 检查点

.log 文件:消息的实际存储文件,默认 1GB(log.segment.bytes),写满后新建下一个 Segment(文件名是起始 offset,左补零到 20 位)。消息在文件内是顺序追加的,与 RocketMQ 的 CommitLog 类似。

.index 文件:稀疏偏移量索引(Sparse Offset Index),不是每条消息都有索引条目,而是每隔一定字节间隔(默认 4KB,log.index.interval.bytes)记录一条映射:相对偏移量(4B) + 文件内位置(4B) = 8 字节/条。因为稀疏,所以索引文件很小(一个 1GB 的 log 文件对应的 index 文件约 2MB),可以全部 mmap 到内存。

.timeindex 文件:时间戳索引,格式为 时间戳(8B) + 偏移量(4B) = 12 字节/条,用于按时间查找消息(如消费端要从某个时间点开始消费)。

3.2 消息查找流程

消费者要从 offset=368900 开始读取消息:

Step 1: 定位 Segment
  二分查找文件名 → 找到 00000000000000368769.log(起始 offset 为 368769)

Step 2: 在 Index 文件中查找
  mmap 加载 .index 文件 → 二分查找 ≤ 368900 的最大索引条目
  得到:相对偏移量 131,文件位置 52480
  含义:offset 368769+131=368900 的消息,在 .log 文件的 52480 字节附近

Step 3: 从 .log 文件扫描
  seek 到 52480 位置 → 顺序扫描,找到 offset=368900 的消息 → 返回

关键点:索引是稀疏的,所以不能直接定位到精确位置,只能定位到"附近",然后顺序扫描。但由于 Segment 内的消息是有序的,扫描距离通常很短(最多几千字节),性能影响极小。

3.3 顺序写入与 Page Cache

Kafka 的写入性能之所以高,核心在于两个机制:

顺序追加写:生产者消息到达后,直接追加到当前活跃 LogSegment 的末尾。顺序写磁盘的速度接近内存写入(HDD 约 600MB/s,SSD 更高),远高于随机写。Kafka 不做复杂的内存数据结构操作,就是"往文件末尾 append"。

利用操作系统 Page Cache:Kafka 不自己管理内存缓存,而是直接依赖操作系统的 Page Cache。写入时数据先进 Page Cache(内存),由操作系统的 pdflush 线程异步刷到磁盘。读取时如果数据在 Page Cache 中就直接返回,不需要磁盘 I/O。这种设计的优势:① JVM 不需要做 GC(数据在 JVM 堆外);② 操作系统对 Page Cache 的预读和刷盘策略经过了数十年的优化;③ Broker 重启后 Page Cache 仍然有效(由操作系统管理)。

写入流程(极简):
Producer → Broker → 追加写入 Page Cache (内存) → 立即返回 ACK
                                    ↓ (异步)
                           OS pdflush → 刷到磁盘

读取流程(热点数据):
Consumer → Broker → 从 Page Cache 直接返回 (零拷贝) → NIC

3.4 零拷贝:sendfile / transferTo

消费者拉取消息时,Kafka 使用 Java NIO 的 FileChannel.transferTo() 方法(底层是 Linux 的 sendfile 系统调用),数据直接从文件描述符传输到 Socket 缓冲区,不经过用户空间。

传统 read/write (4 次拷贝 + 4 次切换):
  Disk → Kernel Buffer → User Buffer → Socket Buffer → NIC
  (DMA)    (copy)          (copy)         (DMA)
           内核→用户       用户→内核

sendfile (2 次拷贝 + 2 次切换):
  Disk → Kernel Buffer → NIC
  (DMA)    (DMA gather copy)

如果网卡支持 SG-DMA(Scatter-Gather DMA),甚至可以直接从 Kernel Buffer 传到网卡,
只有 1 次 DMA 拷贝。

零拷贝的前提:消息在传输过程中不需要被修改(Kafka 的消息是不可变的,天然满足)。如果需要对消息做转换(如压缩/解压),就不能用零拷贝,必须经过用户空间。

3.5 消息格式(Record Batch)

Kafka 0.11+ 使用 v2 消息格式(Magic=2),以 Record Batch 为单位存储:

┌──────────────── RecordBatch Header (61 bytes) ────────────────┐
│ baseOffset(8B) │ batchLength(4B) │ partitionLeaderEpoch(4B)   │
│ magic(1B)      │ crc(4B)         │ attributes(2B)             │
│ lastOffsetDelta(4B) │ baseTimestamp(8B) │ maxTimestamp(8B)    │
│ producerId(8B) │ producerEpoch(2B) │ baseSequence(4B)         │
│ recordsCount(4B)                                            │
└───────────────────────────────────────────────────────────────┘
┌──────── Record 0 ────────┐
│ length(varint) │ attributes(1B) │ timestampDelta(varlong)     │
│ offsetDelta(varint) │ keyLength(varint) │ key(bytes)          │
│ valueLength(varint) │ value(bytes) │ headersCount(varint)     │
│ headers...                                                   │
└──────────────────────────┘
┌──────── Record 1 ────────┐
│ ...                      │
└──────────────────────────┘

v2 格式的关键改进:① 使用 varint 变长编码(小数值占更少字节),比 v1 节省约 25% 空间。② 以 Batch 为单位压缩(而非单条消息压缩),压缩率更高。③ 引入了 producerId + producerEpoch + baseSequence,支持 Exactly-Once 语义的去重。

四、生产者深度机制

4.1 发送流程全链路

Producer.send(ProducerRecord)
    │
    ▼
Interceptor.onSend()           ← 拦截器链(可修改消息)
    │
    ▼
Serializer.serialize()          ← Key/Value 序列化
    │
    ▼
Partitioner.partition()         ← 分区选择
    │
    ▼
RecordAccumulator.append()      ← 写入本地缓冲区(按 Partition 分组的 Deque<RecordBatch>)
    │
    ▼
Sender Thread (后台)            ← 从 Accumulator 中取出攒好的 Batch
    │
    ├── 按 Broker 分组 Batch
    ├── 构建 ProduceRequest(每个 Broker 一个请求,包含多个 Partition 的 Batch)
    ├── 通过 NetworkClient 发送 (Netty / NIO)
    └── 收到 Response → 执行 Callback

RecordAccumulator 是 Producer 的核心组件,它的作用是将零散的单条消息攒成 Batch,减少网络请求次数。关键参数:

  • batch.size(默认 16KB):每个 Batch 的最大字节数,满了就发送
  • linger.ms(默认 0):即使 Batch 没满,等待指定毫秒后也发送(用延迟换吞吐)
  • buffer.memory(默认 32MB):Accumulator 的总内存上限,超过后 send() 阻塞或抛异常

生产环境推荐配置:batch.size=65536(64KB),linger.ms=5-50(根据延迟容忍度),buffer.memory=67108864(64MB)。

4.2 分区策略(Partitioner)

Kafka 默认的分区策略(DefaultPartitioner,3.x+ 为 UniformStickyPartitioner):

  • 有 Key 的消息:对 Key 做 Murmur2 Hash,取模 Partition 数量(hash(key) % numPartitions)。同一 Key 的消息始终路由到同一 Partition,保证 Key 级别的有序性。
  • 无 Key 的消息:Sticky 策略——随机选一个 Partition 并持续发送到该 Partition,直到 Batch 满或 linger.ms 到期,然后切换到另一个 Partition。比纯粹的轮询更高效(减少 Batch 碎片)。

自定义分区器:实现 Partitioner 接口,适合按业务规则路由(如按地域、按用户 ID 范围)。

public class RegionPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        String region = extractRegion(key.toString());
        if ("cn-east".equals(region)) return 0;
        if ("cn-north".equals(region)) return 1;
        return 2;
    }
}

4.3 ACK 机制与数据可靠性

acks 参数控制生产者发送消息后需要等待多少个副本确认:

  • acks=0:不等待任何确认,发送后立即返回。吞吐最高,但消息可能丢失(Broker 宕机、网络中断)。适合日志采集等允许少量丢失的场景。
  • acks=1:等待 Leader 写入成功即返回(不等 Follower 同步)。如果 Leader 在 Follower 同步前宕机,消息丢失。适合大多数业务场景。
  • acks=all(或 acks=-1):等待 ISR 中所有副本都写入成功才返回。最安全,配合 min.insync.replicas=2 可保证至少 2 个副本有数据。适合金融、支付等场景。

生产环境的推荐组合:acks=all + min.insync.replicas=2 + replication.factor=3。这保证了即使一个 Broker 宕机,数据仍然有 2 个副本,且写入时至少有 2 个副本确认。

4.4 Producer 幂等与 Exactly-Once

幂等性(Idempotence)enable.idempotence=true(3.x 默认开启)。Broker 为每个 Producer(producerId + producerEpoch)维护一个 sequence number,如果收到的消息 sequence number 不大于已处理的,说明是重复消息,直接丢弃。幂等性保证的是单 Partition 内的 Exactly-Once(单 Producer 单 Partition 不重复)。

事务(Transactions)transactional.id 设置后,Producer 可以将多个 Partition 的写入包装在一个事务中——要么所有 Partition 都写入成功,要么都回滚。消费者设置 isolation.level=read_committed 后只能看到已提交事务的消息。

Properties props = new Properties();
props.put("transactional.id", "order-tx-001");
props.put("enable.idempotence", true);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("orders", key1, value1));
    producer.send(new ProducerRecord<>("inventory", key2, value2));
    // 发送消费 offset(用于 consume-transform-produce 场景)
    producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

事务消息的典型场景:consume-transform-produce 管道(读一个 Topic 的消息,处理后写到另一个 Topic,保证 Exactly-Once)。

五、消费者深度机制

5.1 Consumer Group 与 Partition 分配

Consumer Group 是 Kafka 消费者的组织单位。同一 Group 内的消费者共同分担 Topic 的所有 Partition,每个 Partition 只能被组内一个消费者消费。不同 Group 之间互不影响(同一个 Partition 可以被不同 Group 各消费一次)。

Topic: orders (4 Partitions)
Group-A: Consumer-0, Consumer-1, Consumer-2
Group-B: Consumer-3

分配结果:
  Group-A:
    Consumer-0: Partition-0, Partition-1
    Consumer-1: Partition-2
    Consumer-2: Partition-3
  Group-B:
    Consumer-3: Partition-0, Partition-1, Partition-2, Partition-3 (全量)

5.2 Rebalance 协议详解

Rebalance 是 Consumer Group 中 Partition 重新分配的过程,发生在以下情况:消费者加入/离开 Group、Topic 的 Partition 数量变化、消费者订阅的 Topic 列表变化。

Rebalance 的三代协议:

Eager Rebalance(经典)——所有消费者同时放弃所有 Partition,然后重新分配。问题是"Stop-The-World"效应:Rebalance 期间所有消费者停止消费,如果 Partition 很多或消费者很多,Rebalance 时间可能达到分钟级。

Eager Rebalance 流程:
1. 所有 Consumer 发送 LeaveGroup → 放弃所有 Partition
2. GroupCoordinator 收到所有 Leave → 清除所有分配
3. 所有 Consumer 重新发送 JoinGroup
4. Coordinator 选举 Leader Consumer → Leader 执行分配算法
5. Leader 将分配方案发给 Coordinator → Coordinator 分发给所有 Consumer
6. 所有 Consumer 确认新分配 → Rebalance 完成

Cooperative Rebalance(增量,2.4+)——不再一次性放弃所有 Partition,而是只调整需要变化的部分。消费者继续处理不需要移动的 Partition,极大减少了 Rebalance 对消费的影响。

Cooperative Rebalance 流程:
1. Consumer 发送 JoinGroup(不放弃现有 Partition)
2. Coordinator 计算新分配 vs 旧分配的差异
3. 只 revoke 需要移动的 Partition(发 revoke 通知)
4. 等待被 revoke 的 Partition 提交 offset
5. 将 revoke 的 Partition 分配给新消费者
6. 其他 Partition 的消费不中断

Static Membership(静态成员,2.3+)——给消费者设置 group.instance.id,消费者重启后(在 session.timeout.ms 内)不会触发 Rebalance,Coordinator 直接恢复之前的 Partition 分配。适合消费者需要频繁重启(如部署更新)的场景。

生产环境推荐:使用 Cooperative Rebalance(partition.assignment.strategy=CooperativeStickyAssignor),配合 Static Membership 减少不必要的 Rebalance。

5.3 Offset 管理

Offset 的存储位置:消费者提交的 offset 存储在内部 Topic __consumer_offsets 中(50 个 Partition,Key 为 groupId + topic + partition 的 hash)。GroupCoordinator 负责管理这个 Topic 的读写。

自动提交 vs 手动提交:

自动提交(enable.auto.commit=true,默认):消费者每隔 auto.commit.interval.ms(默认 5 秒)自动提交当前 poll 返回的最大 offset。问题:如果在提交前消费的消息处理失败了,offset 已经提交,消息就丢了。

手动提交(enable.auto.commit=false):消费者处理完消息后调用 commitSync()commitAsync() 提交 offset。更安全,但需要自己管理提交时机。

// 手动提交示例(推荐)
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record); // 业务处理
    }
    // 处理完所有消息后提交 offset
    consumer.commitSync(); // 同步提交,阻塞直到成功
    // 或 consumer.commitAsync(); // 异步提交,不阻塞
}

max.poll.interval.ms(默认 5 分钟):两次 poll() 之间的最大间隔。如果消费者处理消息太慢,超过这个时间没有调用 poll(),Coordinator 会认为消费者挂了,触发 Rebalance。这个参数需要根据最慢的消息处理时间来设置——如果单条消息处理需要 30 秒,max.poll.records=100,则 max.poll.interval.ms 至少设为 30 * 100 * 1.5 = 4500000(75 分钟)。

5.4 消费者心跳与故障检测

消费者通过后台心跳线程定期向 Coordinator 发送心跳(间隔 heartbeat.interval.ms,默认 3 秒)。如果 Coordinator 在 session.timeout.ms(默认 45 秒)内没有收到心跳,就认为消费者挂了,触发 Rebalance。

Consumer 主线程:  poll() → process records → commit offset → poll() → ...
Consumer 心跳线程: heartbeat → sleep(3s) → heartbeat → sleep(3s) → ...

关键:心跳线程和消费线程是独立的。即使消费线程卡住了(处理慢),心跳线程仍在正常发送,不会触发 Rebalance。只有当消费者进程整体挂掉(心跳线程也停了)或 max.poll.interval.ms 超时才会触发 Rebalance。

六、数据可靠性与一致性

6.1 副本同步机制

Follower 通过 ReplicaFetcherThread 从 Leader 拉取消息(和消费者拉取是同一套协议),写入本地 LogSegment。Leader 维护每个 Follower 的 Log End Offset(LEO),即 Follower 已写入的最新 offset。

HW(High Watermark):ISR 中所有副本 LEO 的最小值。只有 offset < HW 的消息才对消费者可见。HW 保证了消费者永远不会读到可能被丢失的消息(即使 Leader 宕机,HW 之前的消息至少有 ISR 中的所有副本)。

Leader LEO:    100
Follower-1 LEO: 98
Follower-2 LEO: 95

HW = min(100, 98, 95) = 95
→ offset 0-94 的消息对消费者可见
→ offset 95-99 的消息已写入 Leader 但尚未同步到所有 ISR 副本,消费者看不到

Leader Epoch(2.0+):每次 Leader 选举,Epoch 加 1。Follower 恢复时通过 Epoch 判断数据一致性:如果 Follower 的 Epoch < Leader 的 Epoch,说明 Leader 换过了,Follower 需要从 Leader 拉取 Epoch 对应的起始 offset 开始重新同步。Leader Epoch 解决了之前 HW 在极端场景下的数据截断问题。

6.2 消息丢失的三个环节及防范

生产者 → Broker:消息在网络传输中丢失,或 Broker 收到后写入前宕机。防范:acks=all + retries=Integer.MAX_VALUE + delivery.timeout.ms=120000(2 分钟)。

Broker 存储:消息写入 Page Cache 但未刷盘时 Broker 宕机。防范:replication.factor=3 + min.insync.replicas=2(至少 2 个副本写入成功才算成功)。Kafka 默认不依赖同步刷盘(没有 SYNC_FLUSH 概念),而是靠多副本保证可靠性——单个 Broker 宕机,其他副本有数据。

Broker → 消费者:消费者处理完消息但在提交 offset 前崩溃,重启后从上次提交的 offset 开始消费,导致消息重复(不是丢失,但语义上是 At-Least-Once)。防范:手动提交 offset(enable.auto.commit=false),确认业务处理成功后再提交;消费端做幂等。

6.3 Exactly-Once 语义的实现

Kafka 的 Exactly-Once 需要 Producer 事务 + Consumer 隔离级别配合:

Producer:
  transactional.id = "my-app-tx"
  enable.idempotence = true
  → 保证单 Producer 的消息不重复写入

Consumer:
  isolation.level = read_committed
  → 只读取已提交事务的消息(未提交的、已回滚的不可见)

Consume-Transform-Produce:
  producer.beginTransaction();
  records = consumer.poll();
  for (record : records) {
      transformed = transform(record);
      producer.send(new ProducerRecord<>("output-topic", transformed));
  }
  producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
  producer.commitTransaction();

这套机制保证了:输入消息的消费和输出消息的写入是原子的——要么都成功,要么都回滚。典型应用:Kafka Streams 的 Exactly-Once 处理模式。

七、Kafka Streams 与 KSQL 简述

7.1 Kafka Streams 架构

Kafka Streams 是 Kafka 官方的流处理库(不是独立服务,嵌入在 Java 应用中)。核心概念:

  • Stream(KStream):无界的事件流,每条记录是独立的插入事件(INSERT)
  • Table(KTable):变更日志流,每条记录是 Key 的更新事件(UPSERT)
  • GlobalKTable:全量复制到每个应用实例的 Table(用于 Join 操作,避免 Shuffle)
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> orders = builder.stream("orders");

// 过滤 + 转换
orders.filter((key, value) -> value.contains("VIP"))
      .mapValues(value -> enrichOrder(value))
      .to("vip-orders");

// 聚合:按用户统计订单金额
orders.groupByKey()
      .aggregate(() -> 0.0,
                 (key, value, total) -> total + extractAmount(value),
                 Materialized.as("order-totals"));

Kafka Streams 的状态存储使用 RocksDB(本地持久化),通过 Changelog Topic 实现容错。如果应用实例宕机,其他实例可以从 Changelog Topic 恢复 RocksDB 状态。

7.2 KSQL / ksqlDB

ksqlDB 是 Kafka 之上的流处理 SQL 引擎(Confluent 开源),可以用 SQL 语法做实时流处理:

-- 创建 Stream
CREATE STREAM orders (orderId VARCHAR, userId VARCHAR, amount DOUBLE)
  WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON');

-- 实时聚合:每分钟的用户订单总额
CREATE TABLE user_order_totals AS
  SELECT userId, SUM(amount) as total
  FROM orders
  WINDOW TUMBLING (SIZE 1 MINUTE)
  GROUP BY userId;

-- 实时 Join:订单 + 用户信息
CREATE STREAM enriched_orders AS
  SELECT o.orderId, o.amount, u.name, u.level
  FROM orders o
  LEFT JOIN users u ON o.userId = u.id;

八、高可用与生产部署

8.1 部署拓扑

┌─────────────────────────────────────────────────────────┐
│                    Kafka Cluster                         │
│                                                          │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │Broker-0  │  │Broker-1  │  │Broker-2  │              │
│  │Controller│  │          │  │          │              │
│  │Leader:   │  │Leader:   │  │Leader:   │              │
│  │  p0,p3   │  │  p1,p4   │  │  p2,p5   │              │
│  │Follower: │  │Follower: │  │Follower: │              │
│  │  p1,p2   │  │  p2,p0   │  │  p0,p1   │              │
│  └──────────┘  └──────────┘  └──────────┘              │
│                                                          │
│  KRaft Quorum (Controller + Broker 共用进程):            │
│  ┌────┐  ┌────┐  ┌────┐                                 │
│  │ B0 │  │ B1 │  │ B2 │  ← 三个节点组成 Raft 集群       │
│  └────┘  └────┘  └────┘                                 │
└─────────────────────────────────────────────────────────┘

KRaft 模式下,Controller 和 Broker 可以共用进程(Combined Mode),也可以独立部署(Separate Mode)。中小规模(< 50 个 Broker)推荐 Combined Mode 简化运维;大规模推荐 Separate Mode 隔离元数据管理与数据服务的资源竞争。

8.2 核心配置调优

Broker 端(server.properties)

# 日志存储
log.dirs=/data1/kafka-logs,/data2/kafka-logs   # 多磁盘目录,Partition 均匀分布
log.segment.bytes=1073741824                    # 1GB Segment
log.retention.hours=168                         # 保留 7 天
log.retention.bytes=-1                          # 不按大小清理(按时间)
log.cleanup.policy=delete                       # 清理策略(delete / compact)
log.index.interval.bytes=4096                   # 索引间隔 4KB

# 副本与可靠性
default.replication.factor=3                    # 默认 3 副本
min.insync.replicas=2                           # 最少 2 个同步副本
unclean.leader.election.enable=false            # 禁止非 ISR 选举
num.replica.fetchers=4                          # 副本拉取线程数

# 网络与线程
num.network.threads=8                           # 网络线程数(Processor 线程)
num.io.threads=16                               # I/O 线程数(RequestHandler 线程)
socket.send.buffer.bytes=1048576                # Socket 发送缓冲 (1MB)
socket.receive.buffer.bytes=1048576             # Socket 接收缓冲 (1MB)
socket.request.max.bytes=104857600              # 最大请求大小 (100MB)

# 消费者相关
group.initial.rebalance.delay.ms=3000           # 等待 3 秒让更多消费者加入再 Rebalance
offsets.topic.replication.factor=3              # __consumer_offsets 的副本数
transaction.state.log.replication.factor=3      # 事务日志的副本数
transaction.state.log.min.isr=2                 # 事务日志的最小 ISR

Producer 端

acks=all
retries=2147483647                    # Integer.MAX_VALUE
delivery.timeout.ms=120000            # 2 分钟
enable.idempotence=true
batch.size=65536                      # 64KB
linger.ms=10                          # 攒批 10ms
buffer.memory=67108864                # 64MB
compression.type=lz4                  # 推荐 lz4(速度/压缩率平衡最好)
max.in.flight.requests.per.connection=5  # 幂等模式下最大 5

Consumer 端

enable.auto.commit=false
max.poll.records=500                  # 单次 poll 最大记录数
max.poll.interval.ms=300000           # 5 分钟
session.timeout.ms=45000              # 45 秒
heartbeat.interval.ms=3000            # 3 秒
fetch.min.bytes=1                     # 最小拉取字节
fetch.max.wait.ms=500                 # 最大等待时间
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
auto.offset.reset=latest              # 首次消费从最新位置开始

8.3 JVM 参数建议

# Kafka Broker(推荐 JDK 17+,ZGC 或 G1GC)
-Xms12g -Xmx12g                       # 堆内存 12G(不超过 32G,避免压缩指针失效)
-XX:+UseZGC                            # ZGC(JDK 17+ 推荐,亚毫秒停顿)
# 或 -XX:+UseG1GC -XX:MaxGCPauseMillis=20(G1 方案)
-XX:+ParallelRefProcEnabled
-XX:MaxInlineLevel=15                  # 方法内联深度
-XX:+UnlockDiagnosticVMOptions
-XX:G1SummarizeRSetStatsPeriod=1       # G1 RSet 统计
-Xlog:gc*:file=/var/log/kafka/gc.log:time,uptime,level,tags:filecount=5,filesize=50m
-XX:+ExitOnOutOfMemoryError            # OOM 时退出(让 K8s/systemd 重启)

# 注意:Kafka 主要依赖 Page Cache(堆外),JVM 堆不需要特别大
# 32G 以内的机器:12G 堆 + 20G Page Cache 是较优的分配

8.4 操作系统调优

# 文件描述符
ulimit -n 1000000

# 内存
vm.swappiness=1                        # 几乎不用 swap(Kafka 严重依赖 Page Cache)
vm.dirty_ratio=60                      # 脏页占 60% 时强制刷盘
vm.dirty_background_ratio=5            # 脏页占 5% 时后台线程开始刷盘

# 网络
net.core.wmem_default=1048576          # 默认发送缓冲 1MB
net.core.rmem_default=1048576          # 默认接收缓冲 1MB
net.core.wmem_max=2097152              # 最大发送缓冲 2MB
net.core.rmem_max=2097152              # 最大接收缓冲 2MB
net.ipv4.tcp_wmem="1048576 2097152 4194304"
net.ipv4.tcp_rmem="1048576 2097152 4194304"
net.core.somaxconn=65535
net.ipv4.tcp_max_syn_backlog=65535

# 磁盘调度(SSD 用 none/noop,HDD 用 mq-deadline)
echo none > /sys/block/sda/queue/scheduler

# 文件系统(推荐 XFS,ext4 也可以,不要用 ZFS/Btrfs 用于数据目录)

8.5 监控告警关键指标

指标 告警阈值 含义
UnderReplicatedPartitions > 0 有副本不在 ISR 中,可能有 Broker 故障
OfflinePartitionsCount > 0 有 Partition 无 Leader,该 Partition 不可用
ActiveControllerCount ≠ 1 Controller 数量异常(脑裂)
IsrShrinksPerSec > 0 ISR 收缩,Follower 落后
RequestQueueSize > 100 请求队列堆积,Broker 处理能力不足
NetworkProcessorAvgIdlePercent < 0.3 网络线程繁忙,需增加 num.network.threads
LogFlushRateAndTimeMs Flush 耗时 > 100ms 磁盘 I/O 瓶颈
ConsumerLag(per group) > 10000 消费堆积
RequestHandlerAvgIdlePercent < 0.3 I/O 线程繁忙,需增加 num.io.threads

推荐监控栈:jmx_exporter(Kafka 自带 JMX 指标)+ Prometheus + Grafana。Confluent 提供的 kafka-lag-exporter 可以方便地监控所有 Consumer Group 的 Lag。

九、Log Compaction——Kafka 独有的存储特性

Log Compaction(日志压缩)是 Kafka 区别于大多数消息队列的独特功能。它保证每个 Key 至少保留最新的一条消息,旧值会被后台清理。

压缩前:
  Key=A, offset=0, value="v1"
  Key=B, offset=1, value="v1"
  Key=A, offset=2, value="v2"
  Key=C, offset=3, value="v1"
  Key=A, offset=4, value="v3"

压缩后:
  Key=B, offset=1, value="v1"
  Key=C, offset=3, value="v1"
  Key=A, offset=4, value="v3"     ← 只保留 Key=A 的最新值

配置:log.cleanup.policy=compact(或 compact,delete 组合使用)。

典型应用场景:① CDC(Change Data Capture):数据库变更流经过 Compaction 后变成最新快照。② 物化视图:KTable 的 Changelog Topic 使用 Compaction,保证每个 Key 只保留最新状态。③ 配置推送:应用的配置变更写入 Kafka,Compaction 保证消费者总是读到最新配置。

十、面试深度题 & 参考答案

基础概念层(初级)

Q1:Kafka 为什么吞吐这么高?

五个核心设计:① 顺序写磁盘——每个 Partition 的 LogSegment 是顺序追加写入,HDD 顺序写约 600MB/s,接近内存速度。② 零拷贝(sendfile/transferTo)——消费拉取时数据直接从文件到 Socket,不经过用户空间,减少 2 次拷贝和 2 次上下文切换。③ 批量发送——Producer 的 RecordAccumulator 将多条消息攒成 Batch 后一次性发送,减少网络请求次数。④ 压缩——Batch 级别压缩(lz4/snappy/zstd),减少网络传输量和磁盘占用。⑤ 利用操作系统 Page Cache——Kafka 不自己管理内存缓存,直接依赖 OS 的 Page Cache,避免 JVM GC 开销,且 Broker 重启后 Cache 仍然有效。

Q2:Kafka 的 Partition 和 RocketMQ 的 MessageQueue 有什么区别?

两者都是 Topic 下的分区,是最小并行单元,但有几个关键区别。存储方式:Kafka 每个 Partition 独立一个文件(LogSegment),Partition 多时随机写性能下降;RocketMQ 所有 Topic 的所有 Queue 共享 CommitLog 顺序写,写入性能不受 Topic/Queue 数量影响。副本机制:Kafka 原生支持 Partition 级别的 Leader-Follower 复制,ISR 机制精细到每个 Partition;RocketMQ 的复制是 Broker 级别的(Master-Slave),整个 Broker 的数据一起复制。消费模型:Kafka 一个 Partition 只能被同一 Group 内的一个消费者消费(强绑定);RocketMQ 的 MessageQueue 在 Rebalance 后也类似,但支持更灵活的消费模式(如广播模式每个实例都收全量)。

Q3:Kafka 如何保证消息不丢失?

三端保障:Producer 端使用 acks=all + retries=MAX_VALUE + enable.idempotence=true,确保消息至少被 ISR 中所有副本写入。Broker 端使用 replication.factor=3 + min.insync.replicas=2 + unclean.leader.election.enable=false,保证至少 2 个副本有数据且不允许非 ISR 选举(可能丢数据)。Consumer 端使用 enable.auto.commit=false,手动在业务处理成功后再提交 offset,消费端做幂等处理。

最容易丢消息的环节:acks=1 时 Leader 写入后还没同步给 Follower 就宕机了;消费者使用自动提交 offset,消费处理到一半崩溃,重启后从上次提交的 offset 开始消费,跳过了部分消息。

原理深度层(中级)

Q4:什么是 ISR?ISR 为空时会发生什么?

ISR(In-Sync Replicas)是与 Leader 保持同步的副本集合。"同步"的判断标准:Follower 在 replica.lag.time.max.ms(默认 30 秒)内有 Fetch 请求成功拉取了数据。如果 Follower 因网络延迟、GC 停顿、磁盘故障等原因落后 Leader 超过 30 秒,就会被移出 ISR。

ISR 为空意味着所有 Follower 都落后了。此时如果 Leader 宕机:unclean.leader.election.enable=false(默认)时,Partition 不可用,直到原 Leader 恢复或某个 Follower 追上来。unclean.leader.election.enable=true 时,从非 ISR 副本中选 Leader,Partition 可用但可能丢失落后部分的消息。

生产环境通常保持 false,同时监控 IsrShrinksPerSec 指标,ISR 收缩时立即告警排查。

Q5:Kafka 的 Rebalance 机制是什么?为什么 Rebalance 是"万恶之源"?

Rebalance 是 Consumer Group 中 Partition 重新分配的过程。之所以被称为"万恶之源",是因为 Eager Rebalance 模式下所有消费者必须同时放弃所有 Partition(Stop-The-World),期间所有消费停止。如果 Group 有 100 个消费者、1000 个 Partition,Rebalance 可能持续数分钟甚至更长,期间消息堆积急剧增长。

触发 Rebalance 的常见原因:① 消费者加入或退出(包括因心跳超时被踢出)。② max.poll.interval.ms 超时(消费太慢,被 Coordinator 认为挂了)。③ Topic 的 Partition 数量变化。④ 消费者订阅的 Topic 列表变化。

减少不必要 Rebalance 的方案:① 使用 Cooperative Rebalance(增量式,不中断不需要移动的 Partition)。② 使用 Static Membership(group.instance.id),消费者重启不触发 Rebalance。③ 合理设置 session.timeout.ms(心跳超时)和 max.poll.interval.ms(处理超时),避免误判。④ 增加 group.initial.rebalance.delay.ms,等待更多消费者加入后再 Rebalance(适合批量启动场景)。

Q6:HW(High Watermark)和 LEO(Log End Offset)的区别是什么?HW 解决了什么问题?

LEO 是每个副本已写入的最新 offset + 1(下一条消息的 offset)。HW 是 ISR 中所有副本 LEO 的最小值。只有 offset < HW 的消息才对消费者可见。

HW 解决的问题:保证消费者不会读到可能丢失的消息。假设 Leader LEO=100,Follower LEO=95(HW=95),如果此时 Leader 宕机,Follower 被选为新 Leader,那么 offset 95-99 的消息在原 Leader 上但不在新 Leader 上,这些消息会丢失。HW=95 保证了消费者最多只读到 offset 94,不会读到可能丢失的 95-99。

HW 的更新时机:Leader 在收到 Follower 的 Fetch 请求时,根据 Follower 报告的 LEO 更新 HW,并将新的 HW 返回给 Follower(Follower 也更新自己的 HW)。

Q7:Kafka 为什么用 ZooKeeper?KRaft 解决了什么问题?

Kafka 早期用 ZooKeeper 做三件事:① 存储集群元数据(Broker 列表、Topic 配置、Partition 分配)。② Controller 选举(通过 ZK 临时节点竞争)。③ 故障检测(依赖 ZK Session 超时)。

ZooKeeper 的问题:① 元数据更新延迟高(ZAB 协议,几十毫秒),Partition 数量多时 ZK 压力大(每个 Partition 是一个 ZNode,20 万个以上性能急剧下降)。② Controller 故障转移慢(依赖 ZK Session 超时,通常 6-18 秒)。③ 运维复杂度高(需要独立维护 ZK 集群)。

KRaft 用内置的 Raft 协议替代 ZK:① 元数据存储在 __cluster_metadata Topic 中,利用 Kafka 自身高效的日志存储引擎,支持百万级 Partition。② Controller 选举基于 Raft,毫秒级故障转移。③ 不再需要维护外部 ZK 集群,部署和运维大幅简化。

Q8:Kafka 的 Log Compaction 是什么?和 delete 策略有什么区别?

Log Compaction(log.cleanup.policy=compact)保证每个 Key 至少保留最新的一条消息,后台的 LogCleaner 线程定期扫描并删除旧版本。Delete 策略(log.cleanup.policy=delete)按时间或大小清理过期的 LogSegment,不管 Key 是否重复。

两者可以组合使用(compact,delete):先做 Compaction 保留每个 Key 的最新值,再对超过保留时间的数据做 Delete。

Compaction 的典型应用:CDC 场景(数据库 binlog 变更流经过 Compaction 变成最新快照),KTable 的 Changelog Topic(保证每个 Key 的最新状态可用)。

生产实战层(高级)

Q9:线上 Consumer Lag 持续增长,你怎么排查和处理?

排查步骤:① 确认 Lag 发生在哪个 Topic 的哪个 Partition(用 kafka-consumer-groups.sh --describe),如果所有 Partition 的 Lag 均匀增长,说明消费速率整体跟不上;如果只有个别 Partition 增长,可能是数据倾斜(某些 Key 的消息量特别大)或该 Partition 的消费者有问题。② 看消费者的处理耗时(应用日志 + APM 监控),常见原因:慢 SQL、外部 API 超时、单条消息处理逻辑太重。③ 看消费者的 poll 间隔和 max.poll.records,如果 max.poll.records 太大导致处理时间超过 max.poll.interval.ms,会触发 Rebalance,Rebalance 期间消费停止,Lag 继续增长,形成恶性循环。④ 看 Broker 端的 RequestQueueSize 和磁盘 I/O,排除 Broker 瓶颈。

应急处理:① 临时增加消费者实例数(不超过 Partition 数)。② 减小 max.poll.records(如从 500 降到 100),降低单次处理量,避免超时。③ 如果是代码 bug 导致处理卡住,先重启消费者恢复消费,再修复代码。

长期优化:① 异步化消费处理(消费线程只做分发,实际处理交给异步线程池)。② 增加 Partition 数量,提高并行度上限。③ 对热点 Key 做分散处理(加随机后缀,避免数据倾斜)。④ 配置合理的消费线程池和 max.poll.records

Q10:Kafka 如何实现消息的 Exactly-Once 语义?

Kafka 的 Exactly-Once 分两层:

单 Partition 幂等enable.idempotence=true,Broker 为每个 producerId + producerEpoch 维护 sequence number,重复的消息(sequence number 不大于已处理的)被自动丢弃。保证单 Producer 对单 Partition 的 Exactly-Once。

跨 Partition 事务:设置 transactional.id,Producer 可以将多个 Partition 的写入包装在一个事务中。消费者设置 isolation.level=read_committed 后只读取已提交事务的消息(未提交的、已回滚的不可见)。事务的实现:Broker 端有一个 __transaction_state Topic 记录事务状态(Prepare / Commit / Abort),事务提交时 Broker 将事务标记为 Commit 并让所有涉及的 Partition 对消费者可见。

完整的 Consume-Transform-Produce Exactly-Once:Producer 在事务中同时写入输出消息和消费者 offset(sendOffsetsToTransaction),保证"消费输入 + 生产输出"是原子的。

Q11:Kafka 集群扩容(加 Broker)后,已有的 Partition 会自动迁移到新 Broker 吗?

不会自动迁移。新增的 Broker 只会被新创建的 Topic 或新增的 Partition 使用。已有的 Partition 仍然在原来的 Broker 上,需要手动执行 Partition Reassignment(kafka-reassign-partitions.sh)将部分 Partition 迁移到新 Broker。

迁移过程:① 生成迁移计划(--generate),指定哪些 Partition 迁移到哪些 Broker。② 执行迁移(--execute),Kafka 开始在新 Broker 上创建副本并从 Leader 同步数据。③ 验证迁移(--verify),确认所有副本同步完成。④ 迁移期间旧副本继续服务,同步完成后 Controller 更新元数据,新副本加入 ISR,旧副本被删除。

注意:迁移过程会产生大量的网络流量(从 Leader 复制数据到新 Follower),建议在低峰期执行,并通过 --throttle 参数限制复制速率(如 --throttle 50000000 限制为 50MB/s)。

Q12:Kafka 的消息积压和 RocketMQ 有什么不同?处理策略有何差异?

Kafka 的消息积压(Consumer Lag)处理与 RocketMQ 有几个关键差异:

Kafka 的消费者和 Partition 是强绑定关系(一个 Partition 只能被同一 Group 内一个消费者消费),所以扩容的上限是 Partition 数量,超过后多余消费者空闲。RocketMQ 的 Rebalance 机制类似,但 Queue 数量通常更灵活(可以通过 mqadmin updateTopic -w 快速增加)。

Kafka 没有内置的重试队列和死信队列,消费失败需要自己实现(通常是将失败消息发到 error Topic,由另一个消费者处理)。RocketMQ 内置了重试 Topic 和死信队列(%RETRY% + %DLQ%),消费失败自动重试。

Kafka 的 Partition 迁移(扩容后均衡数据)需要手动操作 kafka-reassign-partitions。RocketMQ 的 Queue 分布在多个 Broker 上,新增 Broker 后新 Topic 自动均衡,但已有 Topic 也需要手动调整。

处理策略的共同点:先排查消费端瓶颈 → 优化处理逻辑 → 扩容消费者(不超过 Partition/Queue 数)→ 必要时增加 Partition/Queue 数量。

架构设计层(资深)

Q13:设计一个基于 Kafka 的 CDC(Change Data Capture)系统,你怎么做?

CDC 的目标是将数据库的变更(INSERT/UPDATE/DELETE)实时同步到其他系统(如 Elasticsearch、数据仓库、缓存)。

架构设计:① 数据源:MySQL binlog → Debezium(CDC 连接器)→ Kafka Topic(按表分 Topic,Key 为主键,Value 为变更事件 JSON)。② 日志压缩:CDC Topic 配置 log.cleanup.policy=compact,保证每个主键只保留最新状态,消费者重启后可以从 Compaction 后的快照恢复。③ 数据分发:Kafka Connect(JDBC Sink / ES Sink)将 Kafka 中的变更写入目标系统。④ Schema 管理:使用 Schema Registry(Confluent 开源)管理 Avro/Protobuf Schema,保证数据格式兼容性。

关键设计点:① Debezium 的 offset 存储在 Kafka Connect 的 offset Topic 中,支持断点续传。② CDC 消息需要保证表级别的顺序(同一行的变更必须有序),Debezium 默认以主键为 Kafka Key,保证同一行的消息路由到同一 Partition。③ 对于大表的全量初始同步,Debezium 支持 Snapshot 模式(先全量读取再做增量 binlog)。

Q14:如何设计一个支持每秒百万消息的 Kafka 集群?

百万消息/秒的设计需要从多维度考量:

容量规划:假设平均消息大小 1KB,百万消息/秒 ≈ 1GB/s 写入吞吐。单个 Broker(SSD、8 核 32G)的写入吞吐约 200-500 MB/s,需要 3-5 个 Broker。考虑峰值和冗余(N+1),推荐 6 个 Broker。

Topic 与 Partition 设计:按业务拆分 Topic,每个 Topic 的 Partition 数 = 目标吞吐 / 单 Partition 吞吐。假设单 Partition 写入约 20 MB/s,百万消息(1GB/s)需要 50 个 Partition 分布在 6 个 Broker 上。消费者端,每个 Consumer Group 的实例数 = Partition 数(或按比例分配)。

硬件选型:Broker 使用 NVMe SSD(顺序写 3GB/s+),32G 内存(12G JVM 堆 + 20G Page Cache),万兆网卡(10Gbps ≈ 1.25GB/s,跨机房复制需要足够带宽)。

网络:同机房部署(跨机房复制延迟高),Broker 之间使用万兆内网。生产者使用 acks=all 时,跨 Broker 同步的延迟约 0.5-1ms(同机房)。

监控与告警:重点监控 UnderReplicatedPartitionsConsumerLagRequestQueueSize、磁盘使用率,配置 Grafana Dashboard 和 PagerDuty 告警。

Q15:Kafka 和 Pulsar 的架构对比,各自适合什么场景?

两者都是分布式消息流平台,但架构设计有根本差异:

存储架构:Kafka 是存算一体(Broker 既做计算又做存储),Partition 和 Broker 绑定;Pulsar 是存算分离(Broker 无状态 + BookKeeper 做存储),Topic 可以跨多个 Bookie 节点。Kafka 扩缩容需要迁移 Partition 数据,Pulsar 扩缩容只需加节点(数据自动均衡)。

多租户:Pulsar 原生支持多租户(Tenant → Namespace → Topic),资源隔离粒度更细;Kafka 需要通过 Quota + 独立集群实现多租户。

跨地域复制:Pulsar 内置 Geo-Replication(配置即可开启);Kafka 需要 MirrorMaker 2 或 Confluent Replicator。

适用场景:Kafka 生态更成熟(Confluent 商业支持、ksqlDB、Schema Registry、丰富的 Connector),适合已有大数据生态(Spark/Flink/Hadoop)的团队,以及日志、流计算、CDC 场景。Pulsar 适合对多租户、跨地域、弹性扩缩容有强需求的场景(如 SaaS 平台、全球化部署)。

性能对比:同等硬件下 Kafka 的吞吐更高(存算一体减少了网络跳转),Pulsar 的延迟更稳定(存算分离避免了 Broker 间的 I/O 竞争)。

十一、源码阅读指南

11.1 推荐阅读顺序

第 1 周:理解消息写入全链路
  入口:KafkaProducer.send() → Sender.run()
  重点:RecordAccumulator 攒批 → Sender 分组发送 → ProduceRequest 构建
  Broker 端:ProduceRequest 处理 → Log.append() → ReplicaManager 同步

第 2 周:理解消息存储
  入口:Log.append() → LogSegment.append()
  重点:MappedByteBuffer 写入 → Index 更新 → 刷盘策略
  消费端:FileRecords.read() → sendfile 零拷贝

第 3 周:理解消费与 Rebalance
  入口:KafkaConsumer.poll() → ConsumerCoordinator
  重点:分区分配 → Fetcher 拉取 → offset 管理 → Rebalance 协议

第 4 周:理解 Controller 与副本管理
  入口:KafkaController(Controller 模块)→ ReplicaManager
  重点:Leader 选举 → ISR 管理 → Partition Reassignment
  KRaft:QuorumController → MetadataLog(__cluster_metadata Topic)

11.2 调试技巧

  1. 本地启动单节点 Kafka(KRaft 模式,无需 ZooKeeper),配置文件设置 num.partitions=1 简化调试。
  2. Log.append() 打断点,跟踪一条消息从接收到写入 LogSegment 的全过程。
  3. ConsumerCoordinator.onJoinComplete() 打断点,观察 Rebalance 后的分区分配结果。
  4. 使用 kafka-dump-log.sh 工具查看 LogSegment 的二进制内容(解码消息格式)。
  5. 使用 Arthas 或 JFR(Java Flight Recorder)观察生产环境的线程行为和性能瓶颈。

十二、学习资源

资源 说明
Kafka 官方文档 最权威的参考,Design 章节写得极好
Kafka: The Definitive Guide (2nd) Confluent 团队编写,覆盖原理与实践
Kafka 核心技术与实践(极客时间) 胡夕老师出品,中文最佳 Kafka 课程
Kafka 源码 Scala + Java 混编,3.x 开始逐步迁移到纯 Java
Confluent Developer Confluent 官方学习平台,有大量教程和示例
Kafka KRaft 设计文档 KIP-500,理解 KRaft 的设计动机

本教程基于 Kafka 3.x / 4.x(KRaft 模式),部分内容标注了与旧版本(ZooKeeper 模式)的差异。

建议学习方式:每读完一节就动手实验(本地 Docker 启动 Kafka + kafkacat 工具收发消息),结合源码断点验证理解。面试前重点准备 Q3/Q5/Q6/Q9/Q10/Q13 这六道高频深度题。

0

评论区