Airflow BashOperator日志不包含完整输出

编程入门 行业动态 更新时间:2024-10-25 00:24:11
本文介绍了Airflow BashOperator日志不包含完整输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个问题,其中BashOperator没有记录wget的所有输出。它将只记录输出的前1-5行。

I have an issue where the BashOperator is not logging all of the output from wget. It'll log only the first 1-5 lines of the output.

我尝试使用wget作为bash命令来尝试此操作:

I have tried this with only wget as the bash command:

tester = BashOperator( task_id = 'testing', bash_command = "wget -N -r -nd --directory-prefix='/tmp/' apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip", dag = dag)

我也尝试将其作为更长的bash脚本的一部分,该脚本具有跟随wget的其他命令。在触发下游任务之前,Airflow确实会等待脚本完成。这是一个bash脚本示例:

I've also tried this as part of a longer bash script that has other commands that follow wget. Airflow does wait for the script to complete before firing downstream tasks. Here's an example bash script:

#!/bin/bash echo "Starting up..." wget -N -r -nd --directory-prefix='/tmp/' apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip echo "Download complete..." unzip /tmp/httpcomponents-client-4.5.3-src.zip -o -d /tmp/test_airflow echo "Archive unzipped..."

日志文件的最后几行:

[2017-04-13 18:33:34,214] {base_task_runner.py:95} INFO - Subtask: -------------------------------------------------------------------------------- [2017-04-13 18:33:34,214] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1 of 1 [2017-04-13 18:33:34,215] {base_task_runner.py:95} INFO - Subtask: -------------------------------------------------------------------------------- [2017-04-13 18:33:34,215] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:35,068] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:35,068] {models.py:1342} INFO - Executing <Task(BashOperator): testing> on 2017-04-13 18:33:08 [2017-04-13 18:33:37,569] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:37,569] {bash_operator.py:71} INFO - tmp dir root location: [2017-04-13 18:33:37,569] {base_task_runner.py:95} INFO - Subtask: /tmp [2017-04-13 18:33:37,571] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:37,571] {bash_operator.py:81} INFO - Temporary script location :/tmp/airflowtmpqZhPjB//tmp/airflowtmpqZhPjB/testingCkJgDE [2017-04-13 18:14:54,943] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,942] {bash_operator.py:82} INFO - Running command: /var/www/upstream/xtractor/scripts/Temp_test.sh [2017-04-13 18:14:54,951] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,950] {bash_operator.py:91} INFO - Output: [2017-04-13 18:14:54,955] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,954] {bash_operator.py:96} INFO - Starting up... [2017-04-13 18:14:54,958] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,957] {bash_operator.py:96} INFO - --2017-04-13 18:14:54-- apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip [2017-04-13 18:14:55,106] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,105] {bash_operator.py:96} INFO - Resolving apache.cs.utah.edu (apache.cs.utah.edu)... 155.98.64.87 [2017-04-13 18:14:55,186] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,186] {bash_operator.py:96} INFO - Connecting to apache.cs.utah.edu (apache.cs.utah.edu)|155.98.64.87|:80... connected. [2017-04-13 18:14:55,284] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,284] {bash_operator.py:96} INFO - HTTP request sent, awaiting response... 200 OK [2017-04-13 18:14:55,285] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,284] {bash_operator.py:96} INFO - Length: 1662639 (1.6M) [application/zip] [2017-04-13 18:15:01,485] {jobs.py:2083} INFO - Task exited with return code 0

编辑:更多测试表明,记录wget的输出是一个问题。

More testing suggests that it's a problem logging the output of wget.

推荐答案

这是因为在默认运算符中,仅打印最后一行。请在 airflow / operators / bash_operator.py 内安装以下代码的地方替换代码。通常,您需要查看python所在的位置,然后转到 site-packages

Its because in the default operator only last line is printed. Please replace the code with the following inside airflow/operators/bash_operator.py where ever your airflow is installed. Usually, you need to look in where your python is and then go to site-packages

from builtins import bytes import os import signal import logging from subprocess import Popen, STDOUT, PIPE from tempfile import gettempdir, NamedTemporaryFile from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.utils.file import TemporaryDirectory class BashOperator(BaseOperator): """ Execute a Bash script, command or set of commands. :param bash_command: The command, set of commands or reference to a bash script (must be '.sh') to be executed. :type bash_command: string :param xcom_push: If xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes. :type xcom_push: bool :param env: If env is not None, it must be a mapping that defines the environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated) :type env: dict :type output_encoding: output encoding of bash command """ template_fields = ('bash_command', 'env') template_ext = ('.sh', '.bash',) ui_color = '#f0ede4' @apply_defaults def __init__( self, bash_command, xcom_push=False, env=None, output_encoding='utf-8', *args, **kwargs): super(BashOperator, self).__init__(*args, **kwargs) self.bash_command = bash_command self.env = env self.xcom_push_flag = xcom_push self.output_encoding = output_encoding def execute(self, context): """ Execute the bash command in a temporary directory which will be cleaned afterwards """ bash_command = self.bash_command logging.info("tmp dir root location: \n" + gettempdir()) line_buffer = [] with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: f.write(bytes(bash_command, 'utf_8')) f.flush() fname = f.name script_location = tmp_dir + "/" + fname logging.info("Temporary script " "location :{0}".format(script_location)) logging.info("Running command: " + bash_command) sp = Popen( ['bash', fname], stdout=PIPE, stderr=STDOUT, cwd=tmp_dir, env=self.env, preexec_fn=os.setsid) self.sp = sp logging.info("Output:") line = '' for line in iter(sp.stdout.readline, b''): line = line.decode(self.output_encoding).strip() line_buffer.append(line) logging.info(line) sp.wait() logging.info("Command exited with " "return code {0}".format(sp.returncode)) if sp.returncode: raise AirflowException("Bash command failed") logging.info("\n".join(line_buffer)) if self.xcom_push_flag: return "\n".join(line_buffer) def on_kill(self): logging.info('Sending SIGTERM signal to bash process group') os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)

更多推荐

Airflow BashOperator日志不包含完整输出

本文发布于:2023-10-26 09:49:46,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1529801.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:不包含   完整   日志   Airflow   BashOperator

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!