1. 需求

业务上对业务表就行重构,重新设计了新表,需要原有的数据准实时同步到新表(目标表)

字段类型和数据值等做了修改需要处理映射

例如

1是,2否需要映射为 1是,0否

varchar存储的时间转换为datetime

int存储的时间戳要转换为datetime

新表设计了新状态码,需要按照对照表做数据值映射

三表合一逻辑结构为

源表主键/唯一建 类别 需要的字段 源表 目标表 目标表主键
id 事实主表 10+ old_orders new_orders id(订单号)
order_id 业务延迟产生数据 2+ old_orders_extend1
order_id 业务延迟产生数据 1个 old_orders_extend2

需要实现,三张源表的的insert和update操作准实时的方式同步到同目标目标表中

2.设计

2.1 功能性设计

准实时,必须使用MySQL binlog订阅CDC的方式,使用CloudCanal作为准实时同步数据平台

字段类型和数据值的转化映射,使用CloudCanal的自定义数据处理插件功能,自编写业务处理代码(java)实现

2.2 鲁棒性设计

建表

null处理,目标表字段设置为允许为null,虽然在数据库上常规来讲设置为not null更性能,但是考虑综合收益新表就尽量不使用 not null,这样有更好的数据导入容错性

容量,目标表表字段容量大小必须>= 源表对应的字段,一般情况保持原有大小一致

数据转换

格式化,时间字段格式化,文本格式的字段类型转换为datatime类型时做好异常捕获,确保格式化为正确的日期,保障数据写入更新不出问题

脏数据,其他脏数据的预处理

2.3 CloudCanal任务设计

事实主表建立一个任务实例,这样做保障事实主表独立

业务延迟产生数据的表建立一个任务实例,这样做和事实主表的同步独立,互不影响

3.实现

3.1 目标表建表

注意符合鲁棒性设计中的建表设计

具体内容略…

3.2 编写自定义数据处理插件

常规的一些转换操作看CloudCanal提供的示例代码即可

处理技术重点在,业务延迟产生数据的时候,需要把insert和update都转换为update

处理insert时候,需要同时保留getAfterColumnMap和getBeforeColumnMap,由于insert是没有变更前数据的,所以设置getBeforeColumnMap为getAfterColumnMap即可

核心代码为

//  本文来自 www.iamle.com 流水理鱼
public static void changeOrderExtend1FillColumnMap(CustomFieldV2 oriBf, LinkedHashMap<String, CustomFieldV2> target) {
    CustomFieldV2 bf = CustomFieldV2.buildField(oriBf.getFieldName(), oriBf.getValue(), oriBf.getSqlType(), oriBf.isKey(), oriBf.isNull(), oriBf.isUpdated());
    target.put(oriBf.getFieldName(), bf);
}

private static void processOrderExtend1InsertAndUpdateToUpdate(CustomData data, List<CustomData> re, EventTypeInSdk eventType) {
    for (CustomRecordV2 oriRecord : data.getRecords()) {
        CustomRecordV2 updateRecord = new CustomRecordV2();

        if (eventType.equals(EventTypeInSdk.INSERT)) {
            // 需要处理 afterColumnMap > beforeColumnMap
            for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterColumnMap().entrySet()) {
                changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeColumnMap());
            }

            for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterKeyColumnMap().entrySet()) {
                changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeKeyColumnMap());
            }
        }

        if (eventType.equals(EventTypeInSdk.UPDATE)) {
            // 需要处理 beforeColumnMap
            for (Map.Entry<String, CustomFieldV2> f : oriRecord.getBeforeColumnMap().entrySet()) {
                changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeColumnMap());
            }

            for (Map.Entry<String, CustomFieldV2> f : oriRecord.getBeforeKeyColumnMap().entrySet()) {
                changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeKeyColumnMap());
            }
        }

        for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterColumnMap().entrySet()) {
            changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getAfterColumnMap());
        }

        for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterKeyColumnMap().entrySet()) {
            changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getAfterKeyColumnMap());
        }

        List<CustomRecordV2> updateRecords = new ArrayList<>();
        updateRecords.add(updateRecord);

        SchemaInfo updateSchemaInfo = data.cloneSchemaInfo(data.getSchemaInfo());
        CustomData updateData = new CustomData(updateSchemaInfo, EventTypeInSdk.UPDATE, updateRecords);

        re.add(updateData);
    }
}

工程项目代码参考:

