使用DockerOperator时如何同时使用xcom

编程入门 行业动态 更新时间:2024-10-25 16:29:01
本文介绍了使用DockerOperator时如何同时使用xcom_push = True和auto_remove = True?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

当运行 DockerOperator 和 xcom_push = True , xcom_all = True 和 auto_remove = True ,该任务会引发错误,就像读取之前删除了容器一样其 STDOUT 。

When running DockerOperator with xcom_push=True, xcom_all=True and auto_remove=True, the task raises an error as if the container is deleted before reading its STDOUT.

以以下DAG为例:

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.docker_operator import DockerOperator from airflow.operators.python_operator import PythonOperator # Default (but overridable) arguments for Operators instantiations default_args = { 'owner': 'Satan', 'depends_on_past': False, 'start_date': datetime(2019, 11, 28), 'retry_delay': timedelta(seconds=2), } # DAG definition def createDockerOperatorTask(xcom_all, auto_remove, id_suffix): return DockerOperator( # Default args task_id="docker_operator" + id_suffix, image='centos:latest', container_name="container" + id_suffix, api_version='auto', command="echo 'FALSE';", docker_url="unix://var/run/docker.sock", network_mode="bridge", xcom_push=True, xcom_all=xcom_all, auto_remove=auto_remove, ) # Use dag as python context so all tasks are "automagically" linked (in no specific order) to it with DAG('docker_operator_xcom', default_args=default_args, schedule_interval=timedelta(days=1)) as dag: t1 = createDockerOperatorTask(xcom_all=True, auto_remove=True, id_suffix="_1") t2 = createDockerOperatorTask(xcom_all=True, auto_remove=False, id_suffix="_2") t3 = createDockerOperatorTask(xcom_all=False, auto_remove=True, id_suffix="_3") # Set tasks precedence dag >> t1 dag >> t2 dag >> t3

如果我们运行它,则第一个任务失败,另外两个成功。不过,唯一可以正确运行的是 docker_container_3 ,因为它可以正确设置 xcom_value 而 docker_container_2 不会。这给我一种感觉,它会尝试读取 STDOUT ,并且当它不能读取时,它也不会失败(因为它应该像 docker_container_1 )。

If we run it, the first task fails and the other 2 succeed. Nevertheless, the only one that runs "correctly" is docker_container_3 because it sets the xcom_value correctly while docker_container_2 doesn't. This gives me the feeling that it "tries" to read the STDOUT and when it can't, it doesn't fail (as it should, as docker_container_1).

