admin管理员组

文章数量:1570208

电商数仓

  • 一、Zookeeper安装
  • 1、安装ZK
  • (1)分布式安装部署
  • (2)客户端命令行操作
  • (3)集群规划
  • (4)ZK集群启动停止脚本
  • 二、Kafka安装
  • 1、 Kafka集群安装
  • (1)安装部署
  • (1.1)集群规划
  • (1.2) jar包下载
  • (1.3)集群部署
  • 2、Kafka集群启动停止脚本
  • 3、Kafka常用命令
  • 4、项目经验之Kafka机器数量计算
  • 5、项目经验之Kafka压力测试
  • 6、项目经验值Kafka分区数计算
  • 三、采集日志Flume
  • 1、日志采集Flume安装
  • (1)Flume安装部署
  • (1.1)安装地址
  • (1.2)安装部署
  • (1.3)集群规划
  • 2、项目经验之Flume组件选型
  • 3、日志采集Flume配置
  • 4、Flume拦截器
  • 5、测试Flume-Kafka通道
  • 6、日志采集Flume启动停止脚本
  • 四、消费Kafka数据Flume
  • 1、集群规划
  • 2、项目经验之Flume组件选型
  • 3、消费者Flume配置
  • 4、Flume时间戳拦截器(解决零点漂移问题)
  • 5、消费者Flume启动停止脚本
  • 6、项目经验之Flume内存优化
  • 五、采集通道启动/停止脚本(zk和kf有着一定的顺序)
  • 六、2NN页面不能显示完整信息

一、Zookeeper安装

1、安装ZK

(1)分布式安装部署

1)集群规划
在hadoop102、hadoop103和hadoop104三个节点上部署Zookeeper。
2)解压安装
(1)解压Zookeeper安装包到/opt/module/目录下
[lyh@hadoop102 software]$ tar -zxvf zookeeper-3.5.7.tar.gz -C /opt/module/
(2)修改/opt/module/apache-zookeeper-3.5.7-bin名称为zookeeper-3.5.7
[lyh@hadoop102 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
(3)同步/opt/module/zookeeper-3.5.7目录内容到hadoop103、hadoop104
[lyh@hadoop102 module]$ xsync zookeeper-3.5.7/
3)配置服务器编号
(1)在/opt/module/zookeeper-3.5.7/这个目录下创建zkData
[atguigu@hadoop102 zookeeper-3.5.7]$ mkdir zkData
(2)在/opt/module/zookeeper-3.5.7/zkData目录下创建一个myid的文件
[lyh@hadoop102 zkData]$ vi myid
添加myid文件,注意一定要在linux里面创建,在notepad++里面很可能乱码

在文件中添加与server对应的编号:
2

(3)拷贝配置好的zookeeper到其他机器上
[lyh@hadoop102 zkData]$ xsync myid

并分别在hadoop103、hadoop104上修改myid文件中内容为3、4

4)配置zoo.cfg文件
(1)重命名/opt/module/zookeeper-3.5.7/conf这个目录下的zoo_sample.cfg为zoo.cfg
[lyh@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
(2)打开zoo.cfg文件
[lyh@hadoop102 conf]$ vim zoo.cfg

修改数据存储路径配置
dataDir=/opt/module/zookeeper-3.5.7/zkData
增加如下配置

#######################cluster##########################
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888

(3)同步zoo.cfg配置文件
[lyh@hadoop102 conf]$ xsync zoo.cfg
(4)配置参数解读
server.A=B:C:D。

  • A是一个数字,表示这个是第几号服务器;
    集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
  • B是这个服务器的地址;
  • C是这个服务器Follower与集群中的Leader服务器交换信息的端口;
  • D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

5)集群操作
(1)分别启动Zookeeper
[lyh@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start
[lyh@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh start
[lyh@hadoop104 zookeeper-3.5.7]$ bin/zkServer.sh start
(2)查看状态
[lyh@hadoop102 zookeeper-3.5.7]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/…/conf/zoo.cfg
Mode: follower
[lyh@hadoop103 zookeeper-3.5.7]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/…/conf/zoo.cfg
Mode: leader
[lyh@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/…/conf/zoo.cfg
Mode: follower

(2)客户端命令行操作


1)启动客户端
[lyh@hadoop103 zookeeper-3.5.7]$ bin/zkCli.sh

(3)集群规划

(4)ZK集群启动停止脚本

(1)在hadoop102的/home/lyh/bin目录下创建脚本
[lyh@hadoop102 bin]$ vim zk.sh
在脚本中编写如下内容

#!/bin/bash

case $1 in
"start"){
	for i in hadoop102 hadoop103 hadoop104
	do
        echo ---------- zookeeper $i 启动 ------------
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
	done
};;
"stop"){
	for i in hadoop102 hadoop103 hadoop104
	do
        echo ---------- zookeeper $i 停止 ------------    
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
	done
};;
"status"){
	for i in hadoop102 hadoop103 hadoop104
	do
        echo ---------- zookeeper $i 状态 ------------    
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
	done
};;
esac

