问题描述
限时送ChatGPT账号..尝试将大约 5 万条消息加载到 KAFKA 主题中.在少数运行开始时低于异常但并非总是如此.
Trying to load around 50K messages into KAFKA topic. In the beginning of few runs getting below exception but not all the time.
org.apache.kafkamon.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679) ~[kafka-clients-2.0.0.jar:?]
at myPackage.persistUpdatesPostAction(MyCode.java:??) ~[aKafka.jar:?]
...
Caused by: org.apache.kafkamon.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer with
the same transactionalId, or the producer's transaction has been expired by the
broker.
代码块如下:
public void persistUpdatesPostAction(List<Message> messageList ) {
if ((messageList == null) || (messageList.isEmpty())) {
return;
}
logger.createDebug("Messages in batch(postAction) : "+ messageList.size());
Producer<String,String> producer = KafkaUtils.getProducer(Thread.currentThread().getName());
try {
producer.beginTransaction();
createKafkaBulkInsert1(producer, messageList, "Topic1");
createKafkaBulkInsert2(producer, messageList, "Topic2");
createKafkaBulkInsert3(producer, messageList, "Topic3");
producermitTransaction();
} catch (Exception e) {
producer.abortTransaction();
producer.close();
KafkaUtils.removeProducer(Thread.currentThread().getName());
}
}
-----------
static Properties setPropertiesProducer() {
Properties temp = new Properties();
temp.put("bootstrap.servers", "localhost:9092");
temp.put("acks", "all");
temp.put("retries", 1);
temp.put("batch.size", 16384);
temp.put("linger.ms", 5);
temp.put("buffer.memory", 33554432);
temp.put("key.serializer", "org.apache.kafkamon.serialization.StringSerializer");
temp.put("value.serializer", "org.apache.kafkamon.serialization.StringSerializer");
return temp;
}
public static Producer<String, String> getProducer(String aThreadId) {
if ((producerMap.size() == 0) || (producerMap.get(aThreadId) == null)) {
Properties temp = producerProps;
temp.put("transactional.id", aThreadId);
Producer<String, String> producer = new KafkaProducer<String, String>(temp);
producerMap.put(aThreadId, producer);
producer.initTransactions();
return producer;
}
return producerMap.get(aThreadId);
}
public static void removeProducer(String aThreadId) {
logger.createDebug("Removing Thread ID :" + aThreadId);
if (producerMap.get(aThreadId) == null)
return;
producerMap.remove(aThreadId);
}
推荐答案
引起:org.apache.kafkamon.errors.ProducerFencedException: Producer尝试使用旧时代进行操作.要么有一个新的生产者相同的transactionalId,或者生产者的交易已经被经纪人.
Caused by: org.apache.kafkamon.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
此异常消息不是很有帮助.我相信 试图 说经纪人不再有客户端发送的交易 ID 的任何记录.这可能是因为:
This exception message is not very helpful. I believe that it is trying to say that the broker no longer has any record of the transaction-id that is being sent by the client. This can either be because:
其他人正在使用相同的事务 ID 并已提交.根据我的经验,除非您在客户端之间共享事务 ID,否则这种可能性较小.我们使用UUID.randomUUID()
确保我们的 ID 是唯一的.交易超时并被代理自动化删除.
Someone else was using the same transaction-id and committed it already. In my experience, this is less likely unless you are sharing transaction-ids between clients. We ensure that our ids are unique using UUID.randomUUID()
.
The transaction timed out and was removed by broker automation.
在我们的例子中,我们经常遇到事务超时,从而产生了这个异常.有 2 个属性可以控制代理在中止和忘记交易之前记住交易的时间.
In our case, we were hitting transaction timeouts every so often that generated this exception. There are 2 properties that govern how long the broker will remember a transaction before aborting it and forgetting about it.
transaction.max.timeout.ms
-- 一个 broker 属性,指定事务中止和忘记之前的最大毫秒数.许多 Kafka 版本的默认值似乎是 900000(15 分钟).来自 Kafka 的文档 说:
transaction.max.timeout.ms
-- A broker property that specifies the maximum number of milliseconds until a transaction is aborted and forgotten. Default in many Kafka versions seems to be 900000 (15 minutes). Documentation from Kafka says:
事务的最大允许超时时间.如果客户端请求的事务时间超过此时间,则代理将在 InitProducerIdRequest 中返回错误.这可以防止客户端超时时间过长,这可能会阻止消费者从事务中包含的主题中读取数据.
The maximum allowed timeout for transactions. If a client’s requested transaction time exceeds this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.
transaction.timeout.ms
-- 一个 producer client 属性,用于在创建事务时设置超时时间(以毫秒为单位).许多 Kafka 版本的默认值似乎是 60000(1 分钟).来自 Kafka 的文档说:
transaction.timeout.ms
-- A producer client property that sets the timeout in milliseconds when a transaction is created. Default in many Kafka versions seems to be 60000 (1 minute). Documentation from Kafka says:
事务协调器在主动中止正在进行的事务之前等待来自生产者的事务状态更新的最长时间(以毫秒为单位).
The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.
如果客户端中设置的 transaction.timeout.ms
属性超过了代理中的 transaction.max.timeout.ms
属性,生产者会立即抛出类似以下异常:
If the transaction.timeout.ms
property set in the client exceeds the transaction.max.timeout.ms
property in the broker, the producer will immediately throw something like the following exception:
org.apache.kafkamon.KafkaException: Unexpected error in InitProducerIdResponse
The transaction timeout is larger than the maximum value allowed by the broker
(as configured by transaction.max.timeout.ms).
这篇关于在 producer.send 期间获取 ProducerFencedException 的原因是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论