ES8生产实践——pod日志采集(ELK方案)

编程入门 行业动态 更新时间:2024-10-09 20:27:22

ES8生产实践——pod日志采集(ELK<a href=https://www.elefans.com/category/jswz/34/1770692.html style=方案)"/>

ES8生产实践——pod日志采集(ELK方案)

ELK方案采集介绍

方案简介

面对大规模集群海量日志采集需求时,filebeat相较于fluent bit拥有更高的性能,因此可以通过daemonset方式在每个k8s节点运行一个filebeat日志采集容器,用于采集业务容器产生的日志并暂存到kafka消息队列中。借助Kafka的Consumer Group技术部署多个logstash副本,由logstash集群逐个消费并写入ES,防止瞬间高峰导致直接写入ES失败,提升数据处理能力和高可用性。

采集方案

Kafka部署

生产环境推荐的kafka部署方式为operator方式部署,Strimzi是目前最主流的operator方案。集群数据量较小的话,可以采用NFS共享存储,数据量较大的话可使用local pv存储。

部署operator

operator部署方式为helm或yaml文件部署,此处以helm方式部署为例:

[root@tiaoban kafka]# helm repo add strimzi /
"strimzi" has been added to your repositories
[root@tiaoban kafka]# helm install strimzi -n kafka strimzi/strimzi-kafka-operator
NAME: strimzi
LAST DEPLOYED: Sun Oct  8 21:16:31 2023
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Thank you for installing strimzi-kafka-operator-0.37.0To create a Kafka cluster refer to the following documentation..html#deploying-cluster-operator-helm-chart-str[root@tiaoban strimzi-kafka-operator]# kubectl get pod -n kafka
NAME                                        READY   STATUS    RESTARTS   AGE
strimzi-cluster-operator-56fdbb99cb-gznkw   1/1     Running   0          17m

查看示例文件

Strimzi官方仓库为我们提供了各种场景下的示例文件,资源清单下载地址:

[root@tiaoban kafka]# ls
strimzi-kafka-operator
[root@tiaoban kafka]# wget .37.0/strimzi-0.37.0.tar.gz
[root@tiaoban kafka]# tar -zxf strimzi-0.37.0.tar.gz
[root@tiaoban kafka]# cd strimzi-0.37.0/examples/kafka
[root@tiaoban kafka]# ls
kafka-ephemeral-single.yaml  kafka-ephemeral.yaml  kafka-jbod.yaml  kafka-persistent-single.yaml  kafka-persistent.yaml  nodepools
  • kafka-persistent.yaml:部署具有三个 ZooKeeper 和三个 Kafka 节点的持久集群。(推荐)
  • kafka-jbod.yaml:部署具有三个 ZooKeeper 和三个 Kafka 节点(每个节点使用多个持久卷)的持久集群。
  • kafka-persistent-single.yaml:部署具有单个 ZooKeeper 节点和单个 Kafka 节点的持久集群。
  • kafka-ephemeral.yaml:部署具有三个 ZooKeeper 和三个 Kafka 节点的临时群集。
  • kafka-ephemeral-single.yaml:部署具有三个 ZooKeeper 节点和一个 Kafka 节点的临时群集。

创建pvc资源

此处以nfs存储为例,提前创建pvc资源,分别用于3个zookeeper和3个kafka持久化存储数据使用。

[root@tiaoban kafka]# cat kafka-pvc.yaml
kind: PersistentVolumeClaim
apiVersion: v1
metadata:name: data-my-cluster-zookeeper-0namespace: kafka
spec:storageClassName: nfs-clientaccessModes:- ReadWriteOnceresources:requests:storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:name: data-my-cluster-zookeeper-1namespace: kafka
spec:storageClassName: nfs-clientaccessModes:- ReadWriteOnceresources:requests:storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:name: data-my-cluster-zookeeper-2namespace: kafka
spec:storageClassName: nfs-clientaccessModes:- ReadWriteOnceresources:requests:storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:name: data-0-my-cluster-kafka-0namespace: kafka
spec:storageClassName: nfs-clientaccessModes:- ReadWriteOnceresources:requests:storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:name: data-0-my-cluster-kafka-1namespace: kafka
spec:storageClassName: nfs-clientaccessModes:- ReadWriteOnceresources:requests:storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:name: data-0-my-cluster-kafka-2namespace: kafka
spec:storageClassName: nfs-clientaccessModes:- ReadWriteOnceresources:requests:storage: 100Gi

部署kafka和zookeeper

参考官方仓库的kafka-persistent.yaml示例文件,部署三个 ZooKeeper 和三个 Kafka 节点的持久集群。

[root@tiaoban kafka]# cat kafka.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:name: my-clusternamespace: kafka
spec:kafka:version: 3.5.1replicas: 3listeners:- name: plainport: 9092type: internaltls: false- name: tlsport: 9093type: internaltls: trueconfig:offsets.topic.replication.factor: 3transaction.state.log.replication.factor: 3transaction.state.log.min.isr: 2default.replication.factor: 3min.insync.replicas: 2inter.broker.protocol.version: "3.5"storage:type: jbodvolumes:- id: 0type: persistent-claimsize: 100GideleteClaim: falsezookeeper:replicas: 3storage:type: persistent-claimsize: 100GideleteClaim: falseentityOperator:topicOperator: {}userOperator: {}

访问验证

查看资源信息,已成功创建相关pod和svc资源。

[root@tiaoban kafka]# kubectl get pod -n kafka
NAME                                          READY   STATUS    RESTARTS   AGE
my-cluster-entity-operator-7c68d4b9d9-tg56j   3/3     Running   0          2m15s
my-cluster-kafka-0                            1/1     Running   0          2m54s
my-cluster-kafka-1                            1/1     Running   0          2m54s
my-cluster-kafka-2                            1/1     Running   0          2m54s
my-cluster-zookeeper-0                        1/1     Running   0          3m19s
my-cluster-zookeeper-1                        1/1     Running   0          3m19s
my-cluster-zookeeper-2                        1/1     Running   0          3m19s
strimzi-cluster-operator-56fdbb99cb-gznkw     1/1     Running   0          97m
[root@tiaoban kafka]# kubectl get svc -n kafka
NAME                          TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                        AGE
my-cluster-kafka-bootstrap    ClusterIP   10.99.246.133   <none>        9091/TCP,9092/TCP,9093/TCP                     3m3s
my-cluster-kafka-brokers      ClusterIP   None            <none>        9090/TCP,9091/TCP,8443/TCP,9092/TCP,9093/TCP   3m3s
my-cluster-zookeeper-client   ClusterIP   10.109.106.29   <none>        2181/TCP                                       3m28s
my-cluster-zookeeper-nodes    ClusterIP   None            <none>        2181/TCP,2888/TCP,3888/TCP                     3m28s

部署kafka-ui

创建configmap和ingress资源,在configmap中指定kafka连接地址。以traefik为例,创建ingress资源便于通过域名方式访问。

[root@tiaoban kafka]# cat kafka-ui.yaml 
apiVersion: v1
kind: ConfigMap
metadata:name: kafka-ui-helm-valuesnamespace: kafka
data:KAFKA_CLUSTERS_0_NAME: "kafka-cluster"KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "my-cluster-kafka-brokers.kafka.svc:9092"AUTH_TYPE: "DISABLED"MANAGEMENT_HEALTH_LDAP_ENABLED: "FALSE" 
---
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:name: kafka-uinamespace: kafka
spec:entryPoints:- webroutes:- match: Host(`kafka-ui.local`) kind: Ruleservices:- name: kafka-uiport: 80
[root@tiaoban kafka]# kubectl apply -f kafka-ui.yaml 
configmap/kafka-ui-helm-values created
ingressroute.traefik.containo.us/kafka-ui created

helm方式部署kafka-ui并指定配置文件

[root@tiaoban kafka]# helm install kafka-ui kafka-ui/kafka-ui -n kafka --set existingConfigMap="kafka-ui-helm-values"
NAME: kafka-ui
LAST DEPLOYED: Mon Oct  9 09:56:45 2023
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
1. Get the application URL by running these commands:export POD_NAME=$(kubectl get pods --namespace kafka -l "app.kubernetes.io/name=kafka-ui,app.kubernetes.io/instance=kafka-ui" -o jsonpath="{.items[0].metadata.name}")echo "Visit http://127.0.0.1:8080 to use your application"kubectl --namespace kafka port-forward $POD_NAME 8080:8080

访问验证,添加hosts记录192.168.10.100 kafka-ui.local,然后访问测试。

filebeat部署配置

资源清单

  • rbac.yaml:创建filebeat用户和filebeat角色,并授予filebeat角色获取集群资源权限,并绑定角色与权限。
apiVersion: v1
kind: ServiceAccount
metadata:name: filebeatnamespace: elk
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:name: filebeatnamespace: elk
rules:- apiGroups: ["","apps","batch"]resources: ["*"]verbs:- get- watch- list
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:name: filebeatnamespace: elk
subjects:- kind: ServiceAccountname: filebeatnamespace: elk
roleRef:kind: ClusterRolename: filebeatapiGroup: rbac.authorization.k8s.io
  • filebeat-conf.yaml:使用filebeat.autodiscover方式自动获取pod日志,避免新增pod时日志采集不到的情况发生,并将日志发送到kafka消息队列中。
apiVersion: v1
kind: ConfigMap
metadata:name: filebeat-confignamespace: elk
data:filebeat.yml: |-filebeat.autodiscover:providers:  # 启用自动发现采集pod日志- type: kubernetesnode: ${NODE_NAME}hints.enabled: truehints.default_config:type: containerpaths:- /var/log/containers/*${data.kubernetes.container.id}.logexclude_files: ['.*filebeat-.*'] # 排除filebeat自身日志采集multiline: # 避免日志换行pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}' negate: true match: afterprocessors:- add_kubernetes_metadata: # 增加kubernetes的属性in_cluster: truehost: ${NODE_NAME}matchers:- logs_path:logs_path: "/var/log/containers/"- drop_event: # 不收集debug日志when: contains:message: "DEBUG"output.kafka:hosts: ["my-cluster-kafka-brokers.kafka.svc:9092"]topic: "pod_logs"partition.round_robin:reachable_only: falserequired_acks: -1compression: gzipmonitoring: # monitoring相关配置enabled: truecluster_uuid: "ZUnqLCRqQL2jeo5FNvMI9g"elasticsearch:hosts:  [":9200"]username: "elastic" password: "2zg5q6AU7xW5jY649yuEpZ47"ssl.verification_mode: "none"
  • filebeat.yaml:使用daemonset方式每个节点运行一个filebeat容器,并挂载filebeat配置文件、数据目录、宿主机日志目录。
apiVersion: apps/v1
kind: DaemonSet
metadata:name: filebeatnamespace: elklabels:app: filebeat
spec:selector:matchLabels:app: filebeattemplate:metadata:labels:app: filebeatspec:serviceAccountName: filebeatdnsPolicy: ClusterFirstWithHostNetcontainers:- name: filebeatimage: harbor.local/elk/filebeat:8.9.1args: ["-c","/etc/filebeat/filebeat.yml","-e"]env:- name: NODE_NAMEvalueFrom:fieldRef:fieldPath: spec.nodeNamesecurityContext:runAsUser: 0resources:limits:cpu: 500mmemory: 1GivolumeMounts:- name: timezonemountPath: /etc/localtime- name: configmountPath: /etc/filebeat/filebeat.ymlsubPath: filebeat.yml- name: datamountPath: /usr/share/filebeat/data- name: containersmountPath: /var/log/containersreadOnly: true- name: logsmountPath: /var/log/podsvolumes:- name: timezonehostPath:path: /usr/share/zoneinfo/Asia/Shanghai- name: configconfigMap:name: filebeat-config- name: datahostPath:path: /var/lib/filebeat-datatype: DirectoryOrCreate- name: containershostPath:path: /var/log/containers- name: logshostPath:path: /var/log/pods

访问验证

查看pod信息,在集群每个节点上运行了一个filebeat采集容器。

[root@tiaoban ~]# kubectl get pod -n elk | grep filebeat
filebeat-8p24s             1/1     Running        0      29s
filebeat-chh9b             1/1     Running        0      29s
filebeat-dl28d             1/1     Running        0      29s
filebeat-gnkt6             1/1     Running        0      29s
filebeat-m4rfx             1/1     Running        0      29s
filebeat-w4pdz             1/1     Running        0      29s

查看kafka topic信息,已经成功创建了名为pod_logs的topic,此时我们调整partitions为2,方便logstash多副本消费。

logstash部署配置

构建镜像

由于logstash镜像默认不包含geoip地理位置数据库文件,如果需要解析ip位置信息时会存在解析失败的情况。因此需要提前构建包含geoip数据库文件的logstash镜像,并上传至harbor仓库中。

[root@tiaoban elk]# cat Dockerfile
FROM docker.elastic.co/logstash/logstash:8.9.1
ADD GeoLite2-City.mmdb /etc/logstash/GeoLite2-City.mmdb
[root@tiaoban elk]# docker build -t harbor.local/elk/logstash:v8.9.1 .
[root@tiaoban elk]# docker push harbor.local/elk/logstash:v8.9.1

资源清单

  • logstash-log4j2.yaml:容器方式运行时,logstash日志默认使用的console输出, 不记录到日志文件中, logs目录下面只有gc.log,我们可以通过配置log4j2设置,将日志写入到文件中,方便fleet采集分析logstash日志。
apiVersion: v1
kind: ConfigMap
metadata:name: logstash-log4j2namespace: elk
data:log4j2.properties: |status = errorname = LogstashPropertiesConfigappender.console.type = Consoleappender.console.name = plain_consoleappender.console.layout.type = PatternLayoutappender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]}%notEmpty{[%X{plugin.id}]} %m%nappender.json_console.type = Consoleappender.json_console.name = json_consoleappender.json_console.layout.type = JSONLayoutappender.json_console.layoutpact = trueappender.json_console.layout.eventEol = trueappender.rolling.type = RollingFileappender.rolling.name = plain_rollingappender.rolling.fileName = ${sys:ls.logs}/logstash-plain.logappender.rolling.filePattern = ${sys:ls.logs}/logstash-plain-%d{yyyy-MM-dd}-%i.log.gzappender.rolling.policies.type = Policiesappender.rolling.policies.time.type = TimeBasedTriggeringPolicyappender.rolling.policies.time.interval = 1appender.rolling.policies.time.modulate = trueappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]}%notEmpty{[%X{plugin.id}]} %m%nappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size = 100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 30appender.rolling.avoid_pipelined_filter.type = PipelineRoutingFilterappender.json_rolling.type = RollingFileappender.json_rolling.name = json_rollingappender.json_rolling.fileName = ${sys:ls.logs}/logstash-json.logappender.json_rolling.filePattern = ${sys:ls.logs}/logstash-json-%d{yyyy-MM-dd}-%i.log.gzappender.json_rolling.policies.type = Policiesappender.json_rolling.policies.time.type = TimeBasedTriggeringPolicyappender.json_rolling.policies.time.interval = 1appender.json_rolling.policies.time.modulate = trueappender.json_rolling.layout.type = JSONLayoutappender.json_rolling.layoutpact = trueappender.json_rolling.layout.eventEol = trueappender.json_rolling.policies.size.type = SizeBasedTriggeringPolicyappender.json_rolling.policies.size.size = 100MBappender.json_rolling.strategy.type = DefaultRolloverStrategyappender.json_rolling.strategy.max = 30appender.json_rolling.avoid_pipelined_filter.type = PipelineRoutingFilterappender.routing.type = PipelineRoutingappender.routing.name = pipeline_routing_appenderappender.routing.pipeline.type = RollingFileappender.routing.pipeline.name = appender-${ctx:pipeline.id}appender.routing.pipeline.fileName = ${sys:ls.logs}/pipeline_${ctx:pipeline.id}.logappender.routing.pipeline.filePattern = ${sys:ls.logs}/pipeline_${ctx:pipeline.id}.%i.log.gzappender.routing.pipeline.layout.type = PatternLayoutappender.routing.pipeline.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%nappender.routing.pipeline.policy.type = SizeBasedTriggeringPolicyappender.routing.pipeline.policy.size = 100MBappender.routing.pipeline.strategy.type = DefaultRolloverStrategyappender.routing.pipeline.strategy.max = 30rootLogger.level = ${sys:ls.log.level}rootLogger.appenderRef.console.ref = ${sys:ls.log.format}_consolerootLogger.appenderRef.rolling.ref = ${sys:ls.log.format}_rollingrootLogger.appenderRef.routing.ref = pipeline_routing_appender# Slowlogappender.console_slowlog.type = Consoleappender.console_slowlog.name = plain_console_slowlogappender.console_slowlog.layout.type = PatternLayoutappender.console_slowlog.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%nappender.json_console_slowlog.type = Consoleappender.json_console_slowlog.name = json_console_slowlogappender.json_console_slowlog.layout.type = JSONLayoutappender.json_console_slowlog.layoutpact = trueappender.json_console_slowlog.layout.eventEol = trueappender.rolling_slowlog.type = RollingFileappender.rolling_slowlog.name = plain_rolling_slowlogappender.rolling_slowlog.fileName = ${sys:ls.logs}/logstash-slowlog-plain.logappender.rolling_slowlog.filePattern = ${sys:ls.logs}/logstash-slowlog-plain-%d{yyyy-MM-dd}-%i.log.gzappender.rolling_slowlog.policies.type = Policiesappender.rolling_slowlog.policies.time.type = TimeBasedTriggeringPolicyappender.rolling_slowlog.policies.time.interval = 1appender.rolling_slowlog.policies.time.modulate = trueappender.rolling_slowlog.layout.type = PatternLayoutappender.rolling_slowlog.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%nappender.rolling_slowlog.policies.size.type = SizeBasedTriggeringPolicyappender.rolling_slowlog.policies.size.size = 100MBappender.rolling_slowlog.strategy.type = DefaultRolloverStrategyappender.rolling_slowlog.strategy.max = 30appender.json_rolling_slowlog.type = RollingFileappender.json_rolling_slowlog.name = json_rolling_slowlogappender.json_rolling_slowlog.fileName = ${sys:ls.logs}/logstash-slowlog-json.logappender.json_rolling_slowlog.filePattern = ${sys:ls.logs}/logstash-slowlog-json-%d{yyyy-MM-dd}-%i.log.gzappender.json_rolling_slowlog.policies.type = Policiesappender.json_rolling_slowlog.policies.time.type = TimeBasedTriggeringPolicyappender.json_rolling_slowlog.policies.time.interval = 1appender.json_rolling_slowlog.policies.time.modulate = trueappender.json_rolling_slowlog.layout.type = JSONLayoutappender.json_rolling_slowlog.layoutpact = trueappender.json_rolling_slowlog.layout.eventEol = trueappender.json_rolling_slowlog.policies.size.type = SizeBasedTriggeringPolicyappender.json_rolling_slowlog.policies.size.size = 100MBappender.json_rolling_slowlog.strategy.type = DefaultRolloverStrategyappender.json_rolling_slowlog.strategy.max = 30logger.slowlog.name = slowloglogger.slowlog.level = tracelogger.slowlog.appenderRef.console_slowlog.ref = ${sys:ls.log.format}_console_slowloglogger.slowlog.appenderRef.rolling_slowlog.ref = ${sys:ls.log.format}_rolling_slowloglogger.slowlog.additivity = falselogger.licensereader.name = logstash.licensechecker.licensereaderlogger.licensereader.level = error# Silence http-client by defaultlogger.apache_http_client.name = org.apache.httplogger.apache_http_client.level = fatal# Deprecation logappender.deprecation_rolling.type = RollingFileappender.deprecation_rolling.name = deprecation_plain_rollingappender.deprecation_rolling.fileName = ${sys:ls.logs}/logstash-deprecation.logappender.deprecation_rolling.filePattern = ${sys:ls.logs}/logstash-deprecation-%d{yyyy-MM-dd}-%i.log.gzappender.deprecation_rolling.policies.type = Policiesappender.deprecation_rolling.policies.time.type = TimeBasedTriggeringPolicyappender.deprecation_rolling.policies.time.interval = 1appender.deprecation_rolling.policies.time.modulate = trueappender.deprecation_rolling.layout.type = PatternLayoutappender.deprecation_rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]}%notEmpty{[%X{plugin.id}]} %m%nappender.deprecation_rolling.policies.size.type = SizeBasedTriggeringPolicyappender.deprecation_rolling.policies.size.size = 100MBappender.deprecation_rolling.strategy.type = DefaultRolloverStrategyappender.deprecation_rolling.strategy.max = 30logger.deprecation.name = org.logstash.deprecation, deprecationlogger.deprecation.level = WARNlogger.deprecation.appenderRef.deprecation_rolling.ref = deprecation_plain_rollinglogger.deprecation.additivity = falselogger.deprecation_root.name = deprecationlogger.deprecation_root.level = WARNlogger.deprecation_root.appenderRef.deprecation_rolling.ref = deprecation_plain_rollinglogger.deprecation_root.additivity = false
  • logstash-conf.yaml:修改Logstash配置,禁用默认的指标收集配置,并指定es集群uuid。
