我正在编写一个spark应用程序,在该应用程序中,我需要根据位于SQL Server数据库中的历史数据来评估流数据.
I am writing an spark app ,where I need to evaluate the streaming data based on the historical data, which sits in a sql server database
现在的想法是,spark将从数据库中获取历史数据并将其保留在内存中,并将根据该数据评估流数据.
Now the idea is , spark will fetch the historical data from the database and persist it in the memory and will evaluate the streaming data against it .
现在我得到的流数据为
import re from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext,functions as func,Row sc = SparkContext("local[2]", "realtimeApp") ssc = StreamingContext(sc,10) files = ssc.textFileStream("hdfs://RealTimeInputFolder/") ########Lets get the data from the db which is relavant for streaming ### driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" dataurl = "jdbc:sqlserver://myserver:1433" db = "mydb" table = "stream_helper" credential = "my_credentials" ########basic data for evaluation purpose ######## files_count = files.flatMap(lambda file: file.split( )) pattern = '(TranAmount=Decimal.{2})(.[0-9]*.[0-9]*)(\\S+ )(TranDescription=u.)([a-zA-z\\s]+)([\\S\\s]+ )(dSc=u.)([A-Z]{2}.[0-9]+)' tranfiles = "wasb://myserver.blob.core.windows/RealTimeInputFolder01/" def getSqlContextInstance(sparkContext): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) return globals()['sqlContextSingletonInstance'] def pre_parse(logline): """ to read files as rows of sql in pyspark streaming using the pattern . for use of logging added 0,1 in case there is any failure in processing by this pattern """ match = re.search(pattern,logline) if match is None: return(line,0) else: return( Row( customer_id = match.group(8) trantype = match.group(5) amount = float(match.group(2)) ),1) def parse(): """ actual processing is happening here """ parsed_tran = ssc.textFileStream(tranfiles).map(preparse) success = parsed_tran.filter(lambda s: s[1] == 1).map(lambda x:x[0]) fail = parsed_tran.filter(lambda s:s[1] == 0).map(lambda x:x[0]) if fail.count() > 0: print "no of non parsed file : %d", % fail.count() return success,fail success ,fail = parse()现在,我想根据从历史数据中获得的数据框架对其进行评估
Now I want to evaluate it by the data frame that I get from the historical data
base_data = sqlContext.read.format("jdbc").options(driver=driver,url=dataurl,database=db,user=credential,password=credential,dbtable=table).load()现在,由于将其作为数据帧返回,因此我该如何将其用于我的目的.流式编程指南此处说您必须使用StreamingContext使用的SparkContext创建一个SQLContext."
Now since this being returned as a data frame how do I use this for my purpose . The streaming programming guide here says "You have to create a SQLContext using the SparkContext that the StreamingContext is using."
现在,这使我更加困惑如何将现有数据帧与流对象一起使用.任何帮助都将受到高度赞赏.
Now this makes me even more confused on how to use the existing dataframe with the streaming object . Any help is highly appreciated .
推荐答案要操作DataFrame,您始终需要 SQLContext ,以便可以像这样实例化它:
To manipulate DataFrames, you always need a SQLContext so you can instanciate it like :
sc = SparkContext("local[2]", "realtimeApp") sqlc = SQLContext(sc) ssc = StreamingContext(sc, 10)这两个上下文( SQLContext 和 StreamingContext )将共存于同一作业中,因为它们与相同的 SparkContext 相关联.但是,请记住,您不能在同一工作中实例化两个不同的SparkContext.
These 2 contexts (SQLContext and StreamingContext) will coexist in the same job because they are associated with the same SparkContext. But, keep in mind, you can't instanciate two different SparkContext in the same job.
一旦从DStreams创建了DataFrame,就可以将历史DataFrame与从Stream中创建的DataFrame结合在一起.为此,我会做类似的事情:
Once you have created your DataFrame from your DStreams, you can join your historical DataFrame with the DataFrame created from your stream. To do that, I would do something like :
yourDStream.foreachRDD(lambda rdd: sqlContext .createDataFrame(rdd) .join(historicalDF, ...) ... )考虑一下在操纵流时需要用于联接的流数据量,您可能会对窗口函数
Think about the amount of streamed data you need to use for your join when you manipulate streams, you may be interested by the windowed functions
更多推荐
如何使用Spark数据框架评估Spark Dstream对象
发布评论