我是Airflow的新手,并创建了我的第一个DAG。这是我的DAG代码。我希望DAG从现在开始,然后每天运行一次。
I am new to Airflow and created my first DAG. Here is my DAG code. I want the DAG to start now and thereafter run once in a day.
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.now(), 'email': ['aaaa@gmail'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'alamode', default_args=default_args, schedule_interval=timedelta(1)) create_command = "/home/ubuntu/scripts/makedir.sh " # t1 is the task which will invoke the directory creation shell script t1 = BashOperator( task_id='create_directory', bash_command=create_command, dag=dag) run_spiders = "/home/ubuntu/scripts/crawl_spiders.sh " # t2 is the task which will invoke the spiders t2 = BashOperator( task_id='web_scrawl', bash_command=run_spiders, dag=dag) # To set dependency between tasks. 't1' should run before t2 t2.set_upstream(t1)被Airflow挑选。我检查了日志,这就是它的意思。
The DAG is not getting picked by Airflow. I checked the log and here is what it says.
[2017-09-12 18:08:20,220] {jobs.py:343} DagFileProcessor398 INFO - Started process (PID=7001) to work on /home/ubuntu/airflow/dags/alamode.py [2017-09-12 18:08:20,223] {jobs.py:1521} DagFileProcessor398 INFO - Processing file /home/ubuntu/airflow/dags/alamode.py for tasks to queue [2017-09-12 18:08:20,223] {models.py:167} DagFileProcessor398 INFO - Filling up the DagBag from /home/ubuntu/airflow/dags/alamode.py [2017-09-12 18:08:20,262] {jobs.py:1535} DagFileProcessor398 INFO - DAG(s) ['alamode'] retrieved from /home/ubuntu/airflow/dags/alamode.py [2017-09-12 18:08:20,291] {jobs.py:1169} DagFileProcessor398 INFO - Processing alamode /usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty sequence. This results in a contradiction, which nonetheless can be expensive to evaluate. Consider alternative strategies for improved performance. 'strategies for improved performance.' % expr) [2017-09-12 18:08:20,317] {models.py:322} DagFileProcessor398 INFO - Finding 'running' jobs without a recent heartbeat [2017-09-12 18:08:20,318] {models.py:328} DagFileProcessor398 INFO - Failing jobs without heartbeat after 2017-09-12 18:03:20.318105 [2017-09-12 18:08:20,320] {jobs.py:351} DagFileProcessor398 INFO - Processing /home/ubuntu/airflow/dags/alamode.py took 0.100 seconds我到底在做什么错?我尝试将schedule_interval更改为schedule_interval = timedelta(minutes = 1)来查看它是否立即启动,但仍然没有用。我可以在Airflow UI中按预期看到DAG下的任务,但计划状态为无状态。请在这里帮助我。
What exactly am I doing wrong? I have tried changing the schedule_interval to schedule_interval=timedelta(minutes=1) to see if it starts immediately, but still no use. I can see the tasks under the DAG as expected in Airflow UI but with schedule status as 'no status'. Please help me here.
推荐答案已按照以下步骤解决了此问题:
This issue has been resolved by following the below steps:
1)我使用了更早的日期作为start_date和schedule_interval = timedelta(minutes = 10)。另外,使用实际日期而不是datetime.now()。 2)在DAG参数中添加了catchup = True。 3)将环境变量设置为export AIRFLOW_HOME = pwd / airflow_home。 4)删除airflow.db 5)将新代码移至DAGS文件夹 6)运行命令'airflow initdb'再次创建数据库。 7)通过UI 打开DAG的'ON'开关8)运行命令'airflow scheduler
1) I used a much older date for start_date and schedule_interval=timedelta(minutes=10). Also, used a real date instead of datetime.now(). 2) Added catchup = True in DAG arguments. 3) Setup environment variable as export AIRFLOW_HOME=pwd/airflow_home. 4) Deleted airflow.db 5) Moved the new code to DAGS folder 6) Ran the command 'airflow initdb' to create the DB again. 7) Turned the 'ON' switch of my DAG through UI 8) Ran the command 'airflow scheduler'
这是现在可以运行的代码:
Here is the code which works now:
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2017, 9, 12), 'email': ['anjana@gapro.tech'], 'retries': 0, 'retry_delay': timedelta(minutes=15) } dag = DAG( 'alamode', catchup=False, default_args=default_args, schedule_interval="@daily") # t1 is the task which will invoke the directory creation shell script t1 = BashOperator( task_id='create_directory', bash_command='/home/ubuntu/scripts/makedir.sh ', dag=dag) # t2 is the task which will invoke the spiders t2 = BashOperator( task_id= 'web_crawl', bash_command='/home/ubuntu/scripts/crawl_spiders.sh ', dag=dag) # To set dependency between tasks. 't1' should run before t2 t2.set_upstream(t1)更多推荐
气流DAG尚未安排
发布评论