apiVersion: v1
kind: ConfigMap
metadata:name: logstash-confignamespace: elk
data:logstash.conf: |api.enabled: trueapi.http.port: 9600xpack.monitoring.enabled: falsemonitoring.cluster_uuid: "ZUnqLCRqQL2jeo5FNvMI9g"
  • pod-pipeline.yaml:配置pipeline处理pod日志规则,从kafka读取数据后移除非必要的字段,然后写入ES集群中。
apiVersion: v1
kind: ConfigMap
metadata:name: logstash-pod-pipelinenamespace: elk
data:pipeline.conf: |input {kafka {bootstrap_servers=>"my-cluster-kafka-brokers.kafka.svc:9092"auto_offset_reset => "latest"topics=>["pod_logs"]codec => "json"group_id => "pod"}}filter {mutate {remove_field => ["agent","event","ecs","host","[kubernetes][labels]","input","log","orchestrator","stream"]}}output{elasticsearch{hosts => [":9200"]data_stream => "true"data_stream_type => "logs"data_stream_dataset => "pod"data_stream_namespace => "elk"user => "elastic"password => "2zg5q6AU7xW5jY649yuEpZ47"ssl_enabled => "true"ssl_verification_mode => "none"}}
  • pod-logstash.yaml:部署2副本的logstash容器,挂载pipeline、log4j2、logstash配置文件、日志路径资源。
