如何使用Spark数据框架评估Spark Dstream对象

编程入门 行业动态 更新时间:2024-10-17 21:18:25
本文介绍了如何使用Spark数据框架评估Spark Dstream对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在编写一个spark应用程序,在该应用程序中,我需要根据位于SQL Server数据库中的历史数据来评估流数据.

I am writing an spark app ,where I need to evaluate the streaming data based on the historical data, which sits in a sql server database

现在的想法是,spark将从数据库中获取历史数据并将其保留在内存中,并将根据该数据评估流数据.

Now the idea is , spark will fetch the historical data from the database and persist it in the memory and will evaluate the streaming data against it .

现在我得到的流数据为

import re from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext,functions as func,Row sc = SparkContext("local[2]", "realtimeApp") ssc = StreamingContext(sc,10) files = ssc.textFileStream("hdfs://RealTimeInputFolder/") ########Lets get the data from the db which is relavant for streaming ### driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" dataurl = "jdbc:sqlserver://myserver:1433" db = "mydb" table = "stream_helper" credential = "my_credentials" ########basic data for evaluation purpose ######## files_count = files.flatMap(lambda file: file.split( )) pattern = '(TranAmount=Decimal.{2})(.[0-9]*.[0-9]*)(\\S+ )(TranDescription=u.)([a-zA-z\\s]+)([\\S\\s]+ )(dSc=u.)([A-Z]{2}.[0-9]+)' tranfiles = "wasb://myserver.blob.core.windows/RealTimeInputFolder01/" def getSqlContextInstance(sparkContext): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) return globals()['sqlContextSingletonInstance'] def pre_parse(logline): """ to read files as rows of sql in pyspark streaming using the pattern . for use of logging added 0,1 in case there is any failure in processing by this pattern """ match = re.search(pattern,logline) if match is None: return(line,0) else: return( Row( customer_id = match.group(8) trantype = match.group(5) amount = float(match.group(2)) ),1) def parse(): """ actual processing is happening here """ parsed_tran = ssc.textFileStream(tranfiles).map(preparse) success = parsed_tran.filter(lambda s: s[1] == 1).map(lambda x:x[0]) fail = parsed_tran.filter(lambda s:s[1] == 0).map(lambda x:x[0]) if fail.count() > 0: print "no of non parsed file : %d", % fail.count() return success,fail success ,fail = parse()

现在,我想根据从历史数据中获得的数据框架对其进行评估

Now I want to evaluate it by the data frame that I get from the historical data

base_data = sqlContext.read.format("jdbc").options(driver=driver,url=dataurl,database=db,user=credential,password=credential,dbtable=table).load()

现在,由于将其作为数据帧返回,因此我该如何将其用于我的目的.流式编程指南此处说您必须使用StreamingContext使用的SparkContext创建一个SQLContext."

Now since this being returned as a data frame how do I use this for my purpose . The streaming programming guide here says "You have to create a SQLContext using the SparkContext that the StreamingContext is using."

现在,这使我更加困惑如何将现有数据帧与流对象一起使用.任何帮助都将受到高度赞赏.

Now this makes me even more confused on how to use the existing dataframe with the streaming object . Any help is highly appreciated .

推荐答案

要操作DataFrame,您始终需要 SQLContext ,以便可以像这样实例化它:

To manipulate DataFrames, you always need a SQLContext so you can instanciate it like :

sc = SparkContext("local[2]", "realtimeApp") sqlc = SQLContext(sc) ssc = StreamingContext(sc, 10)

这两个上下文( SQLContext 和 StreamingContext )将共存于同一作业中,因为它们与相同的 SparkContext 相关联.但是,请记住,您不能在同一工作中实例化两个不同的SparkContext.

These 2 contexts (SQLContext and StreamingContext) will coexist in the same job because they are associated with the same SparkContext. But, keep in mind, you can't instanciate two different SparkContext in the same job.

一旦从DStreams创建了DataFrame,就可以将历史DataFrame与从Stream中创建的DataFrame结合在一起.为此,我会做类似的事情:

Once you have created your DataFrame from your DStreams, you can join your historical DataFrame with the DataFrame created from your stream. To do that, I would do something like :

yourDStream.foreachRDD(lambda rdd: sqlContext .createDataFrame(rdd) .join(historicalDF, ...) ... )

考虑一下在操纵流时需要用于联接的流数据量,您可能会对窗口函数

Think about the amount of streamed data you need to use for your join when you manipulate streams, you may be interested by the windowed functions

更多推荐

如何使用Spark数据框架评估Spark Dstream对象

本文发布于:2023-11-25 23:01:51,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1631692.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:如何使用   框架   对象   数据   Spark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!