如何在 Python 中创建 Google Cloud Dataflow Wordcount 自定义模板?

编程入门 行业动态 更新时间:2024-10-19 16:27:30
本文介绍了如何在 Python 中创建 Google Cloud Dataflow Wordcount 自定义模板?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我无法按照以下说明使用 wordcount 示例创建自定义 Google Cloud Dataflow 模板:https://cloud.google/dataflow/docs/guides/templates/creating-templates

我收到与无法访问 RuntimeValueProvider 相关的错误.我做错了什么?

我的主函数wordcount.py:

<预><代码>"""一个字数统计工作流程."""从 __future__ 导入 absolute_import导入参数解析导入日志进口重新从 past.builtins 导入 unicode导入 apache_beam 作为梁从 apache_beam.io 导入 ReadFromText从 apache_beam.io 导入 WriteToText从 apache_beam.metrics 导入指标从 apache_beam.metrics.metric 导入 MetricsFilter从 apache_beam.options.pipeline_options 导入 PipelineOptions, GoogleCloudOptions从 apache_beam.options.pipeline_options 导入 SetupOptions类 WordExtractingDoFn(beam.DoFn):"""将每行输入文本解析为单词."""def __init__(self):self.words_counter = Metrics.counter(self.__class__, 'words')self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths')self.word_lengths_dist = Metrics.distribution(self.__class__, 'word_len_dist')self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines')定义过程(自我,元素):"""返回一个遍历此元素单词的迭代器.元素是一行文本.如果该行是空白的,也要注意这一点.参数:element:正在处理的元素返回:处理的元素."""text_line = element.strip()如果不是 text_line:self.empty_line_counter.inc(1)words = re.findall(r'[\w\']+', text_line, re.UNICODE)对于 w 的话:self.words_counter.inc()self.word_lengths_counter.inc(len(w))self.word_lengths_dist.update(len(w))回话定义运行(argv=无):"""主入口点;定义并运行 wordcount 管道."""类 WordcountOptions(PipelineOptions):@类方法def _add_argparse_args(cls, 解析器):# 使用 add_value_provider_argument 作为模板的参数# 像往常一样使用 add_argument 来处理不可模板化的参数parser.add_value_provider_argument(' - 输入',default='gs://wordcount_custom_template/input/example.txt',help='要读取的文件的路径')parser.add_value_provider_argument(' - 输出',必需=真,default='gs//wordcount_custom_template/output/count',help='将结果写入的输出文件.')pipeline_options = PipelineOptions(['--output', 'some/output_path'])pipeline_options.view_as(SetupOptions).save_main_session = Truep = beam.Pipeline(选项=pipeline_options)wordcount_options = pipeline_options.view_as(WordcountOptions)# 将文本文件[pattern] 读入 PCollection.行 = p |'阅读'>>ReadFromText(wordcount_options.input)# 计算每个单词的出现次数.def count_ones(word_ones):(字,个)= word_ones返回(字,总和(个))计数 =(行|'分裂'>>(beam.ParDo(WordExtractingDoFn()).with_output_types(unicode))|'pair_with_one' >>梁.Map(λ x: (x, 1))|'组'>>梁.GroupByKey()|'计数'>>梁.地图(count_ones))# 将计数格式化为字符串的 PCollection.def format_result(word_count):(word, count) = word_count返回 '​​%s: %d' % (word, count)输出 = 计数 |'格式'>>梁.地图(格式结果)# 使用具有副作用的写入"转换写入输出.# pylint: disable=expression-not-assigned输出|'写'>>WriteToText(wordcount_options.output)结果 = p.run()result.wait_until_finish()# 创建不运行的模板时不要查询指标if (not hasattr(result, 'has_job') # 直接运行或 result.has_job): # 不仅仅是模板创建empty_lines_filter = MetricsFilter().with_name('empty_lines')query_result = result.metrics().query(empty_lines_filter)如果 query_result['counters']:empty_lines_counter = query_result['counters'][0]logging.info('空行数:%d', empty_lines_counter.result)word_lengths_filter = MetricsFilter().with_name('word_len_dist')query_result = result.metrics().query(word_lengths_filter)如果查询结果['分布']:word_lengths_dist = query_result['分布'][0]logging.info('平均字长:%d', word_lengths_dist.result.mean)如果 __name__ == '__main__':logging.getLogger().setLevel(logging.INFO)跑()

