卡夫卡生产者超时异常

编程入门 行业动态 更新时间:2024-10-22 23:48:49
本文介绍了卡夫卡生产者超时异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我正在运行将数据写入 Kafka 主题的 Samza 流作业.Kafka 正在运行一个 3 节点集群.Samza 作业部署在纱线上.我们在容器日志中看到了很多这样的异常:

I am running a Samza stream job that is writing data to Kafka topic. Kafka is running a 3 node cluster. Samza job is deployed on yarn. We are seeing lot of these exceptions in container logs :

 INFO [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.container.ContainerHeartbeatMonitor:[ContainerHeartbeatMonitor:stop:61] - [main] - Stopping ContainerHeartbeatMonitor
ERROR [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.runtime.LocalContainerRunner:[LocalContainerRunner:run:107] - [main] - Container stopped with Exception. Exiting process now.
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:147)
        at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:694)
        at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:104)
        at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:149)
Caused by: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:181)
        at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
        at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafkamon.errors.TimeoutException: Expiring 5 record(s) for Topic3-16 due to 30332 ms has passed since last attempt plus backoff time

这 3 种异常经常出现.

These 3 types of exceptions are coming a lot.

59088 org.apache.kafkamon.errors.TimeoutException: Expiring 115 record(s) for Topic3-1 due to 30028 ms has passed since last attempt plus backoff time

61015 org.apache.kafkamon.errors.TimeoutException: Expiring 60 record(s) for Topic3-1 due to 74949 ms has passed since batch creation plus linger time

62275 org.apache.kafkamon.errors.TimeoutException: Expiring 176 record(s) for Topic3-4 due to 74917 ms has passed since last append

请帮助我了解这里的问题.每当它发生时,Samza 容器都会重新启动.

Please help me understand what is the issue here. Whenever its happened Samza container is getting restarted.

推荐答案

该错误表明某些记录以比从客户端发送的速度更快的速度放入队列.

The error indicates that some records are put into the queue at a faster rate than they can be sent from the client.

当您的生产者发送消息时,它们被存储在缓冲区中(在发送到目标代理之前),并且记录被分组在一起以增加吞吐量.当一个新记录被添加到批处理中时,它必须在由 request.timeout.ms 控制的 -configurable- 时间窗口内发送(默认设置为 30 秒).如果批处理在队列中的时间更长,则会抛出 TimeoutException 并且批处理记录将从队列中删除,并且不会传递给代理.

When your Producer sends messages, they are stored in buffer (before sending the to the target broker) and the records are grouped together into batches in order to increase throughput. When a new record is added to the batch, it must be sent within a -configurable- time window which is controlled by request.timeout.ms (the default is set to 30 seconds). If the batch is in the queue for longer time, a TimeoutException is thrown and the batch records will then be removed from the queue and won't be delivered to the broker.

增加 request.timeout.ms 的值应该对你有用.

Increasing the value of request.timeout.ms should do the trick for you.

如果这不起作用,您还可以尝试减小 batch.size 以便更频繁地发送批次(但这次将包含更少的消息)并确保 停留.ms 设置为 0(这是默认值).

In case this does not work, you can also try decreasing batch.size so that batches are sent more often (but this time will include fewer messages) and make sure that linger.ms is set to 0 (which is the default value).

请注意,更改任何配置参数后都需要重新启动 kafka 代理.

Note that you need to restart your kafka brokers after changing any configuration parameter.

如果您仍然收到错误消息,我认为您的网络出现了问题.您是否启用了 SSL?

If you still get the error I assume that something wrong is going on with your network. Have you enabled SSL?

这篇关于卡夫卡生产者超时异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 12:26:48,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/963096.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:生产者   卡夫卡   异常

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!