在新列上过滤Spark DataFrame

编程入门 行业动态 更新时间:2024-10-27 12:25:23
本文介绍了在新列上过滤Spark DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

上下文:我的数据集太大而无法容纳内存,因此我正在训练Keras RNN.我正在AWS EMR集群上使用PySpark来分批训练模型,该模型的大小足以存储在内存中.我无法实现使用 elephas 分发的模型,并且我怀疑这与我的模型是有状态的有关.不过我不确定.

Context: I have a dataset too large to fit in memory I am training a Keras RNN on. I am using PySpark on an AWS EMR Cluster to train the model in batches that are small enough to be stored in memory. I was not able to implement the model as distributed using elephas and I suspect this is related to my model being stateful. I'm not entirely sure though.

数据框对每个用户都有一行,并且从安装之日起0到29天之间已经过去了几天.查询数据库后,我对该数据框进行了许多操作:

The dataframe has a row for every user and days elapsed from the day of install from 0 to 29. After querying the database I do a number of operations on the dataframe:

query = """WITH max_days_elapsed AS ( SELECT user_id, max(days_elapsed) as max_de FROM table GROUP BY user_id ) SELECT table.* FROM table LEFT OUTER JOIN max_days_elapsed USING (user_id) WHERE max_de = 1 AND days_elapsed < 1""" df = read_from_db(query) #this is just a custom function to query our database #Create features vector column assembler = VectorAssembler(inputCols=features_list, outputCol="features") df_vectorized = assembler.transform(df) #Split users into train and test and assign batch number udf_randint = udf(lambda x: np.random.randint(0, x), IntegerType()) training_users, testing_users = df_vectorized.select("user_id").distinct().randomSplit([0.8,0.2],123) training_users = training_users.withColumn("batch_number", udf_randint(lit(N_BATCHES))) #Create and sort train and test dataframes train = df_vectorized.join(training_users, ["user_id"], "inner").select(["user_id", "days_elapsed","batch_number","features", "kpi1", "kpi2", "kpi3"]) train = train.sort(["user_id", "days_elapsed"]) test = df_vectorized.join(testing_users, ["user_id"], "inner").select(["user_id","days_elapsed","features", "kpi1", "kpi2", "kpi3"]) test = test.sort(["user_id", "days_elapsed"])

我遇到的问题是,如果没有缓存训练,我似乎无法在batch_number上进行过滤.我可以过滤数据库中原始数据集中的任何列,但不能查询数据库后在pyspark中生成的任何列:

The problem I am having is that I cannot seem to be able to filter on batch_number without caching train. I can filter on any of the columns that are in the original dataset in our database, but not on any column I have generated in pyspark after querying the database:

此: train.filter(train ["days_elapsed"] == 0).select("days_elapsed").distinct.show()仅返回0.

但是,所有这些都返回0到9之间的所有批号,而没有任何过滤:

But, all of these return all of the batch numbers between 0 and 9 without any filtering:

  • train.filter(train ["batch_number"] == 0).select("batch_number").distinct().show()
  • train.filter(train.batch_number == 0).select("batch_number").distinct().show()
  • train.filter("batch_number = 0").select("batch_number").distinct().show()
  • train.filter(col("batch_number")== 0).select("batch_number").distinct().show()

这也不起作用:

train.createOrReplaceTempView("train_table") batch_df = spark.sql("SELECT * FROM train_table WHERE batch_number = 1") batch_df.select("batch_number").distinct().show()

如果我先进行train.cache(),所有这些工作.这是绝对必要的,还是有一种无需缓存的方法?

All of these work if I do train.cache() first. Is that absolutely necessary or is there a way to do this without caching?

推荐答案

火花> = 2.3 (?-取决于SPARK-22629的进度)

Spark >= 2.3 (? - depending on a progress of SPARK-22629)

应该可以使用 asNondeterministic 方法禁用某些优化.

It should be possible to disable certain optimization using asNondeterministic method.

火花<2.3

不要使用UDF生成随机数.首先,引用文档:

Don't use UDF to generate random numbers. First of all, to quote the docs:

用户定义的函数必须是确定性的.由于优化,与查询中存在的重复调用相比,可以消除重复调用,甚至可以多次调用该函数.

The user-defined functions must be deterministic. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query.

即使不是UDF,也存在Spark的细微之处,这使得在处理单个记录时几乎不可能实现此权利.

Even if it wasn't for UDF, there are Spark subtleties, which make it almost impossible to implement this right, when processing single records.

Spark已经提供了 rand :

Spark already provides rand:

根据U [0.0,1.0]生成具有独立且分布均匀的(i.i.d.)样本的随机列.

Generates a random column with independent and identically distributed (i.i.d.) samples from U[0.0, 1.0].

和 randn

从标准正态分布中生成具有独立且均匀分布(i.i.d.)样本的列.

Generates a column with independent and identically distributed (i.i.d.) samples from the standard normal distribution.

可用于构建更复杂的生成器功能.

which can be used to build more complex generator functions.

注意:

您的代码可能还有其他问题,但这从一开始就令人无法接受(在PySpark中生成随机数, pyspark.生成随机数的Transformer始终生成相同的数字).

There can be some other issues with your code but this makes it unacceptable from the beginning (Random numbers generation in PySpark, pyspark. Transformer that generates a random number generates always the same number).

更多推荐

在新列上过滤Spark DataFrame

本文发布于:2023-11-22 09:14:01,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1616835.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:新列上   Spark   DataFrame

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!