Thinking of kafka

编程入门 行业动态 更新时间:2024-10-07 14:28:44

<a href=https://www.elefans.com/category/jswz/34/1749188.html style=Thinking of kafka"/>

Thinking of kafka

kafka
1、消息中间件: 
为什么会有消息中间件
    解耦合;在producer和consumer之间放一个broker(防止出现故障)
    削峰: 数据非常大,生成过程中时候会出现瞬间峰值的情况,如果我们数据存在broker里面了,对于消费者来说就没有峰值了,
    消费者比较平稳的消费
2、kafka基本架构与message结构
producer->push-> broker<-pull consumer
<key,value>
3、kafka offset存储:
zookeeper会有一份,本地的日志文件也会有一份
!kafka zookeeper节点:
    以消费者组为单位去维护offsets(因为一个消费者挂掉,不会影响组内其它消费者继续消费,就是说它的offset没有丢失)
4、kafka的高低阶消费者:
低阶:手动维护offset
    对于低阶消费者就不再有分区到消费者之间的 API 中间层了,由消费者直接找到分区进行消费,即消费者通过 ZooKeeper 找到指定分区的 Leader 在哪个 broker 上。
    首先,在 ZooKeeper 中能够找到 Kafka 所有 topic 的分区列表,并且可以找到指定分区的 Leader 在哪个 broker 上。
    消费者消费一条消息后,可以选择提交或者不提交,offset 可以缓存在 Redis 中,也可以自己存到 ZooKeeper 上。
    当正在读取的分区挂掉了,此时读取会出现异常,由于 Kafka 存在副本机制,需要从ZooKeeper 重新获取元数据,更新列表,从而继续消费分区数据。
高阶:自动维护offset,高阶消费者会出现问题?
按照一定时间间隔来提交offset,这样可能就会出现数据丢失或者数据重复
    刚拿到一组数据没处理的时候,提交了,处理数据过程中,程序挂掉了,数据丢失
    一组数据处理了的时候,处理玩挂掉了,还没有提交offset,数据重复

5、kafka分区分配原则:
分区在消费者组中平均分配->分配方式:
range分区策略
round-robin 分区策略(分区号的hashcode排序,然后轮询),
分区个数发生变化或者是消费者组的消费者数发生变化,就会触发rebalance,重分区
底层:(不是每次都随机,而是每隔一段时间随机)一旦来一个message,key为null的话。第一次随机选择一个partition存放message,然后将partitionid放在内存里面的一个缓冲结构hashmap里面,
下一次来的时候,直接从找到hashmap里面partitionid,直接存进去对应的分区,而且每隔一段时间,这个缓冲结构会清空一次,下一次来的message没有key的话会再次随机一次
6、kafka实现高吞吐:
分区机制:
分区数一般2,30分区,2个副本 分区数=消费者数的时候达到最大的吞吐量
觉得吞吐量不够,增加分区,再增加消费者个数,让它rebalence一下就可以了
由于每个 partition 只能被 一个分组内的一个消费者同时消费,因此,消费者 c4 线程(一个消费者相当于一个线程)空闲,拿不到数据,这就意味着, 在分区不变的情况下,增加消费者线程不能提高并发度。
7、kafka数据可靠性保证:
producer的三种模式:参数0 1 -1 
0:消息发给leader之后就不管了,速度快,可靠性低
1:消息发给leader之后,leader确认落盘,发送ack(应答),producer继续执行
-1/off:消息发给leader之后,不但leader确认落盘,所有flow拉取数据落盘之后才会返回ack
broker:分布式的副本机制
前两个通过配置参数就保证数据不丢失
consumer:
高低阶消费者:用低阶消费者

==========================================================================

earliest:如果再消费一次,消费不到?因为offset在最后,换一个消费者组可解决问题
消费过的消费者不能再消费
一下两个开头
hunan  xiangtan group6
Jiansu Nanjing group7
group16了
消费组组?
auto.offset.reset值含义解释

earliest

当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

latest

当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

none

topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

默认建议用earliest。设置该参数后 kafka出错后重启,找到未消费的offset可以继续消费。

而latest 这个设置容易丢失消息,假如kafka出现问题,还有数据往topic中写,这个时候重启kafka,
这个设置会从最新产生数据的offset开始消费,中间出问题的哪些就不管了。 

none这个设置没有用过,兼容性太差,经常出问题。