我的模板创建代码:

#!/usr/bin/env bashpython wordcount.py \--runner DataflowRunner \--project $PROJECT \--staging_location gs://wordcount_custom_template/staging \--temp_location gs://wordcount_custom_template/temp \--template_location gs://wordcount_custom_template/template/wordcount_template

我收到的错误:

raise error.RuntimeValueProviderError('%s 不可访问' % obj)apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input, type: str, default_value: 'gs://wordcount_custom_template/input/example.txt') 不可访问

我不太明白这个错误信息是什么意思,因为 gs://wordcount_custom_template/input/example.txt 是可访问的

完整的堆栈跟踪:

INFO:root:缺少管道选项(运行程序).使用默认运行器执行管道:DirectRunner.信息:root:====================<函数 annotate_downstream_side_inputs at 0x108e5fa28>====================信息:root:====================<函数lift_combiners at 0x108e5ff50>====================信息:root:====================<函数 expand_gbk 在 0x108e5fde8>====================信息:root:====================<函数 sink_flattens 在 0x108e5fe60>====================信息:root:====================<函数 greedily_fuse at 0x108e5f848>====================信息:root:====================<函数 sort_stages 在 0x108e5faa0>====================信息:root:运行(ref_AppliedPTransform_read/Read_3)+((ref_AppliedPTransform_split_4)+((ref_AppliedPTransform_pair_with_one_5)+(组/写入)))INFO:root:start <DataOutputOperation group/Write >信息:root:start <DoOperation pair_with_one output_tags=['out']>信息:root:start INFO:root:start 回溯(最近一次调用最后一次): 中的文件wordcount.py",第 121 行跑()运行中的文件wordcount.py",第 100 行结果 = p.run()运行中的文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/pipeline.py",第 369 行self.to_runner_api(), self.runner, self._options).run(False)运行中的文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/pipeline.py",第 382 行返回 self.runner.run_pipeline(self)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",第129行,在run_pipeline返回 runner.run_pipeline(pipeline)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",第 215 行,在 run_pipeline返回 self.run_via_runner_api(pipeline.to_runner_api())文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",第218行,run_via_runner_api返回 self.run_stages(*self.create_stages(pipeline_proto))文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",第837行,在run_stagespcoll_buffers, safe_coders).process_bundle.metrics文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",第938行,在run_stageself._progress_frequency).process_bundle(data_input, data_output)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",第1110行,在process_bundleresult_future = self._controller.control_handler.push(process_bundle)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",第 1003 行,推送中响应 = self.worker.do_instruction(请求)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",第 185 行,在 do_instruction 中request.instruction_id)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",第202行,在process_bundleprocessor.process_bundle(instruction_id)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",第286行,在process_bundle操作开始()文件apache_beam/runners/worker/operations.py",第 227 行,在 apache_beam.runners.worker.operations.ReadOperation.start文件apache_beam/runners/worker/operations.py",第 228 行,在 apache_beam.runners.worker.operations.ReadOperation.start文件apache_beam/runners/worker/operations.py",第 229 行,在 apache_beam.runners.worker.operations.ReadOperation.start文件apache_beam/runners/worker/operations.py",第 231 行,在 apache_beam.runners.worker.operations.ReadOperation.start文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py",第 197 行,在 get_range_tracker返回 self._get_concat_source().get_range_tracker(start_position,文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/options/value_provider.py",第 123 行,在 _f引发 error.RuntimeValueProviderError('%s 不可访问'% obj)apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input, type: str, default_value: 'gs://wordcount_custom_template/input/example.txt') 不可访问

我不明白的另一件事是,我如何指定 DataflowRunner 而 DirectRunner 却如堆栈跟踪中所示被调用?

解决方案

在我修改 run(argv) 以从命令行获取 args 后,我成功生成了管道:

