在producer.send期间获取ProducerFencedException的原因是什么?

编程入门 行业动态 更新时间:2024-10-28 08:24:08
本文介绍了在producer.send期间获取ProducerFencedException的原因是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

尝试将大约5万条消息加载到KAFKA主题中.在少数运行开始时,但并非总是如此.

Trying to load around 50K messages into KAFKA topic. In the beginning of few runs getting below exception but not all the time.

org.apache.kafkamon.KafkaException: Cannot execute transactional method because we are in an error state at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) ~[kafka-clients-2.0.0.jar:?] at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) ~[kafka-clients-2.0.0.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679) ~[kafka-clients-2.0.0.jar:?] at myPackage.persistUpdatesPostAction(MyCode.java:??) ~[aKafka.jar:?] ... Caused by: org.apache.kafkamon.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

代码块如下:

public void persistUpdatesPostAction(List<Message> messageList ) { if ((messageList == null) || (messageList.isEmpty())) { return; } logger.createDebug("Messages in batch(postAction) : "+ messageList.size()); Producer<String,String> producer = KafkaUtils.getProducer(Thread.currentThread().getName()); try { producer.beginTransaction(); createKafkaBulkInsert1(producer, messageList, "Topic1"); createKafkaBulkInsert2(producer, messageList, "Topic2"); createKafkaBulkInsert3(producer, messageList, "Topic3"); producermitTransaction(); } catch (Exception e) { producer.abortTransaction(); producer.close(); KafkaUtils.removeProducer(Thread.currentThread().getName()); } } ----------- static Properties setPropertiesProducer() { Properties temp = new Properties(); temp.put("bootstrap.servers", "localhost:9092"); temp.put("acks", "all"); temp.put("retries", 1); temp.put("batch.size", 16384); temp.put("linger.ms", 5); temp.put("buffer.memory", 33554432); temp.put("key.serializer", "org.apache.kafkamon.serialization.StringSerializer"); temp.put("value.serializer", "org.apache.kafkamon.serialization.StringSerializer"); return temp; } public static Producer<String, String> getProducer(String aThreadId) { if ((producerMap.size() == 0) || (producerMap.get(aThreadId) == null)) { Properties temp = producerProps; temp.put("transactional.id", aThreadId); Producer<String, String> producer = new KafkaProducer<String, String>(temp); producerMap.put(aThreadId, producer); producer.initTransactions(); return producer; } return producerMap.get(aThreadId); } public static void removeProducer(String aThreadId) { logger.createDebug("Removing Thread ID :" + aThreadId); if (producerMap.get(aThreadId) == null) return; producerMap.remove(aThreadId); }

推荐答案

由以下原因引起:org.apache.kafkamon.errors.ProducerFencedException:生产者尝试以旧时代进行手术.要么有一个新的生产商相同的transactionalId,或者生产者的交易已被过期经纪人.

Caused by: org.apache.kafkamon.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

此异常消息不是很有帮助.我相信,要说经纪人不再具有客户发送的交易ID的任何记录,这是尝试.这可能是因为:

This exception message is not very helpful. I believe that it is trying to say that the broker no longer has any record of the transaction-id that is being sent by the client. This can either be because:

  • 其他人正在使用相同的transaction-id并已提交.以我的经验,除非您在客户之间共享交易ID,否则这种可能性很小.我们使用 UUID.randomUUID()确保我们的ID是唯一的.
  • 交易超时,并被经纪人自动化删除.
  • Someone else was using the same transaction-id and committed it already. In my experience, this is less likely unless you are sharing transaction-ids between clients. We ensure that our ids are unique using UUID.randomUUID().
  • The transaction timed out and was removed by broker automation.

在我们的例子中,我们经常会因事务超时而产生此异常.有2个属性可控制经纪人在中止交易并将其遗忘之前记住交易的时间.

In our case, we were hitting transaction timeouts every so often that generated this exception. There are 2 properties that govern how long the broker will remember a transaction before aborting it and forgetting about it.

  • transaction.max.timeout.ms –一个 broker 属性,用于指定直到中止和忘记事务之前的最大毫秒数.许多Kafka版本的默认值似乎是900000(15分钟).卡夫卡的文件说:

  • transaction.max.timeout.ms -- A broker property that specifies the maximum number of milliseconds until a transaction is aborted and forgotten. Default in many Kafka versions seems to be 900000 (15 minutes). Documentation from Kafka says:

允许的最大交易超时.如果客户请求的交易时间超过了该时间,则经纪人将在InitProducerIdRequest中返回错误.这样可以防止客户的超时时间过长,从而使消费者无法阅读交易中包含的主题.

The maximum allowed timeout for transactions. If a client’s requested transaction time exceeds this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.

  • transaction.timeout.ms -一个生产者客户端属性,用于设置创建事务时的超时时间(以毫秒为单位).许多Kafka版本的默认值似乎是60000(1分钟).卡夫卡的文件说:

  • transaction.timeout.ms -- A producer client property that sets the timeout in milliseconds when a transaction is created. Default in many Kafka versions seems to be 60000 (1 minute). Documentation from Kafka says:

    在主动中止正在进行的事务之前,事务协调器将等待生产者更新事务状态的最长时间(以毫秒为单位).

    The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.

  • 如果客户端中设置的 transaction.timeout.ms 属性超过了代理中的 transaction.max.timeout.ms 属性,则生产者将立即抛出类似以下异常:

    If the transaction.timeout.ms property set in the client exceeds the transaction.max.timeout.ms property in the broker, the producer will immediately throw something like the following exception:

    org.apache.kafkamon.KafkaException: Unexpected error in InitProducerIdResponse The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).

    更多推荐

    在producer.send期间获取ProducerFencedException的原因是什么?

    本文发布于:2023-10-08 14:05:32,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1472852.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:原因   producer   send   ProducerFencedException

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!