使用 BashOperator 或 SqlOperator 为其模板拾取外部文件的方法已有一定的记录,但查看 PythonOperator 我对我从文档中了解的内容的测试无法正常工作。我不确定 templates_exts 和 templates_dict 参数如何正确交互以拾取文件。
The method of getting a BashOperator or SqlOperator to pick up an external file for its template is somewhat clearly documented, but looking at the PythonOperator my test of what I understand from the docs is not working. I am not sure how the templates_exts and templates_dict parameters would correctly interact to pick up a file.
在我的dags文件夹中创建: pyoptemplate.sql 和 pyoptemplate.t 以及 test_python_operator_template.py :
In my dags folder I've created: pyoptemplate.sql and pyoptemplate.t as well as test_python_operator_template.py:
SELECT * FROM {{params.table}};pyoptemplate.t:
pyoptemplate.t:
SELECT * FROM {{params.table}};test_python_operator_template.py:
test_python_operator_template.py:
# coding: utf-8 # vim:ai:si:et:sw=4 ts=4 tw=80 """ # A Test of Templates in PythonOperator """ from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime import pprint pp = pprint.PrettyPrinter(indent=4) def templated_function(ds, **kwargs): """This function will try to use templates loaded from external files""" pp.pprint(ds) pp.pprint(kwargs) # Define the DAG dag = DAG(dag_id='test_python_operator_template_dag', default_args={"owner": "lamblin", "start_date": datetime.now()}, template_searchpath=['/Users/daniellamblin/airflow/dags'], schedule_interval='@once') # Define the single task in this controller example DAG op = PythonOperator(task_id='test_python_operator_template', provide_context=True, python_callable=templated_function, templates_dict={ 'pyoptemplate': '', 'pyoptemplate.sql': '', 'sql': 'pyoptemplate', 'file1':'pyoptemplate.sql', 'file2':'pyoptemplate.t', 'table': '{{params.table}}'}, templates_exts=['.sql','.t'], params={'condition_param': True, 'message': 'Hello World', 'table': 'TEMP_TABLE'}, dag=dag)运行结果显示表正确地模板化为字符串,但是其他人没有拉
The result from a run shows that table was templated correctly as a string, but the others did not pull in any files for templating.
dlamblin$ airflow test test_python_operator_template_dag test_python_operator_template 2017-01-18 [2017-01-18 23:58:06,698] {__init__.py:36} INFO - Using executor SequentialExecutor [2017-01-18 23:58:07,342] {models.py:154} INFO - Filling up the DagBag from /Users/daniellamblin/airflow/dags [2017-01-18 23:58:07,620] {models.py:1196} INFO - -------------------------------------------------------------------------------- Starting attempt 1 of 1 -------------------------------------------------------------------------------- [2017-01-18 23:58:07,620] {models.py:1219} INFO - Executing <Task(PythonOperator): test_python_operator_template> on 2017-01-18 00:00:00 '2017-01-18' { u'END_DATE': '2017-01-18', u'conf': <module 'airflow.configuration' from '/Library/Python/2.7/site-packages/airflow/configuration.pyc'>, u'dag': <DAG: test_python_operator_template_dag>, u'dag_run': None, u'ds_nodash': u'20170118', u'end_date': '2017-01-18', u'execution_date': datetime.datetime(2017, 1, 18, 0, 0), u'latest_date': '2017-01-18', u'macros': <module 'airflow.macros' from '/Library/Python/2.7/site-packages/airflow/macros/__init__.pyc'>, u'params': { 'condition_param': True, 'message': 'Hello World', 'table': 'TEMP_TABLE'}, u'run_id': None, u'tables': None, u'task': <Task(PythonOperator): test_python_operator_template>, u'task_instance': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>, u'task_instance_key_str': u'test_python_operator_template_dag__test_python_operator_template__20170118', 'templates_dict': { 'file1': u'pyoptemplate.sql', 'file2': u'pyoptemplate.t', 'pyoptemplate': u'', 'pyoptemplate.sql': u'', 'sql': u'pyoptemplate', 'table': u'TEMP_TABLE'}, u'test_mode': True, u'ti': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>, u'tomorrow_ds': '2017-01-19', u'tomorrow_ds_nodash': u'20170119', u'ts': '2017-01-18T00:00:00', u'ts_nodash': u'20170118T000000', u'yesterday_ds': '2017-01-17', u'yesterday_ds_nodash': u'20170117'} [2017-01-18 23:58:07,634] {python_operator.py:67} INFO - Done. Returned value was: None推荐答案
从Airflow 1.8开始, PythonOperator替换 __ init __ 中的 template_ext 字段的方法无效。任务仅检查 __ class __ 上的 template_ext 。要创建一个拾取SQL模板文件的PythonOperator,您只需执行以下操作:
As of Airflow 1.8, the way the PythonOperator replaces its template_ext field in __init__ doesn't work. Tasks only check template_ext on the __class__. To create a PythonOperator that picks up SQL template files you only need to do the following:
class SQLTemplatedPythonOperator(PythonOperator): template_ext = ('.sql',)然后在以下情况下从任务访问SQL它会运行:
And then to access the SQL from your task when it runs:
SQLTemplatedPythonOperator( templates_dict={'query': 'my_template.sql'}, params={'my_var': 'my_value'}, python_callable=my_func, provide_context=True, ) def my_func(**context): context['templates_dict']['query']更多推荐
使用PythonOperator模板文件进行气流处理
发布评论