(2)增加脚本执行权限
[lyh@hadoop102 bin]$ chmod u+x zk.sh
(3)Zookeeper集群启动脚本
[lyh@hadoop102 module]$ zk.sh start
(4)Zookeeper集群停止脚本
[lyh@hadoop102 module]$ zk.sh stop

二、Kafka安装

1、 Kafka集群安装

(1)安装部署

(1.1)集群规划

(1.2) jar包下载

http://kafka.apache/downloads

(1.3)集群部署

1)解压安装包
[lyh@hadoop102 software]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
2)修改解压后的文件名称
[lyh@hadoop102 module]$ mv kafka_2.11-2.4.1/ kafka
3)在/opt/module/kafka目录下创建logs文件夹
[lyh@hadoop102 kafka]$ mkdir logs
4)修改配置文件

[lyh@hadoop102 kafka]$ cd config/
[lyh@hadoop102 config]$ vi server.properties
修改或者增加以下内容:
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/data
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

5)配置环境变量

[lyh@hadoop102 module]$ sudo vi /etc/profile.d/my_env.sh

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

[lyh@hadoop102 module]$ source /etc/profile.d/my_env.sh

6)分发安装包
[lyh@hadoop102 module]$ xsync kafka/
注意:分发之后记得配置其他机器的环境变量
7)分别在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
注:broker.id不得重复

8)启动集群

依次在hadoop102、hadoop103、hadoop104节点上启动kafka
[lyh@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
[lyh@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon  /opt/module/kafka/config/server.properties
[lyh@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon  /opt/module/kafka/config/server.properties

9)关闭集群

[lyh@hadoop102 kafka]$ bin/kafka-server-stop.sh
[lyh@hadoop103 kafka]$ bin/kafka-server-stop.sh
[lyh@hadoop104 kafka]$ bin/kafka-server-stop.sh

2、Kafka集群启动停止脚本

(1)在/home/lyh/bin目录下创建脚本kf.sh
[lyh@hadoop102 bin]$ vim kf.sh
在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
    done
};;
"stop"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
    done
};;
esac

(2)增加脚本执行权限
[lyh@hadoop102 bin]$ chmod u+x kf.sh
(3)kf集群启动脚本
[lyh@hadoop102 module]$ kf.sh start
(4)kf集群停止脚本
[lyh@hadoop102 module]$ kf.sh stop

3、Kafka常用命令

1)查看Kafka Topic列表

[lyh@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --list

2)创建Kafka Topic
进入到/opt/module/kafka/目录下创建日志主题

[lyh@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka  --create --replication-factor 1 --partitions 1 --topic topic_log

3)删除Kafka Topic

[lyh@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --topic topic_log

4)Kafka生产消息

[lyh@hadoop102 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic topic_log
>hello world
>lyh  lyh

5)Kafka消费消息

[lyh@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic topic_log

–from-beginning:会把主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

6)查看Kafka Topic详情

