模拟购物数据实时流处理(3)——实时流数据处理

编程入门 行业动态 更新时间:2024-10-05 15:31:46

模拟购物数据<a href=https://www.elefans.com/category/jswz/34/1771422.html style=实时流处理(3)——实时流数据处理"/>

模拟购物数据实时流处理(3)——实时流数据处理

项目介绍

本项目总体分为

  1. 平台搭建
  2. 模拟数据源生成
  3. 实时流数据处理
  4. 实时数据大屏

这几个部分,我将分成几个博客分别介绍这些部分的工作,本文主要介绍实时流数据处理的部分,下面给出整个项目数据生成和处理部分的框架

平台搭建,具体可以看平台搭建
模拟数据源生成,具体可以看模拟数据源生成
实时数据大屏,具体可以看实时数据大屏
项目下载地址下载

环境介绍

终于到了整个项目的重头戏部分了,这部分是使用storm的trident高级事务来进行流处理,这部分主要使用java来进行编写,使用的平台为idea,对于idea使用不是很清楚的可以参考下面的文章,IDEA的简单使用

流处理部分

下面主要介绍一下流处理这部分主要的工作以及主要编写的类,及各类的主要作用,具体还可以结合上面的数据处理框架来看。
数据处理流程

  1. 数据源将生成的数据输入到kafka
  2. 用TransactionalTridentKafkaSpout接收数据
  3. 将接收到的数据存入mysql
  4. 统计数据中出现的性别,商品类别,地域,年龄段的次数
  5. 统计性别和年龄段的关系
  6. 统计性别和商品类别的关系
  7. 将统计的结果输出到文件

主要编写的类及各类的作用

  1. Trident_Topology :拓扑主类
  2. SpilterFunction : 将传过来的数据进行切分
  3. SaveFunction :将切分后的数据存入数据库
  4. Gender_Age_Type :统计性别和年龄段,性别和商品类型的关系
  5. SumWord:统计性别,商品类别,地域,年龄段的次数
  6. PrintFunction:将统计结果输出到文件
  7. PrintFilter_partition :数据流过滤函数,这里没有过滤,只是将元组输出到了日志中
  8. Qurey:mysql的查询函数,主要是在出入数据库之前先判断数据是否已经在数据库中

代码部分

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0"xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>ssh</groupId><artifactId>storm_trident</artifactId><version>1.0-SNAPSHOT</version><properties><kafka.version>0.11.0.0</kafka.version><storm.version>1.1.1</storm.version></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>${kafka.version}</version><exclusions><exclusion><groupId>javax.jms</groupId><artifactId>jms</artifactId></exclusion><exclusion><groupId>com.sun.jdmk</groupId><artifactId>jmxtools</artifactId></exclusion><exclusion><groupId>com.sun.jmx</groupId><artifactId>jmxri</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>${storm.version}</version></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>${storm.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>log4j-over-slf4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>commons-collections</groupId><artifactId>commons-collections</artifactId><version>3.2.1</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>15.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.12</version></dependency><dependency><groupId>net.sf.json-lib</groupId><artifactId>json-lib</artifactId><version>2.4</version><classifier>jdk15</classifier></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.2.4</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.codehaus.mojo</groupId><artifactId>exec-maven-plugin</artifactId><version>1.2.1</version><executions><execution><goals><goal>exec</goal></goals></execution></executions><configuration><executable>java</executable><includeProjectDependencies>true</includeProjectDependencies><includePluginDependencies>false</includePluginDependencies><classpathScope>compile</classpathScope><mainClass>com.learningstorm.kafka.KafkaTopology</mainClass></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass></mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

Trident_Topology.java

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;public class Trident_Topology {public static void main(String[] args) {TridentTopology topology = new TridentTopology();ZkHosts zkHosts = new ZkHosts("192.168.161.100:2501");TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, "wordcount");kafkaConfig.scheme =new SchemeAsMultiScheme(new StringScheme());TransactionalTridentKafkaSpout TransactionalTridentKafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);topology.newStream("kafka_transactionalTrident", TransactionalTridentKafkaSpout).parallelismHint(2).broadcast().each(new Fields("str"), new SpilterFunction(), new Fields("a","b","c","d","e","f","g")).each(new Fields("a","b","c","d","e","f","g"), new SaveFunction(), new Fields("gender","age","type","place")).chainedAgg().aggregate(new Fields("gender","age"), new Gender_Age_Type(),new Fields("gender-age")).aggregate(new Fields("gender","type"), new Gender_Age_Type(),new Fields("gender-type")).aggregate(new Fields("gender"), new SumWord(),new Fields("gender_all")).aggregate(new Fields("age"), new SumWord(),new Fields("age_all")).aggregate(new Fields("type"), new SumWord(),new Fields("type_all")).aggregate(new Fields("place"), new SumWord(),new Fields("place_all")).chainEnd().parallelismHint(1).each(new Fields("gender-age","gender-type","gender_all","age_all","type_all","place_all"), new PrintFunction(),new Fields("gender_age","gender_type","gender","age","type","place")).each(new Fields("gender_age","gender_type","gender","age","type","place"), new PrintFilter_partition());Config config = new Config();config.setDebug(false);config.setNumWorkers(2);if (args.length > 0){try {StormSubmitter.submitTopology(args[0], config,topology.build());try {System.out.println("wait data");Thread.sleep(3000000);} catch (Exception exception) {System.out.println("Thread interrupted exception : " + exception);}} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();} catch (AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("trident_Topology", config,topology.build());}try {System.out.println("wait data");Thread.sleep(3000000);} catch (Exception exception) {System.out.println("Thread interrupted exception : " + exception);}}
}

