如何显示流式数据帧(如显示因 AnalysisException 而失败)?

编程入门 行业动态 更新时间:2024-10-24 14:18:39
本文介绍了如何显示流式数据帧(如显示因 AnalysisException 而失败)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

所以我有一些数据在 Kafka 主题中流式传输,我正在获取这些流式数据并将其放入 DataFrame.我想在 DataFrame 中显示数据:

So I have some data I'm stream in a Kafka topic, I'm taking this streaming data and placing it into a DataFrame. I want to display the data inside of the DataFrame:

import os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, DataFrame
import time
from datetime import datetime, timedelta

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 pyspark-shell'

topic_name = "my-topic"
kafka_broker = "localhost:9092"

producer = KafkaProducer(bootstrap_servers = kafka_broker)
spark = SparkSession.builder.getOrCreate()
terminate = datetime.now() + timedelta(seconds=30)

while datetime.now() < terminate:
    producer.send(topic = topic_name, value = str(datetime.now()).encode('utf-8'))
    time.sleep(1)

readDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", topic_name) \
    .load()
readDF = readDF.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

readDF.writeStream.format("console").start()
readDF.show()

producer.close()

但是我不断收到此错误:

However I keep on getting this error:

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/spark/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.showString.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
...
Traceback (most recent call last):
      File "test2.py", line 30, in <module>
        readDF.show()
      File "/home/spark/spark/python/pyspark/sql/dataframe.py", line 336, in show
        print(self._jdf.showString(n, 20))
      File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File "/home/spark/spark/python/pyspark/sql/utils.py", line 69, in deco
        raise AnalysisException(s.split(': ', 1)[1], stackTrace)
    pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

我不明白为什么会发生异常,我在 show() 之前调用了 writeStream.start().我试图摆脱 selectExpr() 但这没有任何区别.有谁知道如何显示来自流的 DataFrame?我使用的是 Python 3.6.1、Kafka 0.10.2.1 和 Spark 2.2.0

I don't understand why the exception is happening, I'm calling writeStream.start() right before show(). I tried getting rid of selectExpr() but that made no difference. Does anyone know how to display a stream sourced DataFrame? I'm using Python 3.6.1, Kafka 0.10.2.1, and Spark 2.2.0

推荐答案

Streaming DataFrame 不支持 show() 方法.当您调用 start() 方法时,它会启动一个后台线程将输入数据流式传输到接收器,并且由于您使用的是 ConsoleSink,它会将数据输出到控制台.您不需要调用 show().

Streaming DataFrame doesn't support the show() method. When you call start() method, it will start a background thread to stream the input data to the sink, and since you are using ConsoleSink, it will output the data to the console. You don't need to call show().

去掉readDF.show(),然后添加一个sleep,然后就可以在控制台看到数据了,比如

Remove readDF.show() and add a sleep after that, then you should be able to see data in the console, such as

query = readDF.writeStream.format("console").start()
import time
time.sleep(10) # sleep 10 seconds
query.stop()

您还需要将 startingOffsets 设置为 earliest,否则,Kafka 源将从最新的偏移量开始,并且在您的情况下不获取任何内容.

You also need to set startingOffsets to earliest, otherwise, Kafka source will just start from the latest offset and fetch nothing in your case.

readDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("startingOffsets", "earliest") \
    .option("subscribe", topic_name) \
    .load()

这篇关于如何显示流式数据帧(如显示因 AnalysisException 而失败)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 13:54:50,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/963115.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:流式   数据   AnalysisException

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!