我有一个简单的DAG
I have a simple DAG
from airflow import DAG from airflow.contrib.operators.bigquery_operator import BigQueryOperator with DAG(dag_id='my_dags.my_dag') as dag: start = DummyOperator(task_id='start') end = DummyOperator(task_id='end') sql = """ SELECT * FROM 'another_dataset.another_table' """ bq_query = BigQueryOperator(bql=sql, destination_dataset_table='my_dataset.my_table20180524'), task_id='bq_query', bigquery_conn_id='my_bq_connection', use_legacy_sql=False, write_disposition='WRITE_TRUNCATE', create_disposition='CREATE_IF_NEEDED', query_params={}) start >> bq_query >> end执行 bq_query 任务时,SQL查询将保存在分片表中。我希望将其保存在每日分区表中。为此,我只将 destination_dataset_table 更改为 my_dataset.my_table $ 20180524 。执行 bq_task 时出现以下错误:
When executing the bq_query task the SQL query gets saved in a sharded table. I want it to get saved in a daily partitioned table. In order to do so, I only changed destination_dataset_table to my_dataset.my_table$20180524. I got the error below when executing the bq_task:
Partitioning specification must be provided in order to create partitioned table如何指定BigQuery将查询结果保存到每日分区表?我的第一个猜测是在 BigQueryOperator 中使用 query_params ,但是我没有找到有关如何使用该参数。
How can I specify to BigQuery to save query result to a daily partitioned table ? my first guess has been to use query_params in BigQueryOperator but I didn't find any example on how to use that parameter.
编辑:
我正在使用 google -cloud == 0.27.0 python客户端...这是Prod中使用的客户端:(
I'm using google-cloud==0.27.0 python client ... and it's the one used in Prod :(
推荐答案您首先需要创建一个空的分区目标表。请按照此处的说明进行操作:链接创建一个空的分区表
You first need to create an Empty partitioned destination table. Follow instructions here: link to create an empty partitioned table
然后再次在气流管道下运行您可以尝试以下代码:
and then run below airflow pipeline again. You can try code:
import datetime from airflow import DAG from airflow.contrib.operators.bigquery_operator import BigQueryOperator today_date = datetime.datetime.now().strftime("%Y%m%d") table_name = 'my_dataset.my_table' + '$' + today_date with DAG(dag_id='my_dags.my_dag') as dag: start = DummyOperator(task_id='start') end = DummyOperator(task_id='end') sql = """ SELECT * FROM 'another_dataset.another_table' """ bq_query = BigQueryOperator(bql=sql, destination_dataset_table={{ params.t_name }}), task_id='bq_query', bigquery_conn_id='my_bq_connection', use_legacy_sql=False, write_disposition='WRITE_TRUNCATE', create_disposition='CREATE_IF_NEEDED', query_params={'t_name': table_name}, dag=dag ) start >> bq_query >> end所以我要做的是创建了一个动态表名变量并将其传递给BQ运算符。
So what I did is that I created a dynamic table name variable and passed to the BQ operator.
更多推荐
Airflow BigQueryOperator:如何将查询结果保存在分区表中?
发布评论