apiVersion: apps/v1
kind: Deployment
metadata:name: logstash-podnamespace: elk
spec:replicas: 2selector:matchLabels:app: logstash-podtemplate:metadata:labels:app: logstash-podmonitor: enablespec:securityContext:runAsUser: 0containers:- image: harbor.local/elk/logstash:v8.9.1name: logstash-podresources:limits:cpu: "1"memory: 1Giargs:- -f- /usr/share/logstash/pipeline/pipeline.confenv:- name: XPACK_MONITORING_ENABLEDvalue: "false"ports:- containerPort: 9600volumeMounts:- name: timezonemountPath: /etc/localtime- name: configmountPath: /usr/share/logstash/config/logstash.confsubPath: logstash.conf- name: log4j2mountPath: /usr/share/logstash/config/log4j2.propertiessubPath: log4j2.properties- name: pipelinemountPath: /usr/share/logstash/pipeline/pipeline.confsubPath: pipeline.conf- name: logmountPath: /usr/share/logstash/logsvolumes:- name: timezonehostPath:path: /usr/share/zoneinfo/Asia/Shanghai- name: configconfigMap:name: logstash-config- name: log4j2configMap:name: logstash-log4j2- name: pipelineconfigMap:name: logstash-pod-pipeline- name: loghostPath:path: /var/log/logstashtype: DirectoryOrCreate
  • logstash-svc.yaml:创建svc资源,用于暴露logstash监控信息接口。
