如何限制使用 FileIO 写入的每个文件的行数

编程入门 行业动态 更新时间:2024-10-21 13:21:28
本文介绍了如何限制使用 FileIO 写入的每个文件的行数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

是否有可能使用 TextIO 或 FileIO 限制每个写入分片中的行数?

Is there a possible way to limit number of lines in each written shard using TextIO or may be FileIO?

示例:

从 Big Query - Batch Job 中读取行(例如,结果为 19500 行).进行一些转换.将文件写入 Google Cloud 存储(19 个文件,每个文件限制为 1000 条记录,一个文件有 500 条记录).Cloud Function 被触发以针对 GCS 中的每个文件向外部 API 发出 POST 请求.

这是我目前正在尝试但不起作用(试图限制每个文件 1000 行):

Here is what I'm trying to do so far but doesn't work (Trying to limit 1000 rows per file):

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
        beam.io.BigQuerySource(query=query,
                               use_standard_sql=True)) | beam.Map(json.dumps)

BQ_DATA | beam.WindowInto(GlobalWindows(), Repeatedly(trigger=AfterCount(1000)),
                              accumulation_mode=AccumulationMode.DISCARDING)
        | WriteToFiles(path='fileio', destination="csv")

我在概念上是错误的还是有其他方法可以实现这一点?

Am I conceptually wrong or is there any other way to implement this?

推荐答案

您可以在 ParDo 中实现写入 GCS 步骤并限制包含在批处理"中的元素数量像这样:

You can implement the write to GCS step inside ParDo and limit the number of elements to include in a "batch" like this:

from apache_beam.io import filesystems

class WriteToGcsWithRowLimit(beam.DoFn):
  def __init__(self, row_size=1000):
    self.row_size = row_size
    self.rows = []

  def finish_bundle(self):
     if len(self.rows) > 0:
        self._write_file()

  def process(self, element):
    self.rows.append(element)
    if len(self.rows) >= self.row_size:
        self._write_file()

  def _write_file(self):
    from time import time
    new_file = 'gs://bucket/file-{}.csv'.format(time())
    writer = filesystems.FileSystems.create(path=new_file)
    writer.write(self.rows) # may need to format
    self.rows = []
    writer.close()

BQ_DATA  | beam.ParDo(WriteToGcsWithRowLimit())

请注意,这不会创建任何少于 1000 行的文件,但您可以更改 process 中的逻辑来做到这一点.

Note that this will not create any files with less than 1000 rows, but you can change the logic in process to do that.

(编辑 1 处理余数)

(Edit 1 to handle the remainders)

(编辑 2 以停止使用计数器,因为文件将被覆盖)

(Edit 2 to stop using counters, as files will be overridden)

这篇关于如何限制使用 FileIO 写入的每个文件的行数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 23:09:41,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/971394.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:行数   文件   FileIO

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!