如何防止“执行失败:[Errno 32]管道损坏”?在气流中

编程入门 行业动态 更新时间:2024-10-27 18:29:41
本文介绍了如何防止“执行失败:[Errno 32]管道损坏”?在气流中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我刚刚开始使用Airflow来协调我们的ETL管道。

I just started using Airflow to coordinate our ETL pipeline.

我在运行dag时遇到管道错误。

I encountered the pipe error when I run a dag.

我已经在此处看到了一般的stackoverflow讨论。

I've seen a general stackoverflow discussion here.

我的案例更多地是在气流方面。根据该帖子中的讨论,可能的根本原因是:

My case is more on the Airflow side. According to the discussion in that post, the possible root cause is:

如果请求被阻止或$ b通常会发生管道断开错误$ b花费的时间太长,在请求端超时后,它将关闭连接,然后,当响应端(服务器)尝试向写入套接字时,它将抛出管道中断错误。

The broken pipe error usually occurs if your request is blocked or takes too long and after request-side timeout, it'll close the connection and then, when the respond-side (server) tries to write to the socket, it will throw a pipe broken error.

这可能是我真正的原因,我有一个pythonoperator将在Airflow之外启动另一个工作,工作可能非常漫长(即10个小时以上),我想知道Airflow中可以用来防止此错误的机制是什么。

This might be the real cause in my case, I have a pythonoperator that will start another job outside of Airflow, and that job could be very lengthy (i.e. 10+ hours), I wonder if what is the mechanism in place in Airflow that I can leverage to prevent this error.

有人可以帮忙吗? ?

Can anyone help?

UPDATE1 20190303-1:

UPDATE1 20190303-1:

感谢@ y2k-shubham的SSHOperator,我可以使用它可以成功建立SSH连接,并能够在远程站点上运行一些简单的命令(实际上,默认ssh连接必须设置为localhost,因为该作业位于localhost上),并且能够看到正确的主机名, pwd 。

Thanks to @y2k-shubham for the SSHOperator, I am able to use it to set up a SSH connection successfully and am able to run some simple commands on the remote site (indeed the default ssh connection has to be set to localhost because the job is on the localhost) and am able to see the correct result of hostname, pwd.

但是,当我尝试运行实际作业,我收到相同的错误,同样,该错误来自jpipeline ob而不是Airflow dag / task。

However, when I attempted to run the actual job, I received same error, again, the error is from the jpipeline ob instead of the Airflow dag/task.

UPDATE2:20190303-2

UPDATE2: 20190303-2

我成功进行了一次运行(气流测试),没有错误,然后又进行了另一次失败的运行(调度程序),但管道中也出现了同样的错误。

I had a successful run (airflow test) with no error, and then followed another failed run (scheduler) with same error from pipeline.

推荐答案

虽然我建议您继续寻找一种优美的方式来实现目标您想要的是,我根据要求提供了示例用法

While I'd suggest you keep looking for a more graceful way of trying to achieve what you want, I'm putting up example usage as requested

首先,您已经创建 SSHHook 。可以通过两种方式完成此操作

First you've got to create an SSHHook. This can be done in two ways

  • 提供所有必需设置(如主机,用户,密码)的常规方式(如果需要)等来自客户端代码的实例化钩子。我在此引用 test_ssh_hook.py ,但是您必须彻底通过 SSHHook 及其测试来了解所有可能的用法

  • The conventional way where you supply all requisite settings like host, user, password (if needed) etc from the client code where you are instantiating the hook. Im hereby citing an example from test_ssh_hook.py, but you must thoroughly go through SSHHook as well as its tests to understand all possible usages

ssh_hook = SSHHook(remote_host="remote_host", port="port", username="username", timeout=10, key_file="fake.file")

  • Airflow 方式,将所有连接详细信息放在 连接 对象,该对象可以通过UI进行管理,并且只能通过 conn_id 实例化您的钩子

  • The Airflow way where you put all connection details inside a Connection object that can be managed from UI and only pass it's conn_id to instantiate your hook

    ssh_hook = SSHHook(ssh_conn_id="my_ssh_conn_id")

    当然,如果您依赖 SSHOperator ,则可以直接将 ssh_conn_id 传递给运算符。

    Of course, if your'e relying on SSHOperator, then you can directly pass the ssh_conn_id to operator.

    ssh_operator = SSHOperator(ssh_conn_id="my_ssh_conn_id")

  • 现在,如果您打算执行专用任务通过 SSH 的命令,您可以使用 SSHOperator 。我再次引用 test_ssh_operator.py ,但请仔细阅读源代码以获得更好的图片。

    Now if your'e planning to have a dedicated task for running a command over SSH, you can use SSHOperator. Again I'm citing an example from test_ssh_operator.py, but go through the sources for a better picture.

    task = SSHOperator(task_id="test", command="echo -n airflow", dag=self.dag, timeout=10, ssh_conn_id="ssh_default")

    但是随后,您可能希望通过SSH运行命令,这是更大任务的一部分。在这种情况下,您不希望使用 SSHOperator ,仍然可以只使用 SSHHook 。 get_conn() 方法为您提供 paramiko SSHClient 。这样,您可以使用 exec_command() 调用

    But then you might want to run a command over SSH as a part of your bigger task. In that case, you don't want an SSHOperator, you can still use just the SSHHook. The get_conn() method of SSHHook provides you an instance of paramiko SSHClient. With this you can run a command using exec_command() call

    my_command = "echo airflow" stdin, stdout, stderr = ssh_client.exec_command( command=my_command, get_pty=my_command.startswith("sudo"), timeout=10)

    如果您查看 SSHOperator 的 execute() 方法,它是一个相当复杂(但可靠)的方法代码试图实现一个非常简单的事情。对于我自己的用途,我创建了一些摘录,您可能想看看

    • 用于独立于 SSHOperator ,请查看 ssh_utils.py
    • 对于运行多个命令的操作员SSH(您可以使用 bash 的& 运算符),请参见 MultiCmdSSHOperator
    • For using SSHHook independently of SSHOperator, have a look at ssh_utils.py
    • For an operator that runs multiple commands over SSH (you can achieve the same thing by using bash's && operator), see MultiCmdSSHOperator

    更多推荐

    如何防止“执行失败:[Errno 32]管道损坏”?在气流中

    本文发布于:2023-11-24 03:10:50,感谢您对本站的认可!
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:气流   管道   如何防止   Errno

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!