限制数据帧分区的最大大小

编程入门 行业动态 更新时间:2024-10-28 12:29:16
本文介绍了限制数据帧分区的最大大小的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

当我将数据帧写出到 csv 时,会为每个分区创建一个 .csv 文件.假设我想将每个文件的最大大小限制为 1 MB.我可以多次写入并增加每次重新分区的参数.有没有办法可以提前计算用于重新分区的参数以确保每个文件的最大大小小于某个指定大小.

When I write out a dataframe to, say, csv, a .csv file is created for each partition. Suppose I want to limit the max size of each file to, say, 1 MB. I could do the write multiple times and increase the argument to repartition each time. Is there a way I can calculate ahead of time what argument to use for repartition to ensure the max size of each file is less than some specified size.

我想可能会出现所有数据都集中在一个分区上的病态情况.所以做一个较弱的假设,我们只想确保平均文件大小小于某个指定的数量,比如 1 MB.

I imagine there might be pathological cases where all the data ends up on one partition. So make the weaker assumption that we only want to ensure that the average file size is less than some specified amount, say 1 MB.

推荐答案

1.单数据帧方案

我试图找出一些不会同时杀死集群的聪明想法,唯一想到的是:

1. Single dataframe solution

I was trying to find out some clever idea that would not kill the cluster at the same time and the only thing that came to my mind was:

  • 计算序列化行的大小
  • 不知道.DataFrame 中的行数
  • 重新分区,除以预期大小
  • 应该可以吗?
  • 代码应该看起来更像这样:

    The code should look more like this:

    val df: DataFrame = ??? // your df val rowSize = getBytes(df.head) val rowCount = df.count() val partitionSize = 1000000 // million bytes in MB? val noPartitions: Int = (rowSize * rowCount / partitionSize).toInt df.repartition(noPartitions).write.format(...) // save to csv // just helper function from stackoverflow/a/39371571/1549135 def getBytes(value: Any): Long = { val stream: ByteArrayOutputStream = new ByteArrayOutputStream() val oos = new ObjectOutputStream(stream) oos.writeObject(value) oos.close stream.toByteArray.length }

    虽然我的第一个选择是计算每一行的字节大小,但这会非常低效.因此,除非每行中的数据大小差异很大,否则我会说此解决方案可行.您还可以计算每第 n 行的大小.你明白了.

    While my first choice was to calculate each row's byte size, that would be terribly inefficient. So, unless your data size in each row differs in size greatly, I would say that this solution will work. You can also calculate every n-th row size. You got the idea.

    另外,我只是希望"Long 足够大以支持计算noPartitions 的预期大小.如果不是(如果你有很多行),也许改变操作顺序会更好,例如:

    Also, I just 'hope' that Long will be big enough to support the expected size to calculate noPartitions. If not (if you have a lot of rows), maybe it would be better to change the operations order, f.e.:

    val noPartitions: Int = (rowSize / partitionSize * rowCount).toInt

    同样,这只是一个草拟的想法,对您的数据没有领域知识.

    again this is just a drafted idea with no domain knowledge about your data.

    在浏览 apache-spark 文档时 我发现了一个有趣的跨系统解决方案:

    While going through the apache-spark docs I have found an interesting cross-system solution:

    spark.sql.files.maxPartitionBytes其中设置:

    读取文件时打包到单个分区的最大字节数.

    The maximum number of bytes to pack into a single partition when reading files.

    默认值为 134217728 (128 MB).

    所以我想您可以将其设置为 1000000 (1MB),它会对您的 DataFrames 产生永久影响.但是,过小的分区大小可能会极大地影响您的性能!

    So I suppose you could set it to 1000000 (1MB) and it will have a permanent effect on your DataFrames. However, too small partition size may greatly impact your performance!

    您可以在SparkSession创建期间进行设置:

    You can set it up, during SparkSession creation:

    val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.sql.files.maxPartitionBytes", 100000) .getOrCreate()

    以上所有内容仅在(我没记错并且)使用与 DataFrame 分区相同数量的文件对 csv 进行分区时才有效.

    All of above is only valid if (I remember correctly and) the csv is partitioned with the same number of files as there are partitions of DataFrame.

    更多推荐

    限制数据帧分区的最大大小

    本文发布于:2023-06-05 11:33:32,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/518794.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:分区   大小   数据

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!