Google Cloud Composer(Airflow)

编程入门 行业动态 更新时间:2024-10-26 14:33:32
本文介绍了Google Cloud Composer(Airflow)-DAG中的数据流作业成功执行,但DAG失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我的DAG看起来像这样

My DAG looks like this

default_args = { 'start_date': airflow.utils.dates.days_ago(0), 'retries': 0, 'dataflow_default_options': { 'project': 'test', 'tempLocation': 'gs://test/dataflow/pipelines/temp/', 'stagingLocation': 'gs://test/dataflow/pipelines/staging/', 'autoscalingAlgorithm': 'BASIC', 'maxNumWorkers': '1', 'region': 'asia-east1' } } dag = DAG( dag_id='gcs_avro_to_bq_dag', default_args=default_args, description='ETL for loading data from GCS(present in the avro format) to BQ', schedule_interval=None, dagrun_timeout=datetime.timedelta(minutes=30)) task = DataFlowJavaOperator( task_id='gcs_avro_to_bq_flow_job', jar='gs://test/dataflow/pipelines/jobs/test-1.0-SNAPSHOT.jar', poll_sleep=1, options={ 'input': '{{ ts }}', }, dag=dag)

我的DAG正在执行一个jar文件. jar文件包含用于运行数据流作业的代码,该数据流作业将数据从GCS写入BQ.该jar本身可以成功执行.

My DAG is executing a jar file. The jar file has the code for running a dataflow job which writes data to BQ from GCS. The jar by itself executes successfully.

当我尝试执行气流作业时,我看到以下错误

When I try to execute the airflow job, I see the following errors

[2020-05-20 17:20:41,934] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:41,840] {gcp_api_base_hook.py:97} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook. [2020-05-20 17:20:41,937] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:41,853] {discovery.py:272} INFO - URL being requested: GET www.googleapis/discovery/v1/apis/dataflow/v1b3/rest [2020-05-20 17:20:44,338] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:44,338] {discovery.py:873} INFO - URL being requested: GET dataflow.googleapis/v1b3/projects/test/locations/asia-east1/jobs/asia-east1?alt=json [2020-05-20 17:20:45,285] {__init__.py:1631} ERROR - <HttpError 404 when requesting dataflow.googleapis/v1b3/projects/test/locations/asia-east1/jobs/asia-east1?alt=json returned "(7e83a8221abb0a9b): Information about job asia-east1 could not be found in our system. Please double check the id is correct. If it is please contact customer support."> Traceback (most recent call last) File "/usr/local/lib/airflow/airflow/models/__init__.py", line 1491, in _run_raw_tas result = task_copy.execute(context=context File "/usr/local/lib/airflow/airflow/contrib/operators/dataflow_operator.py", line 184, in execut self.jar, self.job_class File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 220, in start_java_dataflo self._start_dataflow(variables, name, command_prefix, label_formatter File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 286, in wrappe return func(self, *args, **kwargs File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 200, in _start_dataflo self.poll_sleep, job_id).wait_for_done( File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 44, in __init_ self._job = self._get_job( File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 63, in _get_jo jobId=self._job_id).execute(num_retries=5 File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrappe return wrapped(*args, **kwargs File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 851, in execut raise HttpError(resp, content, uri=self.uri

我做了一些进一步的挖掘,可以看到气流dataflow.googleapis/v1b3/projects/test/locations/asia-east1/jobs/asia-east1

I did some more digging and I can see the following API being called by airflow dataflow.googleapis/v1b3/projects/test/locations/asia-east1/jobs/asia-east1

您可以看到作业后的最后一个参数是asia-east,因此我觉得气流作业正在尝试使用default_args中提供的区域来搜索数据流作业的状态.不知道这是怎么回事,只是想陈述一下这一观点.我在流程DAG中缺少什么吗?而且我的Java工作逻辑看起来像这样

As you can see the last param after jobs is asia-east, so I feel the airflow job is trying to use the region I have provided in the default_args to search for the status of the dataflow job. Not sure if that is what is going on, but just wanted to state that observation. Am I missing something in my flows DAG? Also my java job logic looks like this

public class GcsAvroToBQ { public interface Options extends PipelineOptions { @Description("Input") ValueProvider<String> getInput(); void setInput(ValueProvider<String> value); } /** * Main entry point for executing the pipeline. * * @param args The command-line arguments to the pipeline. */ public static void main(String[] args) { GcsAvroToBQ.Options options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(GcsAvroToBQ.Options.class); options.getJobName(); run(options); } public static PipelineResult run(Options options) { // Create the pipeline Pipeline pipeline = Pipeline.create(options); // My Pipeline logic to read Avro and upload to BQ PCollection<TableRow> tableRowsForBQ; // Data to store in BQ tableRowsForBQ.apply( BigQueryIO.writeTableRows() .to(bqDatasetName) .withSchema(fieldSchemaListBuilder.schema()) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); return pipeline.run(); } }

推荐答案

这是sdk 2.20.0版中已确认的错误

This is a confirmed bug in the sdk version 2.20.0

github/apache/airflow/blob/master/airflow/providers/google/cloud/hooks/dataflow.py#L47

请使用2.19.0版本,它应该可以正常工作.

Please use 2.19.0 version and it should work correctly.

<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>2.19.0</version> <scope>runtime</scope> </dependency>

更多推荐

Google Cloud Composer(Airflow)

本文发布于:2023-11-24 05:20:58,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1624098.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Cloud   Google   Airflow   Composer

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!