kafka no record information is available
- springcloud-config-bus报错
- 解决
springcloud-config-bus报错
[KafkaConsumerDestination{consumerDestinationName='springCloudBus', partitions=1, dlqName='null'}.container-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151)
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1401)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1116)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:983)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:205)
at com.sun.proxy.$Proxy257.commitSync(Unknown Source)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2366)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2361)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2347)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2161)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
... 4 common frames omitted
在这种情况下,异常是在记录处理之外(处理完成后)引起的。由于重新平衡导致提交失败,而且因为记录不再可用,所以无论如何也无法重新查找。它只是重新调用异常,并将无限期地重试。
解决
kafka加入max.poll.interval.ms配置,配置如下:
spring:
kafka:
bootstrap-servers: 192.168.10.1:9092,192.168.10.2:9092,192.168.10.3:9092
consumer:
#用于标识此使用者所属的使用者组的唯一字符串。
group-id: xxx
#ID在发出请求时传递给服务器: 用于服务器端日志记录。
client-id: xxx
#当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量
#可选的值为latest, earliest, none
auto-offset-reset: earliest
#如果'enable.automit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
auto-commit-interval: 5000ms
#如果为true,则消费者的偏移量将在后台定期提交,默认值为true
enable-auto-commit: true
#如果没有足够的数据立即满足“fetch.min.bytes”给出的要求,服务器在回答获取请求之前将阻塞的最长时间(以毫秒为单位)
#默认值为500
fetch-max-wait: 500ms
#服务器应以字节为单位返回获取请求的最小数据量,默认值为1,对应的kafka的参数为fetch.min.bytes。
fetch-min-size: 1
#心跳与消费者协调员之间的预期时间(以毫秒为单位),默认值为3000
heartbeat-interval: 3000ms
#密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#一次调用poll()操作时返回的最大记录数,默认值为500
max-poll-records: 500
properties:
max:
poll:
interval:
ms: 600000
更多推荐
kafka no record information is available
发布评论