我正在尝试在气流任务中访问外部文件以读取一些sql,但是却出现找不到文件。有人遇到过吗?
I'm trying to access external files in a Airflow Task to read some sql, and I'm getting "file not found". Has anyone come across this?
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta dag = DAG( 'my_dat', start_date=datetime(2017, 1, 1), catchup=False, schedule_interval=timedelta(days=1) ) def run_query(): # read the query query = open('sql/queryfile.sql') # run the query execute(query) tas = PythonOperator( task_id='run_query', dag=dag, python_callable=run_query)日志状态如下:
IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'我知道我可以简单地将查询复制并粘贴到同一文件中,这实际上不是解决之道。有多个查询,并且文本真的很大,将其嵌入Python代码会损害可读性。
I understand that I could simply copy and paste the query inside the same file, it's really not at neat solution. There are multiple queries and the text is really big, embed it with the Python code would compromise readability.
推荐答案下面是使用变量使其易于使用的示例。
Here is an example use Variable to make it easy.
-
首先在 Airflow UI -> <$ c $中添加变量 c>管理员-> 变量,例如 {键:'sql_path',值:'your_sql_script_folder'}
First add Variable in Airflow UI -> Admin -> Variable, eg. {key: 'sql_path', values: 'your_sql_script_folder'}
然后在您的代码中添加以下代码DAG,要从气流中使用变量,只需添加即可。
Then add following code in your DAG, to use Variable from Airflow you just add.
DAG代码:
import airflow from airflow.models import Variable tmpl_search_path = Variable.get("sql_path") dag = airflow.DAG( 'tutorial', schedule_interval="@daily", template_searchpath=tmpl_search_path, # this default_args=default_args )
-
现在可以使用sql脚本名称或文件夹变量下的路径
Now you can use sql script name or path under folder Variable
您可以在此
更多推荐
Airflow DAG中的外部文件
发布评论