[lyh@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka \
--describe --topic topic_log

4、项目经验之Kafka机器数量计算

  • Kafka机器数量(经验公式)= 2 *(峰值生产速度 * 副本数 / 100)+ 1
  • 先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。

1)峰值生产速度
峰值生产速度可以压测得到。
2)副本数
副本数默认是1个,在企业里面2-3个都有,2个居多。
副本多可以提高可靠性,但是会降低网络传输效率。
比如我们的峰值生产速度是50M/s。副本数为2。
Kafka机器数量 = 2 *(50 * 2 / 100)+ 1 = 3台

5、项目经验之Kafka压力测试

1)Kafka压测
用Kafka官方自带的脚本,对Kafka进行压测。
kafka-consumer-perf-test.sh
kafka-producer-perf-test.sh
Kafka压测时,在硬盘读写速度一定的情况下,可以查看到哪些地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。

2)Kafka Producer压力测试

(0)压测环境准备
①hadoop102、hadoop103、hadoop104的网络带宽都设置为100mbps。
②关闭hadoop102主机,并根据hadoop102克隆出hadoop105(修改IP和主机名称)
③hadoop105的带宽不设限
④创建一个test topic,设置为3个分区2个副本

[lyh@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --replication-factor 2 --partitions 3 --topic test

(1)在/opt/module/kafka/bin目录下面有这两个文件。我们来测试一下

[lyh@hadoop102 kafka]$ bin/kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 10000000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

说明:
record-size是一条信息有多大,单位是字节。
num-records是总共发送多少条信息。
throughput 是每秒多少条信息,设成-1,表示不限流,尽可能快的生产数据,可测出生产者最大吞吐量。

(2)Kafka会打印下面的信息

699884 records sent, 139976.8 records/sec (13.35 MB/sec), 1345.6 ms avg latency, 2210.0 ms max latency.
713247 records sent, 141545.3 records/sec (13.50 MB/sec), 1577.4 ms avg latency, 3596.0 ms max latency.
773619 records sent, 153862.2 records/sec (14.67 MB/sec), 2326.8 ms avg latency, 4051.0 ms max latency.
773961 records sent, 154206.2 records/sec (15.71 MB/sec), 1964.1 ms avg latency, 2917.0 ms max latency.
776970 records sent, 154559.4 records/sec (15.74 MB/sec), 1960.2 ms avg latency, 2922.0 ms max latency.
776421 records sent, 154727.2 records/sec (15.76 MB/sec), 1960.4 ms avg latency, 2954.0 ms max latency.

参数解析:Kafka的吞吐量15m/s左右是否符合预期呢?
hadoop102、hadoop103、hadoop104三台集群的网络总带宽30m/s左右,由于是两个副本,所以Kafka的吞吐量30m/s ➗ 2(副本) = 15m/s

结论:网络带宽和副本都会影响吞吐量。

(4)调整batch.size
batch.size默认值是16k。
batch.size较小,会降低吞吐量。比如说,批次大小为0则完全禁用批处理,会一条一条发送消息);
batch.size过大,会增加消息发送延迟。比如说,Batch设置为64k,但是要等待5秒钟Batch才凑满了64k,才能发送出去。那这条消息的延迟就是5秒钟。

[lyh@hadoop102 kafka]$ bin/kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 10000000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=500

输出结果

69169 records sent, 13833.8 records/sec (1.32 MB/sec), 2517.6 ms avg latency, 4299.0 ms max latency.
105372 records sent, 21074.4 records/sec (2.01 MB/sec), 6748.4 ms avg latency, 9016.0 ms max latency.
113188 records sent, 22637.6 records/sec (2.16 MB/sec), 11348.0 ms avg latency, 13196.0 ms max latency.
108896 records sent, 21779.2 records/sec (2.08 MB/sec), 12272.6 ms avg latency, 12870.0 ms max latency.

