admin管理员组

文章数量:1599280

目录

什么是消息队列与Kafka简介

为什么需要消息队列?

消息队列有什么优点和缺点?

Kafka简介

Kafka的优势和特点

Kafka与传统消息队列的对比

Kafka的架构设计

Kafka工作流程

Kafka的数据模型与消息存储机制

索引文件

数据文件

Kafka文件存储优势

Kafka 副本同步机制

ACKS 机制

生产者重试机制:

ISR 机制:

Kafka 副本数据一致性

Kafka如何保证消息可靠

生产者端可靠性

Broker 端可靠性

消费者端可靠性

其他考虑

Kafka 重试机制会导致同一条消息存储多次吗

Kafka中的消费者偏移量是如何管理的

自动提交偏移量

手动提交偏移量

偏移量的存储

偏移量的重置

Kafka中的消息如何分配给不同的消费者

生产者消息分配

消费者消息分配

分配示例

总结

Kafka什么是“零拷贝”?有什么作用?

“零拷贝”的含义

Kafka 中的“零拷贝”

“零拷贝”的作用

Kafka中的消息是如何存储的?

存储结构

存储格式

存储机制

存储优化

总结

如何确保Kafka集群的高可用

1. 分区副本(Replication)

2. 自动 Leader 选举

3. 动态分区分配

4. 配置数据保留策略

5. 监控和警报

6. 硬件和网络冗余

7. 配置容错


什么是消息队列与Kafka简介

​​消息队列(Message Queue,简称MQ),指保存消息的一个容器,本质是个队列。
消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符 串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。

  • Producer:消息生产者,负责产生和发送消息到 Broker;
  • Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
  • Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理; 

为什么需要消息队列?

  1. 屏蔽异构平台的细节:发送方、接收方系统之间不需要了解双方,只需认识消息。
  2. 异步:消息堆积能力;发送方接收方不需同时在线,发送方接收方不需同时扩容(削 峰)。
  3. 解耦:防止引入过多的API给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力。
  4. 复用:一次发送多次消费。
  5. 可靠:一次保证消息的传递。如果发送消息时接收者不可用,消息队列会保留消息,直到成功地传递它。
  6. 提供路由:发送者无需与接收者建立连接,双方通过消息队列保证消息能够从发送者路由到接收者,甚至对于本来网络不易互通的两个服务,也可以提供消息路由。 

消息队列有什么优点和缺点?

1. 核心优点

a. 解耦 b. 异步 c. 流量削峰

2. 缺点

a. 系统可用性降低:系统引入的外部依赖越多,越容易挂掉。

b. 系统复杂度提高了

c. 一致性问题:消息传递给多个系统,部分执行成功,部分执行失败,容易导致数据不 一致

Kafka简介

Kafka是一个分布式流处理系统,可以像消息队列发布和订阅消息,分布式提供了容错性,并发处理消息的机制。

核心概念:

1. 主题(Topic)
一个主题是消息的分类或馈送名称,它是逻辑上的概念,用于组织和识别消息。
2. 分区(Partition)
主题被物理上分割为一个或多个分区,每个分区是一个有序的、不可变的消息序列。分区可以分布在不同的 Kafka 服务器(Broker)上,以实现数据的并行处理和提高系统吞吐量。
3. 生产者(Producer)
生产者是向 Kafka 主题发布消息的客户端。它们可以指定消息发送到特定的主题和分区,或者由 Kafka 自动分配。
4. 消费者(Consumer)
消费者是从 Kafka 主题读取消息的客户端。消费者可以是单个进程,也可以是多个进程组成的消费者组。
5. 消费者组(Consumer Group)
多个消费者可以组成一个消费者组,共同消费一个主题的消息。这允许消息的并行处理和在消费者失败时重新分配分区。
6. Broker
Kafka 集群中的服务器节点,负责存储和传输消息,以及管理主题和分区的状态。
7. ZooKeeper
Kafka 使用 ZooKeeper 协调集群的元数据和高可用性,尽管较新的版本提供了更多的内置协调功能,减少了对 ZooKeeper 的依赖。
8. 偏移量(Offset)
每条消息在主题的分区中有唯一的偏移量,用于标识消息的位置。消费者使用偏移量来追踪已读取消息的进度。
9. Replication(复制)
为了保证数据的高可用性和容错性,每个分区的数据会被复制到多个 Broker 上。每个分区有一个 Leader 和零个或多个 Follower。Leader 负责处理所有读写请求,Follower 从 Leader 同步数据。这保证了即使某些 Broker 失败,数据仍然可用。
10. Segments(段)
Kafka 使用 Segments 来存储和管理分区数据。每个分区的数据被分割成多个较小的 Segment 文件,每个 Segment 代表一段时间内的消息集合。这有助于提高存储效率和性能,因为较小的文件更容易管理和检索。
11. 事务
Kafka 支持事务,允许一组消息作为一个原子操作被提交或回滚,以保证消息的一致性。
这些概念共同构成了 Kafka 的基础架构,使其成为一个强大的实时数据处理和存储系统,适用于多种场景,包括实时分析、日志聚合、流式数据处理等。

