组件"/>
Flume组件
1.Source
TailDir Source相比于Exec Surce、Spooling Directory Source、的优势有:断点续传、配置多目录。
Fluem1.6之前需要自定义Source记录每次读取文件的位置实现断点续传。
Exec Source 可以实时搜集数据,但是在Flume不运行,或者Shell命令出错的情况下,数据将会丢失。
Spooling Directory Source监控目录,但是不支持断点续传。
2.Channel
采用Kafka Channel,省去了Sink,提高了效率。
a1.sources=r1
a1.channels=c1 c2# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/test/log.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.my.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.my.flume.interceptor.LogTypeInterceptor$Buildera1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumera1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
ToDo:******
更多推荐
Flume组件
发布评论