我是Airflow的新手。 我遇到了一个场景,其中父DAG需要传递一些动态数字(比如说 n )到Sub DAG。 其中,SubDAG将使用此数字动态创建 n 并行任务。
I am new to Airflow. I have come across a scenario, where Parent DAG need to pass some dynamic number (let's say n) to Sub DAG. Where as SubDAG will use this number to dynamically create n parallel tasks.
气流文档没有介绍实现这一目标的方法。因此,我探讨了几种方法:
Airflow documentation doesn't cover a way to achieve this. So I have explore couple of ways :
我试图作为xcom值传递,但是出于某种原因SubDAG无法解析为传递的值。
I have tried to pass as a xcom value, but for some reason SubDAG is not resolving to the passed value.
父级Dag文件
def load_dag(**kwargs): number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs']) dag_data = json.dumps({ "number_of_runs": number_of_runs }) return dag_data # ------------------ Tasks ------------------------------ load_config = PythonOperator( task_id='load_config', provide_context=True, python_callable=load_dag, dag=dag) t1 = SubDagOperator( task_id=CHILD_DAG_NAME, subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ), default_args=default_args, dag=dag, )子Dag文件
def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs): dag_subdag = DAG( dag_id='%s.%s' % (parent_dag_name, child_dag_name), default_args=args, schedule_interval=None) variabe_names = {} for i in range(num_of_runs): variabe_names['task' + str(i + 1)] = DummyOperator( task_id='dummy_task', dag=dag_subdag, ) return dag_subdag选项-2
我还尝试将 number_of_runs 作为全局变量传递,这是行不通的。
Option - 2
I have also tried to pass number_of_runs as a global variable, which was not working.
我们也尝试将此值写入数据文件。但是sub DAG抛出文件不存在错误。这可能是因为我们正在动态生成此文件。
Also we tried to write this value to a data file. But sub DAG is throwing File doesn't exist error. This might be because we are dynamically generating this file.
有人可以帮我这个忙吗?
Can some one help me with this.
推荐答案我已经用选项3完成了。关键是要返回没有任务的有效dag(如果文件确实有)不存在。因此,load_config将生成一个文件,其中包含您的任务数或更多信息(如果需要)。您的subdag工厂将类似于:
I've done it with Option 3. The key is to return a valid dag with no tasks, if the file does not exist. So load_config will generate a file with your number of tasks or more information if needed. Your subdag factory would look something like:
def subdag(...): sdag = DAG('%s.%s' % (parent, child), default_args=args, schedule_interval=timedelta(hours=1)) file_path = "/path/to/generated/file" if os.path.exists(file_path): data_file = open(file_path) list_tasks = data_file.readlines() for task in list_tasks: DummyOperator( task_id='task_'+task, default_args=args, dag=sdag, ) return sdag在dag一代中,您会看到一个Subdag,其中没有任务。在执行dag时,在load_config完成后,您会看到动态生成的subdag
At dag generation you will see a subdag with No tasks. At dag execution, after load_config is done, you can see you dynamically generated subdag
更多推荐
气流:将动态值传递给Sub DAG操作员
发布评论