Airflow XCOM Pull仅返回字符串

编程入门 行业动态 更新时间:2024-10-25 12:18:31
本文介绍了Airflow XCOM Pull仅返回字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述


I have an airflow pipeline where I need to get a filename from a pubsub subscription and then import that file into a cloud sql instance. I use the CloudSqlInstanceImportOperator to import the CSV file. This operator needs a body, which contains the filename and other parameters. Since I read that filename during runtime, I also have to define the body during runtime. This all works. But when I pull the body from xcom, it returns a string instead of a python dictionary. So the CloudSqlInstanceImportOperator gives me the following error (my guess is, because the body is a string and not a dictionary):

Traceback (most recent call last) File "/usr/local/lib/airflow/airflow/models/", line 984, in _run_raw_tas result = task_copy.execute(context=context File "/usr/local/lib/airflow/airflow/contrib/operators/", line 715, in execut self._validate_body_fields( File "/usr/local/lib/airflow/airflow/contrib/operators/", line 712, in _validate_body_field api_version=self.api_version).validate(self.body File "/usr/local/lib/airflow/airflow/contrib/utils/", line 420, in validat dictionary_to_validate=body_to_validate File "/usr/local/lib/airflow/airflow/contrib/utils/", line 341, in _validate_fiel value = dictionary_to_validate.get(field_name AttributeError: 'str' object has no attribute 'get


import json import os from datetime import datetime, timedelta import ast from airflow import DAG from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator from airflow.contrib.sensors.pubsub_sensor import PubSubPullSensor from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceImportOperator def create_dag(dag_id,default_args): BUCKET = "{{ var.value.gp2pg_bucket }}" GCP_PROJECT_ID = "{{ var.value.gp2pg_project_id }}" INSTANCE_NAME = "{{ var.value.gp2pg_instance_name }}" def define_import_body(file,**kwargs): import_body = { "importContext": { "importUser": "databasename", "database": "databaseuser", "fileType": "csv", "uri": "bucketname" + file, "csvImportOptions": { "table": "schema.tablename", "columns": ["columns1", "column2"]} } } task_instance = kwargs['task_instance'] task_instance.xcom_push(key='import_body', value=import_body) print(import_body) def get_filename(var,**kwargs): message = ast.literal_eval(var) file = message[0].get('message').get('attributes').get('objectId') task_instance = kwargs['task_instance'] task_instance.xcom_push(key='filename', value=file) print(file) dag = DAG(dag_id=dag_id, schedule_interval=None, default_args=default_args) with dag: t1 = PubSubPullSensor(task_id='pull-messages', project="projectname", ack_messages=True, max_messages=1, subscription="subscribtionname") message = "{{ task_instance.xcom_pull() }}" t2 = PythonOperator( task_id='get_filename', python_callable=get_filename, op_kwargs={'var': message}, provide_context=True, ) file = "{{ task_instance.xcom_pull(task_ids='get_filename', key='filename') }}" t3 = PythonOperator( task_id='define_import_body', python_callable=define_import_body, op_kwargs={'file': file}, provide_context=True, ) import_body = "{{ task_instance.xcom_pull(task_ids='define_import_body', key='import_body') }}" t4 = CloudSqlInstanceImportOperator( project_id=GCP_PROJECT_ID, body= import_body, instance=INSTANCE_NAME, gcp_conn_id='postgres_default', task_id='sql_import_task', validate_body=True, ) t5 = GoogleCloudStorageToGoogleCloudStorageOperator( task_id='copy_files', source_bucket=BUCKET, source_object=file, destination_bucket=BUCKET, destination_object='processed/import/'+file, ) t1 >> t2 >> t3 >> t4 >> t5 return dag dags_folder = os.getenv('DAGS_FOLDER', "./dags") flow_config = open(f'{dags_folder}/gp2pg/flow_config.json', 'r').read() for key, values in json.loads(flow_config).items(): default_args = { "owner": "owner", "start_date": datetime(2020, 1, 1), "email": [], "email_on_failure": False, "email_on_retry": False, "retries": 0, "retry_delay": timedelta(minutes=5), } dag_id = f"gp2pg_{key}_data_to_pg" globals()[dag_id] = create_dag(dag_id, default_args)


Any idea how I could solve that problem?


第一个 CloudSqlInstanceImportOperator 是已弃用.您应该使用 CloudSQLImportInstanceOperator 来自提供者

First CloudSqlInstanceImportOperator is deprecated. You should use CloudSQLImportInstanceOperator from providers

body 参数必须按照文档.


XCOM is a table in the database. The data is saved as strings. You can't store dict in database as dict is a Python in memory object. You probably have a Json (string). Try convert it to dict:

body= json.loads(import_body)


(after discussion in comments)

您将需要用PythonOperator包装运算符,以便可以将 xcom 转换为dict并使用它.

You will need to wrap your operator with PythonOperator so you can convert the xcom to dict and use it.

def my_func(ds, **kwargs): ti = kwargs['ti'] body = ti.xcom_pull(task_ids='privious_task_id') import_body= json.loads(body) op = CloudSqlInstanceImportOperator( project_id=GCP_PROJECT_ID, body= import_body, instance=INSTANCE_NAME, gcp_conn_id='postgres_default', task_id='sql_import_task', validate_body=True, ) op.execute() p = PythonOperator(task_id='python_task', python_callable=my_func)


Airflow XCOM Pull仅返回字符串

本文发布于:2023-11-24 03:21:27,感谢您对本站的认可!
本文标签:字符串   Airflow   XCOM   Pull


评论列表 (有 0 条评论)


编程频道|电子爱好者 - 技术资讯及电子产品介绍!