Kafka的优势和特点

优势

  1. 高吞吐量:单机每秒处理几十上百万的消息量。即使存储了许多TB的消息,它也保持稳 定的性能
  2. 高性能:单节点支持上千个客户端,并保证零停机和零数据丢失,异步化处理机制
  3. 持久化:将消息持久化到磁盘。通过将数据持久化到硬盘以及replica(follower节点)防止 数据丢失
  4. 零拷贝:减少了很多的拷贝技术,以及可以总体减少阻塞事件,提高吞吐量
  5. 可靠性 :Kafka是分布式,分区,复制和容错的

特点

  1. 顺序读,顺序写
  2. 利用Linux的页缓存
  3. 分布式系统,易于向外扩展。所有的Producer、Broker和Consumer都会有多 个,均为分布式的。无需停机即可扩展机器。多个Producer、Consumer可能是不同的应用。
  4. 客户端状态维护:消息被处理的状态是在Consumer端维护,而不是由server端维 护。当失败时能自动平衡。
  5. 支持online(在线)和offline(离线)的场景。
  6. 支持多种客户端语言。Kafka支持Java、.NET、PHP、Python等多种语言。

Kafka与传统消息队列的对比

各种对比之后,有如下建议:

  1. ActiveMQ,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以不推荐;
  2. RabbitMQ,虽然erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而 言,几乎处于不可控的状态,但是毕竟是开源的,比较稳定的支持,活跃度也高,推荐 中小型公司使用;推荐
  3. RocketMQ,阿里出品,Java语言编写,经过了阿里多年双十一大促的考验,性能和稳 定性得到了充分的严重。目前在业界被广泛应用在订单,交易,充值,流计算,消息推 送,日志流式处理,binlog分发等场景;强烈推荐
  4. Kafka,如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对 没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

Kafka的架构设计

kafka运行在集群上,集群包含一个或多个服务器。kafka把消息存在topic中,每一条消息包含键 值(key),值(value)和时间戳(timestamp)。

kafka有以下一些基本概念:

  • Producer - 消息生产者,就是向kafka broker发消息的客户端。
  • Consumer - 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。
  • Topic - 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关 系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。
  • Partition - 消息分区,一个topic可以分为多个 partition,每个partition是一个有序的队列。 partition中的每条消息都会被分配一个有序的id(offset)。
  • 副本(Replicas)
    为了防止数据丢失以及提高系统的可用性,Kafka 为每个分区创建了一个或多个副本。这些副本分布在不同的 Kafka 经纪人(Broker)上。每个分区有一个主副本(Leader Replica)和零个或多个跟随副本(Follower Replicas)。
  • ISR (In-Sync Replicas)
    ISR 指的是与主副本保持同步的所有副本集合。当一个跟随副本长时间未能跟上主副本的变化时,它会被从 ISR 列表中移除。
  • Segment 文件
    每个分区的日志文件由多个 Segment 文件组成。Segment 文件是在磁盘上表示的物理文件,它们是 Kafka 存储消息的基本单元。
  • Broker - 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多 个topic。
  • Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的 consumer group,多个消费者可以共同消息一个Topic,Topic下的每个分区只会被分配给组内的一个消费者,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
  • Offset - 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。
  • 事务
    Kafka 支持事务,允许一组消息作为一个原子操作被提交或回滚,以保证消息的一致性。
    这些概念共同构成了 Kafka 的基础架构,使其成为一个强大的实时数据处理和存储系统,适用于多种场景,包括实时分析、日志聚合、流式数据处理等。

Kafka工作流程

  1. producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader
  2. producer将消息发送给该leader
  3. leader将消息写入本地log
  4. followers从leader pull消息
  5. 写入本地log后向leader发送ACK
  6. leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后 commit 的offset)并向producer发送ACK
  7. Kafka 中消息是以topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic的。
  8. topic 是逻辑上的概念,而partition 是物理上的概念,每个partition 对应一个log 文 件,该log 文件中存储的就是producer 生产的数据。Producer 生产的数据会被不断追 加到该log 文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实 时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。 