parser = argparse.ArgumentParser()known_args, pipeline_args = parser.parse_known_args(argv)管道选项 = 管道选项(管道参数)

所以我认为问题在于 argv 没有正确传递给您的程序.

另外我认为如果你想让输出成为模板参数,请不要将其标记为必需的.

I can't create a custom Google Cloud Dataflow template using the wordcount example following the instructions here: https://cloud.google/dataflow/docs/guides/templates/creating-templates

I get an error relating to the RuntimeValueProvider being unaccessible. What am I doing wrong?

My main function wordcount.py:


"""A word-counting workflow."""

from __future__ import absolute_import

import argparse
import logging
import re

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.options.pipeline_options import SetupOptions


class WordExtractingDoFn(beam.DoFn):
    """Parse each line of input text into words."""

    def __init__(self):
        self.words_counter = Metrics.counter(self.__class__, 'words')
        self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths')
        self.word_lengths_dist = Metrics.distribution(
            self.__class__, 'word_len_dist')
        self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines')

    def process(self, element):
        """Returns an iterator over the words of this element.
        The element is a line of text.  If the line is blank, note that, too.
        Args:
          element: the element being processed
        Returns:
          The processed element.
        """
        text_line = element.strip()
        if not text_line:
            self.empty_line_counter.inc(1)
        words = re.findall(r'[\w\']+', text_line, re.UNICODE)
        for w in words:
            self.words_counter.inc()
            self.word_lengths_counter.inc(len(w))
            self.word_lengths_dist.update(len(w))
        return words


def run(argv=None):
    """Main entry point; defines and runs the wordcount pipeline."""

    class WordcountOptions(PipelineOptions):
        @classmethod
        def _add_argparse_args(cls, parser):
            # Use add_value_provider_argument for arguments to be templatable
            # Use add_argument as usual for non-templatable arguments
            parser.add_value_provider_argument(
                '--input',
                default='gs://wordcount_custom_template/input/example.txt',
                help='Path of the file to read from')
            parser.add_value_provider_argument(
                '--output',
                required=True,
                default='gs//wordcount_custom_template/output/count',
                help='Output file to write results to.')

    pipeline_options = PipelineOptions(['--output', 'some/output_path'])
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=pipeline_options)

    wordcount_options = pipeline_options.view_as(WordcountOptions)

    # Read the text file[pattern] into a PCollection.
    lines = p | 'read' >> ReadFromText(wordcount_options.input)

    # Count the occurrences of each word.
    def count_ones(word_ones):
        (word, ones) = word_ones
        return (word, sum(ones))

    counts = (lines
              | 'split' >> (beam.ParDo(WordExtractingDoFn())
                            .with_output_types(unicode))
              | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
              | 'group' >> beam.GroupByKey()
              | 'count' >> beam.Map(count_ones))

    # Format the counts into a PCollection of strings.
    def format_result(word_count):
        (word, count) = word_count
        return '%s: %d' % (word, count)

    output = counts | 'format' >> beam.Map(format_result)

    # Write the output using a "Write" transform that has side effects.
    # pylint: disable=expression-not-assigned
    output | 'write' >> WriteToText(wordcount_options.output)

    result = p.run()
    result.wait_until_finish()

    # Do not query metrics when creating a template which doesn't run
    if (not hasattr(result, 'has_job')  # direct runner
            or result.has_job):  # not just a template creation
        empty_lines_filter = MetricsFilter().with_name('empty_lines')
        query_result = result.metrics().query(empty_lines_filter)
        if query_result['counters']:
            empty_lines_counter = query_result['counters'][0]
            logging.info('number of empty lines: %d', empty_lines_counter.result)

        word_lengths_filter = MetricsFilter().with_name('word_len_dist')
        query_result = result.metrics().query(word_lengths_filter)
        if query_result['distributions']:
            word_lengths_dist = query_result['distributions'][0]
            logging.info('average word length: %d', word_lengths_dist.result.mean)


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

My template creation code:

#!/usr/bin/env bash
python wordcount.py \
    --runner DataflowRunner \
    --project $PROJECT \
    --staging_location gs://wordcount_custom_template/staging \
    --temp_location gs://wordcount_custom_template/temp \
    --template_location gs://wordcount_custom_template/template/wordcount_template

The error I receive:

raise error.RuntimeValueProviderError('%s not accessible' % obj)
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input, type: str, default_value: 'gs://wordcount_custom_template/input/example.txt') not accessible

I don't really understand what this error message means as gs://wordcount_custom_template/input/example.txt is accessible

Full stacktrace:

INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:root:==================== <function annotate_downstream_side_inputs at 0x108e5fa28> ====================
INFO:root:==================== <function lift_combiners at 0x108e5ff50> ====================
INFO:root:==================== <function expand_gbk at 0x108e5fde8> ====================
INFO:root:==================== <function sink_flattens at 0x108e5fe60> ====================
INFO:root:==================== <function greedily_fuse at 0x108e5f848> ====================
INFO:root:==================== <function sort_stages at 0x108e5faa0> ====================
INFO:root:Running (ref_AppliedPTransform_read/Read_3)+((ref_AppliedPTransform_split_4)+((ref_AppliedPTransform_pair_with_one_5)+(group/Write)))
INFO:root:start <DataOutputOperation group/Write >
INFO:root:start <DoOperation pair_with_one output_tags=['out']>
INFO:root:start <DoOperation split output_tags=['out']>
INFO:root:start <ReadOperation read/Read source=SourceBundle(weight=1.0, source=<apache_beam.io.textio._TextSource object at 0x108cfcd50>, start_position=None, stop_position=None)>
Traceback (most recent call last):
  File "wordcount.py", line 121, in <module>
    run()
  File "wordcount.py", line 100, in run
    result = p.run()
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/pipeline.py", line 369, in run
    self.to_runner_api(), self.runner, self._options).run(False)
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/pipeline.py", line 382, in run
    return self.runner.run_pipeline(self)
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 129, in run_pipeline
    return runner.run_pipeline(pipeline)
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 215, in run_pipeline
    return self.run_via_runner_api(pipeline.to_runner_api())
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 218, in run_via_runner_api
    return self.run_stages(*self.create_stages(pipeline_proto))
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 837, in run_stages
    pcoll_buffers, safe_coders).process_bundle.metrics
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 938, in run_stage
    self._progress_frequency).process_bundle(data_input, data_output)
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1110, in process_bundle
    result_future = self._controller.control_handler.push(process_bundle)
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1003, in push
    response = self.worker.do_instruction(request)
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 185, in do_instruction
    request.instruction_id)
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 202, in process_bundle
    processor.process_bundle(instruction_id)
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 286, in process_bundle
    op.start()
  File "apache_beam/runners/worker/operations.py", line 227, in apache_beam.runners.worker.operations.ReadOperation.start
  File "apache_beam/runners/worker/operations.py", line 228, in apache_beam.runners.worker.operations.ReadOperation.start
  File "apache_beam/runners/worker/operations.py", line 229, in apache_beam.runners.worker.operations.ReadOperation.start
  File "apache_beam/runners/worker/operations.py", line 231, in apache_beam.runners.worker.operations.ReadOperation.start
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py", line 197, in get_range_tracker
    return self._get_concat_source().get_range_tracker(start_position,
  File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/options/value_provider.py", line 123, in _f
    raise error.RuntimeValueProviderError('%s not accessible' % obj)
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input, type: str, default_value: 'gs://wordcount_custom_template/input/example.txt') not accessible

Another thing I don't understand is how can it be that I specify the DataflowRunner yet the DirectRunner is called as shown in the stacktrace?

解决方案

I successfully generated the pipeline after I modified run(argv) to pick up the args from commandline:

parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)                                                                                                                                                                                                                                                                 
pipeline_options = PipelineOptions(pipeline_args)

So I think the problem is that argv is not passed to your program correctly.

Also I think if you'd like to make output a template arg, please do not mark it as required.

这篇关于如何在 Python 中创建 Google Cloud Dataflow Wordcount 自定义模板?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 23:07:06,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/971416.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:自定义   模板   如何在   Python   Cloud

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!