如何在流式查询中生成摘要统计信息(使用Summarizer.metrics)?

编程入门 行业动态 更新时间:2024-10-19 13:26:27
本文介绍了如何在流式查询中生成摘要统计信息(使用Summarizer.metrics)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

当前,我正在使用Spark结构化流式传输来创建(id,timestamp_value,device_id,temperature_value,comment)形式的随机数据的数据帧.

每批

Spark数据帧:

基于上面数据框的屏幕截图,我希望对"temperature_value"列具有一些描述性统计信息.例如,最小值,最大值,平均值,计数,方差.

我在python中实现此目标的方法如下:

import sys导入json导入psycopg2从pyspark导入SparkContext从pyspark.streaming导入StreamingContext从pyspark.sql导入SparkSession从pyspark.sql.types导入StructType,StructField,StringType,IntegerType从pyspark.sql.functions导入from_json,col,to_json从pyspark.sql.types导入*从pyspark.sql.functions导入爆炸从pyspark.sql.functions导入拆分从pyspark.sql.functions导入get_json_object从pyspark.ml.stat导入摘要器从pyspark.ml.feature导入VectorAssembler从pyspark.ml.feature导入StandardScaler从pyspark.sql.functions导入点亮,unix_timestamp从pyspark.sql导入功能为F将numpy导入为np从pyspark.mllib.stat导入统计信息spark = SparkSession.builder.appName(< spark_application_name>).getOrCreate()spark.sparkContext.setLogLevel("WARN")spark.streams.active数据= spark.readStream.format("kafka").option("kafka.bootstrap.servers","kafka_broker:< port_number>").option("subscribe",< topic_name>).option("startingOffsets","latest").load()模式= StructType([StructField("id",DoubleType()),StructField("timestamp_value",DoubleType()),StructField("device_id",DoubleType()),StructField("temperature_value",DoubleType()),StructField("comment",StringType())])telemetry_dataframe = data.selectExpr("CAST(值作为字符串)").select(from_json(col(值").cast("string"),schema).alias("tmp")).选择("tmp.*")telemetry_dataframe.printSchema()temperature_value_selection =遥测数据帧.select("temperature_value")temperature_value_selection_new = temperature_value_selection.withColumn("device_temperature",temperature_value_selection ["temperature_value"].cast(DecimalType()))temperature_value_selection_new.printSchema()汇编程序= VectorAssembler(输入Cols = [设备温度"],输出Col =温度".)组装= assembler.transform(temperature_value_selection_new)assembled_new = assembled.withColumn("timestamp",F.current_timestamp())assembled_new.printSchema()#scaler = StandardScaler(inputCol =温度",outputCol ="scaledTemperatures",其中Std = True,withMean = False).fit(组装)#缩放= scaler.transform(已组装)summaryr = Summarizer.metrics(最大",最小",方差",平均值",计数")descriptive_table_one = assembled_new.withWatermark("timestamp","4 minutes").select(summarizer.summary(assembled_new.temperatures))#descriptive_table_one = assembled_new.withWatermark("timestamp","4 minutes").groupBy(F.col("timestamp")).agg(max(F.col('timestamp')).alias("timestamp)).orderBy('timestamp',ascending = False).select(summarizer.summary(assembled.temperatures))#descriptive_table_one = assembled_new.select(summarizer.summary(assembled.temperatures))#descriptive_table_two = temperature_value_selection_new.select(summarizer.summary(temperature_value_selection_new.device_temperature))#-------------------------------------------------------------------------------------#########################################查询#########################################query_1 = telemetry_dataframe.writeStream.outputMode("append").format("console").trigger(processingTime ="5 seconds").start()#.awaitTermination()query_2 = temperature_value_selection_new.writeStream.outputMode("append").format("console").trigger(processingTime ="8 seconds").start()#.awaitTermination()query_3 = assembled_new.writeStream.outputMode("append").format("console").trigger(processingTime ="11 seconds").start()#.awaitTermination()#query_4_1 = descriptive_table_one.writeStream.outputMode("complete").format("console").trigger(processingTime ="14 seconds").start()#.awaitTermination()query_4_2 = descriptive_table_one.writeStream.outputMode("append").format("console").trigger(processingTime ="17 seconds").start()#.awaitTermination()

解决方案

当我尝试将outputMode更改为"complete"时,我的终端立即终止了火花流.

所有流查询都已启动并正在运行,但是pyspark应用程序(的主线程)甚至没有给它们提供长时间运行的机会(因为它不会等待由于#.awaitTermination()).

您应该使用 StreamingQuery.awaitTermination()阻止pyspark应用程序的主线程,例如 query_1.awaitTermination()

Currently, I am using spark structured streaming to create data frames of random data in the form of (id, timestamp_value, device_id, temperature_value, comment).

Spark Dataframe per Batch:

Based on the screenshot of the data frame above, I would like to have some descriptive statistics for the column "temperature_value". For example, min, max, mean, count, variance.

My approach to achieve this in python is the following:

import sys import json import psycopg2 from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType from pyspark.sql.functions import from_json, col, to_json from pyspark.sql.types import * from pyspark.sql.functions import explode from pyspark.sql.functions import split from pyspark.sql.functions import get_json_object from pyspark.ml.stat import Summarizer from pyspark.ml.feature import VectorAssembler from pyspark.ml.feature import StandardScaler from pyspark.sql.functions import lit,unix_timestamp from pyspark.sql import functions as F import numpy as np from pyspark.mllib.stat import Statistics spark = SparkSession.builder.appName(<spark_application_name>).getOrCreate() spark.sparkContext.setLogLevel("WARN") spark.streams.active data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka_broker:<port_number>").option("subscribe", <topic_name>).option("startingOffsets", "latest").load() schema = StructType([ StructField("id", DoubleType()), StructField("timestamp_value", DoubleType()), StructField("device_id", DoubleType()), StructField("temperature_value", DoubleType()), StructField("comment", StringType())]) telemetry_dataframe = data.selectExpr("CAST(value AS STRING)").select(from_json(col("value").cast("string"), schema).alias("tmp")).select("tmp.*") telemetry_dataframe.printSchema() temperature_value_selection = telemetry_dataframe.select("temperature_value") temperature_value_selection_new = temperature_value_selection.withColumn("device_temperature", temperature_value_selection["temperature_value"].cast(DecimalType())) temperature_value_selection_new.printSchema() assembler = VectorAssembler( inputCols=["device_temperature"], outputCol="temperatures" ) assembled = assembler.transform(temperature_value_selection_new) assembled_new = assembled.withColumn("timestamp", F.current_timestamp()) assembled_new.printSchema() # scaler = StandardScaler(inputCol="temperatures", outputCol="scaledTemperatures", withStd=True, withMean=False).fit(assembled) # scaled = scaler.transform(assembled) summarizer = Summarizer.metrics("max", "min", "variance", "mean", "count") descriptive_table_one = assembled_new.withWatermark("timestamp", "4 minutes").select(summarizer.summary(assembled_new.temperatures)) #descriptive_table_one = assembled_new.withWatermark("timestamp", "4 minutes").groupBy(F.col("timestamp")).agg(max(F.col('timestamp')).alias("timestamp")).orderBy('timestamp', ascending=False).select(summarizer.summary(assembled.temperatures)) #descriptive_table_one = assembled_new.select(summarizer.summary(assembled.temperatures)) # descriptive_table_two = temperature_value_selection_new.select(summarizer.summary(temperature_value_selection_new.device_temperature)) # ------------------------------------------------------------------------------------- ######################################### # QUERIES # ######################################### query_1 = telemetry_dataframe.writeStream.outputMode("append").format("console").trigger(processingTime = "5 seconds").start()#.awaitTermination() query_2 = temperature_value_selection_new.writeStream.outputMode("append").format("console").trigger(processingTime = "8 seconds").start()#.awaitTermination() query_3= assembled_new.writeStream.outputMode("append").format("console").trigger(processingTime = "11 seconds").start()#.awaitTermination() #query_4_1 = descriptive_table_one.writeStream.outputMode("complete").format("console").trigger(processingTime = "14 seconds").start()#.awaitTermination() query_4_2 = descriptive_table_one.writeStream.outputMode("append").format("console").trigger(processingTime = "17 seconds").start()#.awaitTermination()

Summarizer documentation.

Based on the posted code, I am isolating the column "temperature_value" and then I vectorize it (using VectorAssembler) to create the column "temperatures" of type vector.

What I would like is to output the result of the "Summarizer" function to my console. This is why I use "append" for outputMode and format "console". But I was getting this error: pyspark.sql.utils.AnalysisException: 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark. Thus, I used the "withWatermark" function but I am still getting the same error with the outputMode "append".

When I tried to change the outputMode to "complete", my terminal was instantly terminating the spark streaming.

Instant streaming termination:

My questions:

  • How should I use the "withWatermark" function in order to output the summary statistics of the vector column "temperatures" to my console?

  • Is there any other approach to calculate descriptive statistics for a custom column of my data frame, which I may miss?

  • I appreciate any help in advance.

    EDIT (20.12.2019)

    The solution has been given and accepted. Although, now I get the following error:

    解决方案

    When I tried to change the outputMode to "complete", my terminal was instantly terminating the spark streaming.

    All your streaming queries are up and running, but (the main thread of) the pyspark application does not even give them a chance to run for long (since it does not await any termination due to #.awaitTermination()).

    You should block the main thread of the pyspark application using StreamingQuery.awaitTermination(), e.g. query_1.awaitTermination()

    更多推荐

    如何在流式查询中生成摘要统计信息(使用Summarizer.metrics)?

    本文发布于:2023-10-27 11:09:04,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1533195.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:统计信息   流式   摘要   如何在   metrics

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!