Kafka的数据模型与消息存储机制

Kafka 有 Topic 和 Partition 两个概念,一个 Topic 可以有多个 Partition。在实际存储的时 候,Topic + Partition 对应一个文件夹,这个文件夹对应的是这个 Partition 的数据。 在 Kafka 的数据文件目录下,一个 Partition 对应一个唯一的文件夹。如果有 4 个 Topic, 每个 Topic 有 5 个 Partition,那么一共会有 4 * 5 = 20 个文件夹。而在 文件夹下,Kafka 消息是采用 Segment File 的存储方式进行存储的。 Segment File 的大概意思是:将大文件拆分成小文件来存储,这样一个大文件就变成了一段 一段(Segment 段)。这样的好处是 IO 加载速度快,不会有很长的 IO 加载时间。Kafka 的消息存储就采用了这种方式。

如上图所示,在一个文件夹下的数据会根据 Kafka 的配置拆分成多个小文件。拆分规则可以 根据文件大小拆分,也可以根据消息条数拆分,这个是 Kafka 的一个配置,这里不细说。 在 Kafka 的数据文件夹下,分为两种类型的文件:索引文件(Index File)和数据文件 (Data File)。索引文件存的是消息的索引信息,帮助快速定位到某条消息。数据文件存储的是具体的消息内容。

索引文件

索引文件的命名统一为数字格式,其名称表示 Kafka 消息的偏移量。我们假设索引文件的数字为 N,那么就代表该索引文件存储的第一条 Kafka 消息的偏移量为 N + 1,而上个文件存 储的最后一条 Kafka 消息的偏移量为 N(因为 Kafka 是顺序存储的)。例如下图的 368769.index 索引文件,其表示文件存储的第一条 Kafka 消息的偏移量为 368770。而 368769 表示的是 0000.index 这个索引文件的最后一条消息。所以 368769.index 索引文 件,其存储的 Kafka 消息偏移量范围为 368769-737337。

索引文件存储的是简单地索引数据,其格式为:「N,Position」。其中 N 表示索引文件里的 第几条消息,而 Position 则表示该条消息在数据文件(Log File)中的物理偏移地址。例如 下图中的「3,497」表示:索引文件里的第 3 条消息(即 offset 368772 的消息,368772 = 368769+3),其在数据文件中的物理偏移地址为 497。

其他的以此类推,例如:「8,1686」表示 offset 为 368777 的 Kafka 消息,其在数据文件 中的物理偏移地址为 1686。

数据文件

数据文件的命名格式与索引文件的命名格式完全一样,这里就不再赘述了。 通过上面索引文件的分析,我们已经可以根据 offset 快速定位到某个数据文件了。那接着我 们怎么读取到这条消息的内容呢?要读取到这条消息的内容,我们需要搞清楚数据文件的存 储格式。 数据文件就是所有消息的一个列表,而每条消息都有一个固定的格式,如下图所示。

从上图可以看到 Kafka 消息的物理结构,其包含了 Kafka 消息的 offset 信息、Kafka 消息的 大小信息、版本号等等。有了这些信息之后,我们就可以正确地读取到 Kafka 消息的实际内容。 

Kafka文件存储优势

Kafka运行时很少有大量读磁盘的操作,主要是定期批量写磁盘操作,因此操作磁盘很高 效。这跟Kafka文件存储中读写message的设计是息息相关的。Kafka中读写message有如下 特点:

写message

  • 消息从java堆转入page cache(即物理内存)。
  • 由异步线程刷盘,消息从page cache刷入磁盘。

读message

  • 消息直接从page cache转入socket发送出去。
  • 当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁盘Load消息到page cache,然后直接从socket发出去

Kafka高效文件存储设计特点

  • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期 清除或删除已经消费完文件,减少磁盘占用。
  • 通过索引信息可以快速定位message和确定response的最大大小。
  • 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
  • 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

Kafka 副本同步机制

为保证producer发送的数据,能可靠到指定topic,topic的每个的partition收到 producer 发送的数据后,都需要向producer发送 ack(acknowledgement确认收到),如果 producer收到 ack,就会进行下一轮的发送。

ACKS 机制

在 Kafka 中,消息的 ACK(Acknowledgment,确认)机制与生产者的 acks 配置有关。 acks 配置表示生产者在接收到消息后等待副本同步确认的方式,具体取值有:

acks=0:

