Kafka宝典

编程入门 行业动态 更新时间:2024-10-27 02:18:18

Kafka<a href=https://www.elefans.com/category/jswz/34/1768325.html style=宝典"/>

Kafka宝典

Kafka

笔记内容取自尚硅谷Kafka3.0教程,以及《深入理解Kafka核心设计与实践原理》

内容还会不断充实~

概述

定义

传统定义:

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域

最新定义:

Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

消息队列

应用场景
  • 异步处理
  • 削峰
优势
  • 解耦

    允许你独立的扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束

  • 可恢复性

    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理

  • 缓冲

    有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的的处理速度不一致的情况

  • 灵活性&峰值处理能力

    • 灵活性:动态上下线,在流量高峰时可以动态增加服务器
    • 峰值处理:削峰,延迟处理

    使系统能够应对突发的高峰流量

  • 异步通信

    允许用户把一个消息放入队列,但不立即处理它,在需要的时候再去处理

模式
  1. 点对点模式

    一对一,消费者主动拉取数据,收到消息后,队列中的消息清除。队列可以有多个消费者,但对于一个消息而言,只能被一个消费者消费。

  2. 发布/订阅模式

    • 一对多,消费者消费消息后,队列不会清除消息。消息生产者将消息发布到topic中,同时有多个消费者消费该消息。和点对点模式不同,发布到topic的消息会被所有订阅者消费

    • 发布/订阅模式中,又分为两种:

      • 消费者主动拉取消息

        Kafka就是属于这种类型

        优势:速度取决于消费者,可以根据消费能力以适当的速率消费消息

        弊端:需要轮询,查看队列中是否有消息,浪费资源

      • 队列推送消息

        类似于公众号推送

        弊端:

        推送消息的速度取决于队列,各个消费者处理消息的速度可能不一致,造成消费者崩掉(推送速度 >消费者处理速度)或者资源浪费(推送速度 < 消费者处理速度)

Kafka基础架构

架构图