(5)linger.ms
如果设置batch size为64k,但是比如过了10分钟也没有凑够64k,怎么办?
可以设置,linger.ms。比如linger.ms=5ms,那么就是要发送的数据没有到64k,5ms后,数据也会发出去。
(6)总结
同时设置batch.size和 linger.ms,就是哪个条件先满足就都会将消息发送出去

Kafka需要考虑高吞吐量与延时的平衡。

3)Kafka Consumer压力测试

(1)Consumer的测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能。

[lyh@hadoop102 kafka]$ bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1
①参数说明:
--broker-list指定Kafka集群地址
--topic 指定topic的名称
--fetch-size 指定每次fetch的数据的大小
--messages 总共要消费的消息个数
②测试结果说明:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2021-08-03 21:17:21:778, 2021-08-03 21:18:19:775, 514.7169, 8.8749, 5397198, 93059.9514
开始测试时间,测试结束数据,共消费数据514.7169MB,吞吐量8.8749MB/s

(2)调整fetch-size

①增加fetch-size值,观察消费吞吐量。
[lyh@hadoop102 kafka]$ bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --fetch-size 100000 --messages 10000000 --threads 1
②测试结果说明:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2021-08-03 21:22:57:671, 2021-08-03 21:23:41:938, 514.7169, 11.6276, 5397198, 121923.7355

(3)总结
吞吐量受网络带宽和fetch-size的影响

6、项目经验值Kafka分区数计算

(1)创建一个只有1个分区的topic
(2)测试这个topic的producer吞吐量和consumer吞吐量。
(3)假设他们的值分别是Tp和Tc,单位可以是MB/s。
(4)然后假设总的目标吞吐量是Tt,那么分区数 = Tt / min(Tp,Tc)//min是取最小值
例如:producer吞吐量 = 20m/s;consumer吞吐量 = 50m/s,期望吞吐量100m/s;
分区数 = 100 / 20 = 5分区

https://blog.csdn/weixin_42641909/article/details/89294698

分区数一般设置为:3-10个

三、采集日志Flume

1、日志采集Flume安装

(1)Flume安装部署

(1.1)安装地址

(1) Flume官网地址:http://flume.apache/
(2)文档查看地址:http://flume.apache/FlumeUserGuide.html
(3)下载地址:http://archive.apache/dist/flume/

(1.2)安装部署

(1)将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下
(2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
[lyh@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
(3)修改apache-flume-1.9.0-bin的名称为flume
[lyh@hadoop102 module]$ mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
(4)将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
[lyh@hadoop102 module]$ rm /opt/module/flume/lib/guava-11.0.2.jar

注意:删除guava-11.0.2.jar的服务器节点,一定要配置hadoop环境变量。否则会报如下异常。

Caused by: java.lang.ClassNotFoundException: com.googlemon.collect.Lists
        at java.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 1 more
(5)将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件
[lyh@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
[lyh@hadoop102 conf]$ vi flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_212

(1.3)集群规划

2、项目经验之Flume组件选型

1)Source
(1)Taildir Source相比Exec Source、Spooling Directory Source的优势

  • TailDirSource(flume1.7的时候诞生的):断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。不会丢数据,但是有可能会导致数据重复。
  • Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
  • Spooling Directory Source监控目录,支持断点续传。但不能实时监控文件变化

(2)batchSize大小如何设置?
答:Event 1K左右时,500-1000合适(默认为100)

2)Channel

  • 采用Kafka
    Channel(flume1.6的时候诞生的),省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。
  • 注意在Flume1.7以前,KafkaChannel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为FlumeEvent。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

3、日志采集Flume配置

1)Flume配置分析

2)Flume的具体配置如下:
(1)在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件
[lyh@hadoop102 conf]$ vim file-flume-kafka.conf
在文件配置如下内容

#为各组件命名
a1.sources = r1
a1.channels = c1

#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile=
/opt/module/flume/taildir_position.json
#配置拦截器(ETL数据清洗 判断json是否完整)
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type= com.lyh.flume.interceptor.ETLInterceptor$Builder

