Python Airflow

编程入门 行业动态 更新时间:2024-10-28 06:25:08
本文介绍了Python Airflow-从PythonOperator返回结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我写了一个包含多个PythonOperator的DAG

I have written a DAG with multiple PythonOperators

task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment', provide_context=True, python_callable=Task1, dag=dag1) def Task1(**kwargs): return(kwargs['dag_run'].conf.get('file'))

从PythonOperator中,我正在调用 Task1方法。该方法正在返回一个值,该值需要传递给下一个PythonOperator。如何从 task1变量中获取该值,或者如何从Task1方法中返回该值?

From PythonOperator i am calling "Task1" method. That method is returning a value,that value i need to pass to the next PythonOperator.How can i get the value from the "task1" variable or How can i get the value which is returned from Task1 method?

已更新:

def Task1(**kwargs): file_name = kwargs['dag_run'].conf.get[file] task_instance = kwargs['task_instance'] task_instance.xcom_push(key='file', value=file_name) return file_name t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag) t2 = BashOperator( task_id='Moving_bucket', bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ', dag=dag, ) t2.set_upstream(t1)

推荐答案

您可能想查看Airflow的XCOM: airflow.apache/concepts.html#xcoms

You might want to check out Airflow's XCOM: airflow.apache/concepts.html#xcoms

如果您从函数返回值,则此值存储在xcom中。在您的情况下,您可以从其他Python代码中访问它:

If you return a value from a function, this value is stored in xcom. In your case, you could access it like so from other Python code:

task_instance = kwargs['task_instance'] task_instance.xcom_pull(task_ids='Task1')

或在如下模板中:

{{ task_instance.xcom_pull(task_ids='Task1') }}

如果要指定键,则可以推入XCOM(在任务内部):

If you want to specify a key you can push into XCOM (being inside a task):

task_instance = kwargs['task_instance'] task_instance.xcom_push(key='the_key', value=my_str)

然后稍后您可以像这样访问它:

Then later on you can access it like so:

task_instance.xcom_pull(task_ids='my_task', key='the_key')

编辑1

后续问题:我不像在其他函数中使用值那样,如何将值传递给另一个PythonOperator,例如- t2 = BashOperator(task_id = 'Moving_bucket',bash_command ='python /home/raw.py%s' %file_name,dag = dag)-我想访问 Task1返回的file_name。

首先,在我看来,价值实际上不是 被传递给另一个 PythonOperator 但传递给 BashOperator 。

First of all, it seems to me that the value is, in fact, not being passed to another PythonOperator but to a BashOperator.

第二,这已经在上面的答案中涵盖了。字段 bash_command 是模板化的(请参见源中的 template_fields : github/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py )。因此,我们可以使用模板化版本:

Secondly, this is already covered in my answer above. The field bash_command is templated (see template_fields in the source: github/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py). Hence, we can use the templated version:

BashOperator( task_id='Moving_bucket', bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ', dag=dag, )

编辑2

说明:气流的作用类似于此:它将执行Task1,然后填充xcom,然后执行下一个任务。因此,要使您的示例正常工作,您需要先执行Task1,然后在Task1的下游执行Moving_bucket。

Explanation: Airflow works like this: It will execute Task1, then populate xcom and then execute the next task. So for your example to work you need Task1 executed first and then execute Moving_bucket downstream of Task1.

由于使用的是返回函数,因此您也可以从<$ c中省略 key ='file' $ c> xcom_pull ,而不是在函数中手动设置。

Since you are using a return function, you could also omit the key='file' from xcom_pull and not manually set it in the function.

更多推荐

Python Airflow

本文发布于:2023-11-23 18:13:30,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622415.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Python   Airflow

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!