“传递的参数无效”对于使用气流将mysql数据加载到bigquery的dag错误

编程入门 行业动态 更新时间:2024-10-26 14:27:00
本文介绍了“传递的参数无效”对于使用气流将mysql数据加载到bigquery的dag错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我运行一个提取MySQL数据并将其加载到BigQuery中的DAG。我正在正确地得到以下错误:

/usr/local/lib/python2.7/dist-packages/airflow/models.py :1927年:PendingDeprecationWarning:传递给MySqlToGoogleCloudStorageOperator的参数无效。支持传递这样的参数将被丢弃在Airflow 2.0中。无效的参数是:

* args:()

** kwargs:{'google_cloud_storage_connn_id':'podioGCPConnection '} category = PendingDeprecationWarning

/usr/local/lib/python2.7/dist-packages/airflow/models.py:1927:PendingDeprecationWarning:无效参数传递给GoogleCloudStorageToBigQueryOperator。支持传递这样的参数将被丢弃在Airflow 2.0中。无效参数为:

* args:()

** kwargs:{'project_id':'podio -data'} category = PendingDeprecationWarning

dag的代码在这里:

my_connections = ['podiotestmySQL'] my_tables = ['logistics_orders', 'logistics_waybills','logistics_shipping_lines','logistics_info_requests'] $ b default_args = {'owner':'tia','start_date':datetime(2018,1,2),'depends_on_past':False,'retries':1,'retry_delay':timedelta(minutes = 5),} dag = DAG('etl',default_args = default_args,schedule_interval = timedelta(days = 1)) slack_notify = SlackAPIPostOperator( task_id ='slack_notfiy', token ='xxxxxx', channel ='data-status', username ='a irflow', text ='成功执行podio ETL操作', dag = dag) 用于连接my_connections:用于my_tables中的表: extract = MySqlToGoogleCloudStorageOperator( task_id =extract_mysql_%s_%s%(connection,table), mysql_conn_id = connection, google_cloud_storage_connn_id ='podioGCPConnection', sql = SELECT *,'%s'作为源podiodb。%s%(connection,table), bucket ='podio-reader-storage', filename ='%s /%s /% (连接,表), schema_filename ='%s / schemas /%s.json'%(connection,table), dag = dag) load = GoogleCloudStorageToBigQueryOperator( task_id =load_bg_%s_%s%(connection,table), bigquery_conn_id ='podioGCPConnection', google_cloud_storage_conn_id ='podioGCPConnection', bucket ='podio-reader-storage', destination_project_dataset_table =Podio_Data1。%s /%s%(connection,table), source_objects = [%s /%s /%s *。 json%(connection,table,table)], schema_object =%s / schemas /%s.json%(connection,table), source_format ='NEWLINE_DELIMITED_JSON', create_disposition ='CREATE_IF_NEEDED', write_disposition ='WRITE_TRUNCATE', project_id ='podio-data', dag = dag) load.set_upstream(extract) slack_notify.set_upstream(load)

解决方案

来源:

I running a DAG that extracts MySQL data and loads it to BigQuery in airflow. I am currectly getting the following error:

/usr/local/lib/python2.7/dist-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to MySqlToGoogleCloudStorageOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:

*args: ()

**kwargs: {'google_cloud_storage_connn_id': 'podioGCPConnection'} category=PendingDeprecationWarning

/usr/local/lib/python2.7/dist-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to GoogleCloudStorageToBigQueryOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:

*args: ()

**kwargs: {'project_id': 'podio-data'} category=PendingDeprecationWarning

The code for the dag is here:

my_connections = [ 'podiotestmySQL' ] my_tables = [ 'logistics_orders', 'logistics_waybills', 'logistics_shipping_lines', 'logistics_info_requests' ] default_args = { 'owner' : 'tia', 'start_date' : datetime(2018, 1, 2), 'depends_on_past' : False, 'retries' : 1, 'retry_delay':timedelta(minutes=5), } dag = DAG('etl', default_args=default_args,schedule_interval=timedelta(days=1)) slack_notify = SlackAPIPostOperator ( task_id = 'slack_notfiy', token = 'xxxxxx', channel='data-status', username = 'airflow', text = 'Successfully performed podio ETL operation', dag=dag) for connection in my_connections: for table in my_tables: extract = MySqlToGoogleCloudStorageOperator( task_id="extract_mysql_%s_%s"%(connection,table), mysql_conn_id = connection, google_cloud_storage_connn_id = 'podioGCPConnection', sql = "SELECT *, '%s' as source FROM podiodb.%s"%(connection,table), bucket='podio-reader-storage', filename= '%s/%s/%s{}.json'%(connection,table,table), schema_filename='%s/schemas/%s.json'%(connection,table), dag=dag) load =GoogleCloudStorageToBigQueryOperator( task_id = "load_bg_%s_%s"%(connection,table), bigquery_conn_id = 'podioGCPConnection', google_cloud_storage_conn_id = 'podioGCPConnection', bucket = 'podio-reader-storage', destination_project_dataset_table = "Podio_Data1.%s/%s"%(connection,table), source_objects = ["%s/%s/%s*.json"%(connection,table,table)], schema_object = "%s/schemas/%s.json"%(connection,table), source_format = 'NEWLINE_DELIMITED_JSON', create_disposition = 'CREATE_IF_NEEDED', write_disposition = 'WRITE_TRUNCATE', project_id = 'podio-data', dag=dag) load.set_upstream(extract) slack_notify.set_upstream(load)

解决方案

Reading the source here: github/apache/incubator-airflow/blob/master/airflow/contrib/operators/gcs_to_bq.py

Please remove these parameters from the default arguments:

google_cloud_storage_connn_id = 'podioGCPConnection' project_id = 'podio-data',

You need to create a connection in the Airflow dashboard.

更多推荐

“传递的参数无效”对于使用气流将mysql数据加载到bigquery的dag错误

本文发布于:2023-11-24 11:36:04,感谢您对本站的认可!
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:气流   加载   错误   参数   数据

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!