数据流到 Beam Pipeline 的附加参数

编程入门 行业动态 更新时间:2024-10-22 22:57:43
本文介绍了数据流到 Beam Pipeline 的附加参数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我正在研究 Dataflow,我已经通过 Python SDK 构建了我的自定义管道.我想将 Dataflow UI 中的参数添加到我的自定义管道中.使用附加参数.参考

然后我将 add_argument 更改为 add_value_provider_argument 跟随 google docs

class CustomParams(PipelineOptions):@类方法def _add_argparse_args(cls, 解析器):parser.add_value_provider_argument("--input_topic",类型 = str,)parser.add_value_provider_argument("--window_size",类型 = 整数,默认值 = 5,)定义运行():pipeline_options = PipelineOptions(pipeline_args, .....)custom_param = pipeline_options.view_as(CustomParams).....管道|阅读 PubSub 消息">>beam.io.ReadFromPubSub(custom_param.input_topic)

之后,我尝试为 GCP 制作模板.上传脚本看起来像

 python custom_pipeline.py \--runner DataflowRunner \--project YOUR_PROJECT_ID \--staging_location gs://YOUR_BUCKET_NAME/staging \--temp_location gs://YOUR_BUCKET_NAME/temp \--template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME

但是当我创建上传到 GCS 的模板时出现错误,就像这样

TypeError: 预期的字符串或类似字节的对象

在行 beam.io.ReadFromPubSub()

看起来我从 add_value_provider_argument 得到的东西是 RuntimeValueProvider 对象.所以我很困惑我必须做什么才能解决这个问题?

我尝试解决这个问题,例如

转换数据类型

beam.io.ReadFromPubSub(str(custom_param.input_topic))

但是得到这个错误,

ValueError:PubSub 主题必须采用projects//topics/"形式(得到RuntimeValueProvider(选项:input_topic,类型:str,default_value:'...')").

所以请问有人对此进行故障排除吗?我不知道怎么走.

解决方案

正如@mk_sta 提到的

<块引用>

ReadFromPubSub 模块似乎不接受 ValueProvider.你检查过这个 Stack 线程吗?

并解释为 线程,ReadFromPubSub 当前不接受 ValueProvider 参数,因为它是作为 Dataflow 中的本机转换实现的.

您可以查看接受运行时参数的 I/O 方法,用于不同 SDK 中的 ValueProvider 支持.

所以此时,如果你从 Python SDK 切换到 Java SDK,Read 的 PubSubIO 确实支持 ValueProvider.

I'm working on Dataflow, I already has build my custom pipeline via Python SDK. I would like to add the parameters at the Dataflow UI into my custom pipeline. using the Additional Parameters. Reference by https://cloud.google/dataflow/docs/guides/templates/creating-templates#staticvalue

Then I changed add_argument to add_value_provider_argument follow by google docs

class CustomParams(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):        
        parser.add_value_provider_argument(
            "--input_topic",
            type = str,
        )
        parser.add_value_provider_argument(
            "--window_size",
            type = int,
            default = 5,
        )

def run():
    pipeline_options = PipelineOptions(pipeline_args, .....)
    custom_param = pipeline_options.view_as(CustomParams)
    .....
    pipeline | "Read PubSub Message" >> beam.io.ReadFromPubSub(custom_param.input_topic)

After that, I try to making a templates to GCP. The script for upload look like

  python custom_pipeline.py \
    --runner DataflowRunner \
    --project YOUR_PROJECT_ID \
    --staging_location gs://YOUR_BUCKET_NAME/staging \
    --temp_location gs://YOUR_BUCKET_NAME/temp \
    --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME

But I got the error when I creating template for upload to GCS, like this

TypeError: expected string or bytes-like object

at the line beam.io.ReadFromPubSub()

It's look like the thing I got from add_value_provider_argument Is RuntimeValueProvider object. So I'm quite confuse what I have to do for fix this?

I try to fix this problem such as

Casting the data type

beam.io.ReadFromPubSub(str(custom_param.input_topic))

But got this error,

ValueError: PubSub topic must be in the form "projects/<project>/topics/<topic>" (got "RuntimeValueProvider(option: input_topic, type: str, default_value: '...')").

So Please anyone have troubleshooting for this? I have no idea how to go no it.

解决方案

As mentioned by @mk_sta

It seems that ReadFromPubSub module doesn't accept ValueProvider. Have you checked this Stack thread?

and explained in that thread, ReadFromPubSub does not currently accept ValueProvider arguments since it is implemented as a native transform in Dataflow.

You can check I/O methods that accept runtime parameters for the ValueProvider support in different SDKs.

So at this moment, if you switch from Python SDK to Java SDK, the Read of PubSubIO does support ValueProvider.

这篇关于数据流到 Beam Pipeline 的附加参数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 23:42:37,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/972778.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:流到   参数   数据   Beam   Pipeline

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!