基于Flink及Redis布隆过滤器的大数据去重

编程入门 行业动态 更新时间:2024-10-08 10:49:09

基于Flink及Redis布隆<a href=https://www.elefans.com/category/jswz/34/1771166.html style=过滤器的大数据去重"/>

基于Flink及Redis布隆过滤器的大数据去重

Flink是由Apache软件基金会开发的开源流处理框架,社区活跃,并由阿里主导;

布隆过滤器是海量数据去重利器;

Redisson是Redis官方推荐的Java版的Redis客户端,它基于Redis做了更多功能封装,其中就包括布隆过滤器;

结合这三者可以快速的实现一个流式的数据去重功能。

package bill;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import java.io.InputStream;
import java.util.Properties;/*** Created by Bill on 2019-5-9.*/
public class KafkaMessageStreaming {public static void main(String[] args) throws Exception {String inTopic = args[0];String outTopic = args[1];String redisUrl = args[2];String jobName = args[3];String kafkaProps = args[4];final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(300000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Configuration conf = new Configuration();conf.setString("redis_url",redisUrl);conf.setString("job_name",jobName);env.getConfig().setGlobalJobParameters(conf);Properties props = new Properties();InputStream in = KafkaMessageStreaming.class.getClassLoader().getResourceAsStream(kafkaProps);props.load(in);FlinkKafkaConsumer010<String> consumer =new FlinkKafkaConsumer010<>(inTopic, new SimpleStringSchema(), props);consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());FlinkKafkaProducer010<Tuple2<String, String>> producer =new FlinkKafkaProducer010<Tuple2<String, String>>(outTopic, new SerializationSchema<Tuple2<String, String>>() {@Overridepublic byte[] serialize(Tuple2<String, String> element) {return (element.f0+","+element.f1).getBytes();}}, props);env.addSource(consumer).flatMap(new MessageSplitter()).keyBy(1, 2, 3).flatMap(new UniqAnalysis()).addSink(producer);env.execute(jobName);}
}
package bill;import org.apache.flink.apimon.ExecutionConfig;
import org.apache.flink.apimon.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.redisson.Redisson;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;public class UniqAnalysis extends RichFlatMapFunction<Tuple4<Long, String, String, String>, Tuple2<String, String>> {private String redisUrl = null;private String jobName = null;transient private RedissonClient redisson = null;transient private RBloomFilter<String> bloomFilter = null;private Tuple2<String, String> doaminFirstTime = new Tuple2<>("", "");@Overridepublic void flatMap(Tuple4<Long, String, String, String> in, Collector<Tuple2<String, String>> out) {if (!this.bloomFilter.contains(in.f1 + in.f2 + in.f3)) {this.bloomFilter.add(in.f1 + in.f2 + in.f3);doaminFirstTime.f0 = String.valueOf(in.f0);doaminFirstTime.f1 = in.f1 +","+ in.f2 +"," + in.f3;out.collect(doaminFirstTime);//System.out.println("filter redisUrl:" + this.redisUrl + " bloomFilter:" + bloomFilter);}}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();Configuration globConf = (Configuration) globalParams;this.redisUrl = globConf.getString("redis_url", null);this.jobName = globConf.getString("job_name", null);//System.out.println("redisUrl:"+this.redisUrl);if(this.redisUrl!=null) {Config redissonConfig = new Config();redissonConfig.useSingleServer().setAddress(this.redisUrl);redisson = Redisson.create(redissonConfig);this.bloomFilter = redisson.getBloomFilter("first");this.bloomFilter.tryInit(100_000_000, 0.03);System.out.println(System.currentTimeMillis()+" ["+this.jobName+"] RichFilterFunction open new MessageSplitter bloomFilter:" + bloomFilter);}}@Overridepublic void close() {System.out.println(System.currentTimeMillis()+" ["+this.jobName+"] redisson.shutdown()");this.redisson.shutdown();}
}
package bill;import org.apache.flink.apimon.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;/*** Created by Bill on 2019-5-9.*/
public class MessageSplitter implements FlatMapFunction<String, Tuple4<Long, String, String, String>> {private Tuple4<Long, String, String, String> doaminFirstTime = new Tuple4<>(0L, "", "", "");@Overridepublic void flatMap(String value, Collector<Tuple4<Long, String, String, String>> out) {if (value != null && value.contains(",")) {String[] parts = value.split(",");if(parts.length==9 && parts[6].length()!=0){doaminFirstTime.f0 = Long.parseLong(parts[1]);doaminFirstTime.f1 = parts[4];doaminFirstTime.f2 = parts[5];doaminFirstTime.f3 = parts[6];out.collect(doaminFirstTime);}}}
}

 

更多推荐

基于Flink及Redis布隆过滤器的大数据去重

本文发布于:2024-02-27 21:24:59,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1766573.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:过滤器   数据   Flink   Redis

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!