意义:生产者在成功将消息发送给 Kafka 服务端后不等待任何确认。 结果:生产者无法知道消息是否成功到达 Kafka 服务器,可能会导致消息的丢失。 这种配置下,生产者不会收到任何 ACK。

acks=1:

意义:生产者在成功将消息发送给 Kafka 服务端后,等待该分区的首领节点 (leader)确认。 结果:生产者会收到分区首领节点的 ACK。这意味着只要分区首领节点成功接收到 消息,生产者就会得到确认,而不需要等待其他副本。

acks=all 或 acks=-1:

意义:生产者在成功将消息发送给 Kafka 服务端后,等待所有分区副本确认。 结果:生产者会等待分区的所有副本都成功接收到消息并确认。这是最安全的配置, 因为只有当所有副本都确认接收到消息后,才认为消息被成功提交。

生产者重试机制:

Kafka 生产者在发送消息后,如果设置了等待服务器的确认(通过 acks 参数配置),会等待 一定时间来收到来自服务器的确认(ack)。这个等待时间由 timeout.ms 参数控制,默认 是 10000 毫秒(10秒)。 如果在等待时间内没有收到服务器的确认,生产者可以选择重试发送或者处理发送失败的逻 辑。这取决于生产者的配置。通常,生产者会根据配置的重试次数和重试间隔来进行重试, 以确保消息最终被成功发送。

在 Kafka 的生产者配置中,你可以找到以下与重试相关的配置项:

retries: 定义了生产者在发送消息时的最大重试次数。

retry.backoff.ms: 定义了两次重试之间的等待时间间隔。

ISR 机制:

Kafka根据副本同步的情况,分成了3个集合:

AR(Assigned Replicas):包括ISR和OSR

ISR(In-sync Replicas):和leader副本保持同步的副本集合,可以被认为是可靠的数 据

OSR(Out-Sync Replicas):和Leader副本同步失效的副本集合

当 kafka 副本同步机制是所有follower都同步成功才返回 ack 给生产者时,如果有一个 follower,因为某种故障,迟迟不能与leader 进行同步,那leader 就要一直等下去,直到它 完成同步,才能发送ack。这个问题怎么解决呢?

Leader维护了一个动态的in-sync replica set (ISR-同步副本列表),意为和leader保持同步的 follower集合。根据follower发来的FETCH请求中的fetch offset判断ISR中的follower完成数 据同步是否成功。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该 时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新 的leader。

  1. ISR(In-Sync Replicas ):与leader保持同步的follower集合
  2. AR(Assigned Replicas):分区的所有副本
    • ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间 replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新 的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也 会先存放在OSR中。
    • AR=ISR+OSR。

Kafka 副本数据一致性

尽管采用 acks = all 但是也会出现 不一致的场景,例如:

假设 leader 接受了 producer 传来的数据为 8 条,ISR 中三台 follower (broker0,broker1,broker2)开始同步数据,由于网络传输,另外两台 follower 同步数 据的速率不同。当 broker1 同步了 4 条数据,broker2 已经同步了 6 条数据,此时, leader-broker0 突然挂掉,从 ISR 中选取了 broker1 作为主节点,此时 leader-broker1 同步了 4 条,broker2 同步 6,就会造成 leader 和 follower之间数据不一致问题。

HW (High Watermark)俗称高水位,它标识了一个特定的消息偏移量(offset),消 费者只能拉取到这个offset之前的消息,对于同一个副本对象而言,其 HW 值不会大于 LEO 值。小于等于 HW 值的所有消息都被认为是“已备份”的(replicated) 。所有分 区副本中消息偏移量最小值。

LEO(Log End Offset),即日志末端位移(log end offset),记录了该副本底层日志 (log)中下一条消息的位移值。注意是下一条消息!也就是说,如果 LEO =8,那么表示 该副本保存了 8 条消息,位移值范围是[0, 7]。LEO 的大小相当于当前日志分区中最后一 条消息的 offset 值加1,分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合 中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。

针对不同的产生原因,解决方案不同:

当服务出现故障时:如果是 Follower 发生故障,这不会影响消息写入,只不过是少了一 个备份而已。处理 相对简单一点。Kafka 会做如下处理:

  • 将故障的 Follower 节点临时踢出 ISR 集合。而其他 Leader 和 Follower 继续正常 接收消息。
  • 出现故障的 Follower 节点恢复后,不会立即加入 ISR 集合。该 Follower 节点会读 取本地记录的上一次的 HW,将自己的日志中高于 HW 的部分信息全部删除掉,然 后从 HW 开始,向 Leader 进行消息同步。
  • 等到该 Follower 的 LEO 大于等于整个 Partiton 的 HW 后,就重新加入到 ISR 集 合中。这也就是说这个 Follower 的消息进度追上了 Leader。 

