使用PythonOperator模板文件进行气流处理

编程入门 行业动态 更新时间:2024-10-25 00:28:56
本文介绍了使用PythonOperator模板文件进行气流处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

使用 BashOperator 或 SqlOperator 为其模板拾取外部文件的方法已有一定的记录,但查看 PythonOperator 我对我从文档中了解的内容的测试无法正常工作。我不确定 templates_exts 和 templates_dict 参数如何正确交互以拾取文件。

The method of getting a BashOperator or SqlOperator to pick up an external file for its template is somewhat clearly documented, but looking at the PythonOperator my test of what I understand from the docs is not working. I am not sure how the templates_exts and templates_dict parameters would correctly interact to pick up a file.

在我的dags文件夹中创建: pyoptemplate.sql 和 pyoptemplate.t 以及 test_python_operator_template.py :

In my dags folder I've created: pyoptemplate.sql and pyoptemplate.t as well as test_python_operator_template.py:

SELECT * FROM {{params.table}};

pyoptemplate.t:

pyoptemplate.t:

SELECT * FROM {{params.table}};

test_python_operator_template.py:

test_python_operator_template.py:

# coding: utf-8 # vim:ai:si:et:sw=4 ts=4 tw=80 """ # A Test of Templates in PythonOperator """ from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime import pprint pp = pprint.PrettyPrinter(indent=4) def templated_function(ds, **kwargs): """This function will try to use templates loaded from external files""" pp.pprint(ds) pp.pprint(kwargs) # Define the DAG dag = DAG(dag_id='test_python_operator_template_dag', default_args={"owner": "lamblin", "start_date": datetime.now()}, template_searchpath=['/Users/daniellamblin/airflow/dags'], schedule_interval='@once') # Define the single task in this controller example DAG op = PythonOperator(task_id='test_python_operator_template', provide_context=True, python_callable=templated_function, templates_dict={ 'pyoptemplate': '', 'pyoptemplate.sql': '', 'sql': 'pyoptemplate', 'file1':'pyoptemplate.sql', 'file2':'pyoptemplate.t', 'table': '{{params.table}}'}, templates_exts=['.sql','.t'], params={'condition_param': True, 'message': 'Hello World', 'table': 'TEMP_TABLE'}, dag=dag)

运行结果显示表正确地模板化为字符串,但是其他人没有拉

The result from a run shows that table was templated correctly as a string, but the others did not pull in any files for templating.

dlamblin$ airflow test test_python_operator_template_dag test_python_operator_template 2017-01-18 [2017-01-18 23:58:06,698] {__init__.py:36} INFO - Using executor SequentialExecutor [2017-01-18 23:58:07,342] {models.py:154} INFO - Filling up the DagBag from /Users/daniellamblin/airflow/dags [2017-01-18 23:58:07,620] {models.py:1196} INFO - -------------------------------------------------------------------------------- Starting attempt 1 of 1 -------------------------------------------------------------------------------- [2017-01-18 23:58:07,620] {models.py:1219} INFO - Executing <Task(PythonOperator): test_python_operator_template> on 2017-01-18 00:00:00 '2017-01-18' { u'END_DATE': '2017-01-18', u'conf': <module 'airflow.configuration' from '/Library/Python/2.7/site-packages/airflow/configuration.pyc'>, u'dag': <DAG: test_python_operator_template_dag>, u'dag_run': None, u'ds_nodash': u'20170118', u'end_date': '2017-01-18', u'execution_date': datetime.datetime(2017, 1, 18, 0, 0), u'latest_date': '2017-01-18', u'macros': <module 'airflow.macros' from '/Library/Python/2.7/site-packages/airflow/macros/__init__.pyc'>, u'params': { 'condition_param': True, 'message': 'Hello World', 'table': 'TEMP_TABLE'}, u'run_id': None, u'tables': None, u'task': <Task(PythonOperator): test_python_operator_template>, u'task_instance': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>, u'task_instance_key_str': u'test_python_operator_template_dag__test_python_operator_template__20170118', 'templates_dict': { 'file1': u'pyoptemplate.sql', 'file2': u'pyoptemplate.t', 'pyoptemplate': u'', 'pyoptemplate.sql': u'', 'sql': u'pyoptemplate', 'table': u'TEMP_TABLE'}, u'test_mode': True, u'ti': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>, u'tomorrow_ds': '2017-01-19', u'tomorrow_ds_nodash': u'20170119', u'ts': '2017-01-18T00:00:00', u'ts_nodash': u'20170118T000000', u'yesterday_ds': '2017-01-17', u'yesterday_ds_nodash': u'20170117'} [2017-01-18 23:58:07,634] {python_operator.py:67} INFO - Done. Returned value was: None

推荐答案

从Airflow 1.8开始, PythonOperator替换 __ init __ 中的 template_ext 字段的方法无效。任务仅检查 __ class __ 上的 template_ext 。要创建一个拾取SQL模板文件的PythonOperator,您只需执行以下操作:

As of Airflow 1.8, the way the PythonOperator replaces its template_ext field in __init__ doesn't work. Tasks only check template_ext on the __class__. To create a PythonOperator that picks up SQL template files you only need to do the following:

class SQLTemplatedPythonOperator(PythonOperator): template_ext = ('.sql',)

然后在以下情况下从任务访问SQL它会运行:

And then to access the SQL from your task when it runs:

SQLTemplatedPythonOperator( templates_dict={'query': 'my_template.sql'}, params={'my_var': 'my_value'}, python_callable=my_func, provide_context=True, ) def my_func(**context): context['templates_dict']['query']

更多推荐

使用PythonOperator模板文件进行气流处理

本文发布于:2023-11-24 03:37:41,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1623833.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:气流   模板   文件   PythonOperator

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!