为什么Apache Spark为什么在嵌套结构中读取不必要的Parquet列?

编程入门 行业动态 更新时间:2024-10-23 17:28:20
本文介绍了为什么Apache Spark为什么在嵌套结构中读取不必要的Parquet列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我的团队正在构建一个ETL流程,以使用Spark将原始的定界文本文件加载到基于Parquet的数据湖"中. Parquet列存储的承诺之一是查询将只读取必要的列条纹".

My team is building an ETL process to load raw delimited text files into a Parquet based "data lake" using Spark. One of the promises of the Parquet column store is that a query will only read the necessary "column stripes".

但是我们看到正在读取嵌套模式结构的意外列.

But we're seeing unexpected columns being read for nested schema structures.

为了演示,这是一个使用Scala和Spark 2.0.1 shell的POC:

To demonstrate, here is a POC using Scala and the Spark 2.0.1 shell:

// Preliminary setup sc.setLogLevel("INFO") import org.apache.spark.sql.types._ import org.apache.spark.sql._ // Create a schema with nested complex structures val schema = StructType(Seq( StructField("F1", IntegerType), StructField("F2", IntegerType), StructField("Orig", StructType(Seq( StructField("F1", StringType), StructField("F2", StringType)))))) // Create some sample data val data = spark.createDataFrame( sc.parallelize(Seq( Row(1, 2, Row("1", "2")), Row(3, null, Row("3", "ABC")))), schema) // Save it data.write.mode(SaveMode.Overwrite).parquet("data.parquet")

然后,我们将文件读回到DataFrame并投影到列的子集:

Then we read the file back into a DataFrame and project to a subset of columns:

// Read it back into another DataFrame val df = spark.read.parquet("data.parquet") // Select & show a subset of the columns df.select($"F1", $"Orig.F1").show

运行此命令时,我们将看到预期的输出:

When this runs we see the expected output:

+---+-------+ | F1|Orig_F1| +---+-------+ | 1| 1| | 3| 3| +---+-------+

但是...查询计划显示的故事略有不同:

But... the query plan shows a slightly different story:

优化计划"显示:

val projected = df.select($"F1", $"Orig.F1".as("Orig_F1")) projected.queryExecution.optimizedPlan // Project [F1#18, Orig#20.F1 AS Orig_F1#116] // +- Relation[F1#18,F2#19,Orig#20] parquet

说明"显示:

projected.explain // == Physical Plan == // *Project [F1#18, Orig#20.F1 AS Orig_F1#116] // +- *Scan parquet [F1#18,Orig#20] Format: ParquetFormat, InputPaths: hdfs://sandbox.hortonworks:8020/user/stephenp/data.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<F1:int,Orig:struct<F1:string,F2:string>>

在执行过程中产生的INFO日志还确认Orig.F2列被意外读取:

And the INFO logs produced during execution also confirm that the Orig.F2 column is unexpectedly read:

16/10/21 15:13:15 INFO parquet.ParquetReadSupport: Going to read the following fields from the Parquet file: Parquet form: message spark_schema { optional int32 F1; optional group Orig { optional binary F1 (UTF8); optional binary F2 (UTF8); } } Catalyst form: StructType(StructField(F1,IntegerType,true), StructField(Orig,StructType(StructField(F1,StringType,true), StructField(F2,StringType,true)),true))

根据 Dremel纸和镶木地板文档中,用于复杂嵌套结构的列应独立存储和独立检索.

According to the Dremel paper and the Parquet documentation, columns for complex nested structures should be independently stored and independently retrievable.

问题:

  • 此行为是否是当前Spark查询引擎的限制?换句话说,Parquet是否支持最佳执行此查询,但是Spark的查询计划程序是幼稚的?
  • 或者,这是否是当前Parquet实施的局限性?
  • 或者,我是否没有正确使用Spark API?
  • 或者,我是否误解了Dremel/Parquet列存储应该如何工作?
  • 可能相关:为什么查询性能与Spark SQL中的嵌套列有所不同吗?

    推荐答案

    目前这是对Spark查询引擎的限制,相关的JIRA票据在下面,spark仅处理Parquet中简单类型的谓词下推,而不处理嵌套的StructTypes

    It's a limitation on the Spark query engine at the moment, the relevant JIRA ticket is below, spark only handles predicate pushdown of simple types in Parquet, not nested StructTypes

    issues.apache/jira/browse/SPARK-17636

    更多推荐

    为什么Apache Spark为什么在嵌套结构中读取不必要的Parquet列?

    本文发布于:2023-11-22 06:53:11,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1616395.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:嵌套   不必要   结构   Apache   Spark

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!