如果是 Leader 节点出现故障,Kafka 为了保证消息的一致性,处理就会相对复杂一 点。

  • Leader 发生故障,会从 ISR 中进行选举,将一个原本是 Follower 的 Partition提升 为新的 Leader。这时, 消息有可能没有完成同步,所以新的 Leader 的LEO 会低于 之前 Leader 的 LEO。
  • Kafka 中的消息都只能以 Leader 中的备份为准。其他 Follower 会将各自的Log 文 件中高于 HW 的部分全部 清理掉,然后从新的 Leader 中同步数据。
  • 旧的 Leader 恢复后,将作为 Follower 节点,进行数据恢复。

Kafka如何保证消息可靠

生产者端可靠性

  1. ACKs 设置:

    • acks=0: 生产者在发送消息后不等待任何确认。这种方式最快,但没有可靠性保证。
    • acks=1: 生产者在发送消息后等待来自 Leader 的确认。如果 Leader 故障,消息可能丢失。
    • acks=all: 生产者在发送消息后等待所有 ISR(In-Sync Replicas)的确认。这是最可靠的 ACK 设置。
  2. 重试机制:

    • 生产者可以配置重试策略,如 retries 和 retry.backoff.ms,以在发送失败时重新尝试发送消息。
  3. 批量发送:

    • 生产者可以设置 batch.size 和 linger.ms 来控制消息的批量发送,以减少网络通信次数,提高性能的同时保持可靠性。
  4. 压缩:

    • 生产者可以选择压缩算法(如 GZIP 或 Snappy)来减小消息体积,节省存储空间。

Broker 端可靠性

  1. 分区副本:

    • Kafka 的每个分区都有一个 Leader 和零个或多个 Follower 副本。通过配置 replication.factor,可以增加副本数量,提高容错性。
  2. ISR(In-Sync Replicas):

    • ISR 是与 Leader 保持同步的副本集合。通过设置 min.insync.replicas,可以控制至少需要多少副本同步才能接受消息。
  3. Leader 选举:

    • 如果 Leader 副本失败,一个 Follower 副本将被选为新的 Leader。通过配置 unclean.leader.election.enable,可以控制是否允许非同步副本成为 Leader。
  4. 日志清理:

    • 可以配置 log.retention.hours 和 log.retention.bytes 来控制日志保留的时间和大小,避免过早清除数据。
  5. 日志压缩:

    • Kafka 支持日志压缩,这可以减少磁盘占用,同时保持数据完整性。

消费者端可靠性

  1. 自动提交偏移量:

    • 默认情况下,消费者自动提交偏移量。这可能导致数据丢失。建议手动控制偏移量提交。
  2. 偏移量提交:

    • 手动控制偏移量提交,确保只有在消息成功处理后才提交偏移量,以防止数据丢失。
  3. 重置偏移量:

    • 如果消费者未能处理消息,可以重置偏移量以重新处理消息。

其他考虑

  1. 监控和警报:

    • 使用 Kafka 的监控工具和警报系统来监控集群状态,确保及时发现和解决问题。
  2. 容灾策略:

    • 实施地理分布的集群和数据备份策略,以应对更广泛的故障场景。

Kafka 重试机制会导致同一条消息存储多次吗

  1. 幂等性(Idempotence):

    开启生产者的幂等性模式 (enable.idempotence=true) 可以确保即使消息被重试发送,也只会被存储一次。这是因为 Broker 会在存储消息之前检查消息的序列号(sequence number),并确保同一序列号的消息只被存储一次。
  2. 消息去重:

    如果幂等性和事务都不适用,可以在消费者端实现逻辑去重。例如,可以在消费消息时检查消息的内容或 ID 是否已经处理过。

Kafka中的消费者偏移量是如何管理的

消费者偏移量决定了消费者从哪个位置开始消费消息。Kafka 提供了两种主要的方式来管理消费者偏移量:自动提交和手动提交。

自动提交偏移量

自动提交偏移量是一种较为简单的管理方式,它由 Kafka 客户端自动执行,不需要开发人员显式地进行偏移量的提交。

  1. 配置自动提交:

    • 通过设置 enable.automit 为 true 来启用自动提交。
    • 可以通过设置 automit.interval.ms 来配置自动提交的频率。
  2. 优缺点:

    • 优点:
      • 简化了开发工作,因为不需要编写额外的代码来管理偏移量。
      • 适合大多数用例,特别是在消息处理相对简单的情况下。
    • 缺点:
      • 如果消费者处理消息失败并在下一个自动提交周期之前重启,那么可能需要重新处理已经处理过的消息。
      • 如果消费者处理消息很慢,而自动提交周期很短,可能会导致消息被跳过。

