我有一个问题,其中BashOperator没有记录wget的所有输出。它将只记录输出的前1-5行。
I have an issue where the BashOperator is not logging all of the output from wget. It'll log only the first 1-5 lines of the output.
我尝试使用wget作为bash命令来尝试此操作:
I have tried this with only wget as the bash command:
tester = BashOperator( task_id = 'testing', bash_command = "wget -N -r -nd --directory-prefix='/tmp/' apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip", dag = dag)我也尝试将其作为更长的bash脚本的一部分,该脚本具有跟随wget的其他命令。在触发下游任务之前,Airflow确实会等待脚本完成。这是一个bash脚本示例:
I've also tried this as part of a longer bash script that has other commands that follow wget. Airflow does wait for the script to complete before firing downstream tasks. Here's an example bash script:
#!/bin/bash echo "Starting up..." wget -N -r -nd --directory-prefix='/tmp/' apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip echo "Download complete..." unzip /tmp/httpcomponents-client-4.5.3-src.zip -o -d /tmp/test_airflow echo "Archive unzipped..."日志文件的最后几行:
[2017-04-13 18:33:34,214] {base_task_runner.py:95} INFO - Subtask: -------------------------------------------------------------------------------- [2017-04-13 18:33:34,214] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1 of 1 [2017-04-13 18:33:34,215] {base_task_runner.py:95} INFO - Subtask: -------------------------------------------------------------------------------- [2017-04-13 18:33:34,215] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:35,068] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:35,068] {models.py:1342} INFO - Executing <Task(BashOperator): testing> on 2017-04-13 18:33:08 [2017-04-13 18:33:37,569] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:37,569] {bash_operator.py:71} INFO - tmp dir root location: [2017-04-13 18:33:37,569] {base_task_runner.py:95} INFO - Subtask: /tmp [2017-04-13 18:33:37,571] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:37,571] {bash_operator.py:81} INFO - Temporary script location :/tmp/airflowtmpqZhPjB//tmp/airflowtmpqZhPjB/testingCkJgDE [2017-04-13 18:14:54,943] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,942] {bash_operator.py:82} INFO - Running command: /var/www/upstream/xtractor/scripts/Temp_test.sh [2017-04-13 18:14:54,951] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,950] {bash_operator.py:91} INFO - Output: [2017-04-13 18:14:54,955] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,954] {bash_operator.py:96} INFO - Starting up... [2017-04-13 18:14:54,958] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,957] {bash_operator.py:96} INFO - --2017-04-13 18:14:54-- apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip [2017-04-13 18:14:55,106] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,105] {bash_operator.py:96} INFO - Resolving apache.cs.utah.edu (apache.cs.utah.edu)... 155.98.64.87 [2017-04-13 18:14:55,186] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,186] {bash_operator.py:96} INFO - Connecting to apache.cs.utah.edu (apache.cs.utah.edu)|155.98.64.87|:80... connected. [2017-04-13 18:14:55,284] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,284] {bash_operator.py:96} INFO - HTTP request sent, awaiting response... 200 OK [2017-04-13 18:14:55,285] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,284] {bash_operator.py:96} INFO - Length: 1662639 (1.6M) [application/zip] [2017-04-13 18:15:01,485] {jobs.py:2083} INFO - Task exited with return code 0编辑:更多测试表明,记录wget的输出是一个问题。
More testing suggests that it's a problem logging the output of wget.
推荐答案这是因为在默认运算符中,仅打印最后一行。请在 airflow / operators / bash_operator.py 内安装以下代码的地方替换代码。通常,您需要查看python所在的位置,然后转到 site-packages
Its because in the default operator only last line is printed. Please replace the code with the following inside airflow/operators/bash_operator.py where ever your airflow is installed. Usually, you need to look in where your python is and then go to site-packages
from builtins import bytes import os import signal import logging from subprocess import Popen, STDOUT, PIPE from tempfile import gettempdir, NamedTemporaryFile from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.utils.file import TemporaryDirectory class BashOperator(BaseOperator): """ Execute a Bash script, command or set of commands. :param bash_command: The command, set of commands or reference to a bash script (must be '.sh') to be executed. :type bash_command: string :param xcom_push: If xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes. :type xcom_push: bool :param env: If env is not None, it must be a mapping that defines the environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated) :type env: dict :type output_encoding: output encoding of bash command """ template_fields = ('bash_command', 'env') template_ext = ('.sh', '.bash',) ui_color = '#f0ede4' @apply_defaults def __init__( self, bash_command, xcom_push=False, env=None, output_encoding='utf-8', *args, **kwargs): super(BashOperator, self).__init__(*args, **kwargs) self.bash_command = bash_command self.env = env self.xcom_push_flag = xcom_push self.output_encoding = output_encoding def execute(self, context): """ Execute the bash command in a temporary directory which will be cleaned afterwards """ bash_command = self.bash_command logging.info("tmp dir root location: \n" + gettempdir()) line_buffer = [] with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: f.write(bytes(bash_command, 'utf_8')) f.flush() fname = f.name script_location = tmp_dir + "/" + fname logging.info("Temporary script " "location :{0}".format(script_location)) logging.info("Running command: " + bash_command) sp = Popen( ['bash', fname], stdout=PIPE, stderr=STDOUT, cwd=tmp_dir, env=self.env, preexec_fn=os.setsid) self.sp = sp logging.info("Output:") line = '' for line in iter(sp.stdout.readline, b''): line = line.decode(self.output_encoding).strip() line_buffer.append(line) logging.info(line) sp.wait() logging.info("Command exited with " "return code {0}".format(sp.returncode)) if sp.returncode: raise AirflowException("Bash command failed") logging.info("\n".join(line_buffer)) if self.xcom_push_flag: return "\n".join(line_buffer) def on_kill(self): logging.info('Sending SIGTERM signal to bash process group') os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)更多推荐
Airflow BashOperator日志不包含完整输出
发布评论