Apache Kafka Producer Broker连接(Apache Kafka Producer Broker Connection)

编程入门 行业动态 更新时间:2024-10-23 23:22:58
Apache Kafka Producer Broker连接(Apache Kafka Producer Broker Connection)

我有一组作为群集运行的Kafka代理实例。 我有一个客户正在向Kafka提供数据:

props.put("metadata.broker.list", "broker1:9092,broker2:9092,broker3:9092");

当我们使用tcpdump进行监控时,我可以看到只有与broker1和broker2的连接是ESTABLISHED,而对于broker3,没有来自我的生产者的连接。 我只有一个分区的单个主题。

我的问题:

经纪人数量和主题分区之间的关系如何? 我应该总是有经纪人人数=部分人数?

为什么在我的情况下,我无法连接到broker3? 或者至少我的网络监控没有显示我的Producer与broker3建立了连接?

如果我能从生产者的角度更深入地了解与经纪人的关系,那将是一件好事。

I have a set of Kafka broker instances running as a cluster. I have a client that is producing data to Kafka:

props.put("metadata.broker.list", "broker1:9092,broker2:9092,broker3:9092");

When we monitor using tcpdump, I can see that only the connections to broker1 and broker2 are ESTABLISHED while for the broker3, there is no connection from my producer. I have a single topic with just one partition.

My questions:

How is the relation between number of brokers and topic partitions? Should I always have number of brokers = number of partitons?

Why in my case, I'm not able to connect to broker3? or atleast my network monitoring does not show that a connection from my Producer is established with broker3?

It would be great if I could get some deeper insight into how the connection to the brokers work from a Producer stand point.

最满意答案

显然,你的制片人不需要连接到broker3 :)

我会尝试向您解释当您向Kafka制作数据时会发生什么情况:

你启动一些经纪人,比方说3,然后用2个分区创建一些主题foo ,复制因子2.很简单的例子,但对某人来说可能是真实的情况。 您可以为这些代理创建一个包含metadata.broker.list (或新生产者中的bootstrap.servers )的生产者。 值得一提的是,您不必指定群集中的所有代理,事实上,您只能指定其中的一个,它仍然可以工作。 我也会解释一下。 您使用您的制作人发送消息给主题foo 。 生产者查找其本地元数据缓存,以查看哪些代理是每个主题分区foo领导者,以及您的foo主题具有多少个分区。 由于这是第一次发送给生产者,因此本地缓存不包含任何内容。 生产者依次向TopicMetadataRequest中的每个代理发送TopicMetadataRequest ,直到第一次成功响应为止。 这就是为什么我提到1名经纪人只要活着就行。 返回的TopicMetadataResponse将包含有关请求的主题的信息,在您的案例中它是集群中的foo和brokers。 基本上,这个回应包含以下内容: 集群中的经纪人列表,每个经纪人都有一个ID,主机和端口。 此列表可能不包含集群中的整个经纪人列表,但应至少包含负责提供主题主题的经纪人列表。 主题元数据列表,其中每个条目具有主题名称,分区数量,每个分区的引导代理ID以及每个分区的ISR代理ID。 基于TopicMetadataResponse你的生产者建立它的本地缓存,并且现在确切地知道对主题foo分区0的请求应该到代理X. 根据主题中分区的数量,生产者会分割您的消息,并将其积累到应该作为批处理的一部分发送给某个代理的知识。 当批处理已满或linger.ms超时通过时,生产者将批处理刷新到代理。 通过“刷新”我的意思是“打开与经纪人的新连接或重新使用现有的连接,并发送ProduceRequest ”。

生产者不需要为所有经纪人开放不必要的连接,因为您正在生产的主题可能无法由一些经纪人提供服务,并且您的集群可能非常大。 想象一下,一个拥有大量主题的1000代理群集,但其中一个主题只有一个分区 - 您只需要一个连接,而不是1000个。