SpilterFunction.java

import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;public class SpilterFunction extends BaseFunction {private static final long serialVersionUID = 1L;private static final Logger LOGGER = LoggerFactory.getLogger(SpilterFunction.class);@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {String sentens = tuple.getString(0);String[] array = sentens.split("\n");for (int i = 0; i < array.length; i++) {System.out.println("spilter record:" + array[i]);LOGGER.info("spilter record:" + array[i]);String[] array1=array[i].split("\t");collector.emit(new Values(array1[0],array1[1],array1[2],array1[3],array1[4],array1[5],array1[6]));}}
}

SaveFunction.java

import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;import java.sql.*;public class SaveFunction extends BaseFunction {private static final long serialVersionUID = 1L;static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";static final String DB_URL = "jdbc:mysql://localhost:3306/transaction?useSSL=false&serverTimezone=UTC";// 数据库的用户名与密码,需要根据自己的设置static final String USER = "root";static final String PASS = "root";@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {Qurey test = new Qurey();if (test.match(tuple.getStringByField("a"))) {Connection conn = null;Statement stmt = null;try {// 注册 JDBC 驱动Class.forName(JDBC_DRIVER);// 打开链接conn = DriverManager.getConnection(DB_URL, USER, PASS);String sql = "insert into record values(?,?,?,?,?,?,?)";//数据库操作语句(插入)PreparedStatement pst = conn.prepareStatement(sql);pst.setString(1, tuple.getStringByField("a"));pst.setString(2, tuple.getStringByField("b"));pst.setString(3, tuple.getStringByField("c"));pst.setString(4, tuple.getStringByField("d"));pst.setString(5, tuple.getStringByField("e"));pst.setString(6, tuple.getStringByField("f"));pst.setString(7, tuple.getStringByField("g"));pst.executeUpdate();conn.close();} catch (SQLException se) {// 处理 JDBC 错误se.printStackTrace();} catch (Exception e) {// 处理 Class.forName 错误e.printStackTrace();} finally {// 关闭资源try {if (stmt != null) stmt.close();} catch (SQLException se2) {}// 什么都不做try {if (conn != null) conn.close();} catch (SQLException se) {se.printStackTrace();}}}collector.emit(new Values(tuple.get(1), tuple.get(2), tuple.get(3), tuple.get(4)));}
}

Gender_Age_Type.java

