星火流卡夫卡流

编程入门 行业动态 更新时间:2024-10-21 13:23:30
本文介绍了星火流卡夫卡流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一些问题,同时试图从卡夫卡火花流读取。

I'm having some issues while trying to read from kafka with spark streaming.

我的code是:

My code is:

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor") val ssc = new StreamingContext(sparkConf, Seconds(2)) val kafkaParams = Map[String, String]( "zookeeper.connect" -> "localhost:2181", "group.id" -> "consumergroup", "metadata.broker.list" -> "localhost:9092", "zookeeper.connection.timeout.ms" -> "10000" //"kafka.auto.offset.reset" -> "smallest" ) val topics = Set("test") val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

我previously在端口9092开始在端口2181和卡夫卡服务器0.9.0.0饲养员。但我得到星火驱动程序出现以下错误:

I previously started zookeeper at port 2181 and Kafka server 0.9.0.0 at port 9092. But I get the following error in the Spark driver:

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)

动物园管理员日志:

Zookeeper log:

[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)

任何暗示?

非常感谢你。

推荐答案

这个问题是有关错误的火花流 - 卡夫卡的版本。

The problem was related the wrong spark-streaming-kafka version.

如documentation

卡夫卡:星火流1.5.2与卡夫卡0.8.2.1兼容

Kafka: Spark Streaming 1.5.2 is compatible with Kafka 0.8.2.1

所以,包括

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.2</version> </dependency>

在我的pom.xml(而不是0.9.0.0版本)解决了这个问题。

in my pom.xml (instead of version 0.9.0.0) solved the issue.

希望这有助于

更多推荐

星火流卡夫卡流

本文发布于:2023-11-26 00:29:15,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1631934.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:星火   卡夫卡

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!