实时流处理(3)——实时流数据处理"/>
模拟购物数据实时流处理(3)——实时流数据处理
项目介绍
本项目总体分为
- 平台搭建
- 模拟数据源生成
- 实时流数据处理
- 实时数据大屏
这几个部分,我将分成几个博客分别介绍这些部分的工作,本文主要介绍实时流数据处理的部分,下面给出整个项目数据生成和处理部分的框架
平台搭建,具体可以看平台搭建
模拟数据源生成,具体可以看模拟数据源生成
实时数据大屏,具体可以看实时数据大屏
项目下载地址下载
环境介绍
终于到了整个项目的重头戏部分了,这部分是使用storm的trident高级事务来进行流处理,这部分主要使用java来进行编写,使用的平台为idea,对于idea使用不是很清楚的可以参考下面的文章,IDEA的简单使用
流处理部分
下面主要介绍一下流处理这部分主要的工作以及主要编写的类,及各类的主要作用,具体还可以结合上面的数据处理框架来看。
数据处理流程
- 数据源将生成的数据输入到kafka
- 用TransactionalTridentKafkaSpout接收数据
- 将接收到的数据存入mysql
- 统计数据中出现的性别,商品类别,地域,年龄段的次数
- 统计性别和年龄段的关系
- 统计性别和商品类别的关系
- 将统计的结果输出到文件
主要编写的类及各类的作用
- Trident_Topology :拓扑主类
- SpilterFunction : 将传过来的数据进行切分
- SaveFunction :将切分后的数据存入数据库
- Gender_Age_Type :统计性别和年龄段,性别和商品类型的关系
- SumWord:统计性别,商品类别,地域,年龄段的次数
- PrintFunction:将统计结果输出到文件
- PrintFilter_partition :数据流过滤函数,这里没有过滤,只是将元组输出到了日志中
- Qurey:mysql的查询函数,主要是在出入数据库之前先判断数据是否已经在数据库中
代码部分
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0"xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>ssh</groupId><artifactId>storm_trident</artifactId><version>1.0-SNAPSHOT</version><properties><kafka.version>0.11.0.0</kafka.version><storm.version>1.1.1</storm.version></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>${kafka.version}</version><exclusions><exclusion><groupId>javax.jms</groupId><artifactId>jms</artifactId></exclusion><exclusion><groupId>com.sun.jdmk</groupId><artifactId>jmxtools</artifactId></exclusion><exclusion><groupId>com.sun.jmx</groupId><artifactId>jmxri</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>${storm.version}</version></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>${storm.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>log4j-over-slf4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>commons-collections</groupId><artifactId>commons-collections</artifactId><version>3.2.1</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>15.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.12</version></dependency><dependency><groupId>net.sf.json-lib</groupId><artifactId>json-lib</artifactId><version>2.4</version><classifier>jdk15</classifier></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.2.4</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.codehaus.mojo</groupId><artifactId>exec-maven-plugin</artifactId><version>1.2.1</version><executions><execution><goals><goal>exec</goal></goals></execution></executions><configuration><executable>java</executable><includeProjectDependencies>true</includeProjectDependencies><includePluginDependencies>false</includePluginDependencies><classpathScope>compile</classpathScope><mainClass>com.learningstorm.kafka.KafkaTopology</mainClass></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass></mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
Trident_Topology.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;public class Trident_Topology {public static void main(String[] args) {TridentTopology topology = new TridentTopology();ZkHosts zkHosts = new ZkHosts("192.168.161.100:2501");TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, "wordcount");kafkaConfig.scheme =new SchemeAsMultiScheme(new StringScheme());TransactionalTridentKafkaSpout TransactionalTridentKafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);topology.newStream("kafka_transactionalTrident", TransactionalTridentKafkaSpout).parallelismHint(2).broadcast().each(new Fields("str"), new SpilterFunction(), new Fields("a","b","c","d","e","f","g")).each(new Fields("a","b","c","d","e","f","g"), new SaveFunction(), new Fields("gender","age","type","place")).chainedAgg().aggregate(new Fields("gender","age"), new Gender_Age_Type(),new Fields("gender-age")).aggregate(new Fields("gender","type"), new Gender_Age_Type(),new Fields("gender-type")).aggregate(new Fields("gender"), new SumWord(),new Fields("gender_all")).aggregate(new Fields("age"), new SumWord(),new Fields("age_all")).aggregate(new Fields("type"), new SumWord(),new Fields("type_all")).aggregate(new Fields("place"), new SumWord(),new Fields("place_all")).chainEnd().parallelismHint(1).each(new Fields("gender-age","gender-type","gender_all","age_all","type_all","place_all"), new PrintFunction(),new Fields("gender_age","gender_type","gender","age","type","place")).each(new Fields("gender_age","gender_type","gender","age","type","place"), new PrintFilter_partition());Config config = new Config();config.setDebug(false);config.setNumWorkers(2);if (args.length > 0){try {StormSubmitter.submitTopology(args[0], config,topology.build());try {System.out.println("wait data");Thread.sleep(3000000);} catch (Exception exception) {System.out.println("Thread interrupted exception : " + exception);}} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();} catch (AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("trident_Topology", config,topology.build());}try {System.out.println("wait data");Thread.sleep(3000000);} catch (Exception exception) {System.out.println("Thread interrupted exception : " + exception);}}
}
SpilterFunction.java
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;public class SpilterFunction extends BaseFunction {private static final long serialVersionUID = 1L;private static final Logger LOGGER = LoggerFactory.getLogger(SpilterFunction.class);@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {String sentens = tuple.getString(0);String[] array = sentens.split("\n");for (int i = 0; i < array.length; i++) {System.out.println("spilter record:" + array[i]);LOGGER.info("spilter record:" + array[i]);String[] array1=array[i].split("\t");collector.emit(new Values(array1[0],array1[1],array1[2],array1[3],array1[4],array1[5],array1[6]));}}
}
SaveFunction.java
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;import java.sql.*;public class SaveFunction extends BaseFunction {private static final long serialVersionUID = 1L;static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";static final String DB_URL = "jdbc:mysql://localhost:3306/transaction?useSSL=false&serverTimezone=UTC";// 数据库的用户名与密码,需要根据自己的设置static final String USER = "root";static final String PASS = "root";@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {Qurey test = new Qurey();if (test.match(tuple.getStringByField("a"))) {Connection conn = null;Statement stmt = null;try {// 注册 JDBC 驱动Class.forName(JDBC_DRIVER);// 打开链接conn = DriverManager.getConnection(DB_URL, USER, PASS);String sql = "insert into record values(?,?,?,?,?,?,?)";//数据库操作语句(插入)PreparedStatement pst = conn.prepareStatement(sql);pst.setString(1, tuple.getStringByField("a"));pst.setString(2, tuple.getStringByField("b"));pst.setString(3, tuple.getStringByField("c"));pst.setString(4, tuple.getStringByField("d"));pst.setString(5, tuple.getStringByField("e"));pst.setString(6, tuple.getStringByField("f"));pst.setString(7, tuple.getStringByField("g"));pst.executeUpdate();conn.close();} catch (SQLException se) {// 处理 JDBC 错误se.printStackTrace();} catch (Exception e) {// 处理 Class.forName 错误e.printStackTrace();} finally {// 关闭资源try {if (stmt != null) stmt.close();} catch (SQLException se2) {}// 什么都不做try {if (conn != null) conn.close();} catch (SQLException se) {se.printStackTrace();}}}collector.emit(new Values(tuple.get(1), tuple.get(2), tuple.get(3), tuple.get(4)));}
}
Gender_Age_Type.java
import java.util.HashMap;
import java.util.Map;
import org.apachemons.collections.MapUtils;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;public class Gender_Age_Type extends BaseAggregator<Map<String,Map<String,Integer>>> {private static final long serialVersionUID = 1L;/** * 属于哪个batch*/private Object batchId;/** * 属于哪个分区 */private int partitionId;/** * 分区数量 */private int numPartitions;/** * 用来统计 */private Map<String,Map<String,Integer>> state;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map conf, TridentOperationContext context) {state = new HashMap<String,Map<String,Integer>>();Map<String,Integer> male = new HashMap<String,Integer>();Map<String,Integer> female = new HashMap<String,Integer>();state.put("male",male);state.put("female",female);partitionId = context.getPartitionIndex();numPartitions = context.numPartitions();}@Overridepublic Map<String, Map<String,Integer>> init(Object batchId, TridentCollector collector) {this.batchId = batchId;return state;}@Overridepublic void aggregate(Map<String, Map<String,Integer>> val, TridentTuple tuple,TridentCollector collector) {
// System.out.println(tuple+";partitionId="+partitionId+";partitions="+numPartitions+",batchId:" + batchId);String gender = tuple.getString(0);String age = tuple.getString(1);val.get(gender).put(age, MapUtils.getInteger(val.get(gender), age, 0)+1);
// System.out.println("sumWord:" + val);}@Overridepublic void complete(Map<String, Map<String,Integer>> val, TridentCollector collector) {collector.emit(new Values(val));}}
SumWord.java
import java.util.HashMap;
import java.util.Map;
import org.apachemons.collections.MapUtils;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;public class SumWord extends BaseAggregator<Map<String,Integer>> {private static final long serialVersionUID = 1L;/** * 属于哪个batch*/private Object batchId;/** * 属于哪个分区 */private int partitionId;/** * 分区数量 */private int numPartitions;/** * 用来统计 */private Map<String,Integer> state;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map conf, TridentOperationContext context) {state = new HashMap<String,Integer>();partitionId = context.getPartitionIndex();numPartitions = context.numPartitions();}@Overridepublic Map<String, Integer> init(Object batchId, TridentCollector collector) {this.batchId = batchId;return state;}@Overridepublic void aggregate(Map<String, Integer> val, TridentTuple tuple,TridentCollector collector) {
// System.out.println(tuple+";partitionId="+partitionId+";partitions="+numPartitions+",batchId:" + batchId);String gender = tuple.getString(0);val.put(gender, MapUtils.getInteger(val, gender, 0)+1);
// System.out.println("sumWord:" + val);}@Overridepublic void complete(Map<String, Integer> val, TridentCollector collector) {collector.emit(new Values(val));}}
PrintFunction.java
import net.sf.json.JSONObject;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;import java.io.*;
import java.util.HashMap;public class PrintFunction extends BaseFunction {private static final long serialVersionUID = 1L;private void save_to_json(Object map,String type){HashMap<String, Character> hashmap= (HashMap<String, Character>)map;JSONObject json = JSONObject.fromObject(hashmap);File file=new File("F:\\xuanxiu\\PHPstudy\\PHPTutorial\\WWW\\bigdata\\data\\"+type+".json");if(!file.exists())//判断文件是否存在,若不存在则新建{try {file.createNewFile();} catch (IOException e) {e.printStackTrace();}}try {FileOutputStream fileOutputStream=new FileOutputStream(file);OutputStreamWriter outputStreamWriter=new OutputStreamWriter(fileOutputStream,"utf-8");//将字符流转换为字节流BufferedWriter bufferedWriter= new BufferedWriter(outputStreamWriter);//创建字符缓冲输出流对象bufferedWriter.write(String.valueOf(json));//将格式化的jsonarray字符串写入文件bufferedWriter.flush();//清空缓冲区,强制输出数据bufferedWriter.close();//关闭输出流} catch (FileNotFoundException | UnsupportedEncodingException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {save_to_json(tuple.get(0),"gender_age");save_to_json(tuple.get(1),"gender_type");save_to_json(tuple.get(2),"gender");save_to_json(tuple.get(3),"age");save_to_json(tuple.get(4),"type");save_to_json(tuple.get(5),"place");
// System.out.println("gender:" + tuple.get(0));
// System.out.println("age:" + tuple.get(1));
// System.out.println("type:" + tuple.get(2));
// System.out.println("place:" + tuple.get(3));collector.emit(new Values(tuple.get(0),tuple.get(1),tuple.get(2),tuple.get(3),tuple.get(4),tuple.get(5)));}
}
PrintFilter_partition.java
import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class PrintFilter_partition extends BaseFilter {private static final Logger LOGGER = LoggerFactory.getLogger(PrintFilter_partition.class);private static final long serialVersionUID = 1L;@Overridepublic boolean isKeep(TridentTuple tuple) {LOGGER.info("打印出来的tuple:" + tuple);return true;}
}
简单演示
做到这里,基本上整个项目就已经完成了,只差最后的可视化部分而已了,所以这里也可以给大家简单的演示一下
首先将平台都启动,成功启动可以用jps命令来查看
至于数据源和处理程序没有启动顺序要求,可以随意,下面可以看看结果
这里我只将分割后的结果输出了,更多的输出在代码里面都有,大家可以根据需要打开
另外我们也可以将整个程序打包到集群上面运行
怎么用idea打jar包可以参考:idea打包的方法
我们将上面的程序打成jar包之后需要将jar包上传到我们的集群上去
然后就可以运行我们的jar包了,运行的命令是
storm jar storm_trident.jar Trident_Topology trident_Topology
上面的几个参数解释一下,首先storm jar是告诉storm运行jar包,storm_trident.jar是jar包的名字,Trident_Topology是主类的名字,trident_Topology是拓扑的名字
运行过程中我们可以去看storm的UI
这个是拓扑的一个可视化的图
总结
这部分是整个项目的难点部分,大家看的时候主要看处理流程和思想,具体的处理技术可以根据大家的实力调整,接下来整个项目就只剩下最后一个部分了,那就是可视化,在下一篇文章中我会对这部分进行详细的解释,感兴趣的可以继续关注一下
更多推荐
模拟购物数据实时流处理(3)——实时流数据处理
发布评论