问题描述
限时送ChatGPT账号..我有一个从 GCS
到 Pub\Sub
的管道读取文件,
I have one pipeline read file from GCS
through Pub\Sub
,
class ExtractFileNameFn(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
yield file_name
class LogFn(beam.DoFn):
def process(self, element):
logging.info(element)
return [element]
class LogPassThroughFn(beam.DoFn):
def process(self, element):
logging.info(element)
return element
...
p
| "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
| "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
| "Extract File Name" >> beam.ParDo(ExtractFileNameFn())
| 'Log Results' >> beam.ParDo(LogFn())
# | 'Log Results' >> beam.ParDo(LogPassThroughFn())
| "Read File from GCS" >> beam.io.ReadAllFromText()
LogPassThroughFn
和LogPassThroughFn
的区别在于返回值的类型,一个是string
,另一个是list代码>.并且
LogFn
在测试代码中运行良好,但是LogPassThroughFn
使管道无法运行.每个这个问题答案
The difference of LogPassThroughFn
and LogPassThroughFn
is the type of return value, one the string
, the other is list
. And the LogFn
works well in test codes, but LogPassThroughFn
make the pipeline failed to run. Per this issue answer
Beam Python SDK 仍会尝试将 ParDo 的输出解释为元素集合.它通过将您发出的字符串解释为字符集合来实现.
Beam Python SDK still tries to interpret the output of that ParDo as if it was a collection of elements. And it does so by interpreting the string you emitted as collection of characters.
我们知道 LogFn
应该可以正常工作.
We know LogFn
should work correctly.
但是,我注意到 ExtractFileNameFn
返回的是 string
而不是 list
.那是对的吗?然后我测试如下,在 ExtractFileNameFn1
However, I notice the ExtractFileNameFn
return string
rather than list
. Is that correct? Then I test it as below, return list
in ExtractFileNameFn1
class ExtractFileNameFn1(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
yield [file_name]
...
p
| "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
| "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
| "Extract File Name" >> beam.ParDo(ExtractFileNameFn1())
| "Read File from GCS" >> beam.io.ReadAllFromText()
现在,管道无法运行...
Now, the pipeline failed to run...
我的问题是在 DoFn 中 return string
和 return list
有什么区别?为什么ReadAllFromText
可以从ExtractFileNameFn
接收string
,而从LogFn
接收list
?
My question is What the difference between return string
and return list
in DoFn? Why ReadAllFromText
could receive string
from ExtractFileNameFn
, but receive list
from LogFn
?
梁版本:2.14.0
推荐答案
ParDo
的文档说:
请注意,DoFn 必须为输入 PCollection 的每个元素返回一个可迭代对象.一个简单的方法是在 process 方法中使用 yield 关键字.
Note that the DoFn must return an iterable for each element of the input PCollection. An easy way to do this is to use the yield keyword in the process method.
https://beam.apache/releases/pydoc/2.6.0/apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo
返回可迭代对象的目的是您的输入元素可能不会将 1-1 映射到您的输出元素.单个输入可能产生多个输出.
The purpose of returning an iterable is that your input elements may not map 1-1 with your output elements. A single input may produce multiple outputs.
您可以随时yield
它们,或者您可以将它们收集到一个列表中并在最后return
它们
you are able to yield
them as you go, or you can gather them up into a list and return
them at the end
所以:
class ExtractFileNameFn(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
yield file_name
应该是这样的:
class ExtractFileNameFn(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
return [file_name]
两者的输出元素都是字符串,每个输出元素都是一个文件名
the output elements for both are strings, each output element being a filename
当你做yield [file_name]
时,每个输出元素实际上是一个包含字符串的列表
When you do yield [file_name]
, each output element is actually a list containing a string
这篇关于Beam:ReadAllFromText 从 DoFn 接收字符串或列表的不同行为?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论