我可以以编程方式确定是否安排了Airflow DAG还是手动触发了它?

编程入门 行业动态 更新时间:2024-10-06 21:28:31
本文介绍了我可以以编程方式确定是否安排了Airflow DAG还是手动触发了它?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我想创建一个片段,该片段基于DAG是计划的还是手动触发的来传递正确的日期。 DAG每月运行一次。 DAG根据上个月的数据生成报告(SQL查询)。

I want to create a snippet that passes the correct date based on whether the DAG was scheduled or whether it was triggered manually. The DAG runs monthly. The DAG generates a report (A SQL query) based on the data of the previous month.

如果我按计划运行DAG,则可以使用以下命令获取上个月的数据: jinja片段:

If I run the DAG scheduled, I can fetch the previous month with the following jinja snippet:

execution_date.month

鉴于DAG计划在上一个周期(上个月)结束时,execution_date将正确返回上个月。但是,在手动运行时,它将返回当前月份(执行日期将是手动触发的日期)。

given that the DAG is scheduled at the end of the previous period (last month) the execution_date will correctly return the last month. However on manual runs this will return the current month (execution date will be the date of the manual trigger).

我想编写一个处理这种情况的简单宏。但是,我找不到以编程方式查询DAG是否以编程方式触发的好方法。我能想到的最好的办法是从数据库中获取 run_id (通过创建具有数据库会话的宏),然后检查 run_id 包含单词 manual 。有没有更好的方法来解决此问题?

I want to write a simple macro that deals with this case. However I could not find a good way to programmatically query whether the DAG is triggered programmatically. The best I could come up with is to fetch the run_id from the database (by creating a macro that has a DB session), check wheter the run_id contains the word manual. Is there a better way to solve this problem?

推荐答案

目前没有直接的DAG属性可以识别手动运行。 要获取此信息,您需要检查您提到的 run_id 。

There is no direct DAG property to identify manual runs for now. To get this information you would need to check the run_id as you mentioned.

但是,有一个专用宏可以获取 run_id 。您不必自己从数据库中获取数据。 以下是如何使用它的示例:

However, there is a dedicated macro get the run_id. You don't have to fetch it from the database by yourself. Here is an example on how to use it :

def some_task_py(**context): run_id = context['templates_dict']['run_id'] is_manual = run_id.startswith('manual__') is_scheduled = run_id.startswith('scheduled__') some_task = PythonOperator( task_id = 'some_task', dag=dag, templates_dict = {'run_id': '{{ run_id }}'}, python_callable = some_task_py, provide_context = True)

更多推荐

我可以以编程方式确定是否安排了Airflow DAG还是手动触发了它?

本文发布于:2023-11-23 21:49:50,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622945.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:方式   Airflow   DAG

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!