Airflow 1.9

编程入门 行业动态 更新时间:2024-10-15 02:31:42
本文介绍了Airflow 1.9-无法将日志写入S3的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在用Kubernetes在aws中运行airflow 1.9。我希望日志能够转到s3,因为气流容器本身寿命不长。

I'm running airflow 1.9 in kubernetes in aws. I would like the logs to go to s3 as the airflow containers themselves are not long lived.

我已经阅读了描述该过程的各种线程和文档,但我仍然无法正常工作。首先进行一个测试,向我证明s3配置和权限有效。这是在我们的一个工作实例上运行的。

I've read the various threads and documents which describe the process but I still cannot get it working. First a test that demonstrates to me that the s3 configuration and permissions are valid. This is run on one of our worker instances.

使用气流写入s3文件

airflow@airflow-worker-847c66d478-lbcn2:~$ id uid=1000(airflow) gid=1000(airflow) groups=1000(airflow) airflow@airflow-worker-847c66d478-lbcn2:~$ env |grep s3 AIRFLOW__CONN__S3_LOGS=s3://vevo-dev-us-east-1-services-airflow/logs/ AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_logs AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://vevo-dev-us-east-1-services-airflow/logs/ airflow@airflow-worker-847c66d478-lbcn2:~$ python Python 3.6.4 (default, Dec 21 2017, 01:37:56) [GCC 4.9.2] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import airflow >>> s3 = airflow.hooks.S3Hook('s3_logs') /usr/local/lib/python3.6/site-packages/airflow/utils/helpers.py:351: DeprecationWarning: Importing S3Hook directly from <module 'airflow.hooks' from '/usr/local/lib/python3.6/site-packages/airflow/hooks/__init__.py'> has been deprecated. Please import from '<module 'airflow.hooks' from '/usr/local/lib/python3.6/site-packages/airflow/hooks/__init__.py'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0. DeprecationWarning) >>> s3.load_string('put this in s3 file', airflow.conf.get('core', 'remote_base_log_folder') + "/airflow-test") [2018-02-23 18:43:58,437] {{base_hook.py:80}} INFO - Using connection to: vevo-dev-us-east-1-services-airflow

现在让我们从s3检索文件并查看内容。我们可以在这里看到一切看起来很好。

Now let's retrieve the file from s3 and look at the contents. We can see everything looks good here.

root@4f8171d4fe47:/# aws s3 cp s3://vevo-dev-us-east-1-services-airflow/logs//airflow-test . download: s3://vevo-dev-us-east-1-services-airflow/logs//airflow-test to ./airflow-test root@4f8171d4fe47:/# cat airflow-test put this in s3 fileroot@4f8171d4fe47:/stringer#

所以好像是气流s3连接良好,除了气流作业不使用s3进行记录。这是我所拥有的设置,我认为它们是错误的或丢失了某些东西。

So it seems like the airflow s3 connection is good except airflow jobs do not use s3 for logging. Here are the settings I have which I figure something is either wrong or I am missing something.

正在运行的worker / scheduler / master实例的环境变量是

Env vars of running worker/scheduler/master instances are

airflow@airflow-worker-847c66d478-lbcn2:~$ env |grep -i s3 AIRFLOW__CONN__S3_LOGS=s3://vevo-dev-us-east-1-services-airflow/logs/ AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_logs AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://vevo-dev-us-east-1-services-airflow/logs/ S3_BUCKET=vevo-dev-us-east-1-services-airflow

这表明s3_logs连接气流中存在

This shows that the s3_logs connection exists in airflow

airflow@airflow-worker-847c66d478-lbcn2:~$ airflow connections -l|grep s3 │ 's3_logs' │ 's3' │ 'vevo-dev- us-...vices-airflow' │ None │ False │ False │ None │

我将此文件放在 github/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py 就地在我的docker映像中。您可以在我们的一位工人上看到一个示例

I put this file github/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py in place in my docker image. You can see an example here on one of our workers

airflow@airflow-worker-847c66d478-lbcn2:~$ ls -al /usr/local/airflow/config/ total 32 drwxr-xr-x. 2 root root 4096 Feb 23 00:39 . drwxr-xr-x. 1 airflow airflow 4096 Feb 23 00:53 .. -rw-r--r--. 1 root root 4471 Feb 23 00:25 airflow_local_settings.py -rw-r--r--. 1 root root 0 Feb 16 21:35 __init__.py

我们已编辑文件以定义REMOTE_BASE_LOG_FOLDER变量。这是我们版本与上游版本之间的区别

We have edited the file to define the REMOTE_BASE_LOG_FOLDER variable. Here is the diff between our version and the upstream version

