无待处理的回复:ConsumerRecord

编程入门 行业动态 更新时间:2024-10-25 08:23:50
本文介绍了无待处理的回复:ConsumerRecord的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在尝试使用ReplyingKafkaTemplate,并且间歇性地继续看到以下消息.

I am trying to use ReplyingKafkaTemplate, and intermittently I keep seeing the message below.

没有待处理的回复:ConsumerRecord(topic = request-reply-topic,分区= 8,偏移= 1,CreateTime = 1544653843269,序列化密钥size = -1,序列化值size = 1609,标头=RecordHeaders(headers = [RecordHeader(key = kafka_correlationId,值= [-14,65,21,-118,70,-94,72,87,-113,-91,92,72,-124,-110,-64,-94])],isReadOnly = false),键=空,相关性ID:[-18271255759235816475365319231847350110],可能已超时,或使用共享的回复主题

No pending reply: ConsumerRecord(topic = request-reply-topic, partition = 8, offset = 1, CreateTime = 1544653843269, serialized key size = -1, serialized value size = 1609, headers = RecordHeaders(headers = [RecordHeader(key = kafka_correlationId, value = [-14, 65, 21, -118, 70, -94, 72, 87, -113, -91, 92, 72, -124, -110, -64, -94])], isReadOnly = false), key = null, with correlationId: [-18271255759235816475365319231847350110], perhaps timed out, or using a shared reply topic

这将来自下面的代码

RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId); if (future == null) { if (this.sharedReplyTopic) { if (this.logger.isDebugEnabled()) { this.logger.debug(missingCorrelationLogMessage(record, correlationId)); } } else if (this.logger.isErrorEnabled()) { this.logger.error(missingCorrelationLogMessage(record, correlationId)); } }

但只是偶尔发生

我还如下将共享的ReplyTopic设置为false,并试图强制更长的超时时间

I have also set the shared replyTopic to false as below and attempted to force a longer timeout

ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container); replyKafkaTemplate.setSharedReplyTopic(false); replyKafkaTemplate.setReplyTimeout(10000); return replyKafkaTemplate;

我的容器如下

@Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(false); factory.getContainerProperties().setPollTimeout(1000); factory.getContainerProperties().setIdleEventInterval(10000L); factory.setConcurrency(3); factory.setReplyTemplate(kafkaTemplate()); return factory; }

推荐答案

如果是断断续续的,则很可能回复时间太长.消息似乎很清楚

If it's intermittent, it's most likely the reply took too long to arrive. The message seems quite clear

可能超时,或使用共享的回复主题

perhaps timed out, or using a shared reply topic

每个客户端实例必须使用其自己的回复主题或专用分区.

Each client side instance must use it's own reply topic or dedicated partition.

编辑

如果收到的消息的相关性ID与this.futures中的当前条目不匹配(待答复),则会得到日志.这只能在以下情况下发生:

You get the log if a message is received with a correlation id that does not match the entries currently in this.futures (pending replies). This can only occur under the following circumstances:

  • 请求超时(在这种情况下,将有一个对应的WARN日志).
  • 对模板进行stop()操作(在这种情况下,将清除this.futures).
  • 由于某些原因(不会发生)重新发送已经处理过的回复.
  • 将密钥添加到this.futures之前已收到答复.(因为它是在send()保存记录之前插入的,因此不会发生.)
  • 服务器端针对同一请求发送2个或更多回复.
  • 其他一些应用程序正在将数据发送到相同的答复主题.如果您可以使用DEBUG日志重现它,那将会有所帮助,因为然后我们还将相关密钥也记录在发送中.
  • 更多推荐

    无待处理的回复:ConsumerRecord

    本文发布于:2023-11-09 20:22:54,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1573341.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:ConsumerRecord

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!