logstash+elasticsearch+kafka统一日志收集

编程入门 行业动态 更新时间:2024-10-09 05:25:46

logstash+elasticsearch+kafka统一<a href=https://www.elefans.com/category/jswz/34/1770796.html style=日志收集"/>

logstash+elasticsearch+kafka统一日志收集

文章目录

  • logstash+elasticsearch+kafka统一日志收集
    • 环境准备
    • 参数配置
    • 其他说明
    • 通过kafka搜集日志
      • 引入第三方包kafka-log4j-appender
      • 配置kafka appender
      • logstash配置kafka输入
    • 参考资料

logstash+elasticsearch+kafka统一日志收集

环境准备

以下相关软件可从()下载

  1. 安装jdk
  2. 安装elasticsearch
    从()下载最新版,解压后直接运行bin/elasticsearch.bat即可,打开elasticsearch.yml配置文件,设置network.host为外部可访问的ip,建议0.0.0.0
  3. 安装kibana
    从()下载最新版,解压后运行bin/kibana.bat启动程序
  4. 安装logstash
    从()下载最新版,解压到指定目录
  5. 安装kafka
    从(.html)下载最新的binary版本

参数配置

  1. 在logstash/config目录下新建文件log4j.conf,然后启动logstash服务(logstash.bat -f …/config/log4j.conf ),内容如下
input {log4j{mode => "server"type=>"log4j-json"port=>4712} 
}
filter {}
output {stdout { codec => rubydebug }elasticsearch { hosts => ["127.0.0.1"] }
}
  1. 在系统内log4j要配置SocketAppender输出,log4j的properties配置
log4j.rootLogger=info,logstash
# Socket,logstash
log4j.appender.logstash=org.apache.log4j.SocketAppender
log4j.appender.logstash.Port=4712
log4j.appender.logstash.RemoteHost=localhost
log4j.appender.logstash.ReconnectionDelay=60000
log4j.appender.logstash.LocationInfo=true
# 工程名
log4j.appender.logstash.application=stage

其他说明

  1. logstash的elasticsearch插件默认将信息存储在logstash-%{+YYYY.MM.dd}格式的索引下面,在第一次打开kibana时,会要求创建查询索引,直接创建默认的logstash-*即可
  2. elasticsearch最好部署成集群环境,以实现不间断收集日志
  3. 本示例直接使用log4j的SocketAppender输出,如果logstash的socket服务断掉,日志会消失;如果log4j输出到kafka,再从kafka输出到logstash会避免这个问题

通过kafka搜集日志

引入第三方包kafka-log4j-appender

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-log4j-appender</artifactId><version>0.10.0.0</version>
</dependency>

配置kafka appender

## appender kafka
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=logstash
log4j.appender.kafka.brokerList=brokerNode1:9091,brokerNode2:9092
log4j.appender.kafkapressionType=none
log4j.appender.kafka.syncSend=true
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n

logstash配置kafka输入

input {
kafka{bootstrap_servers => "192.168.1.202:9092,192.168.1.202:9093"topics=>["logstash"]type=>"log4j-json"}   
}
filter {grok {match => { "message" => "%{TIMESTAMP_ISO8601:logtime} \[%{DATA:javaThread}\] %{WORD:project} %{LOGLEVEL:loglevel} %{DATA:msg}" }}
}
output {stdout { codec => rubydebug }elasticsearch { hosts => ["127.0.0.1"] }
}

注意:

  1. kafka的topic必须配置一致;
  2. logstash的elasticsearch插件默认端口是9092,如果配置多个地址必须写成[“127.0.0.1:9200”,“127.0.0.2:9200”]
  3. logstash可以通过filter插件,利用正则表达式提取关键字符串作为单独字段存储
  4. 当前版本的KafkaLog4jAppender代码有bug,无法输出异常栈,需重写源码,修改subAppend方法
private String subAppend(LoggingEvent event) {StringBuffer msg = new StringBuffer();msg.append((this.layout == null) ? event.getRenderedMessage() : this.layout.format(event));if (layout.ignoresThrowable()) {String[] s = event.getThrowableStrRep();if (s != null) {int len = s.length;for (int i = 0; i < len; i++) {msg.append(s[i]);msg.append(Layout.LINE_SEP);}}}return msg.toString();
}

参考资料

  • .html 日志收集系统 ELK
  • .html 官网手册
  • .html Elastic Search 分布式部署示例

更多推荐

logstash+elasticsearch+kafka统一日志收集

本文发布于:2024-02-07 09:22:35,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1755491.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:日志   logstash   elasticsearch   kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!