apiVersion: v1
kind: Service
metadata:name: logstash-monitornamespace: elk
spec:selector:monitor: enableports:- port: 9600targetPort: 9600

添加监控指标采集

在fleet集成策略中安装logstash集群,并配置metrics接口地址为:9600

访问验证

查看pod信息,已正常运行2副本的logstash。

[root@tiaoban ~]# kubectl get pod -n elk | grep logstash
logstash-pod-7bb6f6c8c6-ffc4b       1/1     Running   0       58s
logstash-pod-7bb6f6c8c6-qv9kd       1/1     Running   0       58s

登录kibana查看监控信息,已成功采集filebeat和logstash指标和日志数据。

查看数据流信息,已成功创建名为logs-pod-elk的数据流。

查看数据流内容,成功存储解析了pod所在节点、namespace、container、日志内容等数据。

自定义日志解析

需求分析

默认情况下,fluent bit会采集所有pod日志信息,并自动添加namespace、pod、container等信息,所有日志内容存储在log字段中。
以log-demo应为日志为例,将所有日志内容存储到log字段下,如果想要按条件筛选分析日志数据时,无法很好的解析日志内容,因此需要配置logstash解析规则,实现日志自定义日志内容解析。

资源清单

  • myapp-pipeline.yaml:从kafka中读取数据,当匹配到[kubernetes][deployment][name]字段值为log-demo时,进一步做解析处理,其余日志数据丢弃。logstash详细配置可参考历史文章:
apiVersion: v1
kind: ConfigMap
metadata:name: logstash-myapp-pipelinenamespace: elk
data:pipeline.conf: |input {kafka {bootstrap_servers=>"my-cluster-kafka-brokers.kafka.svc:9092"auto_offset_reset => "latest"topics=>["pod_logs"]codec => "json"group_id => "myapp"}}filter {if [kubernetes][deployment][name] == "log-demo" {grok{match => {"message" => "%{TIMESTAMP_ISO8601:log_timestamp} \| %{LOGLEVEL:level} %{SPACE}* \| (?<class>[__main__:[\w]*:\d*]+) \- %{GREEDYDATA:content}"}}mutate {gsub =>["content", "'", '"']lowercase => [ "level" ]}json {source => "content"}geoip {source => "remote_address"database => "/etc/logstash/GeoLite2-City.mmdb"ecs_compatibility => disabled}mutate {remove_field => ["agent","event","ecs","host","[kubernetes][labels]","input","log","orchestrator","stream","content"]}}else {drop{}}}output{elasticsearch{hosts => [":9200"]data_stream => "true"data_stream_type => "logs"data_stream_dataset => "myapp"data_stream_namespace => "elk"user => "elastic"password => "2zg5q6AU7xW5jY649yuEpZ47"ssl_enabled => "true"ssl_verification_mode => "none"}}
  • myapp-logstash.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: logstash-myappnamespace: elk
