问题描述
限时送ChatGPT账号..我无法按照以下说明使用 wordcount 示例创建自定义 Google Cloud Dataflow 模板:https://cloud.google/dataflow/docs/guides/templates/creating-templates
我收到与无法访问 RuntimeValueProvider 相关的错误.我做错了什么?
我的主函数wordcount.py
:
我的模板创建代码:
#!/usr/bin/env bashpython wordcount.py \--runner DataflowRunner \--project $PROJECT \--staging_location gs://wordcount_custom_template/staging \--temp_location gs://wordcount_custom_template/temp \--template_location gs://wordcount_custom_template/template/wordcount_template
我收到的错误:
raise error.RuntimeValueProviderError('%s 不可访问' % obj)apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input, type: str, default_value: 'gs://wordcount_custom_template/input/example.txt') 不可访问
我不太明白这个错误信息是什么意思,因为 gs://wordcount_custom_template/input/example.txt
是可访问的
完整的堆栈跟踪:
INFO:root:缺少管道选项(运行程序).使用默认运行器执行管道:DirectRunner.信息:root:====================<函数 annotate_downstream_side_inputs at 0x108e5fa28>====================信息:root:====================<函数lift_combiners at 0x108e5ff50>====================信息:root:====================<函数 expand_gbk 在 0x108e5fde8>====================信息:root:====================<函数 sink_flattens 在 0x108e5fe60>====================信息:root:====================<函数 greedily_fuse at 0x108e5f848>====================信息:root:====================<函数 sort_stages 在 0x108e5faa0>====================信息:root:运行(ref_AppliedPTransform_read/Read_3)+((ref_AppliedPTransform_split_4)+((ref_AppliedPTransform_pair_with_one_5)+(组/写入)))INFO:root:start <DataOutputOperation group/Write >信息:root:start <DoOperation pair_with_one output_tags=['out']>信息:root:start INFO:root:start 回溯(最近一次调用最后一次): 中的文件wordcount.py",第 121 行跑()运行中的文件wordcount.py",第 100 行结果 = p.run()运行中的文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/pipeline.py",第 369 行self.to_runner_api(), self.runner, self._options).run(False)运行中的文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/pipeline.py",第 382 行返回 self.runner.run_pipeline(self)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",第129行,在run_pipeline返回 runner.run_pipeline(pipeline)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",第 215 行,在 run_pipeline返回 self.run_via_runner_api(pipeline.to_runner_api())文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",第218行,run_via_runner_api返回 self.run_stages(*self.create_stages(pipeline_proto))文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",第837行,在run_stagespcoll_buffers, safe_coders).process_bundle.metrics文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",第938行,在run_stageself._progress_frequency).process_bundle(data_input, data_output)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",第1110行,在process_bundleresult_future = self._controller.control_handler.push(process_bundle)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",第 1003 行,推送中响应 = self.worker.do_instruction(请求)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",第 185 行,在 do_instruction 中request.instruction_id)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",第202行,在process_bundleprocessor.process_bundle(instruction_id)文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",第286行,在process_bundle操作开始()文件apache_beam/runners/worker/operations.py",第 227 行,在 apache_beam.runners.worker.operations.ReadOperation.start文件apache_beam/runners/worker/operations.py",第 228 行,在 apache_beam.runners.worker.operations.ReadOperation.start文件apache_beam/runners/worker/operations.py",第 229 行,在 apache_beam.runners.worker.operations.ReadOperation.start文件apache_beam/runners/worker/operations.py",第 231 行,在 apache_beam.runners.worker.operations.ReadOperation.start文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py",第 197 行,在 get_range_tracker返回 self._get_concat_source().get_range_tracker(start_position,文件/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/options/value_provider.py",第 123 行,在 _f引发 error.RuntimeValueProviderError('%s 不可访问'% obj)apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input, type: str, default_value: 'gs://wordcount_custom_template/input/example.txt') 不可访问
我不明白的另一件事是,我如何指定 DataflowRunner 而 DirectRunner 却如堆栈跟踪中所示被调用?
解决方案在我修改 run(argv) 以从命令行获取 args 后,我成功生成了管道:
parser = argparse.ArgumentParser()known_args, pipeline_args = parser.parse_known_args(argv)管道选项 = 管道选项(管道参数)
所以我认为问题在于 argv 没有正确传递给您的程序.
另外我认为如果你想让输出成为模板参数,请不要将其标记为必需的.
I can't create a custom Google Cloud Dataflow template using the wordcount example following the instructions here: https://cloud.google/dataflow/docs/guides/templates/creating-templates
I get an error relating to the RuntimeValueProvider being unaccessible. What am I doing wrong?
My main function wordcount.py
:
"""A word-counting workflow."""
from __future__ import absolute_import
import argparse
import logging
import re
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.options.pipeline_options import SetupOptions
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def __init__(self):
self.words_counter = Metrics.counter(self.__class__, 'words')
self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths')
self.word_lengths_dist = Metrics.distribution(
self.__class__, 'word_len_dist')
self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines')
def process(self, element):
"""Returns an iterator over the words of this element.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
text_line = element.strip()
if not text_line:
self.empty_line_counter.inc(1)
words = re.findall(r'[\w\']+', text_line, re.UNICODE)
for w in words:
self.words_counter.inc()
self.word_lengths_counter.inc(len(w))
self.word_lengths_dist.update(len(w))
return words
def run(argv=None):
"""Main entry point; defines and runs the wordcount pipeline."""
class WordcountOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# Use add_value_provider_argument for arguments to be templatable
# Use add_argument as usual for non-templatable arguments
parser.add_value_provider_argument(
'--input',
default='gs://wordcount_custom_template/input/example.txt',
help='Path of the file to read from')
parser.add_value_provider_argument(
'--output',
required=True,
default='gs//wordcount_custom_template/output/count',
help='Output file to write results to.')
pipeline_options = PipelineOptions(['--output', 'some/output_path'])
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
wordcount_options = pipeline_options.view_as(WordcountOptions)
# Read the text file[pattern] into a PCollection.
lines = p | 'read' >> ReadFromText(wordcount_options.input)
# Count the occurrences of each word.
def count_ones(word_ones):
(word, ones) = word_ones
return (word, sum(ones))
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
# Format the counts into a PCollection of strings.
def format_result(word_count):
(word, count) = word_count
return '%s: %d' % (word, count)
output = counts | 'format' >> beam.Map(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'write' >> WriteToText(wordcount_options.output)
result = p.run()
result.wait_until_finish()
# Do not query metrics when creating a template which doesn't run
if (not hasattr(result, 'has_job') # direct runner
or result.has_job): # not just a template creation
empty_lines_filter = MetricsFilter().with_name('empty_lines')
query_result = result.metrics().query(empty_lines_filter)
if query_result['counters']:
empty_lines_counter = query_result['counters'][0]
logging.info('number of empty lines: %d', empty_lines_counter.result)
word_lengths_filter = MetricsFilter().with_name('word_len_dist')
query_result = result.metrics().query(word_lengths_filter)
if query_result['distributions']:
word_lengths_dist = query_result['distributions'][0]
logging.info('average word length: %d', word_lengths_dist.result.mean)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
My template creation code:
#!/usr/bin/env bash
python wordcount.py \
--runner DataflowRunner \
--project $PROJECT \
--staging_location gs://wordcount_custom_template/staging \
--temp_location gs://wordcount_custom_template/temp \
--template_location gs://wordcount_custom_template/template/wordcount_template
The error I receive:
raise error.RuntimeValueProviderError('%s not accessible' % obj)
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input, type: str, default_value: 'gs://wordcount_custom_template/input/example.txt') not accessible
I don't really understand what this error message means as gs://wordcount_custom_template/input/example.txt
is accessible
Full stacktrace:
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:root:==================== <function annotate_downstream_side_inputs at 0x108e5fa28> ====================
INFO:root:==================== <function lift_combiners at 0x108e5ff50> ====================
INFO:root:==================== <function expand_gbk at 0x108e5fde8> ====================
INFO:root:==================== <function sink_flattens at 0x108e5fe60> ====================
INFO:root:==================== <function greedily_fuse at 0x108e5f848> ====================
INFO:root:==================== <function sort_stages at 0x108e5faa0> ====================
INFO:root:Running (ref_AppliedPTransform_read/Read_3)+((ref_AppliedPTransform_split_4)+((ref_AppliedPTransform_pair_with_one_5)+(group/Write)))
INFO:root:start <DataOutputOperation group/Write >
INFO:root:start <DoOperation pair_with_one output_tags=['out']>
INFO:root:start <DoOperation split output_tags=['out']>
INFO:root:start <ReadOperation read/Read source=SourceBundle(weight=1.0, source=<apache_beam.io.textio._TextSource object at 0x108cfcd50>, start_position=None, stop_position=None)>
Traceback (most recent call last):
File "wordcount.py", line 121, in <module>
run()
File "wordcount.py", line 100, in run
result = p.run()
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/pipeline.py", line 369, in run
self.to_runner_api(), self.runner, self._options).run(False)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/pipeline.py", line 382, in run
return self.runner.run_pipeline(self)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 129, in run_pipeline
return runner.run_pipeline(pipeline)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 215, in run_pipeline
return self.run_via_runner_api(pipeline.to_runner_api())
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 218, in run_via_runner_api
return self.run_stages(*self.create_stages(pipeline_proto))
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 837, in run_stages
pcoll_buffers, safe_coders).process_bundle.metrics
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 938, in run_stage
self._progress_frequency).process_bundle(data_input, data_output)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1110, in process_bundle
result_future = self._controller.control_handler.push(process_bundle)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1003, in push
response = self.worker.do_instruction(request)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 185, in do_instruction
request.instruction_id)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 202, in process_bundle
processor.process_bundle(instruction_id)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 286, in process_bundle
op.start()
File "apache_beam/runners/worker/operations.py", line 227, in apache_beam.runners.worker.operations.ReadOperation.start
File "apache_beam/runners/worker/operations.py", line 228, in apache_beam.runners.worker.operations.ReadOperation.start
File "apache_beam/runners/worker/operations.py", line 229, in apache_beam.runners.worker.operations.ReadOperation.start
File "apache_beam/runners/worker/operations.py", line 231, in apache_beam.runners.worker.operations.ReadOperation.start
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py", line 197, in get_range_tracker
return self._get_concat_source().get_range_tracker(start_position,
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/options/value_provider.py", line 123, in _f
raise error.RuntimeValueProviderError('%s not accessible' % obj)
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input, type: str, default_value: 'gs://wordcount_custom_template/input/example.txt') not accessible
Another thing I don't understand is how can it be that I specify the DataflowRunner yet the DirectRunner is called as shown in the stacktrace?
解决方案I successfully generated the pipeline after I modified run(argv) to pick up the args from commandline:
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
So I think the problem is that argv is not passed to your program correctly.
Also I think if you'd like to make output a template arg, please do not mark it as required.
这篇关于如何在 Python 中创建 Google Cloud Dataflow Wordcount 自定义模板?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论