在运行时在Airflow操作员中创建和使用连接

编程入门 行业动态 更新时间:2024-10-21 06:29:27
本文介绍了在运行时在Airflow操作员中创建和使用连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

注意:这与

  • 在运行时通过气流导出环境变量
  • 在运行时设置气流环境变量
  • Export environment variables at runtime with airflow
  • Set Airflow Env Vars at Runtime

我必须从 Airflow DAG触发远程系统上的某些任务。实现此目的的直接方法是 SSHHook 。

I have to trigger certain tasks at remote systems from my Airflow DAG. The straight-forward way to achieve this is SSHHook.

问题是远程系统是 EMR 群集,该群集本身是在运行时(通过上游任务)使用 EmrCreateJobFlowOperator 。因此,尽管我可以掌握已启动的EMR集群的 job_flow_id (使用 XCOM ),我需要的是 ssh_conn_id 传递给每个下游任务。

The problem is that the remote system is an EMR cluster which is itself created at runtime (by an upstream task) using EmrCreateJobFlowOperator. So while I can get hold of job_flow_id of the launched EMR cluster (using XCOM), what I need is to an ssh_conn_id to be passed to each downstream task.

文档和代码,很明显,Airflow会尝试寻找在 db 和环境变量中为此连接(使用 conn_id )做好了准备,所以现在问题可以归结为bei ng可以在运行时设置这两个属性中的任何一个(在内部和操作符中)。

Looking at the docs and code, it is evident that Airflow will try to look up for this connection (using conn_id) in db and environment variables, so now the problem boils down to being able to set either of these two properties at runtime (from within an operator).

这似乎是一个相当普遍的问题,因为如果无法实现,则会严重阻碍 EmrCreateJobFlowOperator 的实用程序;

This seems a rather common problem because if this isn't achievable then the utility of EmrCreateJobFlowOperator would be severely hampered; but I haven't come across any example demonstrating it.

  • 是否可以创建(并销毁)来自Airflow操作员中的任何一个?
  • Is it possible to create (and also destroy) either of these from within an Airflow operator?
  • 连接 (位于气流数据库中)
  • 环境变量(应此处的所有下游任务都可以访问,而不仅仅是当前任务。 a>)
  • Connection (persisted in Airflow's db)
  • Environment Variable (should be accessible to all downstream tasks and not just current task as told here)
  • 如果没有,我有什么选择?
  • 我在

    • 气流v1.10
    • Python 3.6.6
    • emr-5.15 (可以根据需要进行升级)
    • Airflow v1.10
    • Python 3.6.6
    • emr-5.15 (can upgrade if required)
    推荐答案

    连接来自ORM

    是的,您可以在运行时创建连接,即使在创建DAG时也要非常小心。 Airflow在其内部模型上是完全透明的,因此您可以直接与基础SqlAlchemy进行交互。正如最初在此答案中所举例说明的那样,它很简单:

    Connections come from the ORM

    Yes, you can create connections at runtime, even at DAG creation time if you're careful enough. Airflow is completely transparent on its internal models, so you can interact with the underlying SqlAlchemy directly. As exemplified originally in this answer, it's as easy as:

    from airflow.models import Connection from airflow import settings def create_conn(username, password, host=None): new_conn = Connection(conn_id=f'{username}_connection', login=username, host=host if host else None) new_conn.set_password(password) session = settings.Session() session.add(new_conn) sessionmit()

    您当然可以在其中与EMR连接可能需要的任何其他其他连接属性进行交互。

    Where you can, of course, interact with any other extra Connection properties you may require for the EMR connection.

    这不是Airflow或Python的限制,但是(每个主要OS的AFAIK)环境都与进程的生命周期息息相关。例如,当您以bash 导出变量时,您只是在说明生成子进程时,想要将该变量复制到孩子的环境。这意味着父进程在创建后就不能更改子级的环境,而子进程也不能更改子级的环境。

    This is not a limitation of Airflow or Python, but (AFAIK for every major OS) environments are bound to the lifetime of a process. When you export a variable in bash for example, you're simply stating that when you spawn child processes, you want to copy that variable to the child's environment. This means that the parent process can't change the child's environment after its creation and the child can't change the parents environment.

    总之,只有进程本身可以创建后更改其环境。并且考虑到工作进程是气流子进程,因此也很难控制其环境的创建。您可以做的是将环境变量写入文件,并在每次任务启动时有意使用该文件的替代更新当前环境。

    In short, only the process itself can change its environment after it's created. And considering that worker process are Airflow subprocesses, it's hard to control the creation of their environments as well. What you can do is to write the environment variables into a file and intentionally update the current environment with overrides from that file on each task start.

    更多推荐

    在运行时在Airflow操作员中创建和使用连接

    本文发布于:2023-11-24 04:31:46,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1623979.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:建和   操作员   中创   Airflow

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!