我在Google Cloud Composer中创建了一个动态Airflow DAG,并在网络服务器中列出了该文件,然后运行(回填)没有错误.但是,存在一些问题:
I have a dynamic Airflow DAG in Google Cloud Composer gets created, listed in the web-server and ran (backfill) without error. However, there are issues:
尝试修复此问题几天...任何提示都会有所帮助.谢谢!
Trying to fix this for couple days...any hint will be helpful. Thank you!
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator from google.cloud import storage from airflow.models import Variable import json args = { 'owner': 'xxx', 'start_date':'2020-11-5', 'provide_context': True } dag = DAG( dag_id='dynamic', default_args=args ) def return_bucket_files(bucket_name='xxxxx', **kwargs): client = storage.Client() bucket = client.get_bucket(bucket_name) blobs = bucket.list_blobs() file_list = [blob.name for blob in blobs] return file_list def dynamic_gcs_to_gbq_etl(file, **kwargs): mapping = json.loads(Variable.get("xxxxx")) database = mapping[0][file] table = mapping[1][file] task=GoogleCloudStorageToBigQueryOperator( task_id= f'gcs_load_{file}_to_gbq', bucket='xxxxxxx', source_objects=[f'{file}'], destination_project_dataset_table=f'xxx.{database}.{table}', write_disposition="WRITE_TRUNCATE", autodetect=True, skip_leading_rows=1, source_format='CSV', dag=dag) return task start_task = DummyOperator( task_id='start', dag=dag ) end_task = DummyOperator( task_id='end', dag=dag) push_bucket_files = PythonOperator( task_id="return_bucket_files", provide_context=True, python_callable=return_bucket_files, dag=dag) for file in return_bucket_files(): gcs_load_task = dynamic_gcs_to_gbq_etl(file) start_task >> push_bucket_files >> gcs_load_task >> end_task 推荐答案此问题意味着Web服务器无法从侧面填充DAG包-此问题很可能不是您的DAG特有的.
This issue means that the Web Server is failing to fill in the DAG bag on its side - this problem is most likely not with your DAG specifically.
我的建议是立即尝试重新启动Web服务器(通过安装一些虚拟软件包).
My suggestion would be right now to try and restart the web server (via the installation of some dummy package).
此帖子和此处.
更多推荐
“似乎丢失了Dag".Cloud Composer Airflow Dynamic DAG中的错误
发布评论