cloudcanal-data-process 本工程汇集了 CloudCanal 数据处理插件,以达成数据自定义 transformation 目标

https://gitee.com/clougence/cloudcanal-data-process

问题!
insert改写为update在增量同步阶段工作正常,在全量迁移阶段会报错!
截止2023年3月代码中没有办法判断任务阶段是在 全量迁移还是增量同步,所以只能在第一次建立任务的时候不要改写insert成为update,待任务正常执行时候,再重新上传激活代码处理包,
让insert改写update在增量同步阶段工作
由于全量迁移阶段的insert数据没同步,这个暂时只有单独手动处理更新到目标表去

2023-03-14 14:22:50.791 [full-task-executor-7-thd-1] INFO  c.c.c.b.service.task.parser.full.RdbSinglePkPageScanner - [FINISH SCAN!]null.你的库.old_orders_extend1, cost time 222 ms.migrated count is 1774
2023-03-14 14:22:51.660 [full-apply-disruptor-6-thd-3] ERROR c.c.c.task.applier.full.FullDisruptorExceptionHandler - disruptor process full event error,msg:IllegalArgumentException: unsupported one FullEvent with multi CustomData.
java.lang.IllegalArgumentException: unsupported one FullEvent with multi CustomData.
 at com.clougence.cloudcanal.task.data.process.pkg.editorv2.FullDataDeSerializer.deserialize(FullDataDeSerializer.java:59)
 at com.clougence.cloudcanal.task.data.process.pkg.editorv2.FullDataDeSerializer.deserialize(FullDataDeSerializer.java:18)
 at com.clougence.cloudcanal.task.data.process.pkg.V2PkgProcessor.process(V2PkgProcessor.java:125)
 at com.clougence.cloudcanal.task.data.process.pkg.CustomPkgProcessor.process(CustomPkgProcessor.java:73)
 at com.clougence.cloudcanal.task.data.process.DataProcHandlerImpl.process(DataProcHandlerImpl.java:19)
 at com.clougence.cloudcanal.task.data.process.FullDataProcWorkerHandler.onEvent(FullDataProcWorkerHandler.java:32)
 at com.clougence.cloudcanal.task.data.process.FullDataProcWorkerHandler.onEvent(FullDataProcWorkerHandler.java:12)
 at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143)
 at java.lang.Thread.run(Thread.java:748)

3.3 CloudCanal任务建立

事实主表建立一个任务实例

old_orders 到 new_orders,并配置好字段映射

业务延迟产生数据的表建立一个任务实例

old_orders_extend1 到 new_orders,并配置好字段映射

old_orders_extend2 到 new_orders,并配置好字段映射

建立任务时在配置时候 数据处理 界面上传自定义代码的jar包

自定义代码jar包更新

激活后需要重启任务(PS:需要重启至少2次才生效,这个是CloudCanal bug,截止2023-3-10 还未解决)

3.4 实现结果

源表的insert和update都会准实时的同步到目标表达到设计目标

1. 数据平台数仓平台架构设计大图

1.1 基于Apache Doris(以下简称Doris)的实时数仓架构大图

基于Apache Doris的实时数仓架构图 流水理鱼 wwek

1.1 架构图说明

数据源
主要是业务数据库MySQL,当然也可以是其他的关系型数据库

数据集成和处理
实时,原封不动同步的数据使用CloudCanal;需要复杂的数据加工处理,使用Flink SQL ,并用Dinky FlinkSQL Studio 实时计算平台来开发、管理、运行Flink SQL
离线,使用DataX、SeaTunel,并用海豚调度(DolphinScheduler)编排任务调度进行数据集成和处理

数据仓库
主体是使用Doris作为数据仓库
ES、MySQL、Redis作为辅助数仓,同时ES也作为搜索引擎使用,数据同步复用该套架构

数据应用
总体分类2大类
自己开发API、BI等数据应用服务
三方BI ,商业Tableau、帆软BI、乾坤,开源Superset、Metabase等之上构建数据应用服务

2. 为什么是Doris?

2.1 Doris核心优势

Apache Doris 简单易用、高性能和统一的分析数据库,他是开源的!

简单易用
部署只需两个进程,不依赖其他系统;在线集群扩缩容,自动副本修复;兼容 MySQL 协议,并且使用标准 SQL

高性能
依托列式存储引擎、现代的 MPP 架构、向量化查询引擎、预聚合物化视图、数据索引的实现,在低延迟和高吞吐查询上, 都达到了极速性能

