我正在尝试在Airflow的自定义运算符中读取包含带有jinja模板的查询的sql文件.我已经使用PythonOperator实现了它,该函数在我使用
I am trying to read sql file that contains query with jinja templates in the custom operator in Airflow. I have already achieved it using PythonOperator that calls function where I used
def execute_query(**kwargs) sql_query = open('my_sql_query.sql').read() #(SELECT * FROM my_table WHERE date > {}) sql_query.format(kwargs['ds'])但是我更喜欢直接在查询中使用这种语法 {{ds}} SELECT * FROM my_table WHERE date>{{ds}}
but I would prefer use this syntax {{ ds }} directly in the query like SELECT * FROM my_table WHERE date > {{ ds }}
我做了什么:
我的想法不多了.是否可以选择将文件传递给操作员或获取其呈现的内容?
I am running out of ideas. Is there any option to pass the file to the operator or get its rendered content?
推荐答案您的方法还可以.我获取了您的代码,并创建了一个工作示例,显示正在根据需要对 {{ds}} 进行模板化:
Your approach is OK. I took your code and created a working example showing {{ ds }} is being templated as required:
创建一个 .py 文件为:
from datetime import datetime, timedelta from airflow import DAG from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults class SQLOperator(BaseOperator): template_fields = ['sql'] template_ext = ('.sql',) @apply_defaults def __init__( self, name = None, sql = None, *args, **kwargs ) -> None: super().__init__(**kwargs) self.name = name self.sql = sql def execute(self, context): print("Name", self.name) # <- works print("Query", self.sql) # <- Also works :) default_args = { 'owner': 'a', 'start_date': datetime(2020, 3, 24, 2, 0, 0), } dag = DAG( 'sql_operator_test', schedule_interval=None, default_args=default_args) sql_task = SQLOperator( task_id='sql_process', name="Aaa", sql="test.sql", dag=dag).sql 文件为:
SELECT * FROM my_table WHERE date > {{ ds }}运行它会给出:
并且从任务日志中:
更多推荐
在Airflow中的自定义运算符中使用Jinja模板读取SQL文件
发布评论