我已将 Airflow 设置为 SQL Server 作为后端 (SQL Azure).初始化数据库成功.我正在尝试每 2 分钟运行一次简单的 dag.
I have setup Airflow a SQL Server as backend (SQL Azure). Init DB is successful. I am trying to run a simple dag every 2 minutes.
dag 有 2 个任务:
The dag has 2 tasks:
当它启动气流调度器时,它会为两个任务创建任务实例,第一个成功&第二个似乎卡在运行"状态.
When it start the airflow scheduler, it creates tasks instances for both the tasks, the first one succeeds & the second one seems to be stuck in "running" state.
查看调度器日志,反复看到如下错误.
Looking at scheduler logs, I see the following error repeatedly.
[2019-01-04 11:38:48,253] {jobs.py:397} ERROR - Got an exception! Propagating... Traceback (most recent call last): File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/jobs.py", line 389, in helper pickle_dags) File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(*args, **kwargs) File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/jobs.py", line 1816, in process_file dag.sync_to_db() File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(*args, **kwargs) File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/models.py", line 4296, in sync_to_db DagModel).filter(DagModel.dag_id == self.dag_id).first() File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2755, in first ret = list(self[0:1]) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2547, in __getitem__ return list(res) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2855, in __iter__ return self._execute_and_instances(context) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2876, in _execute_and_instances close_with_result=True) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2885, in _get_bind_args **kw File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2867, in _connection_from_session conn = self.session.connection(**kw) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1019, in connection execution_options=execution_options) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1024, in _connection_for_bind engine, execution_options) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 409, in _connection_for_bind conn = bind.contextual_connect() File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2112, in contextual_connect self._wrap_pool_connect(self.pool.connect, None), File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2151, in _wrap_pool_connect e, dialect, self) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1465, in _handle_dbapi_exception_noconnection exc_info File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause reraise(type(exception), exception, tb=exc_tb, cause=cause) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2147, in _wrap_pool_connect return fn() File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 387, in connect return _ConnectionFairy._checkout(self) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 768, in _checkout fairy = _ConnectionRecord.checkout(pool) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 516, in checkout rec = pool._do_get() File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 1140, in _do_get self._dec_overflow() File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__ compat.reraise(exc_type, exc_value, exc_tb) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 1137, in _do_get return self._create_connection() File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 333, in _create_connection return _ConnectionRecord(self) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 461, in __init__ self.__connect(first_connect_check=True) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 651, in __connect connection = pool._invoke_creator(self) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/strategies.py", line 105, in connect return dialect.connect(*cargs, **cparams) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/default.py", line 393, in connect return self.dbapi.connect(*cargs, **cparams) InterfaceError: (pyodbc.InterfaceError) ('28000', u"[28000] [unixODBC][Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Login failed for user 'airflowuser'. (18456) (SQLDriverConnect)")Airflow 配置为使用 LocalExecutor &pyodbc 连接到 SQL Azure
Airflow is configured to use LocalExecutor & pyodbc to connect to SQL Azure
# The executor class that airflow should use. Choices include # SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor #executor = SequentialExecutor executor = LocalExecutor # The SqlAlchemy connection string to the metadata database. # SqlAlchemy supports many different database engine, more information # their website #sql_alchemy_conn = sqlite:////home/sshuser/airflow/airflow.db #connection string to MS SQL Serv er sql_alchemy_conn = mssql+pyodbc://airflowuser@afdsqlserver76:<password>@afdsqlserver76.database.windows:1433/airflowdb?driver=ODBC+Driver+17+for+SQL+Server # The encoding for the databases sql_engine_encoding = utf-8 # If SqlAlchemy should pool database connections. sql_alchemy_pool_enabled = True # The SqlAlchemy pool size is the maximum number of database connections # in the pool. 0 indicates no limit. sql_alchemy_pool_size = 10 # The SqlAlchemy pool recycle is the number of seconds a connection # can be idle in the pool before it is invalidated. This config does # not apply to sqlite. If the number of DB connections is ever exceeded, # a lower config value will allow the system to recover faster. sql_alchemy_pool_recycle = 180 # How many seconds to retry re-establishing a DB connection after # disconnects. Setting this to 0 disables retries. sql_alchemy_reconnect_timeout = 300dag如下
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2019, 1, 4), 'email': ['airflow@example'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=1), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG('tutorial', default_args=default_args, schedule_interval='*/2 * * * *', max_active_runs=1, catchup=False) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) t2.set_upstream(t1)我有点不明白为什么调度程序在成功运行第一个任务后无法连接到数据库.非常感谢任何解决此问题的指针.
I am a bit lost on why scheduler would not be able to connect to DB after it has run the first task successfully. Any pointers to resolve this is much appreciated.
我有一个示例程序,它使用 sqlalchemy 使用相同的凭据连接到 SQL Azure这有效.
I have a sample program that uses sqlalchemy to connect to SQL Azure using the same credentials & this works.
import sqlalchemy from sqlalchemy import create_engine engine = create_engine("mssql+pyodbc://afdadmin@afdsqlserver76:<password>@afdsqlserver76.database.windows:1433/airflowdb?driver=ODBC+Driver+17+for+SQL+Server") connection = engine.connect() result = connection.execute("select version_num from alembic_version") for row in result: print("Version:", row['version_num']) connection.close() 推荐答案在odbcinst.ini中设置Pooling = True后问题解决
The issue was resolved after Pooling = True was set in odbcinst.ini
[ODBC]池化 = 是
[ODBC] Pooling = Yes
更多推荐
带有 SQL Server 后端和 pyodbc 的气流调度程序
发布评论