当前,我正在使用Spark结构化流式传输来创建(id,timestamp_value,device_id,temperature_value,comment)形式的随机数据的数据帧.
每批Spark数据帧:
基于上面数据框的屏幕截图,我希望对"temperature_value"列具有一些描述性统计信息.例如,最小值,最大值,平均值,计数,方差.
我在python中实现此目标的方法如下:
import sys导入json导入psycopg2从pyspark导入SparkContext从pyspark.streaming导入StreamingContext从pyspark.sql导入SparkSession从pyspark.sql.types导入StructType,StructField,StringType,IntegerType从pyspark.sql.functions导入from_json,col,to_json从pyspark.sql.types导入*从pyspark.sql.functions导入爆炸从pyspark.sql.functions导入拆分从pyspark.sql.functions导入get_json_object从pyspark.ml.stat导入摘要器从pyspark.ml.feature导入VectorAssembler从pyspark.ml.feature导入StandardScaler从pyspark.sql.functions导入点亮,unix_timestamp从pyspark.sql导入功能为F将numpy导入为np从pyspark.mllib.stat导入统计信息spark = SparkSession.builder.appName(< spark_application_name>).getOrCreate()spark.sparkContext.setLogLevel("WARN")spark.streams.active数据= spark.readStream.format("kafka").option("kafka.bootstrap.servers","kafka_broker:< port_number>").option("subscribe",< topic_name>).option("startingOffsets","latest").load()模式= StructType([StructField("id",DoubleType()),StructField("timestamp_value",DoubleType()),StructField("device_id",DoubleType()),StructField("temperature_value",DoubleType()),StructField("comment",StringType())])telemetry_dataframe = data.selectExpr("CAST(值作为字符串)").select(from_json(col(值").cast("string"),schema).alias("tmp")).选择("tmp.*")telemetry_dataframe.printSchema()temperature_value_selection =遥测数据帧.select("temperature_value")temperature_value_selection_new = temperature_value_selection.withColumn("device_temperature",temperature_value_selection ["temperature_value"].cast(DecimalType()))temperature_value_selection_new.printSchema()汇编程序= VectorAssembler(输入Cols = [设备温度"],输出Col =温度".)组装= assembler.transform(temperature_value_selection_new)assembled_new = assembled.withColumn("timestamp",F.current_timestamp())assembled_new.printSchema()#scaler = StandardScaler(inputCol =温度",outputCol ="scaledTemperatures",其中Std = True,withMean = False).fit(组装)#缩放= scaler.transform(已组装)summaryr = Summarizer.metrics(最大",最小",方差",平均值",计数")descriptive_table_one = assembled_new.withWatermark("timestamp","4 minutes").select(summarizer.summary(assembled_new.temperatures))#descriptive_table_one = assembled_new.withWatermark("timestamp","4 minutes").groupBy(F.col("timestamp")).agg(max(F.col('timestamp')).alias("timestamp)).orderBy('timestamp',ascending = False).select(summarizer.summary(assembled.temperatures))#descriptive_table_one = assembled_new.select(summarizer.summary(assembled.temperatures))#descriptive_table_two = temperature_value_selection_new.select(summarizer.summary(temperature_value_selection_new.device_temperature))#-------------------------------------------------------------------------------------#########################################查询#########################################query_1 = telemetry_dataframe.writeStream.outputMode("append").format("console").trigger(processingTime ="5 seconds").start()#.awaitTermination()query_2 = temperature_value_selection_new.writeStream.outputMode("append").format("console").trigger(processingTime ="8 seconds").start()#.awaitTermination()query_3 = assembled_new.writeStream.outputMode("append").format("console").trigger(processingTime ="11 seconds").start()#.awaitTermination()#query_4_1 = descriptive_table_one.writeStream.outputMode("complete").format("console").trigger(processingTime ="14 seconds").start()#.awaitTermination()query_4_2 = descriptive_table_one.writeStream.outputMode("append").format("console").trigger(processingTime ="17 seconds").start()#.awaitTermination()
解决方案
当我尝试将outputMode更改为"complete"时,我的终端立即终止了火花流.
所有流查询都已启动并正在运行,但是pyspark应用程序(的主线程)甚至没有给它们提供长时间运行的机会(因为它不会等待由于#.awaitTermination()).
您应该使用 StreamingQuery.awaitTermination()阻止pyspark应用程序的主线程,例如 query_1.awaitTermination()
Currently, I am using spark structured streaming to create data frames of random data in the form of (id, timestamp_value, device_id, temperature_value, comment).
Spark Dataframe per Batch:
Based on the screenshot of the data frame above, I would like to have some descriptive statistics for the column "temperature_value". For example, min, max, mean, count, variance.
My approach to achieve this in python is the following:
import sys import json import psycopg2 from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType from pyspark.sql.functions import from_json, col, to_json from pyspark.sql.types import * from pyspark.sql.functions import explode from pyspark.sql.functions import split from pyspark.sql.functions import get_json_object from pyspark.ml.stat import Summarizer from pyspark.ml.feature import VectorAssembler from pyspark.ml.feature import StandardScaler from pyspark.sql.functions import lit,unix_timestamp from pyspark.sql import functions as F import numpy as np from pyspark.mllib.stat import Statistics spark = SparkSession.builder.appName(<spark_application_name>).getOrCreate() spark.sparkContext.setLogLevel("WARN") spark.streams.active data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka_broker:<port_number>").option("subscribe", <topic_name>).option("startingOffsets", "latest").load() schema = StructType([ StructField("id", DoubleType()), StructField("timestamp_value", DoubleType()), StructField("device_id", DoubleType()), StructField("temperature_value", DoubleType()), StructField("comment", StringType())]) telemetry_dataframe = data.selectExpr("CAST(value AS STRING)").select(from_json(col("value").cast("string"), schema).alias("tmp")).select("tmp.*") telemetry_dataframe.printSchema() temperature_value_selection = telemetry_dataframe.select("temperature_value") temperature_value_selection_new = temperature_value_selection.withColumn("device_temperature", temperature_value_selection["temperature_value"].cast(DecimalType())) temperature_value_selection_new.printSchema() assembler = VectorAssembler( inputCols=["device_temperature"], outputCol="temperatures" ) assembled = assembler.transform(temperature_value_selection_new) assembled_new = assembled.withColumn("timestamp", F.current_timestamp()) assembled_new.printSchema() # scaler = StandardScaler(inputCol="temperatures", outputCol="scaledTemperatures", withStd=True, withMean=False).fit(assembled) # scaled = scaler.transform(assembled) summarizer = Summarizer.metrics("max", "min", "variance", "mean", "count") descriptive_table_one = assembled_new.withWatermark("timestamp", "4 minutes").select(summarizer.summary(assembled_new.temperatures)) #descriptive_table_one = assembled_new.withWatermark("timestamp", "4 minutes").groupBy(F.col("timestamp")).agg(max(F.col('timestamp')).alias("timestamp")).orderBy('timestamp', ascending=False).select(summarizer.summary(assembled.temperatures)) #descriptive_table_one = assembled_new.select(summarizer.summary(assembled.temperatures)) # descriptive_table_two = temperature_value_selection_new.select(summarizer.summary(temperature_value_selection_new.device_temperature)) # ------------------------------------------------------------------------------------- ######################################### # QUERIES # ######################################### query_1 = telemetry_dataframe.writeStream.outputMode("append").format("console").trigger(processingTime = "5 seconds").start()#.awaitTermination() query_2 = temperature_value_selection_new.writeStream.outputMode("append").format("console").trigger(processingTime = "8 seconds").start()#.awaitTermination() query_3= assembled_new.writeStream.outputMode("append").format("console").trigger(processingTime = "11 seconds").start()#.awaitTermination() #query_4_1 = descriptive_table_one.writeStream.outputMode("complete").format("console").trigger(processingTime = "14 seconds").start()#.awaitTermination() query_4_2 = descriptive_table_one.writeStream.outputMode("append").format("console").trigger(processingTime = "17 seconds").start()#.awaitTermination()Summarizer documentation.
Based on the posted code, I am isolating the column "temperature_value" and then I vectorize it (using VectorAssembler) to create the column "temperatures" of type vector.
What I would like is to output the result of the "Summarizer" function to my console. This is why I use "append" for outputMode and format "console". But I was getting this error: pyspark.sql.utils.AnalysisException: 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark. Thus, I used the "withWatermark" function but I am still getting the same error with the outputMode "append".
When I tried to change the outputMode to "complete", my terminal was instantly terminating the spark streaming.
Instant streaming termination:
My questions:
How should I use the "withWatermark" function in order to output the summary statistics of the vector column "temperatures" to my console?
Is there any other approach to calculate descriptive statistics for a custom column of my data frame, which I may miss?
I appreciate any help in advance.
EDIT (20.12.2019)
The solution has been given and accepted. Although, now I get the following error:
解决方案When I tried to change the outputMode to "complete", my terminal was instantly terminating the spark streaming.
All your streaming queries are up and running, but (the main thread of) the pyspark application does not even give them a chance to run for long (since it does not await any termination due to #.awaitTermination()).
You should block the main thread of the pyspark application using StreamingQuery.awaitTermination(), e.g. query_1.awaitTermination()
更多推荐
如何在流式查询中生成摘要统计信息(使用Summarizer.metrics)?
发布评论