Flume自定义拦截器ETL分类型

编程入门 行业动态 更新时间:2024-10-26 11:13:37

Flume<a href=https://www.elefans.com/category/jswz/34/1771438.html style=自定义拦截器ETL分类型"/>

Flume自定义拦截器ETL分类型

Flume源码中的Event

package org.apache.flume;
import java.util.Map;
/*** Basic representation of a data object in Flume.* Provides access to data as it flows through the system.*/public interface Event {/*** 返回描述存储在body中的数据的键-值对的映射。*/public Map<String, String> getHeaders();/*** Set the event headers 设置event的头部* @param headers Map of headers to replace the current headers.*/public void setHeaders(Map<String, String> headers);/*** Returns the raw byte array of the data contained in this event.* 以字节数组的形式返回event*/public byte[] getBody();/*** Sets the raw byte array of the data contained in this event.* @param body The data.*/public void setBody(byte[] body);}

Flume源码中的Interceptor

public interface Interceptor {/*** Any initialization / startup needed by the Interceptor.* 初始化的方法*/public void initialize();/*** 单个event的处理方法* Interception of a single {@link Event}.* @param event Event to be intercepted* @return Original or modified event, or {@code null} if the Event* is to be dropped (i.e. filtered out).*/public Event intercept(Event event);/*** 一个批次的event的处理方法* Interception of a batch of {@linkplain Event events}.* @param events Input list of events* @return Output list of events. The size of output list MUST NOT BE GREATER* than the size of the input list (i.e. transformation and removal ONLY).* Also, this method MUST NOT return {@code null}. If all events are dropped,* then an empty List is returned.*/public List<Event> intercept(List<Event> events);/*** Perform any closing / shutdown needed by the Interceptor.*/public void close();/** Builder implementations MUST have a no-arg constructor */public interface Builder extends Configurable {public Interceptor build();}
}

Flume检验日志数据工具类

import org.apache.commons.lang.math.NumberUtils;/*** 检验数据的工具类*/
public class LogUtil {public static boolean validateStart(String log) {if (log == null) {return false;}//log 的类型是json{}if (!log.trim().startsWith("{") || !log.trim().startsWith("}")) {return false;}return true;}public static boolean validateEvent(String log) {// 服务器时间 | json// 1349696509054 | {"cm":{"ln":"-...............}if (log == null) {return false;}// 切割String[] logContents = log.split("\\|");// 校验长度if (logContents.length != 2) {return false;}// 检验服务器时间if (logContents[0].length() != 13 || !NumberUtils.isDigits(logContents[0])) {return false;}// 校验Json数据if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")) {return false;}return true;}
}

FlumeETL拦截器实现

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;/*** 自定义拦截器实现日志json数据的拦截过滤* 实现Interceptor接口*/
public class LogETLInceptor implements Interceptor {/*** 初始化方法*/@Overridepublic void initialize() {}/*** 单个inceptor的过滤方法** @param event* @return*/@Overridepublic Event intercept(Event event) {byte[] body = event.getBody();// 转换为字符串String log = new String(body, Charset.forName("UTF-8"));//校验数据是否合法if (log.contains("start")) {if (LogUtil.validateEvent(log)) {return event;}} else if (LogUtil.validateEvent(log)) {return event;}return null;}/*** EventList的过滤方法** @param events* @return*/@Overridepublic List<Event> intercept(List<Event> events) {ArrayList<Event> listEvent = new ArrayList<>();// 循环遍历进行判断每一个eventfor (Event enent : events) {Event intercept = intercept(enent);if (intercept != null) {listEvent.add(intercept);}}return listEvent;}/*** 关闭方法*/@Overridepublic void close() {}/*** 根据Flume使用规范自定义FlumeBuilder,通过静态内部类的方式实现*/public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new LogETLInceptor();}@Overridepublic void configure(Context context) {}}
}

Flume分类型拦截器

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** flume 分类型拦截器*/
public class LogTypeInterceptor implements Interceptor {@Overridepublic void initialize() {}/*** 将body里面的数据分类型写到header里面** @param event* @return*/@Overridepublic Event intercept(Event event) {// 1.获取body中的数据byte[] eventBody = event.getBody();String log = new String(eventBody, Charset.forName("UTF-8"));// 2.获取HeaderMap<String, String> headers = event.getHeaders();// 3.判断if (log.contains("start")) {headers.put("topic", "topic_start");} else {headers.put("topic", "topic_event");}return null;}@Overridepublic List<Event> intercept(List<Event> events) {ArrayList<Event> inteceptors = new ArrayList<>();for (Event event : events) {if (event != null) {Event intercept = intercept(event);inteceptors.add(intercept);}}return inteceptors;}@Overridepublic void close() {}
}public static class Builder implements org.apache.flume.interceptor.Interceptor.Builder {public Interceptor build() {return new LogTypeInterceptor ();}public void configure(Context context) {}}

Flume自定义拦截器打包时候 选择的依赖需要和项目环境的依赖版本一致,否则会报错,将jar包放入flume的lib目录下面。


Flume配置文件

a1.sources=r1
a1.channels=c1 c2		# 组件定义# configure source
a1.sources.r1.type = TAILDIR	#taildir 方式读取数据
a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json	#记录日志读取位置offset
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+	#读取日志位置
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2#interceptor
a1.sources.r1.interceptors =  i1 i2
a1.sources.r1.interceptors.i1.type = com.flume.interceptor.LogETLInterceptor$Builder	# ETL拦截器
a1.sources.r1.interceptors.i2.type = com.flume.interceptor.LogTypeInterceptor$Builder	# Type拦截器a1.sources.r1.selector.type = multiplexing		# 根据日志类型分数据
a1.sources.r1.selector.header = topic	
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_start	# 日志类型是start,数据发往channel
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumera1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c2.kafka.topic = topic_event	# 日志类型是event,数据发往channel2
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumercom.flume.interceptor.LogETLInterceptor 和com.flume.interceptor.LogTypeInterceptor$Builder 是自定义拦截器的全类名

Flume启动脚本

#! /bin/bashcase $1 in
"start"){for i in hadoop102 hadoop103doecho " --------启动 $i 采集flume-------"ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/dev/null 2>&1 &"done
};;	
"stop"){for i in hadoop102 hadoop103doecho " --------停止 $i 采集flume-------"ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"done};;
esac

说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。

说明2:/dev/null代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。

  • 标准输入0:从键盘获得输入 /proc/self/fd/0
  • 标准输出1:输出到屏幕(即控制台) /proc/self/fd/1 即正常的日志
  • 错误输出2:输出到屏幕(即控制台) /proc/self/fd/2后台运行 即:error日志信息
  • &:后台运行
  • 2>&1 :二级别的日志和一级别的日志

说明3:
$2:代表的是awk截取后面的第二个参数,如果直接写$2即为该脚本的的第二个参数

ps -ef |grep file-fluem-kafka |grep -v grep |awk '{print \$2}' |xargs kill

xargs :将前面的结果作为后面的参数

file-fluem-kafka:为配置文件的名称

更多推荐

Flume自定义拦截器ETL分类型

本文发布于:2023-07-28 20:15:50,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1298228.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:自定义   类型   拦截器   Flume   ETL

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!