#描述channel
a1.channels.c1.type= org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers= hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
#配置sink(没有)
#绑定source和channel以及sink和channel的关系(拼接组件)
a1.sources.r1.channels = c1

注意:com.lyh.flume.interceptor.ETLInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

4、Flume拦截器

1)创建Maven工程flume-interceptor
2)创建包名:com.lyh.flume.interceptor
3)在pom.xml文件中添加如下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

注意:scope中provided的含义是编译时用该jar包。打包时时不用。因为集群上已经存在flume的jar包。只是本地编译时用一下。
4)在com.lyh.flume.interceptor包下创建JSONUtils类

package com.lyh.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;

public class JSONUtils {
    public static boolean isJSONValidate(String log){
        try {
            JSON.parse(log);
            return true;
        }catch (JSONException e){
            return false;
        }
    }
}

5)在com.lyh.flume.interceptor包下创建ETLInterceptor类

package com.lyh.flume.interceptor;

import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);

        if (JSONUtils.isJSONValidate(log)) {
            return event;
        } else {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {

        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                iterator.remove();
            }
        }

        return list;
    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {

        }

    }

    @Override
    public void close() {

    }
}

6)打包

7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

[lyh@hadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

8)分发Flume到hadoop103、hadoop104
[lyh@hadoop102 module]$ xsync flume/

9)分别在hadoop102、hadoop103上启动Flume

[lyh@hadoop102 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
[lyh@hadoop103 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

5、测试Flume-Kafka通道


(1)生成日志
[lyh@hadoop102 ~]$ lg.sh

(2)消费Kafka数据,观察控制台是否有数据获取到

[lyh@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic topic_log

说明:如果获取不到数据,先检查Kafka、Flume、Zookeeper是否都正确启动。再检查Flume的拦截器代码是否正常。

6、日志采集Flume启动停止脚本

(1)在/home/lyh/bin目录下创建脚本f1.sh
[lyh@hadoop102 bin]$ vim f1.sh

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1  &"
        done
};;	
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac
  • 说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。
  • 说明2:awk 默认分隔符为空格
  • 说明3:$2是在“”双引号内部会被解析为脚本的第二个参数,但是这里面想表达的含义是awk的第二个值,所以需要将他转义,用$2表示。
  • 说明4:xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。

(2)增加脚本执行权限
[lyh@hadoop102 bin]$ chmod u+x f1.sh
(3)f1集群启动脚本
[lyh@hadoop102 module]$ f1.sh start
(4)f1集群停止脚本
[lyh@hadoop102 module]$ f1.sh stop

四、消费Kafka数据Flume

1、集群规划

2、项目经验之Flume组件选型

1)FileChannel和MemoryChannel区别

  • MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。
  • FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。
  • 选型: 金融类公司、对钱要求非常准确的公司通常会选择FileChannel;
    传输的是普通日志信息(京东内部一天丢100万-200万条,这是非常正常的),通常选择MemoryChannel。

2)FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据。

3)Sink:HDFS Sink
(1)HDFS存入大量小文件,有什么影响?

  • 元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
  • 计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

(2)HDFS小文件处理

  • 官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
  • 基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount
    =0几个参数综合作用,效果如下:

①文件在达到128M时会滚动生成新文件
②文件创建超3600秒时会滚动生成新文件

3、消费者Flume配置

1)Flume配置分析

2)Flume的具体配置如下:
(1)在hadoop104的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件
[lyh@hadoop104 conf]$ vim kafka-flume-hdfs.conf
在文件配置如下内容

## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers= hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
## 时间戳拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type= com.lyh.flume.interceptor.TimeStampInterceptor$Builder

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir= /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/


## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path= /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false

#控制生成的小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