手动提交偏移量

手动提交偏移量提供了更细粒度的控制,开发人员可以根据业务逻辑来决定何时提交偏移量。

  1. 同步提交:

    • 使用 commitSync() 方法来同步提交偏移量。这种方法会阻塞直到偏移量提交成功。
    • 适用于需要立即确认偏移量已提交的情况。
  2. 异步提交:

    • 使用 commitAsync() 方法来异步提交偏移量。这种方法不会阻塞,提交的结果通过回调函数来通知。
    • 更适合需要高性能的应用程序。
  3. 优缺点:

    • 优点:
      • 可以确保只有在消息成功处理后才提交偏移量,减少了重复处理的风险。
      • 可以更好地控制偏移量的提交时机,比如可以在事务边界或者检查点之后提交。
    • 缺点:
      • 需要更多的编码工作来处理偏移量的提交。
      • 需要处理异常和错误情况,确保偏移量正确提交。

偏移量的存储

无论使用哪种提交方式,偏移量都会存储在一个特殊的 Kafka 主题 _consumer_offsets 中。这个主题使用与普通主题相同的分区和副本机制,以保证偏移量存储的高可用性和可靠性。

偏移量的重置

消费者还可以重置其偏移量,例如在某些特定条件下,消费者可能需要从头开始消费消息或者跳过某些消息。这可以通过调用 seek() 方法来实现。

Kafka中的消息如何分配给不同的消费者

生产者消息分配

当生产者向 Kafka 发送消息时,它需要确定消息应该被发送到哪个分区。Kafka 提供了几种策略来决定消息的分配:

  1. 指定分区:

    • 生产者可以显式地为消息指定一个分区。在这种情况下,消息将直接发送到指定的分区。
  2. 基于键的分区:

    • 如果消息具有键(key),则生产者会使用该键来计算分区。通常使用的是 Murmur2 哈希算法,然后对分区数取模来确定具体的分区。
  3. 粘性分区(Sticky Partitioner):

    • 如果消息没有显式指定分区也没有键,Kafka 会使用粘性分区策略。这种策略是为了优化消息发送的性能,尽量将同一批次的消息发送到相同的分区,以减少网络传输次数。
  4. 默认分区器:

    • 如果以上条件都不满足,生产者会使用默认的分区器,通常是基于消息的键进行分区。

消费者消息分配

消费者从 Kafka 获取消息的过程涉及到消费者组的概念。一个消费者组中的所有消费者共享一个主题的分区。以下是消费者消息分配的几种策略:

  1. Range 策略:

    • 这是最常见的分配策略。对于每个主题,Kafka 将所有分区按照其序号排序,并将分区范围均等地分配给消费者。这意味着分区编号较小的分区会被优先分配给消费者,若消费者数不能整除分区数,则前面的消费者会多分配一个或多个分区。
  2. Round Robin 策略:

    • 此策略将不分主题地为每个消费者轮流分配分区,确保所有消费者在总体上获得大致相等数量的分区。不过,在单个主题内部,分区分配并不保证有序。
  3. Sticky 策略:

    • 这种策略试图在消费者之间保持一个“粘性”平衡,即在消费者数量变化时尽量保持现有的分区分配不变,以减少重新分配引起的开销。

分配示例

假设有一个主题 example-topic,它有 5 个分区,现在有两个消费者 consumer1consumer2 加入同一个消费者组 group1

  • Range 策略:

    • 如果使用 Range 策略,分区将会被分配如下:
      • consumer1 可能会得到分区 0 和 1。
      • consumer2 可能会得到分区 2、3 和 4。
  • Round Robin 策略:

    • 如果使用 Round Robin 策略,分区将会被分配如下:
      • consumer1 可能得到分区 0、2 和 4。
      • consumer2 可能得到分区 1 和 3。
  • Sticky 策略:

    • 如果使用 Sticky 策略,并且消费者组中消费者数量发生变化,例如加入了一个新的消费者 consumer3,则分配可能会尽量保持之前分配的分区不变,同时新加入的消费者会获得剩余的分区。

总结

  • 生产者通过指定分区、基于键的分区、粘性分区或默认分区器来决定消息的发送目标。
  • 消费者通过 Range 策略、Round Robin 策略或 Sticky 策略来决定如何从主题的分区中获取消息。
  • 分配策略的选择取决于具体的应用场景和需求。

