避免在 Beam Python SDK 中重新计算所有 Cloud Storage 文件的大小

编程入门 行业动态 更新时间:2024-10-28 00:19:34
本文介绍了避免在 Beam Python SDK 中重新计算所有 Cloud Storage 文件的大小的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我正在开发一个从 Google Cloud Storage (GCS) 目录读取约 500 万个文件的管道.我已将其配置为在 Google Cloud Dataflow 上运行.

I'm working on a pipeline that reads ~5 million files from a Google Cloud Storage (GCS) directory. I have it configured to run on Google Cloud Dataflow.

问题是,当我启动管道时,需要几个小时来计算所有文件的大小":

The problem is that when I start the pipeline, it takes hours "computing the size" of all of the files:

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]

如您所见,计算大约 5.5M 文件的大小花了一个半小时(5549 秒),然后又从头开始!跑第二遍又用了2个小时,然后第三遍就开始了!截至撰写本文时,该作业在 Dataflow 控制台中仍然不可用,这让我相信这一切都发生在我的本地机器上,并没有利用任何分布式计算.

As you can see, it took an hour an a half (5549 seconds) to compute the size of about 5.5M files, then it started all over again from the beginning! It took another 2 hours to run the second pass, then it started it a third time! As of this time of writing, the job is still not available in the Dataflow console, which leads me to believe this is all happening on my local machine and not taking advantage of any distributed computing.

当我使用较小的输入数据集(2 个文件)测试管道时,它会重复大小估计 4 次:

When I test the pipeline with a smaller input dataset (2 files) it repeats the size estimation 4 times:

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.

以这种速度,仅在 Dataflow 作业开始之前对所有 550 万个文件执行 GCS 大小估计 4 次就需要大约 8 小时.

At this rate it will take about 8 hours just to perform GCS size estimations of all 5.5M files 4 times, all before the Dataflow job has even started.

我的管道配置了 --runner=DataflowRunner 选项,所以它应该在 Dataflow 中运行:

My pipeline is configured with the --runner=DataflowRunner option, so it should be running in Dataflow:

python bigquery_import.py --runner=DataflowRunner #other options...

管道从 GCS 读取如下:

The pipeline reads from GCS like this:

parser = argparse.ArgumentParser()
parser.add_argument(
    '--input',
    required=True,
    help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True

with beam.Pipeline(options=pipeline_options) as p:
    files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')

请参阅 GitHub 上的完整代码.

Refer to bigquery_import.py on GitHub for the full code.

我很困惑为什么这个乏味的过程会在 Dataflow 环境之外发生,以及为什么需要多次完成.我是正确读取 GCS 中的文件还是有更有效的方法?

I'm confused why this tedious process is happening outside of the Dataflow environment and why it needs to be done multiple times. Am I reading the files from GCS correctly or is there a more efficient way?

推荐答案

感谢您报告此问题.Beam 有两种用于阅读文本的变换.ReadFromTextReadAllFromText.ReadFromText 会遇到这个问题,但 ReadAllFromText 不会.

Thanks for reporting this. Beam have two transforms for reading text. ReadFromText and ReadAllFromText. ReadFromText will run into this issue but ReadAllFromText shouldn't.

https://github/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L438

ReadAllFromText 的缺点是它不会执行动态工作重新平衡,但在读取大量文件时这应该不是问题.

Downside of ReadAllFromText is that it won't perform dynamic work rebalancing, but this should not be an issue when reading a a large number of files.

创建 https://issues.apache/jira/browse/BEAM-9620 用于跟踪 ReadFromText(以及一般基于文件的源)的问题.

Created https://issues.apache/jira/browse/BEAM-9620 for tracking issues with ReadFromText (and file-based sources in general).

这篇关于避免在 Beam Python SDK 中重新计算所有 Cloud Storage 文件的大小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 23:25:50,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/972346.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:大小   文件   Python   Beam   SDK

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!