本文介绍了星火流卡夫卡流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一些问题,同时试图从卡夫卡火花流读取。
I'm having some issues while trying to read from kafka with spark streaming.
我的code是:
My code is:
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor") val ssc = new StreamingContext(sparkConf, Seconds(2)) val kafkaParams = Map[String, String]( "zookeeper.connect" -> "localhost:2181", "group.id" -> "consumergroup", "metadata.broker.list" -> "localhost:9092", "zookeeper.connection.timeout.ms" -> "10000" //"kafka.auto.offset.reset" -> "smallest" ) val topics = Set("test") val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)我previously在端口9092开始在端口2181和卡夫卡服务器0.9.0.0饲养员。但我得到星火驱动程序出现以下错误:
I previously started zookeeper at port 2181 and Kafka server 0.9.0.0 at port 9092. But I get the following error in the Spark driver:
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)动物园管理员日志:
Zookeeper log:
[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)任何暗示?
非常感谢你。
推荐答案这个问题是有关错误的火花流 - 卡夫卡的版本。
The problem was related the wrong spark-streaming-kafka version.
如documentation
卡夫卡:星火流1.5.2与卡夫卡0.8.2.1兼容
Kafka: Spark Streaming 1.5.2 is compatible with Kafka 0.8.2.1
所以,包括
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.2</version> </dependency>在我的pom.xml(而不是0.9.0.0版本)解决了这个问题。
in my pom.xml (instead of version 0.9.0.0) solved the issue.
希望这有助于
更多推荐
星火流卡夫卡流
发布评论