4、Flume时间戳拦截器(解决零点漂移问题)

  • 由于Flume默认会用Linux系统时间,作为输出到HDFS路径的时间。如果数据是23:59分产生的。Flume消费Kafka里面的数据时,有可能已经是第二天了,那么这部门数据会被发往第二天的HDFS路径。我们希望的是根据日志里面的实际时间,发往HDFS的路径,所以下面拦截器作用是获取日志中的实际时间。
  • 解决的思路:拦截json日志,通过fastjson框架解析json,获取实际时间ts。将获取的ts时间写入拦截器header头,header的key必须是timestamp,因为Flume框架会根据这个key的值识别为时间,写入到HDFS。

1)在com.lyh.flume.interceptor包下创建TimeStampInterceptor类

package com.lyh.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TimeStampInterceptor implements Interceptor {

    private ArrayList<Event> events = new ArrayList<>();

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        events.clear();
        for (Event event : list) {
            events.add(intercept(event));
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

2)重新打包

3)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。
[lyh@hadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
4)分发Flume到hadoop103、hadoop104
[lyh@hadoop102 module]$ xsync flume/

5、消费者Flume启动停止脚本

(1)在/home/lyh/bin目录下创建脚本f2.sh
[lyh@hadoop102 bin]$ vim f2.sh
在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop104
        do
                echo " --------启动 $i 消费flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt   2>&1 &"
        done
};;
"stop"){
        for i in hadoop104
        do
                echo " --------停止 $i 消费flume-------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
        done

};;
esac

(2)增加脚本执行权限
[lyh@hadoop102 bin]$ chmod u+x f2.sh
(3)f2集群启动脚本
[lyh@hadoop102 module]$ f2.sh start
(4)f2集群停止脚本
[lyh@hadoop102 module]$ f2.sh stop

6、项目经验之Flume内存优化

1)问题描述:如果启动消费Flume抛出如下异常
ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded

2)解决方案步骤
(1)在hadoop102服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

(2)同步配置到hadoop103、hadoop104服务器
[lyh@hadoop102 conf]$ xsync flume-env.sh

3)Flume内存参数设置及优化

  • JVM heap一般设置为4G或更高
  • -Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
  • -Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。

五、采集通道启动/停止脚本(zk和kf有着一定的顺序)

(1)在/home/lyh/bin目录下创建脚本cluster.sh
[lyh@hadoop102 bin]$ vim cluster.sh
在脚本中填写如下内容

#!/bin/bash

case $1 in
"start"){
        echo ================== 启动 集群 ==================

        #启动 Zookeeper集群
        zk.sh start

        #启动 Hadoop集群(有的是myhadoop.sh)
        hdp.sh start

        #启动 Kafka采集集群
        kf.sh start

        #启动 Flume采集集群
        f1.sh start

        #启动 Flume消费集群
        f2.sh start

        };;
"stop"){
        echo ================== 停止 集群 ==================

        #停止 Flume消费集群
        f2.sh stop

        #停止 Flume采集集群
        f1.sh stop

        #停止 Kafka采集集群
        kf.sh stop

        #停止 Hadoop集群
        hdp.sh stop

        #停止 Zookeeper集群
        zk.sh stop

};;
esac

(2)增加脚本执行权限
[lyh@hadoop102 bin]$ chmod u+x cluster.sh
(3)cluster集群启动脚本
[lyh@hadoop102 module]$ cluster.sh start
(4)cluster集群停止脚本
[lyh@hadoop102 module]$ cluster.sh stop

六、2NN页面不能显示完整信息

1)问题描述
访问2NN页面http://hadoop104:9868,看不到详细信息

2)解决办法
(1)在浏览器上按F12,查看问题原因。定位bug在61行
(2)找到要修改的文件

[lyh@hadoop102 static]$ pwd
/opt/module/hadoop-3.1.3/share/hadoop/hdfs/webapps/static
[lyh@hadoop102 static]$ vim dfs-dust.js
:set nu
修改61行 
return new Date(Number(v)).toLocaleString();

(3)分发dfs-dust.js
[lyh@hadoop102 static]$ xsync dfs-dust.js
(4)在http://hadoop104:9868/status.html 页面强制刷新

本文标签: 商数脚本顺序通道完整