在你的特定情况下,我不是100%确定为什么你有2个打开的连接到经纪人,如果你只有一个分区,但我假设在元数据发现期间打开了一个连接并被缓存以供重用,第二个是实际的经纪人连接来产生数据。 但是,在这种情况下,我可能是错的。

但无论如何,没有必要为第三家经纪商建立联系。

关于你的问题:“我应该总是有多少个经纪人=多少部分?” 答案很可能不是。 如果你解释你正在努力达到的目标,也许我可以指出你正确的方向,但是这个范围太广泛而无法解释。 我建议阅读这个来澄清事情。

UPD在评论中回答这个问题:

元数据缓存更新有两种情况:

如果生产商出于任何原因未能与经纪人沟通 - 这包括经纪人根本无法到达的情况,以及经纪人以错误回应的情况(例如“我不是这个分区的领导者,就离开了”)

如果没有失败发生,客户端仍然每次更新元metadata.max.age.ms ( https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients /CommonClientConfigs.java#L42-L43 )来发现新​​的代理和分区本身。

Obviously, your producer does not need to connect to broker3 :)

I'll try to explain you what happens when you are producing data to Kafka:

You spin up some brokers, let's say 3, then create some topic foo with 2 partitions, replication factor 2. Quite simple example, yet could be a real case for someone. You create a producer with metadata.broker.list (or bootstrap.servers in new producer) configured to these brokers. Worth mentioning, you don't necessarily have to specify all the brokers in your cluster, in fact you can specify only 1 of them and it will still work. I'll explain this in a bit too. You send a message to topic foo using your producer. The producer looks up its local metadata cache to see what brokers are leaders for each partition of topic foo and how many partitions does your foo topic have. As this is the first send to the producer, local cache contains nothing. Producer sends a TopicMetadataRequest to each broker in metadata.broker.list sequentially until first successful response. That's why I mentioned 1 broker in that list would work as long as it's alive. Returned TopicMetadataResponse will contain the information about requested topics, in your case it's foo and brokers in the cluster. Basically, this response contains the following: list of brokers in the cluster, where each broker has an ID, host and port. This list may not contain the entire list of brokers in the cluster, but should contain at least the list of brokers that are responsible for servicing the subject topic. list of topic metadata, where each entry has topic name, number of partitions, leader broker ID for each partition and ISR broker IDs for each partition. Based on TopicMetadataResponse your producer builds up its local cache and now knows exactly that the request for topic foo partition 0 should go to broker X. Based on number of partitions in a topic, producer partitions your message and accumulates it with the knowledge that it should be sent as a part of batch to some broker. When the batch is full or linger.ms timeout passes, your producer flushes the batch to the broker. By "flushes" I mean "opens a new connection to a broker or reuses an existing one, and sends the ProduceRequest".

The producer does not need to open unnecessary connections to all brokers, as the topic you are producing to may not be serviced by some brokers, and your cluster could be quite large. Imagine a 1000 broker cluster with lots of topics, but one of topics has just one partition - you only need that one connection, not 1000.

In your particular case I'm not 100% sure why you have 2 open connections to brokers, if you have just a single partition, but I assume one connection was opened during metadata discovery and was cached for reusing, and the second one is the actual broker connection to produce data. However, I might be wrong in this case.

But anyway, there is no need at all to have a connection for the third broker.

Regarding your question about "Should I always have number of brokers = number of partitons?" the answer is most likely no. If you explain what you are trying to achieve, maybe I'll be able to point you to the right direction, but this is too broad to explain in general. I recommend reading this to clarify things.

UPD to answer the question in comment:

Metadata cache is updated in 2 cases:

If producer fails to communicate with broker for any reason - this includes the case when the broker is not reachable at all and when broker responds with an error (like "I'm not leader for this partition anymore, go away")

If no failures happen, the client still refreshes metadata every metadata.max.age.ms (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java#L42-L43) to discover new brokers and partitions itself.

更多推荐

本文发布于:2023-08-04 12:55:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1416095.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Kafka   Apache   Producer   Connection   Broker

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!