统一数仓
单一系统,可以同时支持实时数据服务、交互数据分析和离线数据处理场景

联邦查询
支持对 Hive、Iceberg、Hudi 等数据湖和 MySQL、Elasticsearch 等数据库的联邦查询分析

多种导入
支持从 HDFS/S3 等批量拉取导入和 MySQL Binlog/Kafka 等流式拉取导入;支持通过HTTP接口进行微批量推送写入和 JDBC 中使用 Insert 实时推送写入

生态丰富
Spark 利用 Spark Doris Connector 读取和写入 Doris;Flink Doris Connector 配合 Flink CDC 实现数据 Exactly Once 写入 Doris;利用 DBT Doris Adapter,可以很容易的在 Doris 中完成数据转化

2.2 实际使用体验

查询性能满足需求
实战下来,千万级数据主表,再join几个小表,聚合查询,8C16G硬件配置单机运行Doris,查询能在3s内
join支持友好,一个查询关联10+个表后查询也毫无压力

文本作者为 流水理鱼 wwek https://www.iamle.com

2.3 杀鸡用牛刀Hadoop、Hive

Hadoop、Hive大象固然好,但是对于大部分中小型企业来说,这个就是牛刀,现在的主流数据湖也是牛刀,大部分中小公司研发都会面临杀鸡难道用牛刀的情况。
传统的大数据基本都是玩离线的,业务要求的实时性如何做到
所以TB级别的数据应该用对应的解决方案,那就是MPP架构的Doris

2.4 平替商业分析型数据库

能够平替阿里云ADB(阿里云的分析型数据库)
和相同的数据库表,想同的服务器配置,在体验上大部分Doris比阿里云ADB更快,小部分相当或更慢(非严格的测试对比,仅仅是自己特定的场景下的结果)

3、让架构实际落地的渐进式方案

3.1 V1.0 解决MySQL不能做OLAP分析查询的问题

绝大多数中小公司都有朴实的需求,我要实时大屏,业务数据统计报表
而这个时候你发现MySQL已经不支持当前的报表统计查询了,已经卡爆了,加从库也不行
这就是V1.0 落地方案要解决的问题

MySQL是行存数据库也即是面向OLTP的,不是面向OLAP分析查询的,所以不适合做数据报表等数据应用,数据量不大,时间跨度不大用MySQL从库还能一战,但终归你会遇到MySQL已经不支持你的报表查询SQL的时候

如何最低成本的让各类数据应用能实时、高性能的查询,答案是Doirs
V1.0 落地数仓核心是:原封不动的实时同步了一份业务数据库MySQL中的表到 Doris

对应架构图中
数据源 数据集成 数据仓库 数据应用
业务MySQL 》 CloudCanal 》 Doris 》 数据查询应用
其实这就相当于完成了数据仓库的ODS层,直接用ODS层,在数仓中使用原始业务库表,是最简单的开始,这个时候已经支持绝大多数据应用了,业务需求大多数都能得到满足。

数据应用方面可以很易用,因为Doirs支持MySQL协议,又支持标准SQL
所以不管是商业或开源BI软件,自己程序开发API都能快速进行数据应用服务支持

在落地成本方面,1-2人开发者(甚至还不是数据开发工程师),服务器16C32G * 3 人、机资源即可落地

3.2 V2.0 解决更大的数据、更复杂的数据加工的问题

如果有更多的数据集成的需求,还有更复杂的ETL数据加工的需求
推荐使用:
实时 Flink SQL 使用 Dinky FlinkSQL Studio 实时计算平台来开发、管理、运行Flink SQL
离线 DataX、SeaTunnel 使用海豚(DolphinScheduler)调度编排和调度

实时场景下一般使用CDC,离线场景下一般使用SQL查询抽取

3.3 V3.0 走向成熟的数仓分层、数据治理

该阶段建立 数仓分层模型
建立数仓分层模型的好处:数据结构化更清晰、数据血缘追踪、增强数据复用能力、简化复杂问题、减少业务影响、统一数据口径

ADS层
数据集市

DWS层
分析主题域,“轻粒度汇总表”

DWD层
业务主题划分域,并打成“事实明细宽表”

ODS层
贴源层,也就是业务数据原封不动的同步过来存储到数仓

DIM层
维度数据

具体的细节怎么设计、规范怎么定,这已超出了本文的范围,到了这个阶段您也不需要看本文了