zk在这里的作用:

  • 存储kafka集群信息
  • 存储消费者消费到的位置信息
    • 即:消费到第几条了
    • 消费者本地内存也会存储该条数信息,平时就是读取并维护本地的信息;但当机器挂掉重启后,会先去zk获取该信息,然后再在本地内存继续维护)
    • 0.9之后将位置信息存储到kafka里一个系统创建的topic中
      • 为何改存到kafka?
      • 因为消费者本身就需要维护与kafka的连接,去获取消息,如果将位置信息放在zk,则还需要请求zk获取信息,速度不如kafka(注:Kafka消息存到磁盘,默认存七天

名词解释

  • Broker:可以理解为起了Kafka进程的服务器
  • Topic:主题,可以理解为一个队列,用来将消息分类,便于发送和消费
  • Partition:分区,用来提高Kafka集群的并发处理能力,每一个partition是一个有序的队列(个人理解Partition就是对Topic的又一次细分,分布式的多个Broker是为了避免单个机器的性能造成的阻塞,多个partition是为了避免同一内存区域的IO阻塞)
  • Replication:副本。为保证集群中某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower
  • Leader:每个分区多个副本的”主“,生产者发送数据的对象,消费者消费数据的对象都是leader
  • Follower:
    • Leader的副本,每个分区多个副本的”从“,实时从leader同步数据,保持和leader数据的同步。用来备份数据(不直接对生产者和消费者提供读写),避免Leader所在机器挂掉后数据丢失。(所以,同一topic同一partition的Follower和Leader一定不在同一台机器)
    • leader发生故障时,某个Follower会成为新的leader
  • Consumer Group:消费者组,可以理解为一个大的消费者,目的是提高消费能力
    • 和其他普通消费者一样,需要订阅一个topic
    • 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
    • 一个消费者组里面的不同消费者,只能消费kafka中这个topic的不同partition的数据。(个人理解:建立partition就是为了提高消息的读写速度,对于同一个topic,消息的写入根据partition区分开了,消费者消费的时候如果不分开,会降低消息消费速度且造成重复消费,那partition的意义就不大了)

基本操作

安装

命令行操作

topic

查看该机器上所有topic:

kafka-topics.sh --list --zookeeper ip:zk端口

创建topic:

kafka-topics.sh --create --topic topic名称 --zookeeper ip:zk端口 --partitions 分区数 --replication-factor 副本数#注:副本数不能大于当前可用的Broker数,分区数可以大于当前可用的Broker数
#副本数 包括 leader 和 follower

删除topic:

kafka-topics.sh --delete --topic first --zookeeper ip:zk端口#注:执行效果:
#Topic first is marked for deletion.   标记为删除
#Note: This will have no impact if delete.topic.enable is not set to true.  只有当delete.topic.enable设为true时才会真正删除

查看topic详情:

kafka-topics.sh --describe --zookeeper ip:zk端口 --topic topic名称
消息

生产者发送消息

kafka-console-producer.sh --topic first --broker-list kafkaIP:kafka端口

消费者消费消息

kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 #从当前开始消费
#或者:
kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 --from-begining #从头开始消费

架构深入

工作流程

说明:

  • 偏移量不是全局唯一的,是分区唯一的
  • kafka只保证消息的分区内有序,不保证全局有序
    • 有序是指消费消息的顺序和生产消息的顺序一致
  • kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的
  • topic是逻辑上的概念,而partition是物理上的概念
    • 每个partition对应一个文件夹(文件夹名 = topic - partition),文件夹内有.log文件,该log文件存储的就是producer生产的数据。producer生产的数据会不断追加到该log文件末端,且每条数据都有自己的offset。

    • 消费者都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

    • 个人更倾向于《深入理解Kafka核心设计与实践原理》上的叙述:

      分区同主题一样是一个逻辑的概念而没有物理上的存在

      主题、分区、副本和Log(日志)的关系如下图,主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切的说是Log层面才有实际物理上的存在

生产者

消息发送流程

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。 RecordAccumulator的大小可通过生产者客户端参数buffer.memory配置,默认32MB。如果生产者发送消息的速度超过发送到服务器端的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数默认60秒。

主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。

注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一致多个ProducerRecord

  • ProducerBatch是一个消息批次,包含ProducerRecord,使字节使用更加紧凑
  • 将较小的ProducerRecord拼凑成一个较大的ProducerBatch,可以减少网络请求的次数以提升整体的吞吐量

如果生产者要向很多分区发送消息,则可将buffer.memory参数适当调大以增加整体吞吐量(buffer.memory大,RecordAccumulator则大,因RecordAccumulator中为每个分区都维护了一个双端队列,所以,RecordAccumulator大,每个分区分到的空间就大,可缓存的消息就多)。

在Kafka生产者客户端中,使用java.io.ByteBuffer实现消息内存的创建和释放,不过频繁的创建和释放比较耗费资源,故RecordAccumulator内部有一个BufferPool,主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数指定,默认16KB。

ProducerBatch的大小和batch.size的关系:

当一条消息(ProducerRecord)进入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数设定的大小,如果不超过,则以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。

向RecordAccumulator中追加消息源码:

public RecordAppendResult append(TopicPartition tp,long timestamp,byte[] key,byte[] value,Header[] headers,Callback callback,long maxTimeToBlock,boolean abortOnNewBatch,long nowMs) throws InterruptedException {// We keep track of the number of appending thread to make sure we do not miss batches in// abortIncompleteBatches().appendsInProgress.incrementAndGet();ByteBuffer buffer = null;if (headers == null) headers = Record.EMPTY_HEADERS;try {// check if we have an in-progress batch// 获取 or 创建队列Deque<ProducerBatch> dq = getOrCreateDeque(tp);synchronized (dq) {if (closed)throw new KafkaException("Producer closed while send in progress");// 尝试向ProducerBatch中添加数据RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);if (appendResult != null)// 向已有的 ProducerBatch 中追加成功,直接返回即可return appendResult;}// we don't have an in-progress record batch try to allocate a new batchif (abortOnNewBatch) {// Return a result that will cause another call to append.return new RecordAppendResult(null, false, false, true);}byte maxUsableMagic = apiVersions.maxUsableProduceMagic();// 计算批次大小设置值,和真实消息(序列化和压缩后)大小 取较大值int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);// BufferPool分配内存(按照上一步计算得到的size来分配,如果size超过了16k,则这块区域不会被BufferPool复用)(BufferPool只对特定大小的ByteBuffer进行管理,这个特定大小由batch.size指定,默认16k)buffer = free.allocate(size, maxTimeToBlock);// Update the current time in case the buffer allocation blocked above.nowMs = time.milliseconds();synchronized (dq) {// Need to check if producer is closed again after grabbing the dequeue lock.if (closed)throw new KafkaException("Producer closed while send in progress");//再向 ProducerBatch 中追加试试RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);if (appendResult != null) {// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...return appendResult;}//不再舔了,我自己新建一个ProducerBatch// 封装ByteBufferMemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);// 再封装ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,callback, nowMs));// 将ProducerBatch添加到队列末尾dq.addLast(batch);incomplete.add(batch);// Don't deallocate this buffer in the finally block as it's being used in the record batchbuffer = null;return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);}} finally {if (buffer != null)free.deallocate(buffer);appendsInProgress.decrementAndGet();}}

Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node, List<ProducerBatch>>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息是属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个topic中发送哪些消息,所以这里需要做一个应用逻辑层到网络I/O层面的转换。

在转换成<Node, List<ProducerBatch>>的形式之后,Sender还会进一步封装成<Node, Request>的形式,这样就可以将Request请求发往各个Node了。这里的Request是指Kafka的各种协议请求,对于消息发送而言就是具体的ProduceRequest。

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId, Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的id编号)。通过max.in.flight.requests.per.connection参数可限制每个连接(也就是客户端与每个Node之间的连接)最多缓存的请求数,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多请求了,除非有缓存的请求已经收到了响应(Response)。

如果响应成功,则会清理InFlightRequests中的请求,以及RecordAAccumulater中对应分区中的数据;

如果响应失败,则会进行重试,重试次数可通过retries参数进行设置,默认为int类型的最大值。

我们发送消息通常只指定了topic,那么生产者客户端如何知道要发往哪个broker节点呢?这就需要元数据

元数据是指kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR,ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。

元数据的更新(二者满足其一即可触发更新):

  • 当客户端中没有要使用的元数据时
  • 超过metedata.max.age.ms时间没有更新元数据(默认5分钟)

当需要更新元数据时,会先挑选出latestLoadedNode(即InFlightRequests中还未确认的请求个数最小的Node),然后向这个Node发送MeteDataRequest请求来获取具体的元数据信息。这个更新操作由Sender线程发起,在创建完MeteDataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时类似。元数据由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步由synchronized和final关键字来保障。

重要参数
参数名称说明
bootstrap.servers生 产 者 连 接 集 群 所 需 的 broker 地 址 清 单 。 例如ip:port,ip1:port,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者可以从给定的 broker里查找到其他 broker 信息。
key.serializer 和 value.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名
buffer.memoryRecordAccumulator 缓冲区总大小, 默认 32m。
batch.size缓冲区一批数据最大值, 默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加
linger.ms如果数据迟迟未达到 batch.size, sender 等待 linger.time之后就会发送数据。单位 ms, 默认值是 0ms,表示没有延迟。 生产环境建议该值大小为 5-100ms 之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据, Leader 收到数据后应答。
-1(all):生产者发送过来的数据, Leader+和 isr 队列里面的所有节点收齐数据后应答。-1 和all 是等价的。 Kafka3.0中默认值是-1,之前版本默认是1。
max.in.flight.requests.per.connection允许最多没有返回 ack 的次数, 默认为 5,开启幂等性要保证该值是 1-5 的数字
retries当消息发送出现错误的时候,系统会重发消息。 retries表示重试次数。 默认是 int 最大值, 2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是 100ms。
enable.idempotence是否开启幂等性, 默认 true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。 默认是 none,也就是不压缩。支持压缩类型: none、 gzip、 snappy、 lz4 和 zstd。
生产者分区
分区好处
  • 便于合理使用存储资源, 每个Partition在一个Broker上存储, 可以把海量的数据按照分区切割成一
    块一块数据存储在多台Broker上。 合理控制分区的任务, 可以实现负载均衡的效果。
  • 提高并行度, 生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据

分区策略
老版本

分区原则

发送的数据要封装成一个ProduceRecord对象,该对象中有partition、key、value等属性

  • 指明partition的情况下,直接将指明的值作为partition值
  • 没有指明partition值,但有key值的情况下,将key的hash值与当前topic的partition存活数进行取余得到partition值
  • 既没有指明partition值又没有key值的情况下,采用轮询的方式选取分区。但谁来做第一个分区?kafka采用的机制是:在第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与当前topic可用的partition总数取余得到partition值。(由于每次调用都会在这个整数上自增,所以取余后的结果也是自增或等于初始值,也就达到了轮询每个partition的效果)。这就是round-robin算法

源码

//这个方法是默认的分区策略类里的,能进到这个方法,说明肯定没有指定partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//获取当前topic的partition数目List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();//没有指定keyif (keyBytes == null) {int nextValue = this.nextValue(topic);//当前topic存活的partition数List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {//从存活的partition里选取一个partition返回int part = Utils.toPositive(nextValue) % availablePartitions.size();return ((PartitionInfo)availablePartitions.get(part)).partition();

更多推荐

Kafka宝典

本文发布于:2024-03-05 03:35:04,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1711213.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:宝典   Kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!