spec:replicas: 2selector:matchLabels:app: logstash-myapptemplate:metadata:labels:app: logstash-myappmonitor: enablespec:securityContext:runAsUser: 0containers:- image: harbor.local/elk/logstash:v8.9.1name: logstash-myappresources:limits:cpu: "1"memory: 1Giargs:- -f- /usr/share/logstash/pipeline/pipeline.confenv:- name: XPACK_MONITORING_ENABLEDvalue: "false"ports:- containerPort: 9600volumeMounts:- name: timezonemountPath: /etc/localtime- name: configmountPath: /usr/share/logstash/config/logstash.confsubPath: logstash.conf- name: log4j2mountPath: /usr/share/logstash/config/log4j2.propertiessubPath: log4j2.properties- name: pipelinemountPath: /usr/share/logstash/pipeline/pipeline.confsubPath: pipeline.conf- name: logmountPath: /usr/share/logstash/logsvolumes:- name: timezonehostPath:path: /usr/share/zoneinfo/Asia/Shanghai- name: configconfigMap:name: logstash-config- name: log4j2configMap:name: logstash-log4j2- name: pipelineconfigMap:name: logstash-myapp-pipeline- name: loghostPath:path: /var/log/logstashtype: DirectoryOrCreate

访问验证

查看数据流信息,已成功创建名为logs-myapp-elk的数据流。

