Airflow XCOM Pull仅返回字符串

编程入门 行业动态 更新时间:2024-10-25 12:18:31
本文介绍了Airflow XCOM Pull仅返回字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个气流管道,需要从pubsub订阅中获取文件名,然后将该文件导入到云sql实例中.我使用CloudSqlInstanceImportOperator导入CSV文件.该运算符需要一个正文,其中包含文件名和其他参数.由于我在运行时读取了文件名,因此我还必须在运行时定义主体.所有这一切.但是,当我从xcom中提取主体时,它返回的是字符串而不是python字典.因此,CloudSqlInstanceImportOperator给了我以下错误(我的猜测是,因为主体是字符串而不是字典):

I have an airflow pipeline where I need to get a filename from a pubsub subscription and then import that file into a cloud sql instance. I use the CloudSqlInstanceImportOperator to import the CSV file. This operator needs a body, which contains the filename and other parameters. Since I read that filename during runtime, I also have to define the body during runtime. This all works. But when I pull the body from xcom, it returns a string instead of a python dictionary. So the CloudSqlInstanceImportOperator gives me the following error (my guess is, because the body is a string and not a dictionary):

Traceback (most recent call last) File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 984, in _run_raw_tas result = task_copy.execute(context=context File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 715, in execut self._validate_body_fields( File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 712, in _validate_body_field api_version=self.api_version).validate(self.body File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 420, in validat dictionary_to_validate=body_to_validate File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 341, in _validate_fiel value = dictionary_to_validate.get(field_name AttributeError: 'str' object has no attribute 'get

这是我使用的代码:

import json import os from datetime import datetime, timedelta import ast from airflow import DAG from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator from airflow.contrib.sensors.pubsub_sensor import PubSubPullSensor from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceImportOperator def create_dag(dag_id,default_args): BUCKET = "{{ var.value.gp2pg_bucket }}" GCP_PROJECT_ID = "{{ var.value.gp2pg_project_id }}" INSTANCE_NAME = "{{ var.value.gp2pg_instance_name }}" def define_import_body(file,**kwargs): import_body = { "importContext": { "importUser": "databasename", "database": "databaseuser", "fileType": "csv", "uri": "bucketname" + file, "csvImportOptions": { "table": "schema.tablename", "columns": ["columns1", "column2"]} } } task_instance = kwargs['task_instance'] task_instance.xcom_push(key='import_body', value=import_body) print(import_body) def get_filename(var,**kwargs): message = ast.literal_eval(var) file = message[0].get('message').get('attributes').get('objectId') task_instance = kwargs['task_instance'] task_instance.xcom_push(key='filename', value=file) print(file) dag = DAG(dag_id=dag_id, schedule_interval=None, default_args=default_args) with dag: t1 = PubSubPullSensor(task_id='pull-messages', project="projectname", ack_messages=True, max_messages=1, subscription="subscribtionname") message = "{{ task_instance.xcom_pull() }}" t2 = PythonOperator( task_id='get_filename', python_callable=get_filename, op_kwargs={'var': message}, provide_context=True, ) file = "{{ task_instance.xcom_pull(task_ids='get_filename', key='filename') }}" t3 = PythonOperator( task_id='define_import_body', python_callable=define_import_body, op_kwargs={'file': file}, provide_context=True, ) import_body = "{{ task_instance.xcom_pull(task_ids='define_import_body', key='import_body') }}" t4 = CloudSqlInstanceImportOperator( project_id=GCP_PROJECT_ID, body= import_body, instance=INSTANCE_NAME, gcp_conn_id='postgres_default', task_id='sql_import_task', validate_body=True, ) t5 = GoogleCloudStorageToGoogleCloudStorageOperator( task_id='copy_files', source_bucket=BUCKET, source_object=file, destination_bucket=BUCKET, destination_object='processed/import/'+file, ) t1 >> t2 >> t3 >> t4 >> t5 return dag dags_folder = os.getenv('DAGS_FOLDER', "./dags") flow_config = open(f'{dags_folder}/gp2pg/flow_config.json', 'r').read() for key, values in json.loads(flow_config).items(): default_args = { "owner": "owner", "start_date": datetime(2020, 1, 1), "email": [], "email_on_failure": False, "email_on_retry": False, "retries": 0, "retry_delay": timedelta(minutes=5), } dag_id = f"gp2pg_{key}_data_to_pg" globals()[dag_id] = create_dag(dag_id, default_args)

有什么想法可以解决这个问题吗?

Any idea how I could solve that problem?

推荐答案

第一个 CloudSqlInstanceImportOperator 是已弃用.您应该使用 CloudSQLImportInstanceOperator 来自提供者

First CloudSqlInstanceImportOperator is deprecated. You should use CloudSQLImportInstanceOperator from providers

body 参数必须按照文档.

XCOM是数据库中的表.数据另存为字符串.您不能将dict存储在数据库中,因为dict是内存中的Python对象.您可能有一个Json(字符串).尝试将其转换为字典:

XCOM is a table in the database. The data is saved as strings. You can't store dict in database as dict is a Python in memory object. You probably have a Json (string). Try convert it to dict:

body= json.loads(import_body)

(在评论中讨论之后)

(after discussion in comments)

您将需要用PythonOperator包装运算符,以便可以将 xcom 转换为dict并使用它.

You will need to wrap your operator with PythonOperator so you can convert the xcom to dict and use it.

def my_func(ds, **kwargs): ti = kwargs['ti'] body = ti.xcom_pull(task_ids='privious_task_id') import_body= json.loads(body) op = CloudSqlInstanceImportOperator( project_id=GCP_PROJECT_ID, body= import_body, instance=INSTANCE_NAME, gcp_conn_id='postgres_default', task_id='sql_import_task', validate_body=True, ) op.execute() p = PythonOperator(task_id='python_task', python_callable=my_func)

更多推荐

Airflow XCOM Pull仅返回字符串

本文发布于:2023-11-24 03:21:27,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1623787.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:字符串   Airflow   XCOM   Pull

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!