index 899e815..897d2fd 100644 --- a/var/tmp/file +++ b/config/airflow_local_settings.py @@ -35,7 +35,8 @@ PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log' # Storage bucket url for remote logging # s3 buckets should start with "s3://" # gcs buckets should start with "gs://" -REMOTE_BASE_LOG_FOLDER = '' +REMOTE_BASE_LOG_FOLDER = conf.get('core', 'remote_base_log_folder') + DEFAULT_LOGGING_CONFIG = { 'version': 1,

在这里您可以看到我们其中一位工人的设置正确。

Here you can see that the setting is correct on one of our workers.

>>> import airflow >>> airflow.conf.get('core', 'remote_base_log_folder') 's3://vevo-dev-us-east-1-services-airflow/logs/'

基于REMOTE_BASE_LOG_FOLDER以 s3开头且REMOTE_LOGGING为真的事实

Based on the fact that REMOTE_BASE_LOG_FOLDER starts with 's3' and REMOTE_LOGGING is True

>>> airflow.conf.get('core', 'remote_logging') 'True'

我希望这个块 https:/ /github/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py#L122-L123 评估为true并将日志转到s3。

I would expect this block github/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py#L122-L123 to evaluate to true and make the logs go to s3.

请在1.9上使用s3日志记录的人指出我所缺少的内容吗?我想向上游项目提交PR来更新文档,因为这似乎是一个非常普遍的问题,并且我可以告诉上游文档无效或经常被误解。

Please can anyone who has s3 logging working on 1.9 point out what I am missing? I would like to submit a PR to the upstream project to update the docs as this seems to be a pretty common problem and as near as I can tell the upstream documents are not valid or somehow get misinterpreted frequently.

谢谢! G。

推荐答案

是的,我也很难仅基于文档进行设置。我必须仔细检查一下气流的代码才能解决。有很多事情您可能做不到。

Yea, I also had trouble setting it up based just on the docs. I had to go over airflow's code to figure it out. There are multiple things you could have not done.

要检查的一些事情: 1.确保您拥有log_config.py文件及其在正确的目录中:./config/log_config.py。还请确保您没有忘记该目录中的__init__.py文件。 2.确保已定义s3.task处理程序并将其格式化程序设置为airflow.task 3.确保将airflow.task和airflow.task_runner处理程序设置为s3.task

Some things to check: 1. Make sure you have the log_config.py file and it is in the correct dir: ./config/log_config.py. Also make sure you didn't forget the __init__.py file in that dir. 2. Make sure you defined the s3.task handler and set its formatter to airflow.task 3. Make sure you set airflow.task and airflow.task_runner handlers to s3.task

这是一个对我有用的log_config.py文件:

Here is a log_config.py file that works for me:

# -*- coding: utf-8 -*- # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # www.apache/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os from airflow import configuration as conf # TO DO: Logging format and level should be configured # in this file instead of from airflow.cfg. Currently # there are other log format and level configurations in # settings.py and cli.py. Please see AIRFLOW-1455. LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper() LOG_FORMAT = conf.get('core', 'log_format') BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory') FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log' PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log' S3_LOG_FOLDER = 's3://your_path_to_airflow_logs' LOGGING_CONFIG = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'airflow.task': { 'format': LOG_FORMAT, }, 'airflow.processor': { 'format': LOG_FORMAT, }, }, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'formatter': 'airflow.task', 'stream': 'ext://sys.stdout' }, 'file.task': { 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', 'formatter': 'airflow.task', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 'filename_template': FILENAME_TEMPLATE, }, 'file.processor': { 'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler', 'formatter': 'airflow.processor', 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), 'filename_template': PROCESSOR_FILENAME_TEMPLATE, }, # When using s3 or gcs, provide a customized LOGGING_CONFIG # in airflow_local_settings within your PYTHONPATH, see UPDATING.md # for details 's3.task': { 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', 'formatter': 'airflow.task', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 's3_log_folder': S3_LOG_FOLDER, 'filename_template': FILENAME_TEMPLATE, }, # 'gcs.task': { # 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', # 'formatter': 'airflow.task', # 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), # 'gcs_log_folder': GCS_LOG_FOLDER, # 'filename_template': FILENAME_TEMPLATE, # }, }, 'loggers': { '': { 'handlers': ['console'], 'level': LOG_LEVEL }, 'airflow': { 'handlers': ['console'], 'level': LOG_LEVEL, 'propagate': False, }, 'airflow.processor': { 'handlers': ['file.processor'], 'level': LOG_LEVEL, 'propagate': True, }, 'airflow.task': { 'handlers': ['s3.task'], 'level': LOG_LEVEL, 'propagate': False, }, 'airflow.task_runner': { 'handlers': ['s3.task'], 'level': LOG_LEVEL, 'propagate': True, }, } }

更多推荐

Airflow 1.9

本文发布于:2023-11-02 00:19:14,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1550825.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Airflow

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!