Kafka的follower可以读吗?

​​​​​​为保证数据一致性, follower仅用于数据冗余和故障恢复, 不能从follower读取

Kafka什么是“零拷贝”?有什么作用?

在 Apache Kafka 中,“零拷贝”(Zero Copy)是一种优化技术,它减少了数据在内存中的拷贝次数,从而提高了数据处理的效率。这种技术特别适用于高吞吐量和低延迟的场景,如 Kafka 的数据传输过程。

“零拷贝”的含义

在传统的数据传输过程中,数据经常需要从一个内存区域复制到另一个内存区域。例如,当数据从网络接收时,它首先被复制到操作系统缓冲区,然后可能需要复制到用户空间的缓冲区,再从用户空间复制到网络发送缓冲区等。这些不必要的复制操作会消耗 CPU 时间和内存带宽。

“零拷贝”技术旨在减少这些不必要的内存复制操作,通过直接在内核空间操作数据,或者通过其他技术避免数据在不同内存区域之间的复制。

Kafka 中的“零拷贝”

在 Kafka 中,零拷贝主要体现在以下几个方面:

1. 直接内存映射

Kafka 利用 Java NIO 的 DirectByteBuffer 和直接内存映射技术来减少数据在内存中的复制次数。当消息存储在磁盘上时,它们可以被直接映射到物理内存中,而不需要先将数据加载到 JVM 堆内存中。

2. DMA (Direct Memory Access)

DMA 技术允许设备控制器直接与主内存交互,而不需要 CPU 的干预。这可以用于将数据直接从设备(如磁盘或网络接口卡)读取到内存中,或者从内存中写入到设备中。

“零拷贝”的作用

“零拷贝”技术在 Kafka 中的作用主要包括:

  1. 提高性能:

    • 减少不必要的内存复制可以显著降低 CPU 的使用率,从而提高 Kafka 的吞吐量和响应速度。
  2. 减少延迟:

    • 由于减少了数据处理的步骤,零拷贝可以减少数据传输的延迟。
  3. 节省内存资源:

    • 通过避免不必要的数据复制,可以减少内存的使用,尤其是在高并发的情况下。
  4. 简化编程模型:

    • 对于开发人员来说,零拷贝可以简化数据处理的编程模型,使代码更容易维护和理解。

Kafka中的消息是如何存储的?

存储结构

Kafka 将消息组织成主题(Topic),每个主题可以划分为多个分区(Partition)。每个分区是一个独立的日志文件,其中的消息按照顺序存储。

  1. 主题(Topic):

    • 主题是逻辑上的命名空间,用于分类消息。
    • 每个主题可以有多个分区。
  2. 分区(Partition):

    • 分区是物理上的存储单元,每个主题可以包含一个或多个分区。
    • 每个分区是一个有序的、不可变的消息队列,其中的消息按顺序追加。
    • 分区可以分布在不同的 Broker 上,以实现数据的水平扩展。
  3. 副本(Replica):

    • 为了提高可靠性和容错性,每个分区都有一个 Leader 和零个或多个 Follower 副本。
    • Leader 分区负责所有的读写操作,而 Follower 分区则通过复制 Leader 分区的数据来保持同步。

存储格式

Kafka 的存储格式是高度优化的,以支持高效的读写操作:

  1. 日志文件:

    • 每个分区对应一个或多个日志文件,每个日志文件包含一系列消息记录。
    • 日志文件被分割成较小的段(Segment),每个段对应一个 .log 文件。
    • 每个段都有一个索引文件(.index),用于快速查找消息的位置。
  2. 消息格式:

    • 每条消息由一个键(Key)、一个值(Value)和一个时间戳组成。
    • 键和值可以是任意字节序列。
    • 时间戳用于排序消息和实现时间窗口查询。
  3. 消息索引:

    • Kafka 使用稀疏索引来加快查找消息的速度。索引记录消息的偏移量和位置信息。
    • 索引文件每隔一定长度记录一次消息的位置信息,这使得查找特定消息的偏移量非常快。

存储机制

