将Airflow的PostgresOperator与Jinja模板和SQL一起使用时,TemplateNotFound

编程入门 行业动态 更新时间:2024-10-13 20:21:26
本文介绍了将Airflow的PostgresOperator与Jinja模板和SQL一起使用时,TemplateNotFound的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

当尝试通过PostgresOperator使用Airflow的模板功能(通过Jinja2)时,我无法进行渲染。我很有可能做错了事,但是我对可能的问题感到很困惑。这是一个重现我收到的TemplateNotFound错误的示例:

When trying to use Airflow's templating capabilities (via Jinja2) with the PostgresOperator, I've been unable to get things to render. It's quite possible I'm doing something wrong, but I'm pretty lost as to what the issue might be. Here's an example to reproduce the TemplateNotFound error I've been getting:

airflow.cfg

airflow_home = /home/gregreda/airflow dags_folder = /home/gregreda/airflow/dags

相关DAG和变量

default_args = { 'owner': 'gregreda', 'start_date': datetime(2016, 6, 1), 'schedule_interval': None, 'depends_on_past': False, 'retries': 3, 'retry_delay': timedelta(minutes=5) } this_dag_path = '/home/gregreda/airflow/dags/example_csv_to_redshift' dag = DAG( dag_id='example_csv_to_redshift', schedule_interval=None, default_args=default_args )

/example_csv_to_redshift/csv_to_redshift.py

copy_s3_to_redshift = PostgresOperator( task_id='load_table', sql=this_dag_path + '/copy_to_redshift.sql', params=dict( AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'), AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY') ), postgres_conn_id='postgres_redshift', autocommit=False, dag=dag )

/example_csv_to_redshift/copy_to_redshift.sql

COPY public.table_foobar FROM 's3://mybucket/test-data/import/foobar.csv' CREDENTIALS 'aws_access_key_id={{ AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ AWS_SECRET_ACCESS_KEY }}' CSV NULL as 'null' IGNOREHEADER as 1;

调用气流渲染example_csv_to_redshift load_table 2016-06-14 引发以下异常。请注意,我也在另一个DAG中遇到了这个问题,这就是为什么您看到带有 example_redshift_query_to_csv 的路径的原因。

Calling airflow render example_csv_to_redshift load_table 2016-06-14 throws the exception below. Note I'm running into this issue for another DAG as well, which is why you see the path with example_redshift_query_to_csv mentioned.

[2016-06-14 21:24:57,484] {__init__.py:36} INFO - Using executor SequentialExecutor [2016-06-14 21:24:57,565] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt [2016-06-14 21:24:57,596] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt [2016-06-14 21:24:57,763] {models.py:154} INFO - Filling up the DagBag from /home/gregreda/airflow/dags [2016-06-14 21:24:57,828] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files setattr(self, attr, env.loader.get_source(env, content)[0]) File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source raise TemplateNotFound(template) TemplateNotFound: /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql [2016-06-14 21:24:57,834] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files setattr(self, attr, env.loader.get_source(env, content)[0]) File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source raise TemplateNotFound(template) TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql Traceback (most recent call last): File "/usr/local/bin/airflow", line 15, in <module> args.func(args) File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 359, in render ti.render_templates() File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1409, in render_templates rendered_content = rt(attr, content, jinja_context) File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2017, in render_template return jinja_env.get_template(content).render(**context) File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 812, in get_template return self._load_template(name, self.make_globals(globals)) File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 774, in _load_template cache_key = self.loader.get_source(self, name)[1] File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql

任何解决问题的想法都值得赞赏。

Any ideas towards a fix are much appreciated.

推荐答案

标准 PEBCAK错误。

在指定给定Airflow任务中指定SQL模板的路径时出现问题,该问题必须是相对的。

There was an issue specifying the path to the SQL template within the given Airflow task, which needed to be relative.

copy_s3_to_redshift = PostgresOperator( task_id='load_table', sql='/copy_to_redshift.sql', params=dict( AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'), AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY') ), postgres_conn_id='postgres_redshift', autocommit=False, dag=dag )

,则需要对SQL模板进行一些更改(请注意 params。 ... 这次):

Additionally, the SQL template needed to be changed slightly (note the params. ... this time):

COPY public.pitches FROM 's3://mybucket/test-data/import/heyward.csv' CREDENTIALS 'aws_access_key_id={{ params.AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ params.AWS_SECRET_ACCESS_KEY }}' CSV NULL as 'null' IGNOREHEADER as 1;

更多推荐

将Airflow的PostgresOperator与Jinja模板和SQL一起使用时,TemplateNotFound

本文发布于:2023-11-24 12:37:28,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1625230.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:模板   PostgresOperator   Airflow   Jinja   TemplateNotFound

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!