问题描述
限时送ChatGPT账号..我找不到任何直接处理我面临的问题的东西,所以我在这里发帖.我有 JUnit/JBehave 测试,可以启动嵌入式 ZooKeeper 服务器、嵌入式 Kafka 服务器以及 kafka 生产者和消费者.
I haven't been able to find anything that directly handled the problem I'm facing, so I'm posting here. I have JUnit/JBehave tests that spin up an embedded ZooKeeper server, embedded Kafka server, and kafka producers and consumers.
将 kafka 从 0.7 升级到 0.8.1.1 后,我遇到以下类型的错误:
After upgrading kafka from 0.7 to 0.8.1.1, I'm encountering the following types of errors:
ERROR [kafka-request-handler-5] state.change.logger - Error on broker 1 while processing LeaderAndIsr request correlationId 7 received from controller 1 epoch 1 for partition [topicName,8]
java.lang.NullPointerException: null
at kafka.log.Log.<init>(Log.scala:60) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.log.LogManager.createLog(LogManager.scala:265) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:90) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.cluster.Partition$$anonfun$makeLeader$2.apply(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.cluster.Partition$$anonfun$makeLeader$2.apply(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na]
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) ~[scala-library-2.10.4.jar:na]
at kafka.cluster.Partition.makeLeader(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:305) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:304) ~[kafka_2.10-0.8.1.1.jar:na]
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) ~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) ~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) ~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) ~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) ~[scala-library-2.10.4.jar:na]
at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:304) [kafka_2.10-0.8.1.1.jar:na]
at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:258) [kafka_2.10-0.8.1.1.jar:na]
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:217) [kafka_2.10-0.8.1.1.jar:na]
at kafka.server.KafkaApis.handle(KafkaApis.scala:189) [kafka_2.10-0.8.1.1.jar:na]
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) [kafka_2.10-0.8.1.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
和
WARN [threadName] k.c.ConsumerFetcherManager$LeaderFinderThread - [threadName], Failed to add leader for partitions [topicName,9],[topicName,3],[topicName,0],[topicName,8],[topicName,5],[topicName,1],[topicName,6],[topicName,2],[topicName,7],[topicName,4]; will retry
kafkamon.NotLeaderForPartitionException: null
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_25]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_25]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_25]
at java.lang.reflect.Constructor.newInstance(Constructor.java:408) ~[na:1.8.0_25]
at java.lang.Class.newInstance(Class.java:438) ~[na:1.8.0_25]
at kafkamon.ErrorMapping$.exceptionFor(ErrorMapping.scala:73) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174) ~[kafka_2.10-0.8.1.1.jar:na]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[scala-library-2.10.4.jar:na]
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) ~[scala-library-2.10.4.jar:na]
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) ~[scala-library-2.10.4.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[scala-library-2.10.4.jar:na]
at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76) ~[kafka_2.10-0.8.1.1.jar:na]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[scala-library-2.10.4.jar:na]
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[scala-library-2.10.4.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[scala-library-2.10.4.jar:na]
at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [kafka_2.10-0.8.1.1.jar:na]
和
02/03 10:26:34.655 WARN [kafka-request-handler-7] kafka.server.KafkaApis - [KafkaApi-1] Offset request with correlation id 0 from client clientName on partition [topicName,5] failed due to Leader not local for partition [topicName,5] on broker 1
推荐答案
事实证明这与新的 KafkaServer 构造函数中的 Time 参数有关.
It turns out this has to do with the Time parameter in the new KafkaServer constructor.
我为 kafka.utils.Time 对象:
private KafkaServer server = new KafkaServer(config, null);
相反,您需要创建 kafka.utils.Time 接口的实现,并传入该接口的新实例:
Instead, you need to create an implementation of the kafka.utils.Time interface, and pass in a new instance of that:
private KafkaServer server = new KafkaServer(config, new SystemTime());
private static class SystemTime implements Time {
@Override
public long milliseconds() {
return System.currentTimeMillis();
}
@Override
public long nanoseconds() {
return System.nanoTime();
}
@Override
public void sleep(long arg0) {
try {
Thread.sleep(arg0);
} catch (InterruptedException e) {
log.error("Kafka systemtime interrupted",e);
}
}
}
这篇关于从 0.7 升级到 0.8.1.1 后生成嵌入式 kafka 队列时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论