我已经看过这和有关SO的问题并进行了相应的更改。但是,我的依赖DAG仍然卡在戳状态。以下是我的主DAG:
I have already seen this and this questions on SO and made the changes accordingly. However, my dependent DAG still gets stuck in poking state. Below is my master DAG:
from airflow import DAG from airflow.operators.jdbc_operator import JdbcOperator from datetime import datetime from airflow.operators.bash_operator import BashOperator today = datetime.today() default_args = { 'depends_on_past': False, 'retries': 0, 'start_date': datetime(today.year, today.month, today.day), 'schedule_interval': '@once' } dag = DAG('call-procedure-and-bash', default_args=default_args) call_procedure = JdbcOperator( task_id='call_procedure', jdbc_conn_id='airflow_db2', sql='CALL AIRFLOW.TEST_INSERT (20)', dag=dag ) call_procedure下面是我的依赖DAG:
Below is my dependent DAG:
from airflow import DAG from airflow.operators.jdbc_operator import JdbcOperator from datetime import datetime, timedelta from airflow.sensors.external_task_sensor import ExternalTaskSensor today = datetime.today() default_args = { 'depends_on_past': False, 'retries': 0, 'start_date': datetime(today.year, today.month, today.day), 'schedule_interval': '@once' } dag = DAG('external-dag-upstream', default_args=default_args) task_sensor = ExternalTaskSensor( task_id='link_upstream', external_dag_id='call-procedure-and-bash', external_task_id='call_procedure', execution_delta=timedelta(minutes=-2), dag=dag ) count_rows = JdbcOperator( task_id='count_rows', jdbc_conn_id='airflow_db2', sql='SELECT COUNT(*) FROM AIRFLOW.TEST', dag=dag ) count_rows.set_upstream(task_sensor)以下是从属DAG的日志一旦执行主DAG:
Below are the logs of dependent DAG once the master DAG gets executed:
[2019-01-10 11:43:52,951] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... [2019-01-10 11:44:52,955] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... [2019-01-10 11:45:52,961] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... [2019-01-10 11:46:52,949] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... [2019-01-10 11:47:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... [2019-01-10 11:48:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... [2019-01-10 11:49:52,905] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...以下是主DAG执行的日志:
Below are the logs of master DAG execution:
[2019-01-10 11:45:20,215] {{jdbc_operator.py:56}} INFO - Executing: CALL AIRFLOW.TEST_INSERT (20) [2019-01-10 11:45:21,477] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:21,476] {{dbapi_hook.py:166}} INFO - CALL AIRFLOW.TEST_INSERT (20) [2019-01-10 11:45:24,139] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:24,137] {{jobs.py:2627}} INFO - Task exited with return code 0我的假设是,如果主机运行正常,气流应该触发从属DAG吗?我试过玩 execution_delta ,但这似乎不起作用。
My assumption is, Airflow should trigger the dependent DAG if the master runs fine? I have tried playing around with execution_delta but that doesn't seem to work.
另外, schedule_interval 和 start_date 对于两个DAG都是相同的,因此不要认为这会造成任何麻烦。
Also, schedule_interval and start_date are same for both of the DAGs so don't think that should cause any trouble.
我错过了什么吗?
推荐答案确保两个DAG同时启动,而您不不能手动启动两个DAG。
Make sure both DAGs start at the same time and you don't start either DAGs manually.
更多推荐
气流:ExternalTaskSensor不会触发任务
发布评论