带有 SQL Server 后端和 pyodbc 的气流调度程序

编程入门 行业动态 更新时间:2024-10-11 17:28:39
本文介绍了带有 SQL Server 后端和 pyodbc 的气流调度程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我已将 Airflow 设置为 SQL Server 作为后端 (SQL Azure).初始化数据库成功.我正在尝试每 2 分钟运行一次简单的 dag.

I have setup Airflow a SQL Server as backend (SQL Azure). Init DB is successful. I am trying to run a simple dag every 2 minutes.

dag 有 2 个任务:

The dag has 2 tasks:

  • 打印日期
  • 睡觉
  • 当它启动气流调度器时,它会为两个任务创建任务实例,第一个成功&第二个似乎卡在运行"状态.

    When it start the airflow scheduler, it creates tasks instances for both the tasks, the first one succeeds & the second one seems to be stuck in "running" state.

    查看调度器日志,反复看到如下错误.

    Looking at scheduler logs, I see the following error repeatedly.

    [2019-01-04 11:38:48,253] {jobs.py:397} ERROR - Got an exception! Propagating... Traceback (most recent call last): File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/jobs.py", line 389, in helper pickle_dags) File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(*args, **kwargs) File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/jobs.py", line 1816, in process_file dag.sync_to_db() File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(*args, **kwargs) File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/models.py", line 4296, in sync_to_db DagModel).filter(DagModel.dag_id == self.dag_id).first() File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2755, in first ret = list(self[0:1]) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2547, in __getitem__ return list(res) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2855, in __iter__ return self._execute_and_instances(context) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2876, in _execute_and_instances close_with_result=True) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2885, in _get_bind_args **kw File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2867, in _connection_from_session conn = self.session.connection(**kw) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1019, in connection execution_options=execution_options) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1024, in _connection_for_bind engine, execution_options) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 409, in _connection_for_bind conn = bind.contextual_connect() File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2112, in contextual_connect self._wrap_pool_connect(self.pool.connect, None), File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2151, in _wrap_pool_connect e, dialect, self) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1465, in _handle_dbapi_exception_noconnection exc_info File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause reraise(type(exception), exception, tb=exc_tb, cause=cause) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2147, in _wrap_pool_connect return fn() File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 387, in connect return _ConnectionFairy._checkout(self) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 768, in _checkout fairy = _ConnectionRecord.checkout(pool) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 516, in checkout rec = pool._do_get() File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 1140, in _do_get self._dec_overflow() File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__ compat.reraise(exc_type, exc_value, exc_tb) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 1137, in _do_get return self._create_connection() File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 333, in _create_connection return _ConnectionRecord(self) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 461, in __init__ self.__connect(first_connect_check=True) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 651, in __connect connection = pool._invoke_creator(self) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/strategies.py", line 105, in connect return dialect.connect(*cargs, **cparams) File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/default.py", line 393, in connect return self.dbapi.connect(*cargs, **cparams) InterfaceError: (pyodbc.InterfaceError) ('28000', u"[28000] [unixODBC][Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Login failed for user 'airflowuser'. (18456) (SQLDriverConnect)")

    Airflow 配置为使用 LocalExecutor &pyodbc 连接到 SQL Azure

    Airflow is configured to use LocalExecutor & pyodbc to connect to SQL Azure

    # The executor class that airflow should use. Choices include # SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor #executor = SequentialExecutor executor = LocalExecutor # The SqlAlchemy connection string to the metadata database. # SqlAlchemy supports many different database engine, more information # their website #sql_alchemy_conn = sqlite:////home/sshuser/airflow/airflow.db #connection string to MS SQL Serv er sql_alchemy_conn = mssql+pyodbc://airflowuser@afdsqlserver76:<password>@afdsqlserver76.database.windows:1433/airflowdb?driver=ODBC+Driver+17+for+SQL+Server # The encoding for the databases sql_engine_encoding = utf-8 # If SqlAlchemy should pool database connections. sql_alchemy_pool_enabled = True # The SqlAlchemy pool size is the maximum number of database connections # in the pool. 0 indicates no limit. sql_alchemy_pool_size = 10 # The SqlAlchemy pool recycle is the number of seconds a connection # can be idle in the pool before it is invalidated. This config does # not apply to sqlite. If the number of DB connections is ever exceeded, # a lower config value will allow the system to recover faster. sql_alchemy_pool_recycle = 180 # How many seconds to retry re-establishing a DB connection after # disconnects. Setting this to 0 disables retries. sql_alchemy_reconnect_timeout = 300

    dag如下

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2019, 1, 4), 'email': ['airflow@example'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=1), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG('tutorial', default_args=default_args, schedule_interval='*/2 * * * *', max_active_runs=1, catchup=False) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) t2.set_upstream(t1)

    我有点不明白为什么调度程序在成功运行第一个任务后无法连接到数据库.非常感谢任何解决此问题的指针.

    I am a bit lost on why scheduler would not be able to connect to DB after it has run the first task successfully. Any pointers to resolve this is much appreciated.

    我有一个示例程序,它使用 sqlalchemy 使用相同的凭据连接到 SQL Azure这有效.

    I have a sample program that uses sqlalchemy to connect to SQL Azure using the same credentials & this works.

    import sqlalchemy from sqlalchemy import create_engine engine = create_engine("mssql+pyodbc://afdadmin@afdsqlserver76:<password>@afdsqlserver76.database.windows:1433/airflowdb?driver=ODBC+Driver+17+for+SQL+Server") connection = engine.connect() result = connection.execute("select version_num from alembic_version") for row in result: print("Version:", row['version_num']) connection.close()

    推荐答案

    在odbcinst.ini中设置Pooling = True后问题解决

    The issue was resolved after Pooling = True was set in odbcinst.ini

    [ODBC]池化 = 是

    [ODBC] Pooling = Yes

    更多推荐

    带有 SQL Server 后端和 pyodbc 的气流调度程序

    本文发布于:2023-11-23 19:20:08,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1622590.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:气流   后端   程序   SQL   Server

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!