气流TriggerDagRunOperator如何更改执行日期

编程入门 行业动态 更新时间:2024-10-24 18:17:38
本文介绍了气流TriggerDagRunOperator如何更改执行日期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我注意到,对于计划任务,执行日期是根据

设置的。

Airflow是为满足ETL需求而开发的。在ETL世界中,通常汇总数据。因此,如果我想汇总2016年2月19日的数据,我会在格林尼治标准时间2016-02-20午夜进行,在2016-02-19年的所有数据变为

但是,当dag触发另一个dag时,执行时间设置为now()。

有没有办法使触发的dag具有与触发dag相同的执行时间?当然,我可以重写模板并使用昨天的ds,但这是一个棘手的解决方案。

解决方案

下面的类在 TriggerDagRunOperator 允许将执行日期作为字符串传递,然后将其转换回日期时间。有点黑,但这是我发现完成工作的唯一方法。

来自datetime导入datetime 导入日志记录 从airflow导入设置从airflow.utils.state import State 从airflow.models导入DagBag 从airflow。 operator.dagrun_operator import TriggerDagRunOperator,DagRunOrder MMTTriggerDagRunOperator(TriggerDagRunOperator)类: MMT已为传递显式执行日期进行了修补(否则很难挂钩) 用于从控制器DAG显式设置目标DAG 的执行日期时使用。 改编自Paul Elliot的解决方案airflow-dev邮件列表档案: mail-archives.apache/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail%b$b b参数 ------------------ execute_date:str 自定义执行日期(jinja'd) 用法示例: ------------------- my_dag_trigger_operator = MMTTriggerDagRunOperator(execution_date = {{execution_date}} task_id ='my_dag_trigger_operator', trigger_dag_id ='my_target_dag_id', python .getrandbits(1), params = {}, dag = my_controller_dag ) template_fields =('execution_date',) def __init __( self,trigger_dag_id,python_callable,execute_date, * args,** kwargs ): self.execution_date =执行日期 super( MMTTriggerDagRunOperator,self).__ init __( trigger_dag_id = trigger_dag_id,python_callable = python_callable, * args,** kwargs ) def execute(self,context): run_id_dt = datetime.strptime(self。 execution_date,'%Y-%m-%d%H:%M:%S') dro = DagRunOrder(run_id ='trig__'+ run_id_dt.isoformat()) dro = self.python_callable (上下文,dro)如果dro: session = settings.Session() dbag = DagBag(settings.DAGS_FOLDER) trigger_dag = dbag.get_dag(self.trigger_dag_id) dr = trigger_dag.create_dagrun( run_id = dro.run_id, state = State.RUNNING,execution_date = self.execution_date, conf = dro.payload, external_trigger = True) logging.info(正在创建DagRun {}。format(dr)) session.add(dr) sessionmit() session.close()其他: logging.info(不符合条件,继续前进)

使用此功能而不设置 execution_date = now()时可能会遇到问题:如果您尝试使用相同的 execution_date 两次启动dag,则ur运算符将引发mysql错误。这是因为 execution_date 和 dag_id 用于创建行索引,并且不能插入具有相同索引的行。 / p>

我想不出您为什么要在生产中使用相同的 execution_date 运行两个相同的dag的原因无论如何,但这是我在测试时遇到的问题,您不应对此感到震惊。只需清除旧工作或使用其他日期时间即可。

I noticed that for scheduled task the execution date is set in the past according to

Airflow was developed as a solution for ETL needs. In the ETL world, you typically summarize data. So, if I want to summarize data for 2016-02-19, I would do it at 2016-02-20 midnight GMT, which would be right after all data for 2016-02-19 becomes available.

however, when a dag triggers another dag the execution time is set to now().

Is there a way to have the triggered dags with the same execution time of triggering dag? Of course, I can rewrite the template and use yesterday_ds, however, this is a tricky solution.

解决方案

The following class expands on TriggerDagRunOperator to allow passing the execution date as a string that then gets converted back into a datetime. It's a bit hacky but it is the only way I found to get the job done.

from datetime import datetime import logging from airflow import settings from airflow.utils.state import State from airflow.models import DagBag from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder class MMTTriggerDagRunOperator(TriggerDagRunOperator): """ MMT-patched for passing explicit execution date (otherwise it's hard to hook the datetime.now() date). Use when you want to explicity set the execution date on the target DAG from the controller DAG. Adapted from Paul Elliot's solution on airflow-dev mailing list archives: mail-archives.apache/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail%3e Parameters ------------------ execution_date: str the custom execution date (jinja'd) Usage Example: ------------------- my_dag_trigger_operator = MMTTriggerDagRunOperator( execution_date="{{execution_date}}" task_id='my_dag_trigger_operator', trigger_dag_id='my_target_dag_id', python_callable=lambda: random.getrandbits(1), params={}, dag=my_controller_dag ) """ template_fields = ('execution_date',) def __init__( self, trigger_dag_id, python_callable, execution_date, *args, **kwargs ): self.execution_date = execution_date super(MMTTriggerDagRunOperator, self).__init__( trigger_dag_id=trigger_dag_id, python_callable=python_callable, *args, **kwargs ) def execute(self, context): run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S') dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat()) dro = self.python_callable(context, dro) if dro: session = settings.Session() dbag = DagBag(settings.DAGS_FOLDER) trigger_dag = dbag.get_dag(self.trigger_dag_id) dr = trigger_dag.create_dagrun( run_id=dro.run_id, state=State.RUNNING, execution_date=self.execution_date, conf=dro.payload, external_trigger=True) logging.info("Creating DagRun {}".format(dr)) session.add(dr) sessionmit() session.close() else: logging.info("Criteria not met, moving on")

There is an issue you may run into when using this and not setting execution_date=now(): your operator will throw a mysql error if you try to start a dag with an identical execution_date twice. This is because the execution_date and dag_id are used to create the row index and rows with identical indexes cannot be inserted.

I can't think of a reason you would ever want to run two identical dags with the same execution_date in production anyway, but it is something I ran into while testing and you should not be alarmed by it. Simply clear the old job or use a different datetime.

更多推荐

气流TriggerDagRunOperator如何更改执行日期

本文发布于:2023-11-24 03:36:30,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1623830.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:气流   如何更改   日期   TriggerDagRunOperator

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!