Kafka 使用了一种称为分段日志(Segmented Log)的存储机制来提高存储效率和性能:

  1. 分段日志:

    • 日志文件被分成多个段,每个段都有一个唯一的标识符(ID)。
    • 每个段包含一个 .log 文件和一个对应的 .index 文件。
    • 段的大小不是固定的,而是根据日志的写入情况动态增长的。
  2. 文件清理:

    • Kafka 提供了两种主要的文件清理策略:基于时间的清理和基于大小的清理。
    • 基于时间的清理 (log.retention.hours): 控制日志文件的保留时间。
    • 基于大小的清理 (log.retention.bytes): 控制日志文件的最大大小。
  3. 压缩:

    • Kafka 支持对日志文件进行压缩,以节省存储空间。
    • 支持的压缩算法包括 LZ4、Snappy、Gzip 和 Zstandard。

存储优化

Kafka 采用了多种优化技术来提高存储性能:

  1. 零拷贝:

    • Kafka 利用 Java NIO 的 DirectByteBuffer 和直接内存映射技术来减少数据在内存中的复制次数。
  2. 预分配:

    • Kafka 提前创建新的日志段文件,以减少文件系统的 I/O 开销。
  3. 磁盘缓存:

    • 利用操作系统级别的缓存来提高数据的读取速度。

总结

Kafka 通过将消息组织成主题和分区,并采用高效的存储格式和机制,实现了高度优化的数据存储。这些技术使得 Kafka 能够支持大规模的实时数据流处理,同时保持高吞吐量、低延迟和高可靠性。

如何确保Kafka集群的高可用

1. 分区副本(Replication)

Kafka 的核心特性之一就是分区副本。每个分区都有一个 Leader 和零个或多个 Follower 副本。Leader 负责所有的读写操作,而 Follower 复制 Leader 的数据以确保数据冗余。

  1. 配置副本因子:

    • 通过设置 replication.factor 配置项来指定每个分区的副本数量。推荐至少设置为 3,以确保足够的冗余。
  2. 配置 ISR (In-Sync Replicas):

    • ISR 是一组与 Leader 保持同步的副本。Kafka 通过心跳机制监控副本状态,确保 ISR 中的副本始终与 Leader 保持同步。
  3. 配置最小 ISR:

    • 通过设置 min.insync.replicas 配置项来确保至少有多少个副本必须与 Leader 保持同步才能接受写操作。

2. 自动 Leader 选举

当一个分区的 Leader 发生故障时,Kafka 会自动从该分区的 Follower 副本中选举一个新的 Leader。这是通过 ZooKeeper 协调完成的。

  1. ZooKeeper 配置:

    • 确保 Kafka 集群与 ZooKeeper 集群之间的连接稳定,并且 ZooKeeper 集群本身也是高可用的。
  2. Leader 选举机制:

    • Kafka 使用 ZooKeeper 来协调 Leader 选举过程,确保选举过程的原子性和一致性。

3. 动态分区分配

Kafka 允许动态地添加或删除 Broker,而不会丢失数据或中断服务。当 Broker 节点加入或离开集群时,Kafka 会自动重新分配分区的副本。

  1. 动态加入/离开 Broker:

    • Kafka 支持动态地增加或减少 Broker 节点,以应对负载变化或故障。
  2. 动态分区分配:

    • Kafka 的 kafka-reassign-partitions.sh 工具可以用来手动重新分配分区副本,以优化负载均衡。

4. 配置数据保留策略

数据保留策略确保了数据不会被过早地清除,从而减少了数据丢失的风险。

  1. 基于时间的数据保留:

    • 通过设置 log.retention.hours 来指定日志文件的保留时间。
  2. 基于大小的数据保留:

    • 通过设置 log.retention.bytes 来指定每个主题的日志文件最大大小。

5. 监控和警报

实施有效的监控和警报策略可以帮助及时发现和解决问题。

  1. 监控工具:

    • 使用 Prometheus、Grafana 或其他监控工具来监控 Kafka 集群的状态。
  2. 健康检查:

    • 定期检查 Kafka Broker、ZooKeeper 和其他组件的健康状况。
  3. 警报系统:

    • 配置警报系统来通知管理员关于潜在的问题,如 Broker 下线、分区不平衡等。

6. 硬件和网络冗余

确保基础设施的高可用性同样重要。

  1. 多数据中心部署:

    • 如果可能的话,将 Kafka Broker 部署在多个数据中心,以实现地理分布式的高可用性。
  2. 网络冗余:

    • 确保网络连接的冗余,以避免单点故障。

7. 配置容错

确保 Kafka 配置文件中包含了适当的容错设置。

  1. Broker 容错:

    • 设置 unclean.leader.election.enable 为 false 以禁止不干净的 Leader 选举。
  2. 日志清理:

    • 设置合理的日志清理策略,如 log.cleaner.dedupe.buffer.size 和 log.cleanup.policy

本文标签: 面试题Kafka