Airflow xcom pull 只返回字符串

编程入门 行业动态 更新时间:2024-10-25 10:25:26
本文介绍了Airflow xcom pull 只返回字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个气流管道,我需要从 pubsub 订阅中获取文件名,然后将该文件导入到云 sql 实例中.我使用 CloudSqlInstanceImportOperator 导入 CSV 文件.该运算符需要一个主体,其中包含文件名和其他参数.由于我在运行时读取了该文件名,因此我还必须在运行时定义主体.这一切都有效.但是当我从 xcom 拉出正文时,它返回一个字符串而不是 python 字典.所以 CloudSqlInstanceImportOperator 给了我以下错误(我的猜测是,因为正文是字符串而不是字典):

I have an airflow pipeline where I need to get a filename from a pubsub subscription and then import that file into a cloud sql instance. I use the CloudSqlInstanceImportOperator to import the CSV file. This operator needs a body, which contains the filename and other parameters. Since I read that filename during runtime, I also have to define the body during runtime. This all works. But when I pull the body from xcom, it returns a string instead of a python dictionary. So the CloudSqlInstanceImportOperator gives me the following error (my guess is, because the body is a string and not a dictionary):

Traceback (most recent call last) File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 984, in _run_raw_tas result = task_copy.execute(context=context File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 715, in execut self._validate_body_fields( File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 712, in _validate_body_field api_version=self.api_version).validate(self.body File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 420, in validat dictionary_to_validate=body_to_validate File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 341, in _validate_fiel value = dictionary_to_validate.get(field_name AttributeError: 'str' object has no attribute 'get

这是我使用的代码:

import json import os from datetime import datetime, timedelta import ast from airflow import DAG from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator from airflow.contrib.sensors.pubsub_sensor import PubSubPullSensor from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceImportOperator def create_dag(dag_id,default_args): BUCKET = "{{ var.value.gp2pg_bucket }}" GCP_PROJECT_ID = "{{ var.value.gp2pg_project_id }}" INSTANCE_NAME = "{{ var.value.gp2pg_instance_name }}" def define_import_body(file,**kwargs): import_body = { "importContext": { "importUser": "databasename", "database": "databaseuser", "fileType": "csv", "uri": "bucketname" + file, "csvImportOptions": { "table": "schema.tablename", "columns": ["columns1", "column2"]} } } task_instance = kwargs['task_instance'] task_instance.xcom_push(key='import_body', value=import_body) print(import_body) def get_filename(var,**kwargs): message = ast.literal_eval(var) file = message[0].get('message').get('attributes').get('objectId') task_instance = kwargs['task_instance'] task_instance.xcom_push(key='filename', value=file) print(file) dag = DAG(dag_id=dag_id, schedule_interval=None, default_args=default_args) with dag: t1 = PubSubPullSensor(task_id='pull-messages', project="projectname", ack_messages=True, max_messages=1, subscription="subscribtionname") message = "{{ task_instance.xcom_pull() }}" t2 = PythonOperator( task_id='get_filename', python_callable=get_filename, op_kwargs={'var': message}, provide_context=True, ) file = "{{ task_instance.xcom_pull(task_ids='get_filename', key='filename') }}" t3 = PythonOperator( task_id='define_import_body', python_callable=define_import_body, op_kwargs={'file': file}, provide_context=True, ) import_body = "{{ task_instance.xcom_pull(task_ids='define_import_body', key='import_body') }}" t4 = CloudSqlInstanceImportOperator( project_id=GCP_PROJECT_ID, body= import_body, instance=INSTANCE_NAME, gcp_conn_id='postgres_default', task_id='sql_import_task', validate_body=True, ) t5 = GoogleCloudStorageToGoogleCloudStorageOperator( task_id='copy_files', source_bucket=BUCKET, source_object=file, destination_bucket=BUCKET, destination_object='processed/import/'+file, ) t1 >> t2 >> t3 >> t4 >> t5 return dag dags_folder = os.getenv('DAGS_FOLDER', "./dags") flow_config = open(f'{dags_folder}/gp2pg/flow_config.json', 'r').read() for key, values in json.loads(flow_config).items(): default_args = { "owner": "owner", "start_date": datetime(2020, 1, 1), "email": [], "email_on_failure": False, "email_on_retry": False, "retries": 0, "retry_delay": timedelta(minutes=5), } dag_id = f"gp2pg_{key}_data_to_pg" globals()[dag_id] = create_dag(dag_id, default_args)

知道如何解决这个问题吗?

Any idea how I could solve that problem?

推荐答案

第一个 CloudSqlInstanceImportOperator 是 已弃用.您应该使用 a> 来自供应商

First CloudSqlInstanceImportOperator is deprecated. You should use CloudSQLImportInstanceOperator from providers

body 参数需要是 dict,如 文档.

The body param needs to be dict as explained in the docs.

XCOM 是数据库中的一个表.数据保存为字符串.您不能将 dict 存储在数据库中,因为 dict 是内存对象中的 Python.您可能有一个 Json(字符串).尝试将其转换为字典:

XCOM is a table in the database. The data is saved as strings. You can't store dict in database as dict is a Python in memory object. You probably have a Json (string). Try convert it to dict:

body=json.loads(import_body)

(在评论中讨论后)

您需要使用 PythonOperator 包装您的运算符,以便您可以将 xcom 转换为 dict 并使用它.

You will need to wrap your operator with PythonOperator so you can convert the xcom to dict and use it.

def my_func(ds, **kwargs): ti = kwargs['ti'] body = ti.xcom_pull(task_ids='privious_task_id') import_body = json.loads(body) op = CloudSqlInstanceImportOperator( project_id=GCP_PROJECT_ID, body=import_body, instance=INSTANCE_NAME, gcp_conn_id='postgres_default', task_id='sql_import_task', validate_body=True, ) op.execute() p = PythonOperator(task_id='python_task', python_callable=my_func)

对于气流 >= 2.1.0:Airflow 添加了将字段呈现为原生 Python 对象的功能.您需要在 DAG 构造函数中设置 render_template_as_native_obj=True.你可以按照这个 文档 示例.

For Airflow >= 2.1.0: Airflow added the ability to render fields as native Python objects. You need to set render_template_as_native_obj=True in your DAG constructor. You can follow this documentation example.

更多推荐

Airflow xcom pull 只返回字符串

本文发布于:2023-11-24 03:21:41,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1623788.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:字符串   Airflow   xcom   pull

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!