apache气流

编程入门 行业动态 更新时间:2024-10-19 16:35:38
本文介绍了apache气流-无法加载dag袋以处理故障的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我创建了一个on_failure_callback函数(请参阅 Airflow默认on_failure_callback )来处理任务的失败。

当DAG中只有一个任务时,它很好用,但是,如果还有两个任务,则任务随机失败由于运算符为空,因此可以稍后手动恢复。在airflow-scheduler.out中的日志是:

[2018-05-08 14:24:21,237] {models.py:1595 }错误-执行器报告任务实例%s完成(%s),尽管任务说它的%s。 任务是否在外部被杀死? NoneType [2018-05-08 14:24:21,238] {jobs.py:1435}错误-无法加载dag包以处理的故障。将任务设置为FAILED,而无需回调或重试。您是否有足够的资源?

DAG代码为:

从气流进口DAG 从airflow.operators.bash_operator进口BashOperator 从日期时间进口timedelta 进口气流从devops.util进口WechatUtil $ b来自devops.util的$ b导入JiraUtil def on_failure_callback(context): ti = context ['task_instance'] log_url = ti.log_url owner = ti .task.owner ti_str = str(context ['task_instance']) wechat_msg =%s-所有者:%s%(ti_str,owner) WeChatUtil.notify_team(wechat_msg) jira_desc =请检查URL%s中的日志%(log_url) JiraUtil.create_incident( DW,ti_str,jira_desc,owner) args = {'queue':'default','start_date':airflow.utils.dates.days_ago(1),'retry_delay':timedelta(minutes = 1 ),'on_failure_callback':on_failure_callback,'所有者':'user1',} dag = DAG(dag_id ='test_dependence1',default_args = args,schedule_interval = '10 16 * * *') load_crm_goods = BashOperator( task_id ='crm_goods_job', bash_command ='date', dag = dag) load_crm_memeber = BashOperator( task_id ='crm_member_job', bash_command ='date', dag = dag) load_crm_order = BashOperator( task_id ='crm_order_job', bash_command ='date', dag = dag) load_crm_eur_invt = BashOperator( task_id ='crm_eur_invt_job', bash_command ='date', dag = dag) crm_member_cohort_analysis = BashOperator( task_id ='crm_member_cohort_analysis_job', bash_command ='date', dag = dag) crm_member_cohort_analysis.set_upstream(load_crm_goods) crm_member_cohort_analysisset。 load_crm_memeber) crm_member_cohort_analysis.set_upstream(load_crm_order) crm_member_cohort_analysis.set_upstream(load_crm_eur_i nvt) crm_member_kpi_daily = BashOperator( task_id ='crm_member_kpi_daily_job', bash_command ='date', dag = dag) crm_member_kpi_daily.set_upstream(crm_member_cohort_analysis)

我试图通过添加默认内存来更新airflow.cfg 512甚至4096,但没有运气。有人会提出任何建议吗?

我还尝试如下更新我的JiraUtil和WechatUtil,从而遇到相同的错误

WechatUtil:

导入请求 类WechatUtil: @staticmethod def notify_trendy_user(user_ldap_id,message):返回None @staticmethod def notify_bigdata_team(message): return None

JiraUtil:

import json 导入请求类JiraUtil: @staticmethod def execute_jql(jql):返回None @staticmethod def create_incident(projectKey ,摘要,desc,受让人=无):返回无

解决方案

(我在这里射击示踪剂,因此,如果第一次尝试时答案不正确,请多多包涵。)

空运算符问题具有多个任务实例的方法很奇怪...如果您可以将当前代码简化为 MCVE ,例如,它将有助于解决此问题1-2运算符,如果它们与回调失败无关,则不包括JiraUtil和WechatUtil部分。

这里有2个主意:

1。您可以尝试更改从上下文中获取任务实例的行以查看是否有区别吗?

之前:

def on_failure_callback(context): ti = context ['task_instance'] ...

之后:

def on_failure_callback(上下文): ti = context ['ti'] ...

