如何覆盖在Spark中读取DataFrame所在的Parquet文件

编程入门 行业动态 更新时间:2024-10-23 07:18:53
本文介绍了如何覆盖在Spark中读取DataFrame所在的Parquet文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

这是我所遇到的问题的缩影,其中我遇到了错误.让我尝试在这里重现它.

This is a microcosm of the problem I am facing, where I am getting an error. Let me try to reproduce it here.

我正在将 DataFrame 保存为 parquet ,但是当我从 parquet 文件重新加载 DataFrame 时,再次将其保存为 parquet ,我得到一个错误.

I am saving a DataFrame as a parquet, but when I reload the DataFrame from parquet file and save it once again as parquet, I get an error.

valuesCol = [('Male','2019-09-06'),('Female','2019-09-06'),('Male','2019-09-07')] df = spark.createDataFrame(valuesCol,['sex','date']) # Save as parquet df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp') # Load it back df = spark.read.format('parquet').load('.../temp') df = df.where(col('sex')=='Male') # Save it back - This produces ERROR df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

错误消息-

执行器22):java.io.FileNotFoundException:请求的文件maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet不存在.基础文件可能已更新.您可以通过运行'REFRESH来显式使Spark中的缓存无效TABLE tableName'命令(通过SQL或通过重新创建数据集/DataFrame)参与其中.

executor 22): java.io.FileNotFoundException: Requested file maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

另外一个问题解决了此问题问题.提议的解决方案是刷新该表类似于下面的代码,但这并没有帮助.问题在于元数据的刷新.我不知道如何刷新它.

Another SO question addresses this issue. The proposed solution was to refresh the table like the code below, but that did not help. The issue is with the refreshing of the metadata. I don't know how to refresh it.

df.createOrReplaceTempView('table_view') spark.catalog.refreshTable('table_view') df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

此问题的解决方法:解决此问题的一种非优雅的方法是将 DataFrame 保存为具有不同名称的 parquet 文件,然后删除原始的 parquet 文件,最后,将此 parquet 文件重命名为旧名称.

Workaround for this problem: A non-elegant way to solve this issue is to save the DataFrame as parquet file with a different name, then delete the original parquet file and finally, rename this parquet file to the old name.

# Workaround import os import shutil # Load it back df = spark.read.format('parquet').load('.../temp') # Save it back as temp1, as opposed to original temp df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp1') # Delete the original parquet file shutil.rmtree('.../temp') # Renaming the parquet folder. os.rename('.../temp1','.../temp')

但是,问题是某些DataFrame很大,这可能不是处理它的最佳方法.更不用说重命名是否会导致MetaData出现问题,我不确定.

But, the problem is that some DataFrames are quite big and this may not be the best way to deal with it. Not to mention if renaming will cause some problem with the MetaData, that I am not sure of.

推荐答案

针对此错误的一种解决方案是缓存,对df进行操作(例如: df.show()),然后将实木复合地板文件保存在覆盖"目录中.模式.

One solution for this error is to cache, make an action to the df (example: df.show()) and then save the parquet file in "overwrite" mode.

在python中:

df = spark.read.parquet("path_to_parquet") ....... make your transformation to the df new_df.cache() new_df.show() df.write.format("parquet")\ .mode(save_mode)\ .save("path_to_parquet")

更多推荐

如何覆盖在Spark中读取DataFrame所在的Parquet文件

本文发布于:2023-11-22 06:51:44,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1616390.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:文件   Spark   DataFrame   Parquet

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!