import java.util.HashMap;
import java.util.Map;
import org.apachemons.collections.MapUtils;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;public class Gender_Age_Type extends BaseAggregator<Map<String,Map<String,Integer>>> {private static final long serialVersionUID = 1L;/** * 属于哪个batch*/private Object batchId;/** * 属于哪个分区 */private int partitionId;/** * 分区数量 */private int numPartitions;/** * 用来统计 */private Map<String,Map<String,Integer>> state;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map conf, TridentOperationContext context) {state = new HashMap<String,Map<String,Integer>>();Map<String,Integer> male = new HashMap<String,Integer>();Map<String,Integer> female = new HashMap<String,Integer>();state.put("male",male);state.put("female",female);partitionId = context.getPartitionIndex();numPartitions = context.numPartitions();}@Overridepublic Map<String, Map<String,Integer>> init(Object batchId, TridentCollector collector) {this.batchId = batchId;return state;}@Overridepublic void aggregate(Map<String, Map<String,Integer>> val, TridentTuple tuple,TridentCollector collector) {
//        System.out.println(tuple+";partitionId="+partitionId+";partitions="+numPartitions+",batchId:" + batchId);String gender = tuple.getString(0);String age = tuple.getString(1);val.get(gender).put(age, MapUtils.getInteger(val.get(gender), age, 0)+1);
//        System.out.println("sumWord:" + val);}@Overridepublic void complete(Map<String, Map<String,Integer>> val, TridentCollector collector) {collector.emit(new Values(val));}}

SumWord.java

import java.util.HashMap;
import java.util.Map;
import org.apachemons.collections.MapUtils;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;public class SumWord extends BaseAggregator<Map<String,Integer>> {private static final long serialVersionUID = 1L;/** * 属于哪个batch*/private Object batchId;/** * 属于哪个分区 */private int partitionId;/** * 分区数量 */private int numPartitions;/** * 用来统计 */private Map<String,Integer> state;@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map conf, TridentOperationContext context) {state = new HashMap<String,Integer>();partitionId = context.getPartitionIndex();numPartitions = context.numPartitions();}@Overridepublic Map<String, Integer> init(Object batchId, TridentCollector collector) {this.batchId = batchId;return state;}@Overridepublic void aggregate(Map<String, Integer> val, TridentTuple tuple,TridentCollector collector) {
//        System.out.println(tuple+";partitionId="+partitionId+";partitions="+numPartitions+",batchId:" + batchId);String gender = tuple.getString(0);val.put(gender, MapUtils.getInteger(val, gender, 0)+1);
//        System.out.println("sumWord:" + val);}@Overridepublic void complete(Map<String, Integer> val, TridentCollector collector) {collector.emit(new Values(val));}}

PrintFunction.java

import net.sf.json.JSONObject;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;import java.io.*;
import java.util.HashMap;public class PrintFunction extends BaseFunction {private static final long serialVersionUID = 1L;private void save_to_json(Object map,String type){HashMap<String, Character> hashmap= (HashMap<String, Character>)map;JSONObject json = JSONObject.fromObject(hashmap);File file=new File("F:\\xuanxiu\\PHPstudy\\PHPTutorial\\WWW\\bigdata\\data\\"+type+".json");if(!file.exists())//判断文件是否存在,若不存在则新建{try {file.createNewFile();} catch (IOException e) {e.printStackTrace();}}try {FileOutputStream fileOutputStream=new FileOutputStream(file);OutputStreamWriter outputStreamWriter=new OutputStreamWriter(fileOutputStream,"utf-8");//将字符流转换为字节流BufferedWriter bufferedWriter= new BufferedWriter(outputStreamWriter);//创建字符缓冲输出流对象bufferedWriter.write(String.valueOf(json));//将格式化的jsonarray字符串写入文件bufferedWriter.flush();//清空缓冲区,强制输出数据bufferedWriter.close();//关闭输出流} catch (FileNotFoundException | UnsupportedEncodingException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {save_to_json(tuple.get(0),"gender_age");save_to_json(tuple.get(1),"gender_type");save_to_json(tuple.get(2),"gender");save_to_json(tuple.get(3),"age");save_to_json(tuple.get(4),"type");save_to_json(tuple.get(5),"place");
//        System.out.println("gender:" + tuple.get(0));
//        System.out.println("age:" + tuple.get(1));
//        System.out.println("type:" + tuple.get(2));
//        System.out.println("place:" + tuple.get(3));collector.emit(new Values(tuple.get(0),tuple.get(1),tuple.get(2),tuple.get(3),tuple.get(4),tuple.get(5)));}
}

PrintFilter_partition.java

import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class PrintFilter_partition extends BaseFilter {private static final Logger LOGGER = LoggerFactory.getLogger(PrintFilter_partition.class);private static final long serialVersionUID = 1L;@Overridepublic boolean isKeep(TridentTuple tuple) {LOGGER.info("打印出来的tuple:" + tuple);return true;}
}

简单演示

做到这里,基本上整个项目就已经完成了,只差最后的可视化部分而已了,所以这里也可以给大家简单的演示一下
首先将平台都启动,成功启动可以用jps命令来查看

至于数据源和处理程序没有启动顺序要求,可以随意,下面可以看看结果

这里我只将分割后的结果输出了,更多的输出在代码里面都有,大家可以根据需要打开

另外我们也可以将整个程序打包到集群上面运行
怎么用idea打jar包可以参考:idea打包的方法
我们将上面的程序打成jar包之后需要将jar包上传到我们的集群上去
然后就可以运行我们的jar包了,运行的命令是

storm jar storm_trident.jar Trident_Topology trident_Topology

上面的几个参数解释一下,首先storm jar是告诉storm运行jar包,storm_trident.jar是jar包的名字,Trident_Topology是主类的名字,trident_Topology是拓扑的名字

运行过程中我们可以去看storm的UI

这个是拓扑的一个可视化的图

总结

这部分是整个项目的难点部分,大家看的时候主要看处理流程和思想,具体的处理技术可以根据大家的实力调整,接下来整个项目就只剩下最后一个部分了,那就是可视化,在下一篇文章中我会对这部分进行详细的解释,感兴趣的可以继续关注一下

更多推荐

模拟购物数据实时流处理(3)——实时流数据处理

本文发布于:2024-02-28 04:35:37,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1768158.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:实时   数据处理   数据

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!