发布时间:2025-06-24 18:45:57 作者:北方职教升学中心 阅读量:384
写出数据
这一步我们只需要直接把关联后的临时表数据写入到上面的 dwd_trade_cart_add 表中即可:
// TODO 7. 写出数据到 Kafka tableEnv.executeSql("INSERT INTO dwd_trade_cart_add SELECT * FROM cart_with_dic_table"); // TODO 8. 启动任务 env.execute("DwdTradeCartAdd");
2、于是我们创建了 base_dic 的 lookup 表;
对于交易域订单预处理表,它并不是一个事实表,我们创建它是因为:和订单相关的业务过程(下单和取消订单)都需要从 订单明细表、commit 等字段也不需要(都是 Maxwell 加的,对后面分析没有用);
最后我们的表结构大概就是这样:
CREATE TABLE topic_db( `database` STRING, `table` STRING, `type` STRING, `data` MAP<STRING,STRING>, `old` MAP<STRING,STRING>, `pt` AS PROCTIME()) WITH ( 'connector' = 'kafka', 'topic' = 'topic_db', 'properties.bootstrap.servers' = 'hadoop102:9092', 'properties,group.id' = 'cartAdd', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json')
对于这里面的 data 和 old 是 JSON 对象类型,为了考虑到以后可以直接操作而不需要再做转换,我们可以考虑使用 Flink SQL 中的复杂数据类型 Map;
此外,我们还增加了一个时间字段 pt,因为后面我们需要做 lookup join,它需要我们提供一个处理时间字段;
1.2.2、
0.2、
累积型快照事实表通常具有多个日期字段,每个日期对应业务流程中的一个关键业务过程(里程碑)比如下面的 下单日期 -> 支付日期 -> 发货日期 -> 收货日期。写出到 Kafka
// TODO 10. 数据写出 tableEnv.executeSql("INSERT INTO dwd_trade_order_pre_process SELECT * FROM result_table"); // TODO 11. 启动任务 env.execute("DwdTradeOrderPreProcess");
总结
今天创建了 1 张事实表和一张辅助表(优化之后查询订单相关表的查询效率)
对于交易域加购事务事实表来说,需要根据公司业务逻辑来确定逻辑,比如有的公司的加购操作就是新增或者更新操作(更新商品的件数),但是有的公司的加购操作都是新增;
我们创建了 topic_db 这张包含了全部业务系统 binlog 的数据表,并从中过滤出出了购物车相关的数据(type=insert 或者 update 的)。上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。实现思路
先把大致的框架列出来,很多内容比如创建 topic_db 表和 base_dic 的 lookup 表可以直接复用:
public class DwdTradeOrderPreProcess { public static void main(String[] args) throws Exception { // TODO 1. 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 生产环境中设置为kafka主题的分区数 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 1.1 开启checkpoint env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck"); env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L); env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次 // 1.2 设置状态后端 env.setStateBackend(new HashMapStateBackend()); // TODO 2. 使用 DDL 方式读取 topic_db 创建表 tableEnv.executeSql(MyKafkaUtil.getTopicDb("order_pre_process")); // TODO 3. 过滤 订单明细表 // TODO 4. 过滤 订单表 // TODO 5. 过滤 订单明细活动关联表 // TODO 6. 过滤 订单明细优惠券关联表 // TODO 7. 创建 base_dic 的 lookup 表 tableEnv.executeSql(MysqlUtil.getBaseDicLookUpDDL()); // TODO 8. 关联这 5 张表 // TODO 9. 创建 upsert-kafka 表 // TODO 10. 数据写出 // TODO 11. 启动任务 env.execute("DwdTradeOrderPreProcess"); }
2.1.1、
注意,预处理表在写出的时候不能再使用之前普通的 Kafka 连接器了,而是应该使用 Upsert Kafka 连接器(支持回撤流);
2.1、过滤订单明细表
注意:订单明细表不同于订单表,订单表只有新增(type 只有 insert)!但是订单明细表会有订单的修改(type 有 insert,也有 update);
除了明显用不到的比如 img_url 等字段,别的以防万一全部保留(此外,还有 topic_db 的 pt 字段也需要查出来,后面要和 lookup 表 join 使用):
// TODO 3. 过滤 订单明细表 Table orderDetailTable = tableEnv.sqlQuery("SELECT\n" + " data['id'] id,\n" + " data['user_id'] user_id,\n" + " data['sku_id'] sku_id,\n" + " data['sku_name'] sku_name,\n" + " data['order_price'] order_price,\n" + " data['sku_num'] sku_num,\n" + " data['create_time'] create_time,\n" + " data['source_type'] source_type,\n" + " data['source_id'] source_id,\n" + " data['split_total_amount'] split_total_amount,\n" + " data['split_activity_amount'] split_activity_amount,\n" + " data['split_coupon_amount'] split_coupon_amount,\n" + " pt\n" + "FROM\n" + " topic_db\n" + "WHERE\n" + " `database` = 'gmall'\n" + "AND\n" + " `table` = 'order_detail'\n"); tableEnv.createTemporaryView("order_detail_table",orderDetailTable);
2.1.2、确认收货业务过程。订单状态、订单优惠券表进行了 left join);
最终得到的预处理表中,包含订单表的订单id、
在 MyKafkaUtil 中添加方法:
public static String getKafkaSinkDDL(String topic) { return "WITH ( " + " 'connector' = 'kafka', " + " 'topic' = '" + topic + "', " + " 'properties.bootstrap.servers' = '" + KAFKA_SERVER + "', " + " 'format' = 'json' " + ")"; }
创建 dwd_trade_cart_add 表:
// TODO 6. 使用 DDL 方式创建加购事实表 // 前面创建的表是查询结果(Flink表,不包含配置信息比如Kafka主题等), // 这张表是要写入到Kafka主题中,所以只需要指定主题(指定主题就意味着写入该表的数据被写入到Kafka), // 这张表是Flink表它不会被消费,所以不需要指定消费者组 tableEnv.executeSql("CREATE TABLE dwd_trade_cart_add(\n" + " `id` STRING,\n" + " `user_id` STRING,\n" + " `sku_id` STRING,\n" + " `cart_price` STRING,\n" + " `sku_num` STRING,\n" + " `sku_name` STRING,\n" + " `is_checked` STRING,\n" + " `create_time` STRING,\n" + " `operate_time` STRING,\n" + " `is_ordered` STRING,\n" + " `order_time` STRING,\n" + " `source_type_name` STRING,\n" + " `source_id` STRING,\n" + " `source_type_id` STRING\n" + ")" + MyKafkaUtil.getKafkaSinkDDL("dwd_trade_cart_add"));
1.2.7、订单明细活动关联表、省份id、前置知识回顾0.1、交易域订单预处理表
在离线数仓中,我们针对订单这一类型创建的是累积快照事实表:
累计快照事实表是基于一个业务流程(区别于业务过程,业务过程指的是一个业务的原子操作,而业务流程是由多个有关联的业务过程组成的)中的多个关键业务过程联合处理而构建的事实表,如交易流程中的下单、关联 5 张表
// TODO 8. 关联这 5 张表 Table resultTable = tableEnv.sqlQuery("SELECT \n" + "od.id,\n" + "od.order_id,\n" + "oi.user_id,\n" + "oi.order_status,\n" + "od.sku_id,\n" + "od.sku_name,\n" + "oi.province_id,\n" + "oa.activity_id,\n" + "oa.activity_rule_id,\n" + "oc.coupon_id,\n" + "date_format(od.create_time, 'yyyy-MM-dd') date_id,\n" + "od.create_time,\n" + "date_format(oi.operate_time, 'yyyy-MM-dd') operate_date_id,\n" + "oi.operate_time,\n" + "od.source_id,\n" + "od.source_type source_type_id,\n" + "dic.dic_name source_type_name,\n" + "od.sku_num,\n" + "od.split_original_amount,\n" + "od.split_activity_amount,\n" + "od.split_coupon_amount,\n" + "od.split_total_amount,\n" + "oi.`type`,\n" + "oi.`old`,\n" + "od.od_ts,\n" + "oi.oi_ts,\n" + "current_row_timestamp() row_op_ts\n" + "FROM\n" + " order_detail_table od\n" + "JOIN\n" + " order_info_table oi\n" + "ON\n" + " od.order_id = oi.id\n" + "LEFT JOIN\n" + " order_activity_table oa\n" + "ON\n" + " oa.order_detail_id = od.id\n" + "LEFT JOIN\n" + " order_coupon_table oc\n" + "ON\n" + " oa.order_detail_id = oc.id\n" + "JOIN\n" + " base_dic FOR SYSTEM_TIME AS OF od.pt AS dic\n" + "ON\n" + " od.source_type = dic.dic_code"); tableEnv.createTemporaryView("result_table", resultTable);
2.1.6、创建 lookup 表
我们需要对字典表(base_dic)创建一张 lookup 表,用于和事实表去做 join。关联加购事实表和字典表
用 SQL 关联加购事实表和字典表,为的就是对加购来源(比如广告/用户查询/促销活动/只能推荐)做一个维度退化:
SEELCT ci.id, ci.user_id, ci.sku_id, ci.cart_price, ci.sku_num, ci.sku_name, ci.is_checked, ci.create_time, ci.operate_time, ci.is_ordered, ci.order_time, dic.dic_name source_type_name, ci.source_id, dic.dic_code source_type_idFROM cart_info ciJOIN base_dic dic FOR SYSTEM_TOME AS OF ci.pt AS dicON ci.source_type = dic.dic_code
关联后的表我们同样创建一张 Flink 临时表:
tableEnv.createTemporaryView("cart_with_dic_table",cartAddWithDicTable);
1.2.6、topic_db 表实现
首先,Flink Table API 的执行环境有一点变化:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 生产环境中设置为kafka主题的分区数StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
其它代码比如设置检查点和状态后端这里省略;
然后,我们在之前的 MyKafkaUtil 中再封装两个方法用来创建建表语句:
/** * Kafka-Source DDL 的配置信息 * * @param topic 数据源主题 * @param groupId 消费者组 * @return with 配置信息 */ public static String getKafkaDDL(String topic, String groupId) { return " with ('connector' = 'kafka', " + " 'topic' = '" + topic + "'," + " 'properties.bootstrap.servers' = '" + KAFKA_SERVER + "', " + " 'properties.group.id' = '" + groupId + "', " + " 'format' = 'json', " + " 'scan.startup.mode' = 'group-offsets')"; } /** * topic_db 建表语句 * @param groupId 消费者组 * @return 完整的建表语句 */ public static String getTopicDb(String groupId) { return "CREATE TABLE topic_db(\n" + " `database` STRING,\n" + " `table` STRING,\n" + " `type` STRING,\n" + " `data` MAP<STRING,STRING>,\n" + " `old` MAP<STRING,STRING>,\n" + " `pt` AS PROCTIME()\n" + ")" + getKafkaDDL("topic_db",groupId); }
执行 DDL :
// TODO 2. 使用 DDL 方式读取 topic_db 创建表 tableEnv.executeSql(MyKafkaUtil.getTopicDb("cart_add"));
这样,我们的 topic_db 表就通过 Kafka 连接器和 Table API 创建完成了,之后我们就可以直接使用 Flink SQL 去查询我们想要的表的变更数据了;
1.2.3、所以对于另一条流(不妨叫 B)中的数据元素 b,如果它的时间戳落在了这个区间范围内,a 和 b 就可以成功配对,进而进行计算输出结果。Flink SQL 实现 join
这一块我们之前在学Flink SQL 查询的时候详细介绍过了,这里不多废话,回顾一下即可;
1、订单表、
这里,我们经过分析,订单明细表和取消订单明细表的数据来源、操作时间、过滤订单明细优惠券表
同样订单明细优惠券表只有新增,不需要过滤 type:
Table orderDetailCoupon = tableEnv.sqlQuery("select\n" + "data['order_detail_id'] order_detail_id,\n" + "data['coupon_id'] coupon_id\n" + "from `topic_db`\n" + "where `table` = 'order_detail_coupon'\n"); tableEnv.createTemporaryView("order_detail_coupon", orderDetailCoupon);
2.1.5、订单明细活动关联表,订单明细优惠券关联表等去做关联,所以为了减少重复计算,我们可以这两张表的公共关联部分提取出来;创建的预处理表需要保留订单表的 type 和 old 字段(因为订单表的粒度是一个订单,它的变化就是订单状态的变化),所以为了方便后面过滤订单明细数据和取消订单明细数据(根据 old 中 order_status 字段的值和数据中 order_status 值的变化进行判断),
和离线数仓一样,我们最终得到的预处理表的粒度也是最细粒度(订单明细表的粒度),一行代表一个商品(因为我们把订单明细表作为主表,并且和订单表进行了 join),同时具有丰富的维度属性(对订单活动、topic_db 表结构设计
首先,我们需要考虑如何将 Kafka 的 topic_db 中的数据保存到 Flink SQL 表中去?因为 topic_db 中存储的是 Maxwell 同步过来的 46 张表 JSON 格式的数据:
{ "database":"gmall-211126-flink", "table":"base_trademark", "xid" : 188, "commit" : true, "type":"update", "ts":1652499295, "data":{ "id":1, "tm_name":"三星", "icon_url":"/bbb/cccc" }, "old":{ "tm_name":"小米" }}
所以,我们需要考虑如何设计表结构?这里的 ts 字段并没有必要保留(ts 是 Maxwell 给加的,而且业务表中一般都有 create_time 和 operator_time 等字段),还有 xid、代码实现
1.2.1、过滤出加购数据
业务表中加购只两种情况,在 Maxwell 传递过来的数据中体现出来就是:
- type 字段为 insert
- date 中的 sku_num 即为该次加购操作的 sku_num
- type 字段值为 update 且 data[sku_num] > old[sku_num]
- data[sku_num] - old[sku_num] 即为该次加购操作的 sku_num
SELECT data['id'] id, data['user_id'] user_id, data['sku_id'] sku_id, data['cart_price'] cart_price, if(`type`='insert',`data`['sku_num'],CAST(CAST(`data`['sku_num'] AS INT) - CAST(`old`['sku_num'] AS INT)) AS STRING)) sku_num, data['sku_name'] sku_name, data['is_checked'] is_checked, data['create_time'] create_time, data['operate_time'] operate_time, data['is_ordered'] is_ordered, data['order_time'] order_time, data['source_id'] source_id, data['source_type'] source_type, ptFROM topic_dbWHERE `database` = 'gmall'AND `table` = 'cart_info'AND `type` = 'insert'OR ( `type` = 'update' AND 'old'['sku_num'] is not null AND CAST('old'['sku_num'] AS INT) < CAST('data'['sku_num'] AS INT) )
这样,我们就可以从 topic_db 中读取到加购数据了,这些字段全部来自于 cart_info,每行数据代表一个加购操作;
// TODO 3. 过滤加购数据 Table cartAddTable = tableEnv.sqlQuery("SELECT\n" + " data['id'] id,\n" + " data['user_id'] ,\n" + " data['sku_id'],\n" + " data['cart_price'],\n" + " if(`type`='insert',`data`['sku_num'],CAST(CAST(`data`['sku_num'] AS INT) - CAST(`old`['sku_num'] AS INT)) AS STRING)) sku_num,\n" + " data['sku_name'] sku_name,\n" + " data['is_checked'] is_checked,\n" + " data['create_time'] create_time,\n" + " data['operate_time'] operate_time,\n" + " data['is_ordered'] is_ordered,\n" + " data['order_time'] order_time,\n" + " data['source_id'] source_id,\n" + " data['source_type'] source_type,\n" + " pt\n" + "FROM\n" + " topic_db\n" + "WHERE\n" + " `database` = 'gmall'\n" + "AND\n" + " `table` = 'cart_info'\n" + "AND\n" + " `type` = 'insert'\n" + "OR (\n" + " `type` = 'update'\n" + " AND\n" + " 'old'['sku_num'] is not null\n" + " AND\n" + " CAST('old'['sku_num'] AS INT) < CAST('data'['sku_num'] AS INT)\n" + " )"); tableEnv.createTemporaryView("cart_info_table",cartAddTable);
这里我们把提取出来的加购表创建成了一张临时表,这样下面我们才能用 FlinkSQL 对它和其他表进行关联;
1.2.4、
关联订单明细表、发货、表结构都相同,差别只在业务过程和过滤条件,为了减少重复计算(重复关联多张表的数据),我们可以将两张表公共的关联过程提取出来,形成订单预处理表。
我们之前在离线数仓中(声明粒度(最细粒度) -> 确认维度(维度外键 + 退化字段) -> 确认事实(度量值)),往往都是会选择一个最细粒度的业务表作为主表(比如订单明细表,因为订单明细表的粒度是sku),然后和其他相关的业务表(比如订单表(订单表中可以获得订单的状态),订单明细活动关联表,订单明细优惠券关联表等)进行 join,保留维度外键,选择需要退化的字段,最后添加度量字段;
那么,在事实表这里其实也是一样的,我们也不可避免需要对事实数据流之间进行 join,实时数据和维表之间的 join,所以我们现在需要熟悉一下 Flink 中如何使用 DataStream API 或者 Flink SQL 实现 join:
0、创建 Upsert Kafka 表
实时数据流在 left join 的过程中很可能出现数据的撤回,而之前我们使用的普通 Kafka 连接器是仅追加模式的,如果想要将有更新操作的结果表写入到 Kafka,就需要使用 Upsert Kafka 连接器了:
在 MyKafkaUtil 中创建方法(用来给建表时,添加 Kafka 参数):
public static String getUpsertKafkaDDL(String topic) { return "WITH ( " + " 'connector' = 'upsert-kafka', " + " 'topic' = '" + topic + "', " + " 'properties.bootstrap.servers' = '" + KAFKA_SERVER + "', " + " 'key.format' = 'json', " + " 'value.format' = 'json' " + ")"; }
创建表格,同样相当于是生产者,需要配置 Kafka 参数(topic,k,v类型):
// TODO 9. 建立 Upsert-Kafka dwd_trade_order_pre_process 表 tableEnv.executeSql("" + "create table dwd_trade_order_pre_process(\n" + "id string,\n" + "order_id string,\n" + "user_id string,\n" + "order_status string,\n" + "sku_id string,\n" + "sku_name string,\n" + "province_id string,\n" + "activity_id string,\n" + "activity_rule_id string,\n" + "coupon_id string,\n" + "date_id string,\n" + "create_time string,\n" + "operate_date_id string,\n" + "operate_time string,\n" + "source_id string,\n" + "source_type string,\n" + "source_type_name string,\n" + "sku_num string,\n" + "split_original_amount string,\n" + "split_activity_amount string,\n" + "split_coupon_amount string,\n" + "split_total_amount string,\n" + "`type` string,\n" + "`old` map<string,string>,\n" + "od_ts string,\n" + "oi_ts string,\n" + "row_op_ts timestamp_ltz(3),\n" + "primary key(id) not enforced\n" + ")" + MyKafkaUtil.getUpsertKafkaDDL("dwd_trade_order_pre_process"));