气流DAG尚未安排

编程入门 行业动态 更新时间:2024-10-16 00:25:21
本文介绍了气流DAG尚未安排的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我是Airflow的新手,并创建了我的第一个DAG。这是我的DAG代码。我希望DAG从现在开始,然后每天运行一次。

I am new to Airflow and created my first DAG. Here is my DAG code. I want the DAG to start now and thereafter run once in a day.

from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.now(), 'email': ['aaaa@gmail'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'alamode', default_args=default_args, schedule_interval=timedelta(1)) create_command = "/home/ubuntu/scripts/makedir.sh " # t1 is the task which will invoke the directory creation shell script t1 = BashOperator( task_id='create_directory', bash_command=create_command, dag=dag) run_spiders = "/home/ubuntu/scripts/crawl_spiders.sh " # t2 is the task which will invoke the spiders t2 = BashOperator( task_id='web_scrawl', bash_command=run_spiders, dag=dag) # To set dependency between tasks. 't1' should run before t2 t2.set_upstream(t1)

被Airflow挑选。我检查了日志,这就是它的意思。

The DAG is not getting picked by Airflow. I checked the log and here is what it says.

[2017-09-12 18:08:20,220] {jobs.py:343} DagFileProcessor398 INFO - Started process (PID=7001) to work on /home/ubuntu/airflow/dags/alamode.py [2017-09-12 18:08:20,223] {jobs.py:1521} DagFileProcessor398 INFO - Processing file /home/ubuntu/airflow/dags/alamode.py for tasks to queue [2017-09-12 18:08:20,223] {models.py:167} DagFileProcessor398 INFO - Filling up the DagBag from /home/ubuntu/airflow/dags/alamode.py [2017-09-12 18:08:20,262] {jobs.py:1535} DagFileProcessor398 INFO - DAG(s) ['alamode'] retrieved from /home/ubuntu/airflow/dags/alamode.py [2017-09-12 18:08:20,291] {jobs.py:1169} DagFileProcessor398 INFO - Processing alamode /usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate. Consider alternative strategies for improved performance. 'strategies for improved performance.' % expr) [2017-09-12 18:08:20,317] {models.py:322} DagFileProcessor398 INFO - Finding 'running' jobs without a recent heartbeat [2017-09-12 18:08:20,318] {models.py:328} DagFileProcessor398 INFO - Failing jobs without heartbeat after 2017-09-12 18:03:20.318105 [2017-09-12 18:08:20,320] {jobs.py:351} DagFileProcessor398 INFO - Processing /home/ubuntu/airflow/dags/alamode.py took 0.100 seconds

我到底在做什么错?我尝试将schedule_interval更改为schedule_interval = timedelta(minutes = 1)来查看它是否立即启动,但仍然没有用。我可以在Airflow UI中按预期看到DAG下的任务,但计划状态为无状态。请在这里帮助我。

What exactly am I doing wrong? I have tried changing the schedule_interval to schedule_interval=timedelta(minutes=1) to see if it starts immediately, but still no use. I can see the tasks under the DAG as expected in Airflow UI but with schedule status as 'no status'. Please help me here.

推荐答案

已按照以下步骤解决了此问题:

This issue has been resolved by following the below steps:

1)我使用了更早的日期作为start_date和schedule_interval = timedelta(minutes = 10)。另外,使用实际日期而不是datetime.now()。 2)在DAG参数中添加了catchup = True。 3)将环境变量设置为export AIRFLOW_HOME = pwd / airflow_home。 4)删除airflow.db 5)将新代码移至DAGS文件夹 6)运行命令'airflow initdb'再次创建数据库。 7)通过UI 打开DAG的'ON'开关8)运行命令'airflow scheduler

1) I used a much older date for start_date and schedule_interval=timedelta(minutes=10). Also, used a real date instead of datetime.now(). 2) Added catchup = True in DAG arguments. 3) Setup environment variable as export AIRFLOW_HOME=pwd/airflow_home. 4) Deleted airflow.db 5) Moved the new code to DAGS folder 6) Ran the command 'airflow initdb' to create the DB again. 7) Turned the 'ON' switch of my DAG through UI 8) Ran the command 'airflow scheduler'

这是现在可以运行的代码:

Here is the code which works now:

from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2017, 9, 12), 'email': ['anjana@gapro.tech'], 'retries': 0, 'retry_delay': timedelta(minutes=15) } dag = DAG( 'alamode', catchup=False, default_args=default_args, schedule_interval="@daily") # t1 is the task which will invoke the directory creation shell script t1 = BashOperator( task_id='create_directory', bash_command='/home/ubuntu/scripts/makedir.sh ', dag=dag) # t2 is the task which will invoke the spiders t2 = BashOperator( task_id= 'web_crawl', bash_command='/home/ubuntu/scripts/crawl_spiders.sh ', dag=dag) # To set dependency between tasks. 't1' should run before t2 t2.set_upstream(t1)

更多推荐

气流DAG尚未安排

本文发布于:2023-11-23 16:44:56,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622173.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:气流   DAG

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!