我看到了气流回购中的这种用法( https ://github/apache/incubator-airflow/blob/c1d583f91a0b4185f760a64acbeae86739479cdb/airflow/contrib/hooks/qubole_check_hook.py#L88 )。

2。

$ b是否可以尝试在操作符上添加 provide_context = True 或添加到 default_args 中? $ b

I have created a on_failure_callback function(refering Airflow default on_failure_callback) to handle task's failure.

It works well when there is only one task in a DAG, however, if there are 2 more tasks, a task is randomly failed since the operator is null, it can resume later by manully . In airflow-scheduler.out the log is:

[2018-05-08 14:24:21,237] {models.py:1595} ERROR - Executor reports task instance %s finished (%s) although the task says its %s. Was the task killed externally? NoneType [2018-05-08 14:24:21,238] {jobs.py:1435} ERROR - Cannot load the dag bag to handle failure for . Setting task to FAILED without callbacks or retries. Do you have enough resources?

The DAG code is:

from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import timedelta import airflow from devops.util import WechatUtil from devops.util import JiraUtil def on_failure_callback(context): ti = context['task_instance'] log_url = ti.log_url owner = ti.task.owner ti_str = str(context['task_instance']) wechat_msg = "%s - Owner:%s"%(ti_str,owner) WeChatUtil.notify_team(wechat_msg) jira_desc = "Please check log from url %s"%(log_url) JiraUtil.create_incident("DW",ti_str,jira_desc,owner) args = { 'queue': 'default', 'start_date': airflow.utils.dates.days_ago(1), 'retry_delay': timedelta(minutes=1), 'on_failure_callback': on_failure_callback, 'owner': 'user1', } dag = DAG(dag_id='test_dependence1',default_args=args,schedule_interval='10 16 * * *') load_crm_goods = BashOperator( task_id='crm_goods_job', bash_command='date', dag=dag) load_crm_memeber = BashOperator( task_id='crm_member_job', bash_command='date', dag=dag) load_crm_order = BashOperator( task_id='crm_order_job', bash_command='date', dag=dag) load_crm_eur_invt = BashOperator( task_id='crm_eur_invt_job', bash_command='date', dag=dag) crm_member_cohort_analysis = BashOperator( task_id='crm_member_cohort_analysis_job', bash_command='date', dag=dag) crm_member_cohort_analysis.set_upstream(load_crm_goods) crm_member_cohort_analysis.set_upstream(load_crm_memeber) crm_member_cohort_analysis.set_upstream(load_crm_order) crm_member_cohort_analysis.set_upstream(load_crm_eur_invt) crm_member_kpi_daily = BashOperator( task_id='crm_member_kpi_daily_job', bash_command='date', dag=dag) crm_member_kpi_daily.set_upstream(crm_member_cohort_analysis)

I had tried to update the airflow.cfg by adding the default memory from 512 to even 4096, but no luck. Would anyone have any advice ?

Ialso try to updated my JiraUtil and WechatUtil as following, encoutering the same error

WechatUtil:

import requests class WechatUtil: @staticmethod def notify_trendy_user(user_ldap_id, message): return None @staticmethod def notify_bigdata_team(message): return None

JiraUtil:

import json import requests class JiraUtil: @staticmethod def execute_jql(jql): return None @staticmethod def create_incident(projectKey, summary, desc, assignee=None): return None

解决方案

(I'm shooting tracer bullets a bit here, so bear with me if this answer doesn't get it right on the first try.)

The null operator issue with multiple task instances is weird... it would help approaching troubleshooting this if you could boil the current code down to a MCVE e.g., 1–2 operators and excluding the JiraUtil and WechatUtil parts if they're not related to the callback failure.

Here are 2 ideas:

1. Can you try changing the line that fetches the task instance out of the context to see if this makes a difference?

Before:

def on_failure_callback(context): ti = context['task_instance'] ...

After:

def on_failure_callback(context): ti = context['ti'] ...

I saw this usage in the Airflow repo (github/apache/incubator-airflow/blob/c1d583f91a0b4185f760a64acbeae86739479cdb/airflow/contrib/hooks/qubole_check_hook.py#L88). It's possible it can be accessed both ways.

2. Can you try adding provide_context=True on the operators either as a kwarg or in default_args?

更多推荐

apache气流

本文发布于:2023-11-23 19:55:59,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622697.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:气流   apache

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!