Airflow BigQueryOperator:如何将查询结果保存在分区表中?

编程入门 行业动态 更新时间:2024-10-10 13:13:23
本文介绍了Airflow BigQueryOperator:如何将查询结果保存在分区表中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个简单的DAG

I have a simple DAG

from airflow import DAG from airflow.contrib.operators.bigquery_operator import BigQueryOperator with DAG(dag_id='my_dags.my_dag') as dag: start = DummyOperator(task_id='start') end = DummyOperator(task_id='end') sql = """ SELECT * FROM 'another_dataset.another_table' """ bq_query = BigQueryOperator(bql=sql, destination_dataset_table='my_dataset.my_table20180524'), task_id='bq_query', bigquery_conn_id='my_bq_connection', use_legacy_sql=False, write_disposition='WRITE_TRUNCATE', create_disposition='CREATE_IF_NEEDED', query_params={}) start >> bq_query >> end

执行 bq_query 任务时,SQL查询将保存在分片表中。我希望将其保存在每日分区表中。为此,我只将 destination_dataset_table 更改为 my_dataset.my_table $ 20180524 。执行 bq_task 时出现以下错误:

When executing the bq_query task the SQL query gets saved in a sharded table. I want it to get saved in a daily partitioned table. In order to do so, I only changed destination_dataset_table to my_dataset.my_table$20180524. I got the error below when executing the bq_task:

Partitioning specification must be provided in order to create partitioned table

如何指定BigQuery将查询结果保存到每日分区表?我的第一个猜测是在 BigQueryOperator 中使用 query_params ,但是我没有找到有关如何使用该参数。

How can I specify to BigQuery to save query result to a daily partitioned table ? my first guess has been to use query_params in BigQueryOperator but I didn't find any example on how to use that parameter.

编辑:

我正在使用 google -cloud == 0.27.0 python客户端...这是Prod中使用的客户端:(

I'm using google-cloud==0.27.0 python client ... and it's the one used in Prod :(

推荐答案

您首先需要创建一个空的分区目标表。请按照此处的说明进行操作:链接创建一个空的分区表

You first need to create an Empty partitioned destination table. Follow instructions here: link to create an empty partitioned table

然后再次在气流管道下运行您可以尝试以下代码:

and then run below airflow pipeline again. You can try code:

import datetime from airflow import DAG from airflow.contrib.operators.bigquery_operator import BigQueryOperator today_date = datetime.datetime.now().strftime("%Y%m%d") table_name = 'my_dataset.my_table' + '$' + today_date with DAG(dag_id='my_dags.my_dag') as dag: start = DummyOperator(task_id='start') end = DummyOperator(task_id='end') sql = """ SELECT * FROM 'another_dataset.another_table' """ bq_query = BigQueryOperator(bql=sql, destination_dataset_table={{ params.t_name }}), task_id='bq_query', bigquery_conn_id='my_bq_connection', use_legacy_sql=False, write_disposition='WRITE_TRUNCATE', create_disposition='CREATE_IF_NEEDED', query_params={'t_name': table_name}, dag=dag ) start >> bq_query >> end

所以我要做的是创建了一个动态表名变量并将其传递给BQ运算符。

So what I did is that I created a dynamic table name variable and passed to the BQ operator.

更多推荐

Airflow BigQueryOperator:如何将查询结果保存在分区表中?

本文发布于:2023-11-24 11:53:13,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1625097.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:分区表   查询结果   如何将   Airflow   BigQueryOperator

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!