使用一个 Python 任务的输出并用作 Airflow 上另一个 Python 任务的输入

编程入门 行业动态 更新时间:2024-10-24 18:27:31
本文介绍了使用一个 Python 任务的输出并用作 Airflow 上另一个 Python 任务的输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

所以我正在使用 Apache Airflow 创建一个数据流,以获取一些存储在 Pandas Dataframe 中的数据,然后将其存储到 MongoDB 中.所以我有两种 python 方法,一种用于获取数据并返回数据帧,另一种用于将其存储到相关数据库中.如何获取一项任务的输出并将其作为另一项任务的输入?这就是我目前所拥有的(总结和浓缩版)

So I'm creating a data flow with Apache Airflow for grabbing some data that's stored in a Pandas Dataframe and then storing it into MongoDB. So I have two python methods, one for fetching the data and returning the dataframe and the other for storing it into the relevant database. How do I take the output of one task and feed it as the input to another task? This is what I have so far (summarized and condensed version)

我研究了 xcom pull 和 push 的概念,这就是我在下面实现的,我还看到有一个用于气流的 MongoHook,但不太确定如何使用它.

I looked into the concept of xcom pull and push and that's what I implemented below , I also saw that there's a MongoHook for Airflow but wasn't quite sure on how to use it.

import pandas as pd import pymongo import airflow from datetime import datetime, timedelta from airflow.models import DAG from airflow.operators.python_operator import PythonOperator def get_data(name, **context): data = pd.read_csv('dataset.csv') df = data.loc[data.name == name] context['ti'].xcom_push(task_ids=['get-data'], value=data) def push_to_db(df, dbname, collection): client = pymongo.MongoClient(-insert creds here-) db = client[dbname][collection] data = df.to_dict(orient='records') db.insert_many(data) args = { 'owner': 'Airflow', 'start_date': airflow.utils.dates.days_ago(2), } dag = DAG( dag_id='simple_xcom', default_args=args, start_date=datetime(2019, 09, 02), schedule_interval="@daily", retries=2 ) task1 = PythonOperator(task_id='get-data', params=['name': 'John'], python_callable=get_data, provide_context=True, dag=dag) task2 = PythonOperator(task_id='load-db', params=['df': context['ti'].xcom_pull(task_ids=['get-data'], key='data'), 'dbname': 'person', 'table': 'salary'), python_callable=push_to_db, provide_context=True, dag=dag) task1 >> task2

每次我尝试运行它时,它都会显示上下文不存在.所以也许我在将一项任务的输出作为另一个任务的输入方面做错了什么?

Everytime I try to run it, it displays that context does not exist. So maybe I'm doing some wrong in terms of feeding the output of one task as the input to another?

推荐答案

查看示例 xcom DAG.

Have a look at the example xcom DAG.

github/apache/airflow/blob/master/airflow/example_dags/example_xcom.py

更多推荐

使用一个 Python 任务的输出并用作 Airflow 上另一个 Python 任务的输入

本文发布于:2023-11-24 10:31:53,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1624849.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Python   Airflow

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!