我目前正在尝试使用Airflow来协调一个流程,在该流程中动态定义了一些运算符,并依赖于另一个(较早的)运算符的输出。
I’m currently trying to use Airflow to orchestrate a process where some operators are defined dynamically and depend on the output of another (earlier) operator.
在t1下面的代码中,文本文件更新了新记录(实际上是从外部队列读取的,但为简单起见,我将它们硬编码为A,B和C在这里)。然后,我想为从该文本文件读取的每个记录创建单独的运算符。这些操作员将分别创建目录A,B和C,并且在Airflow UI中将其视为单独的bash进程Create_directory_A,Create_directory_B和Create_directory_C。
In the code below t1 updates a text file with new records (these are actually read from an external queue but, for simplicity, I hard coded them as A, B and C here). Then, I want to create separate operators for each record read from that text file. These operators will create directories A, B and C, respectively, and in Airflow UI will be seen as separate bash processes Create_directory_A, Create_directory_B and Create_directory_C.
dag = DAG('Test_DAG', description="Lorem ipsum.", start_date=datetime(2017, 3, 20), schedule_interval=None, catchup=False) def create_text_file(list_of_rows): text_file = open('text_file.txt', "w") for row in list_of_rows: text_file.write(row + '\n') text_file.close() def read_text(): txt_file = open('text_file.txt', 'r') return [element for element in txt_file.readlines()] t1 = PythonOperator( task_id='Create_text_file', python_callable=create_text_file, op_args=[['A', 'B', 'C']], dag=dag ) for row in read_text(): t2 = BashOperator( task_id='Create_directory_{}'.format(row), bash_command="mkdir {{params.dir_name}}", params={'dir_name': row}, dag=dag ) t1 >> t2在 Airflow的文档我可以看到调度程序将定期执行它[DAG]以反映所做的更改。
推荐答案您不能动态创建依赖于上游任务输出的任务。您混淆了时间表和执行时间。 DAG 定义和任务在计划时间创建。在执行时将创建DAG run 和任务实例。只有任务 instance 可以产生输出。
You cannot create tasks dynamically that depend on the output of an upstream task. You're mixing up schedule and execution time. A DAG definition and a task is created at schedule time. A DAG run and task instance is created at execution time. Only a task instance can produce output.
Airflow调度程序将使用 text_file.txt 包含在计划时间。然后将这些任务运送给工人。
The Airflow scheduler will build the dynamic graph with whatever text_file.txt contains at schedule time. These tasks are then shipped off to the workers.
工人最终将执行 t1 任务实例并创建一个新的 text_file.txt ,但此时, t2 任务列表已经由调度程序计算并发送
A worker will eventually execute the t1 task instance and create a new text_file.txt, but at this point, the list of t2 tasks has already been calculated by the scheduler and sent off to the workers.
因此,无论最新的 t1 任务实例如何转储到 text_file中.txt 将在调度程序下次决定运行DAG的时间使用。
So, whatever the latest t1 task instance dumps into text_file.txt will be used the next time the scheduler decides it's time to run the DAG.
如果您的任务很快而您的工人却没有积压的订单,这将是先前DAG运行的内容。如果积压了这些内容,则 text_file.txt 的内容可能会过时,并且如果您真倒霉,则调度程序会在写入任务实例的同时读取该文件,并且您会从 read_text()中获取不完整的数据。
If your task is fast and your workers are not backlogged, that will be the contents from the previous DAG run. If they are backlogged, text_file.txt contents may be stale, and if you're really unlucky, the scheduler reads the file while a task instance is writing to it, and you'll get incomplete data from read_text().
更多推荐
气流中的动态任务定义
发布评论