尚硅谷大数据项目《在线教育之实时数仓》笔记007

编程入门 行业动态 更新时间:2024-10-28 01:15:09

尚<a href=https://www.elefans.com/category/jswz/34/1770121.html style=硅谷大数据项目《在线教育之实时数仓》笔记007"/>

尚硅谷大数据项目《在线教育之实时数仓》笔记007

视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

目录

第9章 数仓开发之DWD层

P053

P054

P055

P056

P057

P058

P059

P060

P061

P062

P063

P064

P065


第9章 数仓开发之DWD层

P053

9.6 用户域用户注册事务事实表
9.6.1 主要任务

读取用户表数据,读取页面日志数据,关联两张表补全用户注册操作的维度信息,写入 Kafka 用户注册主题。

P054

9.6.4 代码

Kafka | Apache Flink

 

P055

//TODO 4 读取page主题数据dwd_traffic_page_log
//TODO 5 过滤用户表数据
//TODO 6 过滤注册日志的维度信息

P056

package com.atguigu.edu.realtime.app.dwd.db;import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @author yhm* @create 2023-04-23 17:36*/
public class DwdUserUserRegister {public static void main(String[] args) {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(4);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//TODO 2 设置表的TTL
//        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10L));EnvUtil.setTableEnvStateTtl(tableEnv, "10s");//TODO 3 读取topic_db的数据String groupId = "dwd_user_user_register2";KafkaUtil.createTopicDb(tableEnv, groupId);
//        tableEnv.executeSql("select * from topic_db").print();//TODO 4 读取page主题数据dwd_traffic_page_logtableEnv.executeSql("CREATE TABLE page_log (\n" +"  `common` map<string,string>,\n" +"  `page` string,\n" +"  `ts` string\n" +")" + KafkaUtil.getKafkaDDL("dwd_traffic_page_log", groupId));//TODO 5 过滤用户表数据Table userRegister = tableEnv.sqlQuery("select \n" +"    data['id'] id,\n" +"    data['create_time'] create_time,\n" +"    date_format(data['create_time'],'yyyy-MM-dd') create_date,\n" +"    ts\n" +"from topic_db\n" +"where `table`='user_info'\n" +"and `type`='insert'" +"");tableEnv.createTemporaryView("user_register", userRegister);//TODO 6 过滤注册日志的维度信息Table dimLog = tableEnv.sqlQuery("select \n" +"    common['uid'] user_id,\n" +"    common['ch'] channel, \n" +"    common['ar'] province_id, \n" +"    common['vc'] version_code, \n" +"    common['sc'] source_id, \n" +"    common['mid'] mid_id, \n" +"    common['ba'] brand, \n" +"    common['md'] model, \n" +"    common['os'] operate_system \n" +"from page_log\n" +"where common['uid'] is not null \n"//"and page['page_id'] = 'register'");tableEnv.createTemporaryView("dim_log", dimLog);//TODO 7 join两张表格Table resultTable = tableEnv.sqlQuery("select \n" +"    ur.id user_id,\n" +"    create_time register_time,\n" +"    create_date register_date,\n" +"    channel,\n" +"    province_id,\n" +"    version_code,\n" +"    source_id,\n" +"    mid_id,\n" +"    brand,\n" +"    model,\n" +"    operate_system,\n" +"    ts, \n" +"    current_row_timestamp() row_op_ts \n" +"from user_register ur \n" +"left join dim_log pl \n" +"on ur.id=pl.user_id");tableEnv.createTemporaryView("result_table", resultTable);//TODO 8 写出数据到kafkatableEnv.executeSql(" create table dwd_user_user_register(\n" +"    user_id string,\n" +"    register_time string,\n" +"    register_date string,\n" +"    channel string,\n" +"    province_id string,\n" +"    version_code string,\n" +"    source_id string,\n" +"    mid_id string,\n" +"    brand string,\n" +"    model string,\n" +"    operate_system string,\n" +"    ts string,\n" +"    row_op_ts TIMESTAMP_LTZ(3) ,\n" +"    PRIMARY KEY (user_id) NOT ENFORCED\n" +")" + KafkaUtil.getUpsertKafkaDDL("dwd_user_user_register"));tableEnv.executeSql("insert into dwd_user_user_register " +"select * from result_table");}
}

P057

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_user_user_register

P058

9.7 交易域下单事务事实表
9.7.1 主要任务

从 Kafka 读取 topic_db 主题数据,筛选订单明细表和订单表数据,读取 dwd_traffic_page_log 主题数据,筛选订单页日志,关联三张表获得交易域下单事务事实表,写入 Kafka 对应主题。

P059

DwdTradeOrderDetail,TODO1 ~ TODO7

P060

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_trade_order_detail
package com.atguigu.edu.realtime.app.dwd.db;import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @author yhm* @create 2023-04-24 15:18*/
public class DwdTradeOrderDetail {public static void main(String[] args) {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//TODO 2 设置表格TTLEnvUtil.setTableEnvStateTtl(tableEnv, "10s");//TODO 3 从kafka读取业务数据topic_dbString groupId = "dwd_trade_order_detail";KafkaUtil.createTopicDb(tableEnv, groupId);//TODO 4 从kafka读取日志数据dwd_traffic_page_logtableEnv.executeSql("create table page_log(\n" +"    common map<String,String>,\n" +"    page map<String,String>,\n" +"    ts string\n" +")" + KafkaUtil.getKafkaDDL("dwd_traffic_page_log", groupId));//TODO 5 过滤订单详情表Table orderDetail = tableEnv.sqlQuery("select \n" +"    data['id'] id,\n" +"    data['course_id'] course_id,\n" +"    data['course_name'] course_name,\n" +"    data['order_id'] order_id,\n" +"    data['user_id'] user_id,\n" +"    data['origin_amount'] origin_amount,\n" +"    data['coupon_reduce'] coupon_reduce,\n" +"    data['final_amount'] final_amount,\n" +"    data['create_time'] create_time,\n" +"    date_format(data['create_time'], 'yyyy-MM-dd') create_date,\n" +"    ts\n" +"from topic_db\n" +"where `table`='order_detail'\n" +"and type='insert'");tableEnv.createTemporaryView("order_detail", orderDetail);//TODO 6 过滤订单表Table orderInfo = tableEnv.sqlQuery("select \n" +"    data['id'] id, \n" +"    data['out_trade_no'] out_trade_no, \n" +"    data['trade_body'] trade_body, \n" +"    data['session_id'] session_id, \n" +"    data['province_id'] province_id\n" +"from topic_db\n" +"where `table`='order_info'\n" +"and type='insert'");tableEnv.createTemporaryView("order_info", orderInfo);//TODO 7 获取下单日志Table orderLog = tableEnv.sqlQuery("select \n" +"    common['sid'] session_id,\n" +"    common['sc'] source_id\n" +"from page_log\n" +"where page['page_id']='order'");tableEnv.createTemporaryView("order_log", orderLog);//TODO 8 关联3张表格Table resultTable = tableEnv.sqlQuery("select \n" +"    od.id,\n" +"    od.course_id,\n" +"    od.course_name,\n" +"    od.order_id,\n" +"    od.user_id,\n" +"    od.origin_amount,\n" +"    od.coupon_reduce,\n" +"    od.final_amount,\n" +"    od.create_time,\n" +"    oi.out_trade_no,\n" +"    oi.trade_body,\n" +"    oi.session_id,\n" +"    oi.province_id,\n" +"    ol.source_id,\n" +"    ts,\n" +"    current_row_timestamp() row_op_ts \n" +"from order_detail od\n" +"join order_info oi\n" +"on od.order_id=oi.id\n" +"left join order_log ol\n" +"on oi.session_id=ol.session_id");tableEnv.createTemporaryView("result_table", resultTable);//TODO 9 创建upsert kafkatableEnv.executeSql("create table dwd_trade_order_detail( \n" +"    id string,\n" +"    course_id string,\n" +"    course_name string,\n" +"    order_id string,\n" +"    user_id string,\n" +"    origin_amount string,\n" +"    coupon_reduce string,\n" +"    final_amount string,\n" +"    create_time string,\n" +"    out_trade_no string,\n" +"    trade_body string,\n" +"    session_id string,\n" +"    province_id string,\n" +"    source_id string,\n" +"    ts string,\n" +"    row_op_ts TIMESTAMP_LTZ(3) ,\n" +"    primary key(id) not enforced \n" +")" + KafkaUtil.getUpsertKafkaDDL("dwd_trade_order_detail"));//TODO 10 写出数据到kafkatableEnv.executeSql("insert into dwd_trade_order_detail " +"select * from result_table");}
}

P061

9.8 交易域支付成功事务事实表
9.8.1 主要任务

从 Kafka topic_db主题筛选支付成功数据、从dwd_trade_order_detail主题中读取订单事实数据,关联两张表形成支付成功宽表,写入 Kafka 支付成功主题。

P062

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_trade_pay_suc_detail
[atguigu@node001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar

P063

9.9 事实表动态分流

9.9.1 主要任务

DWD层余下的事实表都是从topic_db中取业务数据库一张表的变更数据,按照某些条件过滤后写入Kafka的对应主题,它们处理逻辑相似且较为简单,可以结合配置表动态分流在同一个程序中处理。

读取优惠券领用数据,写入 Kafka 优惠券领用主题。

P064

BaseDBApp

//TODO 1 创建环境设置状态后端

//TODO 2 读取业务topic_db主流数据

//TODO 3 清洗转换topic_db数据

//TODO 4 使用flinkCDC读取dwd配置表数据

//TODO 5 创建广播流

//TODO 6 连接两个流

//TODO 7 过滤出需要的dwd表格数据

P065

package com.atguigu.edu.realtime.app.dwd.db;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.app.func.DwdBroadcastProcessFunction;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.bean.DwdTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.apimon.eventtime.WatermarkStrategy;
import org.apache.flink.apimon.functions.FlatMapFunction;
import org.apache.flink.apimon.state.MapStateDescriptor;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerRecord;/*** @author yhm* @create 2023-04-24 18:05*/
public class BaseDBApp {public static void main(String[] args) throws Exception {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);//TODO 2 读取业务topic_db主流数据String groupId = "base_DB_app";DataStreamSource<String> dbStream = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", groupId), WatermarkStrategy.noWatermarks(), "base_db");//TODO 3 清洗转换topic_db数据SingleOutputStreamOperator<JSONObject> jsonObjStream = dbStream.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!("bootstrap-start".equals(type) || "bootstrap-insert".equals(type) || "bootstrap-complete".equals(type))) {out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();}}});jsonObjStream.print();//TODO 4 使用flinkCDC读取dwd配置表数据MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node001").port(3306).username("root").password("123456").databaseList("edu_config").tableList("edu_config.dwd_table_process")// 定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema())// 设置读取数据的模式.startupOptions(StartupOptions.initial()).build();//TODO 5 创建广播流DataStreamSource<String> tableProcessStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "dwd_table_process");MapStateDescriptor<String, DwdTableProcess> dwdTableProcessState = new MapStateDescriptor<>("dwd_table_process_state", String.class, DwdTableProcess.class);BroadcastStream<String> broadcastDS = tableProcessStream.broadcast(dwdTableProcessState);//TODO 6 连接两个流BroadcastConnectedStream<JSONObject, String> connectStream = jsonObjStream.connect(broadcastDS);//TODO 7 过滤出需要的dwd表格数据SingleOutputStreamOperator<JSONObject> processStream = connectStream.process(new DwdBroadcastProcessFunction(dwdTableProcessState));//TODO 8 将数据写出到kafkaprocessStream.sinkTo(KafkaUtil.getKafkaProducerBySchema(new KafkaRecordSerializationSchema<JSONObject>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(JSONObject element, KafkaSinkContext context, Long timestamp) {String topic = element.getString("sink_table");element.remove("sink_table");return new ProducerRecord<byte[], byte[]>(topic, element.toJSONString().getBytes());}}, "base_db_app_trans"));//TODO 9 执行任务env.execute();}
}
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_trade_cart_add
[atguigu@node001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar 

启动maxwell。

更多推荐

尚硅谷大数据项目《在线教育之实时数仓》笔记007

本文发布于:2023-11-15 21:24:57,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1606595.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:硅谷   实时   笔记   项目   数据

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!