我需要将 Apache Airflow 日志以JSON格式记录到stdout.气流似乎并没有开箱即用地投射出这种功能.我发现了几个可以执行此任务的python模块,但是我无法使实现正常工作.
I have a requirement to log the Apache Airflow logs to stdout in JSON format. Airflow does not seem to project this capability out of the box. I have found a couple python modules that are capable of this task, but I cannot get the implementation to work.
当前,我在 airflow/utils/logging.py 中应用一个类来修改记录器,如下所示:
Currently, I am applying a class in airflow/utils/logging.py to modify the logger, shown below:
from pythonjsonlogger import jsonlogger class StackdriverJsonFormatter(jsonlogger.JsonFormatter, object): def __init__(self, fmt="%(levelname) %(asctime) %(nanotime) %(severity) %(message)", style='%', *args, **kwargs): jsonlogger.JsonFormatter.__init__(self, fmt=fmt, *args, **kwargs) def process_log_record(self, log_record): if log_record.get('level'): log_record['severity'] = log_record['level'] del log_record['level'] else: log_record['severity'] = log_record['levelname'] del log_record['levelname'] if log_record.get('asctime'): log_record['timestamp'] = log_record['asctime'] del log_record['asctime'] now = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%fZ') log_record['nanotime'] = now return super(StackdriverJsonFormatter, self).process_log_record(log_record)我在 /airflow/settings.py 中实现此代码,如下所示:
I am implementing this code in /airflow/settings.py as shown below:
from airflow.utils import logging as logconf def configure_logging(log_format=LOG_FORMAT): handler = logconf.logging.StreamHandler(sys.stdout) formatter = logconf.StackdriverJsonFormatter() handler.setFormatter(formatter) logging = logconf.logging.getLogger() logging.addHandler(handler) ''' code below was original airflow source code logging.root.handlers = [] logging.basicConfig( format=log_format, stream=sys.stdout, level=LOGGING_LEVEL) '''我尝试了几种不同的变体,但是无法让python-json-logger将日志转换为JSON.也许我没有进入根记录器?我考虑过的另一个选项是手动将日志格式化为JSON字符串.也没有运气.任何其他想法,技巧或支持都将受到赞赏.
I have tried a couple different variations of this and can't get the python-json-logger to transform the logs to JSON. Perhaps I'm not getting to the root logger? Another option I have considered is manually formatting the logs to a JSON string. No luck with that yet either. Any alternative ideas, tips, or support are appreciated.
干杯!
推荐答案我不知道您是否曾经解决过此问题,但是经过一番令人沮丧的修补后,我最终使它与气流保持了良好的配合.作为参考,我遵循了本文的大部分内容,以使其正常工作:io/guides/logging/.主要问题是气流日志记录只接受日志格式的字符串模板,json-logging 无法插入.因此,您必须创建自己的日志记录类并将其连接到自定义的日志记录配置类.
I don't know if you ever solved this problem, but after some frustrating tinkering, I ended up getting this to play nice with airflow. For reference, I followed a lot of this article to get it working: www.astronomer.io/guides/logging/. The main issue was that the airflow logging only accepts a string template for the logging format, which json-logging can't plug into. So you have to create your own logging classes and connect it to a custom logging config class.
在此处
Copy the log template here into your $AIRFLOW_HOME/config folder, and change DEFAULT_CONFIG_LOGGING to CONFIG_LOGGING. When you're successful, bring up airflow and you'll get a log message on airflow startup that says Successfully imported user-defined logging config from logging_config.LOGGING_CONFIG. If this is the first .py file in the config folder don't forget to add a blank __init__.py file to get python to pick it up
编写您的自定义 JsonFormatter 以注入您的处理程序.我确实是从这个一个中挖出的.
Write your custom JsonFormatter to inject into your handler. I did mine off of this one.
编写自定义日志处理程序类.由于我一直在寻找JSON日志记录,因此我的样子是这样的:
Write the custom log handler classes. Since I was looking for JSON logging, mine look like this:
从airflow.utils.log.file_processor_handler中的
from airflow.utils.log.file_processor_handler import FileProcessorHandler from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import RedirectStdHandler from pythonjsonlogger import jsonlogger class JsonStreamHandler(RedirectStdHandler): def __init__(self, stream): super(JsonStreamHandler, self).__init__(stream) json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)') self.setFormatter(json_formatter) class JsonFileTaskHandler(FileTaskHandler): def __init__(self, base_log_folder, filename_template): super(JsonFileTaskHandler, self).__init__(base_log_folder, filename_template) json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)') self.setFormatter(json_formatter) class JsonFileProcessorHandler(FileProcessorHandler): def __init__(self, base_log_folder, filename_template): super(JsonFileProcessorHandler, self).__init__(base_log_folder, filename_template) json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)') self.setFormatter(json_formatter) class JsonRotatingFileHandler(RotatingFileHandler): def __init__(self, filename, mode, maxBytes, backupCount): super(JsonRotatingFileHandler, self).__init__(filename, mode, maxBytes, backupCount) json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)') self.setFormatter(json_formatter)
和
DEFAULT_DAG_PARSING_LOGGING_CONFIG = { 'handlers': { 'processor_manager': { 'class': 'logging_handler.JsonRotatingFileHandler', 'formatter': 'airflow', 'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION, 'mode': 'a', 'maxBytes': 104857600, # 100MB 'backupCount': 5 } } ...并且应该在DAG日志和输出中输出json日志.
And json logs should be output, both in the DAG logs and the output as well.
希望这会有所帮助!
更多推荐
格式化JSON中的气流日志
发布评论