我正在尝试使用ReplyingKafkaTemplate,并且间歇性地继续看到以下消息.
I am trying to use ReplyingKafkaTemplate, and intermittently I keep seeing the message below.
没有待处理的回复:ConsumerRecord(topic = request-reply-topic,分区= 8,偏移= 1,CreateTime = 1544653843269,序列化密钥size = -1,序列化值size = 1609,标头=RecordHeaders(headers = [RecordHeader(key = kafka_correlationId,值= [-14,65,21,-118,70,-94,72,87,-113,-91,92,72,-124,-110,-64,-94])],isReadOnly = false),键=空,相关性ID:[-18271255759235816475365319231847350110],可能已超时,或使用共享的回复主题
No pending reply: ConsumerRecord(topic = request-reply-topic, partition = 8, offset = 1, CreateTime = 1544653843269, serialized key size = -1, serialized value size = 1609, headers = RecordHeaders(headers = [RecordHeader(key = kafka_correlationId, value = [-14, 65, 21, -118, 70, -94, 72, 87, -113, -91, 92, 72, -124, -110, -64, -94])], isReadOnly = false), key = null, with correlationId: [-18271255759235816475365319231847350110], perhaps timed out, or using a shared reply topic
这将来自下面的代码
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId); if (future == null) { if (this.sharedReplyTopic) { if (this.logger.isDebugEnabled()) { this.logger.debug(missingCorrelationLogMessage(record, correlationId)); } } else if (this.logger.isErrorEnabled()) { this.logger.error(missingCorrelationLogMessage(record, correlationId)); } }但只是偶尔发生
我还如下将共享的ReplyTopic设置为false,并试图强制更长的超时时间
I have also set the shared replyTopic to false as below and attempted to force a longer timeout
ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container); replyKafkaTemplate.setSharedReplyTopic(false); replyKafkaTemplate.setReplyTimeout(10000); return replyKafkaTemplate;我的容器如下
@Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(false); factory.getContainerProperties().setPollTimeout(1000); factory.getContainerProperties().setIdleEventInterval(10000L); factory.setConcurrency(3); factory.setReplyTemplate(kafkaTemplate()); return factory; } 推荐答案如果是断断续续的,则很可能回复时间太长.消息似乎很清楚
If it's intermittent, it's most likely the reply took too long to arrive. The message seems quite clear
可能超时,或使用共享的回复主题
perhaps timed out, or using a shared reply topic
每个客户端实例必须使用其自己的回复主题或专用分区.
Each client side instance must use it's own reply topic or dedicated partition.
编辑
如果收到的消息的相关性ID与this.futures中的当前条目不匹配(待答复),则会得到日志.这只能在以下情况下发生:
You get the log if a message is received with a correlation id that does not match the entries currently in this.futures (pending replies). This can only occur under the following circumstances:
更多推荐
无待处理的回复:ConsumerRecord
发布评论