如何动态嵌套Airflow DAG?

编程入门 行业动态 更新时间:2024-10-17 15:24:41
本文介绍了如何动态嵌套Airflow DAG?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个由三个运算符组成的简单DAG。第一个是具有我们自己功能的 PythonOperator ,其他两个是 airflow.contrib ( FileToGoogleCloudStorageOperator 和 GoogleCloudStorageToBigQueryOperator )。它们按顺序工作。根据参数,我们的自定义任务会生成许多文件,通常在2到5之间。所有这些文件都必须由后续任务分别处理。这意味着我想要几个下游分支,但是在运行DAG之前,确切知道多少个分支。

I have a simple DAG of three operators. The first one is PythonOperator with our own functionality, the other two are standard operators from airflow.contrib (FileToGoogleCloudStorageOperator and GoogleCloudStorageToBigQueryOperator to be precise). They work in sequence. Our custom task produces a number of files, typically between 2 and 5, depending on the parameters. All of these files have to be processed by subsequent tasks separately. That means I want several downstream branches, but it's unknowable how many exactly before the DAG is run.

您将如何解决此问题?

更新:

使用jhnclvr在他的BranchPythonOperator ://://stackoverflow/a/44636828/8184313>另一个答复作为出发点,我创建了一个运算符,该运算符将根据条件跳过或继续执行分支。这种方法仅是可行的,因为已知最大可能的分支数量并且足够小。

Using BranchPythonOperator that jhnclvr mentioned in his another reply as a point of departure, I created an operator that would skip or continue executing a branch, depending on condition. This approach is feasible only because highest possible number of branches is known and sufficiently small.

运算符:

class SkipOperator(PythonOperator): def execute(self, context): boolean = super(SkipOperator, self).execute(context) session = settings.Session() for task in context['task'].downstream_list: if boolean is False: ti = TaskInstance( task, execution_date=context['ti'].execution_date) ti.state = State.SKIPPED ti.start_date = datetime.now() ti.end_date = datetime.now() session.merge(ti) sessionmit() session.close()

用法:

def check(i, templates_dict=None, **kwargs): return len(templates_dict["data_list"].split(",")) > i dag = DAG( dag_name, default_args=default_args, schedule_interval=None ) load = CustomOperator( task_id="load_op", bash_command=' '.join([ './command.sh' '--data-list {{ dag_run.conf["data_list"]|join(",") }}' ]), dag=dag ) for i in range(0, 5): condition = SkipOperator( task_id=f"{dag_name}_condition_{i}", python_callable=partial(check, i), provide_context=True, templates_dict={"data_list": '{{ dag_run.conf["data_list"]|join(",") }}'}, dag=dag ) gs_filename = 'prefix_{{ dag_run.conf["data_list"][%d] }}.json' % i load_to_gcs = CustomFileToGoogleCloudStorageOperator( task_id=f"{dag_name}_to_gs_{i}", src='/tmp/{{ run_id }}_%d.{{ dag_run.conf["file_extension"] }}' % i, bucket=gs_bucket, dst=gs_filename, mime_type='application/json', google_cloud_storage_conn_id=connection_id, dag=dag ) load_to_bq = GoogleCloudStorageToBigQueryOperator( task_id=f"{dag_name}_to_bq_{i}", bucket=gs_bucket, source_objects=[gs_filename, ], source_format='NEWLINE_DELIMITED_JSON', destination_project_dataset_table='myproject.temp_{{ dag_run.conf["data_list"][%d] }}' % i, bigquery_conn_id=connection_id, schema_fields={}, google_cloud_storage_conn_id=connection_id, write_disposition='WRITE_TRUNCATE', dag=dag ) condition.set_upstream(load) load_to_gcs.set_upstream(condition) load_to_bq.set_upstream(load_to_gcs)

推荐答案

看到一个相似(但不同)的问题

基本上,您不能将任务添加到 DAG 当它运行时。您需要提前知道要添加多少个任务。

Basically, you can't add tasks to a DAG when it's running. You would need to know ahead of time how many tasks you wanted to add.

您可以使用一个运算符处理N个文件。

You could process N files using a single operator.

或者,如果您还有另一个单独的dag可以处理一个文件,您可以触发该 DAG N次,在conf中传递文件名。

Or, if you have another separate dag that processes a file you could trigger that DAG N times, passing the name of the file in the conf.

请参见此处获取 TriggerDagRunOperator 。

请参见此处以获取要触发的 DAG 。

最后看到这篇帖子,上面的例子来自。

更多推荐

如何动态嵌套Airflow DAG?

本文发布于:2023-07-07 22:11:31,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1068475.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:嵌套   动态   DAG   Airflow

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!