kafka消费者如何才能从头开始消费某个topic的全量数据
消费者要从头开始消费某个topic的全量数据,需要满足2个条件(spring-kafka):

(1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过);

(2)指定"auto.offset.reset"参数的值为earliest;

=============================================

topic:可以理解为一个队列,生产者和消费者面向的都是一个topic;
bin/kafka-server-start.sh -daemon  config/server.properties
topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。
Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。
消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
acknowledgement 承认,感谢
数据可靠性的保证:ack(acknowledgement 确认收到)
只要group.id相同,就属于同一个消费者组
一个topic内的一个分区只能被同一消费组内的一个消费者所消费,不能被一个组内的多个消费者所消费!

Zookeeper在Kafka中的作用:
    选举某个broker为Conreoller,负责集群broker的上下线;
    所有的分区副本分配和leader选举工作。

!如何删除topic
命令删除:/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper mini5:2181 --topic recharge
    需要server.properties中设置delete.topic.enable=true否则只是标记删除,然后重启生效
手动删除:
    以删除topic test为例,步骤如下:
    1. 删除kafka存储目录(server.properties文件log.dirs配置,默认为”/tmp/kafka-logs”)相关topic目录,如test-0,test-1等
    2. 删除zookeeper目录下相关topic节点
    rmr /brokers/topics/test
    rmr /config/topics/test
    rmr /admin/delete_topics/test
!如何删除消费者消息,困扰我多年的问题
rmr /controller    rmr /brokers    rmr /consumers    rmr /latest_producer_id_block
0.8版 rmr /consumers
0.10版 rmr /brokers/topics/__consumer_offsets 这个主题用命令是删不掉的,所有要进行手动删除
    首先删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录
    再删除zookeeper里面元数据信息,rmr /brokers/topics/__consumer_offsets
---------------------------------------------
kafka命令:    
启动:
kafka-server-start.sh -daemon  config/server.properties
关闭:
kafka-server-stop.sh
创建topic:
kafka-topics.sh --create --zookeeper mini5:2181 --replication-factor 1 --partitions 1 --topic recharge
查看所有topic:
kafka-topics.sh --list --zookeeper mini5:2181
删除topic
/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper mini5:2181 --topic recharge
需要server.properties中设置delete.topic.enable=true否则只是标记删除,然后重启生效
对分区数进行修改:
不能修改replication-factor,以及只能对partition个数进行增加,不能减少
bin/kafka-topics.sh --alter --topic recharge --zookeeper NODE01:2181,NODE02:2181,NODE03:2181 --partitions 3
---------------------------------------------
生产消息:
kafka-console-producer.sh --broker-list mini5:9092 --topic recharge
---------------------------------------------
消费消息:
offset保存的位置?
消费者组消息保存在broker上,老板保存在zookeeper上
新版:
kafka-console-consumer.sh --bootstrap-server mini5:9092 --from-beginning --topic recharge    执行一次消费一次
kafka-console-consumer.sh --bootstrap-server master:9092 --topic recharge --consumer-property group.id=group_mytes
查看消费组信息:
kafka-consumer-groups.sh --bootstrap-server mini5:9092 --list
查看特定消费组信息:
kafka-consumer-groups.sh --bootstrap-server mini5:9092 --group group16 --describe
可以通过部署kafka manager来删除消费组信息

老版:
kafka-console-consumer.sh --zookeeper mini5:2181 --from-beginning --topic recharge    
kafka-console-consumer.sh --zookeeper mini5:2181 --topic recharge --consumer-property group.id=group_recharge 
查看消费组信息
kafka-consumer-groups.sh --zookeeper mini5:2181 --list
查看特定消费组信息:
kafka-consumer-groups.sh --zookeeper mini5:2181 --group group16 --describe (--delete 删除仅限于老版
或者是直接在zk里面删除消费者组目录,干净)

---------------------------------------------
zk相关命令:
zkCli.sh -server mini5:2181
ls /consumers 如果创建消费组组的时候没有指定zookeeper,zookeeper里的consumers时没有该消费组的信息的
get /consumers/recharge03/offsets/recharge/0
rmr path

console-consumer-75434
console-consumer-68877
recharge04
recharge03


 

 

更多推荐

Thinking of kafka

本文发布于:2024-03-09 02:06:07,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1723348.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Thinking   kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!