问题描述
限时送ChatGPT账号..我正在尝试使用 ReplyingKafkaTemplate,但我时不时地看到下面的消息.
I am trying to use ReplyingKafkaTemplate, and intermittently I keep seeing the message below.
没有待处理的回复:ConsumerRecord(topic = request-reply-topic,分区 = 8,偏移量 = 1,创建时间 = 1544653843269,序列化密钥大小 = -1,序列化值大小 = 1609,标题 =RecordHeaders(headers = [RecordHeader(key = kafka_correlationId, value= [-14, 65, 21, -118, 70, -94, 72, 87, -113, -91, 92, 72, -124, -110, -64, -94])], isReadOnly = false), key = null, with correlationId: [-18271255759235816475365319231847350110],可能是超时了,或者使用一个共享的回复主题
No pending reply: ConsumerRecord(topic = request-reply-topic, partition = 8, offset = 1, CreateTime = 1544653843269, serialized key size = -1, serialized value size = 1609, headers = RecordHeaders(headers = [RecordHeader(key = kafka_correlationId, value = [-14, 65, 21, -118, 70, -94, 72, 87, -113, -91, 92, 72, -124, -110, -64, -94])], isReadOnly = false), key = null, with correlationId: [-18271255759235816475365319231847350110], perhaps timed out, or using a shared reply topic
它源于下面的代码
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
if (this.sharedReplyTopic) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(missingCorrelationLogMessage(record, correlationId));
}
}
else if (this.logger.isErrorEnabled()) {
this.logger.error(missingCorrelationLogMessage(record, correlationId));
}
}
但只是偶尔发生
我还如下将共享的回复主题设置为 false 并试图强制更长的超时
I have also set the shared replyTopic to false as below and attempted to force a longer timeout
ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
replyKafkaTemplate.setSharedReplyTopic(false);
replyKafkaTemplate.setReplyTimeout(10000);
return replyKafkaTemplate;
我的容器如下
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(false);
factory.getContainerProperties().setPollTimeout(1000);
factory.getContainerProperties().setIdleEventInterval(10000L);
factory.setConcurrency(3);
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
推荐答案
如果它是间歇性的,很可能回复需要很长时间才能到达.消息似乎很清楚
If it's intermittent, it's most likely the reply took too long to arrive. The message seems quite clear
可能超时,或使用共享回复主题
perhaps timed out, or using a shared reply topic
每个客户端实例必须使用自己的回复主题或专用分区.
Each client side instance must use it's own reply topic or dedicated partition.
编辑
如果收到的消息的相关 ID 与当前在 this.futures 中的条目(待回复)不匹配,您将获得日志.这只能在以下情况下发生:
You get the log if a message is received with a correlation id that does not match the entries currently in this.futures (pending replies). This can only occur under the following circumstances:
请求超时(在这种情况下会有相应的 WARN 日志).模板是 stop()ped (在这种情况下 this.futures 被清除).由于某种原因(不应该发生),已处理的回复被重新发送.在将密钥添加到 this.futures 之前收到回复(不可能发生,因为它是在发送()记录之前插入的).服务器端对同一个请求发送 2 个或多个回复.其他一些应用程序正在向同一回复主题发送数据.如果您可以使用 DEBUG 日志记录重现它,那将会有所帮助,因为我们还会在发送时记录相关键.这篇关于没有待处理的回复:ConsumerRecord的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论