查看数据流详细内容,成功解析了日志相关字段数据。

注意事项

kafka partition数配置

需要注意的是每个consumer最多只能使用一个partition,当一个Group内consumer的数量大于partition的数量时,只有等于partition个数的consumer能同时消费,其他的consumer处于等待状态。因此想要增加logstash的消费性能,可以适当的增加topic的partition数量,但kafka中partition数量过多也会导致kafka集群故障恢复时间过长。

logstash副本数配置

Logstash副本数=kafka partition数/每个logstash线程数(默认为1,数据量大时可增加线程数,建议不超过4)

完整资源清单

本实验案例所有yaml文件已上传至git仓库。访问地址如下:

github

gitee

参考文档

helm部署Strimzi:
filebeat通过自动发现采集k8s日志:.html
kubernetes集群运行filebeat:.html
filebeat处理器新增kubernetes元数据信息:.html
filebeat丢弃指定事件:.html

查看更多

微信公众号

微信公众号同步更新,欢迎关注微信公众号《崔亮的博客》第一时间获取最近文章。

博客网站

崔亮的博客-专注devops自动化运维,传播优秀it运维技术文章。更多原创运维开发相关文章,欢迎访问

更多推荐

ES8生产实践——pod日志采集(ELK方案)

本文发布于:2023-12-07 14:44:02,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1671462.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:方案   日志   pod   ELK

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!