*** Log file does not exist: /usr/local/airflow/logs/docker_operator_xcom/docker_operator_1/2019-12-04T20:24:21.180209+00:00/1.log *** Fetching from: 5df603088df3:8793/log/docker_operator_xcom/docker_operator_1/2019-12-04T20:24:21.180209+00:00/1.log [2019-12-04 20:24:24,959] {{taskinstance.py:630}} INFO - Dependencies all met for <TaskInstance: docker_operator_xcom.docker_operator_1 2019-12-04T20:24:21.180209+00:00 [queued]> [2019-12-04 20:24:24,984] {{taskinstance.py:630}} INFO - Dependencies all met for <TaskInstance: docker_operator_xcom.docker_operator_1 2019-12-04T20:24:21.180209+00:00 [queued]> [2019-12-04 20:24:24,984] {{taskinstance.py:841}} INFO - -------------------------------------------------------------------------------- [2019-12-04 20:24:24,984] {{taskinstance.py:842}} INFO - Starting attempt 1 of 1 [2019-12-04 20:24:24,985] {{taskinstance.py:843}} INFO - -------------------------------------------------------------------------------- [2019-12-04 20:24:24,998] {{taskinstance.py:862}} INFO - Executing <Task(DockerOperator): docker_operator_1> on 2019-12-04T20:24:21.180209+00:00 [2019-12-04 20:24:24,998] {{base_task_runner.py:133}} INFO - Running: ['airflow', 'run', 'docker_operator_xcom', 'docker_operator_1', '2019-12-04T20:24:21.180209+00:00', '--job_id', '72', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/qm_operators/exp_5_prueba.py', '--cfg_path', '/tmp/tmp4_eb_wcg'] [2019-12-04 20:24:25,987] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 [2019-12-04 20:24:25,986] {{settings.py:252}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=1037 [2019-12-04 20:24:26,006] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 /usr/local/lib/python3.7/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <initd/psycopg/docs/install.html#binary-install-from-pypi>. [2019-12-04 20:24:26,006] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 """) [2019-12-04 20:24:26,838] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 [2019-12-04 20:24:26,838] {{__init__.py:51}} INFO - Using executor CeleryExecutor [2019-12-04 20:24:26,841] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 [2019-12-04 20:24:26,838] {{dagbag.py:92}} INFO - Filling up the DagBag from /usr/local/airflow/dags/qm_operators/exp_5_prueba.py [2019-12-04 20:24:26,982] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 [2019-12-04 20:24:26,982] {{cli.py:545}} INFO - Running <TaskInstance: docker_operator_xcom.docker_operator_1 2019-12-04T20:24:21.180209+00:00 [running]> on host 5df603088df3 [2019-12-04 20:24:27,001] {{docker_operator.py:201}} INFO - Starting docker container from image centos:latest [2019-12-04 20:24:27,519] {{logging_mixin.py:112}} INFO - Attachs: [] [2019-12-04 20:24:27,575] {{taskinstance.py:1058}} ERROR - 404 Client Error: Not Found ("No such container: 635f096a834e1fa20f4252287161f7a4765eed0f2aec706c1e5859e6c50dbdbe") Traceback (most recent call last): File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 261, in _raise_for_status response.raise_for_status() File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 940, in raise_for_status raise HTTPError(http_error_msg, response=self) requests.exceptions.HTTPError: 404 Client Error: Not Found for url: http+docker://localhost/v1.39/containers/635f096a834e1fa20f4252287161f7a4765eed0f2aec706c1e5859e6c50dbdbe/json During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task result = task_copy.execute(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/operators/docker_operator.py", line 264, in execute if self.xcom_all else str(line) File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 19, in wrapped return f(self, resource_id, *args, **kwargs) File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/container.py", line 855, in logs output = self._get_result(container, stream, res) File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 451, in _get_result return self._get_result_tty(stream, res, self._check_is_tty(container)) File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 19, in wrapped return f(self, resource_id, *args, **kwargs) File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 447, in _check_is_tty cont = self.inspect_container(container) File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 19, in wrapped return f(self, resource_id, *args, **kwargs) File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/container.py", line 758, in inspect_container self._get(self._url("/containers/{0}/json", container)), True File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 267, in _result self._raise_for_status(response) File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 263, in _raise_for_status raise create_api_error_from_http_exception(e) File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/errors.py", line 31, in create_api_error_from_http_exception raise cls(e, response=response, explanation=explanation) docker.errors.NotFound: 404 Client Error: Not Found ("No such container: 635f096a834e1fa20f4252287161f7a4765eed0f2aec706c1e5859e6c50dbdbe") [2019-12-04 20:24:27,583] {{taskinstance.py:1089}} INFO - Marking task as FAILED. [2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 Traceback (most recent call last): [2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 261, in _raise_for_status [2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 response.raise_for_status() [2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 940, in raise_for_status [2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 raise HTTPError(http_error_msg, response=self) [2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 requests.exceptions.HTTPError: 404 Client Error: Not Found for url: http+docker://localhost/v1.39/containers/635f096a834e1fa20f4252287161f7a4765eed0f2aec706c1e5859e6c50dbdbe/json [2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 [2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 During handling of the above exception, another exception occurred: [2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 [2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 Traceback (most recent call last): [2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/bin/airflow", line 37, in <module> [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 args.func(args) [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 return f(*args, **kwargs) [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 551, in run [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 _run(args, dag, ti) [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 469, in _run [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 pool=args.pool, [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 return func(*args, **kwargs) [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 result = task_copy.execute(context=context) [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/airflow/operators/docker_operator.py", line 264, in execute [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 if self.xcom_all else str(line) [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 19, in wrapped [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 return f(self, resource_id, *args, **kwargs) [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/container.py", line 855, in logs [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 output = self._get_result(container, stream, res) [2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 451, in _get_result [2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 return self._get_result_tty(stream, res, self._check_is_tty(container)) [2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 19, in wrapped [2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 return f(self, resource_id, *args, **kwargs) [2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 447, in _check_is_tty [2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 cont = self.inspect_container(container) [2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 19, in wrapped [2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 return f(self, resource_id, *args, **kwargs) [2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/container.py", line 758, in inspect_container [2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 self._get(self._url("/containers/{0}/json", container)), True [2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 267, in _result [2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 self._raise_for_status(response) [2019-12-04 20:24:27,649] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 263, in _raise_for_status [2019-12-04 20:24:27,649] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 raise create_api_error_from_http_exception(e) [2019-12-04 20:24:27,649] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/errors.py", line 31, in create_api_error_from_http_exception [2019-12-04 20:24:27,649] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 raise cls(e, response=response, explanation=explanation) [2019-12-04 20:24:27,649] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 docker.errors.NotFound: 404 Client Error: Not Found ("No such container: 635f096a834e1fa20f4252287161f7a4765eed0f2aec706c1e5859e6c50dbdbe") [2019-12-04 20:24:29,953] {{logging_mixin.py:112}} INFO - [2019-12-04 20:24:29,952] {{local_task_job.py:124}} WARNING - Time since last heartbeat(0.01 s) < heartrate(5.0 s), sleeping for 4.989579 s [2019-12-04 20:24:34,948] {{logging_mixin.py:112}} INFO - [2019-12-04 20:24:34,947] {{local_task_job.py:103}} INFO - Task exited with return code 1

任务日志 docker_operator_2 ,其中 xcom_push = True , xcom_all = True 和 auto_remove = False

Log of task docker_operator_2 with xcom_push=True, xcom_all=True and auto_remove=False

*** Log file does not exist: /usr/local/airflow/logs/docker_operator_xcom/docker_operator_2/2019-12-04T20:24:21.180209+00:00/1.log *** Fetching from: 5df603088df3:8793/log/docker_operator_xcom/docker_operator_2/2019-12-04T20:24:21.180209+00:00/1.log [2019-12-04 20:24:24,794] {{taskinstance.py:630}} INFO - Dependencies all met for <TaskInstance: docker_operator_xcom.docker_operator_2 2019-12-04T20:24:21.180209+00:00 [queued]> [2019-12-04 20:24:24,829] {{taskinstance.py:630}} INFO - Dependencies all met for <TaskInstance: docker_operator_xcom.docker_operator_2 2019-12-04T20:24:21.180209+00:00 [queued]> [2019-12-04 20:24:24,829] {{taskinstance.py:841}} INFO - -------------------------------------------------------------------------------- [2019-12-04 20:24:24,829] {{taskinstance.py:842}} INFO - Starting attempt 1 of 1 [2019-12-04 20:24:24,829] {{taskinstance.py:843}} INFO - -------------------------------------------------------------------------------- [2019-12-04 20:24:24,842] {{taskinstance.py:862}} INFO - Executing <Task(DockerOperator): docker_operator_2> on 2019-12-04T20:24:21.180209+00:00 [2019-12-04 20:24:24,843] {{base_task_runner.py:133}} INFO - Running: ['airflow', 'run', 'docker_operator_xcom', 'docker_operator_2', '2019-12-04T20:24:21.180209+00:00', '--job_id', '71', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/qm_operators/exp_5_prueba.py', '--cfg_path', '/tmp/tmpeq9uc4kw'] [2019-12-04 20:24:26,174] {{base_task_runner.py:115}} INFO - Job 71: Subtask docker_operator_2 [2019-12-04 20:24:26,173] {{settings.py:252}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=1035 [2019-12-04 20:24:26,226] {{base_task_runner.py:115}} INFO - Job 71: Subtask docker_operator_2 /usr/local/lib/python3.7/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <initd/psycopg/docs/install.html#binary-install-from-pypi>. [2019-12-04 20:24:26,226] {{base_task_runner.py:115}} INFO - Job 71: Subtask docker_operator_2 """) [2019-12-04 20:24:27,685] {{base_task_runner.py:115}} INFO - Job 71: Subtask docker_operator_2 [2019-12-04 20:24:27,678] {{__init__.py:51}} INFO - Using executor CeleryExecutor [2019-12-04 20:24:27,685] {{base_task_runner.py:115}} INFO - Job 71: Subtask docker_operator_2 [2019-12-04 20:24:27,678] {{dagbag.py:92}} INFO - Filling up the DagBag from /usr/local/airflow/dags/qm_operators/exp_5_prueba.py [2019-12-04 20:24:27,973] {{base_task_runner.py:115}} INFO - Job 71: Subtask docker_operator_2 [2019-12-04 20:24:27,971] {{cli.py:545}} INFO - Running <TaskInstance: docker_operator_xcom.docker_operator_2 2019-12-04T20:24:21.180209+00:00 [running]> on host 5df603088df3 [2019-12-04 20:24:28,017] {{docker_operator.py:201}} INFO - Starting docker container from image centos:latest [2019-12-04 20:24:28,643] {{logging_mixin.py:112}} INFO - Attachs: [] [2019-12-04 20:24:29,783] {{logging_mixin.py:112}} INFO - [2019-12-04 20:24:29,782] {{local_task_job.py:124}} WARNING - Time since last heartbeat(0.01 s) < heartrate(5.0 s), sleeping for 4.989846 s [2019-12-04 20:24:34,780] {{logging_mixin.py:112}} INFO - [2019-12-04 20:24:34,779] {{local_task_job.py:103}} INFO - Task exited with return code 0

任务日志 docker_operator_3 ,其中 xcom_push = True , xcom_all = False 和 auto_remove = True

Log of task docker_operator_3 with xcom_push=True, xcom_all=False and auto_remove=True

*** Log file does not exist: /usr/local/airflow/logs/docker_operator_xcom/docker_operator_3/2019-12-04T20:24:21.180209+00:00/1.log *** Fetching from: 5df603088df3:8793/log/docker_operator_xcom/docker_operator_3/2019-12-04T20:24:21.180209+00:00/1.log [2019-12-04 20:24:24,992] {{taskinstance.py:630}} INFO - Dependencies all met for <TaskInstance: docker_operator_xcom.docker_operator_3 2019-12-04T20:24:21.180209+00:00 [queued]> [2019-12-04 20:24:25,031] {{taskinstance.py:630}} INFO - Dependencies all met for <TaskInstance: docker_operator_xcom.docker_operator_3 2019-12-04T20:24:21.180209+00:00 [queued]> [2019-12-04 20:24:25,032] {{taskinstance.py:841}} INFO - -------------------------------------------------------------------------------- [2019-12-04 20:24:25,032] {{taskinstance.py:842}} INFO - Starting attempt 1 of 1 [2019-12-04 20:24:25,032] {{taskinstance.py:843}} INFO - -------------------------------------------------------------------------------- [2019-12-04 20:24:25,054] {{taskinstance.py:862}} INFO - Executing <Task(DockerOperator): docker_operator_3> on 2019-12-04T20:24:21.180209+00:00 [2019-12-04 20:24:25,055] {{base_task_runner.py:133}} INFO - Running: ['airflow', 'run', 'docker_operator_xcom', 'docker_operator_3', '2019-12-04T20:24:21.180209+00:00', '--job_id', '73', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/qm_operators/exp_5_prueba.py', '--cfg_path', '/tmp/tmp94dzo8w7'] [2019-12-04 20:24:26,219] {{base_task_runner.py:115}} INFO - Job 73: Subtask docker_operator_3 [2019-12-04 20:24:26,219] {{settings.py:252}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=1039 [2019-12-04 20:24:26,294] {{base_task_runner.py:115}} INFO - Job 73: Subtask docker_operator_3 /usr/local/lib/python3.7/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <initd/psycopg/docs/install.html#binary-install-from-pypi>. [2019-12-04 20:24:26,294] {{base_task_runner.py:115}} INFO - Job 73: Subtask docker_operator_3 """) [2019-12-04 20:24:27,549] {{base_task_runner.py:115}} INFO - Job 73: Subtask docker_operator_3 [2019-12-04 20:24:27,548] {{__init__.py:51}} INFO - Using executor CeleryExecutor [2019-12-04 20:24:27,549] {{base_task_runner.py:115}} INFO - Job 73: Subtask docker_operator_3 [2019-12-04 20:24:27,549] {{dagbag.py:92}} INFO - Filling up the DagBag from /usr/local/airflow/dags/qm_operators/exp_5_prueba.py [2019-12-04 20:24:27,722] {{base_task_runner.py:115}} INFO - Job 73: Subtask docker_operator_3 [2019-12-04 20:24:27,721] {{cli.py:545}} INFO - Running <TaskInstance: docker_operator_xcom.docker_operator_3 2019-12-04T20:24:21.180209+00:00 [running]> on host 5df603088df3 [2019-12-04 20:24:27,754] {{docker_operator.py:201}} INFO - Starting docker container from image centos:latest [2019-12-04 20:24:28,329] {{logging_mixin.py:112}} INFO - Attachs: [] [2019-12-04 20:24:29,979] {{logging_mixin.py:112}} INFO - [2019-12-04 20:24:29,979] {{local_task_job.py:124}} WARNING - Time since last heartbeat(0.01 s) < heartrate(5.0 s), sleeping for 4.989138 s [2019-12-04 20:24:34,974] {{logging_mixin.py:112}} INFO - [2019-12-04 20:24:34,974] {{local_task_job.py:103}} INFO - Task exited with return code 0

XComs docker_operator_2

XComs of docker_operator_2

即使设置 auto_remove = False ,如 docker_container_2 ,使任务成功并正确设置XCom,从不删除容器,并且以后的DAG运行将失败,因为旧运行的容器将与容器冲突新运行。

Even though setting auto_remove=False, as in docker_container_2, makes the task succeed and sets the XCom correctly, the container is never removed and future runs of the DAG will fail as the container of the old run will conflict with the container of the new runs.

一种解决方法是在下游添加一个删除容器的任务,但这并不是干净的任务。

A workaround for this is to add a task downstream that deletes the container, but it's not "clean".

是否可以通过 xcom_push = True 和 auto_remove = True 运行DockerOperator

Is there a way to run DockerOperator with xcom_push=True and auto_remove=True at the same time?

推荐答案

阅读 Docker运算符来源,我不这么认为。它调用Docker API客户端 wait 方法,然后调用日志 方法。

Reading the Docker operator source, I don't think so. It calls the Docker API clients wait method and then calls the logs method afterwards.

但是,文档 auto_remove 指出:

在以下情况下启用自动删除守护进程端容器的功能:容器的进程退出。

enable auto-removal of the container on daemon side when the container’s process exits.

因此,一旦操作员对等待的调用完成, ,该容器将被删除,您将无法为其检索日志。

So as soon as the operator's call to wait completes, the container will be removed and you won't be able to retrieve the logs for it.

更多推荐

使用DockerOperator时如何同时使用xcom

本文发布于:2023-11-24 05:39:06,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1624129.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:DockerOperator   xcom

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!