1. 需求

业务上对业务表就行重构,重新设计了新表,需要原有的数据准实时同步到新表(目标表)

字段类型和数据值等做了修改需要处理映射

例如

1是,2否需要映射为 1是,0否

varchar存储的时间转换为datetime

int存储的时间戳要转换为datetime

新表设计了新状态码,需要按照对照表做数据值映射

三表合一逻辑结构为

源表主键/唯一建 类别 需要的字段 源表 目标表 目标表主键
id 事实主表 10+ old_orders new_orders id(订单号)
order_id 业务延迟产生数据 2+ old_orders_extend1
order_id 业务延迟产生数据 1个 old_orders_extend2

需要实现,三张源表的的insert和update操作准实时的方式同步到同目标目标表中

2.设计

2.1 功能性设计

准实时,必须使用MySQL binlog订阅CDC的方式,使用CloudCanal作为准实时同步数据平台

字段类型和数据值的转化映射,使用CloudCanal的自定义数据处理插件功能,自编写业务处理代码(java)实现

2.2 鲁棒性设计

建表

null处理,目标表字段设置为允许为null,虽然在数据库上常规来讲设置为not null更性能,但是考虑综合收益新表就尽量不使用 not null,这样有更好的数据导入容错性

容量,目标表表字段容量大小必须>= 源表对应的字段,一般情况保持原有大小一致

数据转换

格式化,时间字段格式化,文本格式的字段类型转换为datatime类型时做好异常捕获,确保格式化为正确的日期,保障数据写入更新不出问题

脏数据,其他脏数据的预处理

2.3 CloudCanal任务设计

事实主表建立一个任务实例,这样做保障事实主表独立

业务延迟产生数据的表建立一个任务实例,这样做和事实主表的同步独立,互不影响

3.实现

3.1 目标表建表

注意符合鲁棒性设计中的建表设计

具体内容略…

3.2 编写自定义数据处理插件

常规的一些转换操作看CloudCanal提供的示例代码即可

处理技术重点在,业务延迟产生数据的时候,需要把insert和update都转换为update

处理insert时候,需要同时保留getAfterColumnMap和getBeforeColumnMap,由于insert是没有变更前数据的,所以设置getBeforeColumnMap为getAfterColumnMap即可

核心代码为

//  本文来自 www.iamle.com 流水理鱼
public static void changeOrderExtend1FillColumnMap(CustomFieldV2 oriBf, LinkedHashMap<String, CustomFieldV2> target) {
    CustomFieldV2 bf = CustomFieldV2.buildField(oriBf.getFieldName(), oriBf.getValue(), oriBf.getSqlType(), oriBf.isKey(), oriBf.isNull(), oriBf.isUpdated());
    target.put(oriBf.getFieldName(), bf);
}

private static void processOrderExtend1InsertAndUpdateToUpdate(CustomData data, List<CustomData> re, EventTypeInSdk eventType) {
    for (CustomRecordV2 oriRecord : data.getRecords()) {
        CustomRecordV2 updateRecord = new CustomRecordV2();

        if (eventType.equals(EventTypeInSdk.INSERT)) {
            // 需要处理 afterColumnMap > beforeColumnMap
            for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterColumnMap().entrySet()) {
                changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeColumnMap());
            }

            for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterKeyColumnMap().entrySet()) {
                changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeKeyColumnMap());
            }
        }

        if (eventType.equals(EventTypeInSdk.UPDATE)) {
            // 需要处理 beforeColumnMap
            for (Map.Entry<String, CustomFieldV2> f : oriRecord.getBeforeColumnMap().entrySet()) {
                changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeColumnMap());
            }

            for (Map.Entry<String, CustomFieldV2> f : oriRecord.getBeforeKeyColumnMap().entrySet()) {
                changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeKeyColumnMap());
            }
        }

        for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterColumnMap().entrySet()) {
            changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getAfterColumnMap());
        }

        for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterKeyColumnMap().entrySet()) {
            changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getAfterKeyColumnMap());
        }

        List<CustomRecordV2> updateRecords = new ArrayList<>();
        updateRecords.add(updateRecord);

        SchemaInfo updateSchemaInfo = data.cloneSchemaInfo(data.getSchemaInfo());
        CustomData updateData = new CustomData(updateSchemaInfo, EventTypeInSdk.UPDATE, updateRecords);

        re.add(updateData);
    }
}

工程项目代码参考:

cloudcanal-data-process 本工程汇集了 CloudCanal 数据处理插件,以达成数据自定义 transformation 目标

https://gitee.com/clougence/cloudcanal-data-process

问题!
insert改写为update在增量同步阶段工作正常,在全量迁移阶段会报错!
截止2023年3月代码中没有办法判断任务阶段是在 全量迁移还是增量同步,所以只能在第一次建立任务的时候不要改写insert成为update,待任务正常执行时候,再重新上传激活代码处理包,
让insert改写update在增量同步阶段工作
由于全量迁移阶段的insert数据没同步,这个暂时只有单独手动处理更新到目标表去

2023-03-14 14:22:50.791 [full-task-executor-7-thd-1] INFO  c.c.c.b.service.task.parser.full.RdbSinglePkPageScanner - [FINISH SCAN!]null.你的库.old_orders_extend1, cost time 222 ms.migrated count is 1774
2023-03-14 14:22:51.660 [full-apply-disruptor-6-thd-3] ERROR c.c.c.task.applier.full.FullDisruptorExceptionHandler - disruptor process full event error,msg:IllegalArgumentException: unsupported one FullEvent with multi CustomData.
java.lang.IllegalArgumentException: unsupported one FullEvent with multi CustomData.
 at com.clougence.cloudcanal.task.data.process.pkg.editorv2.FullDataDeSerializer.deserialize(FullDataDeSerializer.java:59)
 at com.clougence.cloudcanal.task.data.process.pkg.editorv2.FullDataDeSerializer.deserialize(FullDataDeSerializer.java:18)
 at com.clougence.cloudcanal.task.data.process.pkg.V2PkgProcessor.process(V2PkgProcessor.java:125)
 at com.clougence.cloudcanal.task.data.process.pkg.CustomPkgProcessor.process(CustomPkgProcessor.java:73)
 at com.clougence.cloudcanal.task.data.process.DataProcHandlerImpl.process(DataProcHandlerImpl.java:19)
 at com.clougence.cloudcanal.task.data.process.FullDataProcWorkerHandler.onEvent(FullDataProcWorkerHandler.java:32)
 at com.clougence.cloudcanal.task.data.process.FullDataProcWorkerHandler.onEvent(FullDataProcWorkerHandler.java:12)
 at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143)
 at java.lang.Thread.run(Thread.java:748)

3.3 CloudCanal任务建立

事实主表建立一个任务实例

old_orders 到 new_orders,并配置好字段映射

业务延迟产生数据的表建立一个任务实例

old_orders_extend1 到 new_orders,并配置好字段映射

old_orders_extend2 到 new_orders,并配置好字段映射

建立任务时在配置时候 数据处理 界面上传自定义代码的jar包

自定义代码jar包更新

激活后需要重启任务(PS:需要重启至少2次才生效,这个是CloudCanal bug,截止2023-3-10 还未解决)

3.4 实现结果

源表的insert和update都会准实时的同步到目标表达到设计目标