在Airflow中的自定义运算符中使用Jinja模板读取SQL文件

编程入门 行业动态 更新时间:2024-10-28 01:27:18
本文介绍了在Airflow中的自定义运算符中使用Jinja模板读取SQL文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在尝试在Airflow的自定义运算符中读取包含带有jinja模板的查询的sql文件.我已经使用PythonOperator实现了它,该函数在我使用

I am trying to read sql file that contains query with jinja templates in the custom operator in Airflow. I have already achieved it using PythonOperator that calls function where I used

def execute_query(**kwargs) sql_query = open('my_sql_query.sql').read() #(SELECT * FROM my_table WHERE date > {}) sql_query.format(kwargs['ds'])

但是我更喜欢直接在查询中使用这种语法 {{ds}} SELECT * FROM my_table WHERE date>{{ds}}

but I would prefer use this syntax {{ ds }} directly in the query like SELECT * FROM my_table WHERE date > {{ ds }}

我做了什么:

  • 我使用template_fields和template_ext创建了CustomOperator
  • class SQLOperator(BaseOperator): template_fields = ['sql'] template_ext = ('.sql',) @apply_defaults def __init__( self, name = None, sql = None, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.name = name self.sql = sql def execute(self, context): print("Name", name) # <- works print("Query", sql) # <- doesn't work and I don't know how to get the sql file content

  • 达格
  • default_args = {...} dag = DAG( 'sql_operator_test', schedule_interval='0 0 * * *', template_searchpath=['/Users/username/airflow/dags/sql/test/'], default_args=default_args) sql_task = SQLOperator( task_id='sql_process', name="Aaa", sql="/Users/username/airflow/dags/sql/test.sql", dag=dag)

  • SQL查询
  • SELECT * FROM my_table WHERE date > {{ ds }}

    我的想法不多了.是否可以选择将文件传递给操作员或获取其呈现的内容?

    I am running out of ideas. Is there any option to pass the file to the operator or get its rendered content?

    推荐答案

    您的方法还可以.我获取了您的代码,并创建了一个工作示例,显示正在根据需要对 {{ds}} 进行模板化:

    Your approach is OK. I took your code and created a working example showing {{ ds }} is being templated as required:

    创建一个 .py 文件为:

    from datetime import datetime, timedelta from airflow import DAG from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults class SQLOperator(BaseOperator): template_fields = ['sql'] template_ext = ('.sql',) @apply_defaults def __init__( self, name = None, sql = None, *args, **kwargs ) -> None: super().__init__(**kwargs) self.name = name self.sql = sql def execute(self, context): print("Name", self.name) # <- works print("Query", self.sql) # <- Also works :) default_args = { 'owner': 'a', 'start_date': datetime(2020, 3, 24, 2, 0, 0), } dag = DAG( 'sql_operator_test', schedule_interval=None, default_args=default_args) sql_task = SQLOperator( task_id='sql_process', name="Aaa", sql="test.sql", dag=dag)

    .sql 文件为:

    SELECT * FROM my_table WHERE date > {{ ds }}

    运行它会给出:

    并且从任务日志中:

    更多推荐

    在Airflow中的自定义运算符中使用Jinja模板读取SQL文件

    本文发布于:2023-10-29 03:09:36,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1538547.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:自定义   运算符   模板   文件   Airflow

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!