1. 需求
业务上对业务表就行重构,重新设计了新表,需要原有的数据准实时同步到新表(目标表)
字段类型和数据值等做了修改需要处理映射
例如
1是,2否需要映射为 1是,0否
varchar存储的时间转换为datetime
int存储的时间戳要转换为datetime
新表设计了新状态码,需要按照对照表做数据值映射
三表合一逻辑结构为
源表主键/唯一建 | 类别 | 需要的字段 | 源表 | 目标表 | 目标表主键 |
---|---|---|---|---|---|
id | 事实主表 | 10+ | old_orders | new_orders | id(订单号) |
order_id | 业务延迟产生数据 | 2+ | old_orders_extend1 | ||
order_id | 业务延迟产生数据 | 1个 | old_orders_extend2 |
需要实现,三张源表的的insert和update操作准实时的方式同步到同目标目标表中
2.设计
2.1 功能性设计
准实时,必须使用MySQL binlog订阅CDC的方式,使用CloudCanal作为准实时同步数据平台
字段类型和数据值的转化映射,使用CloudCanal的自定义数据处理插件功能,自编写业务处理代码(java)实现
2.2 鲁棒性设计
建表
null处理,目标表字段设置为允许为null,虽然在数据库上常规来讲设置为not null更性能,但是考虑综合收益新表就尽量不使用 not null,这样有更好的数据导入容错性
容量,目标表表字段容量大小必须>= 源表对应的字段,一般情况保持原有大小一致
数据转换
格式化,时间字段格式化,文本格式的字段类型转换为datatime类型时做好异常捕获,确保格式化为正确的日期,保障数据写入更新不出问题
脏数据,其他脏数据的预处理
2.3 CloudCanal任务设计
事实主表建立一个任务实例,这样做保障事实主表独立
业务延迟产生数据的表建立一个任务实例,这样做和事实主表的同步独立,互不影响
3.实现
3.1 目标表建表
注意符合鲁棒性设计中的建表设计
具体内容略…
3.2 编写自定义数据处理插件
常规的一些转换操作看CloudCanal提供的示例代码即可
处理技术重点在,业务延迟产生数据的时候,需要把insert和update都转换为update
处理insert时候,需要同时保留getAfterColumnMap和getBeforeColumnMap,由于insert是没有变更前数据的,所以设置getBeforeColumnMap为getAfterColumnMap即可
核心代码为
// 本文来自 www.iamle.com 流水理鱼
public static void changeOrderExtend1FillColumnMap(CustomFieldV2 oriBf, LinkedHashMap<String, CustomFieldV2> target) {
CustomFieldV2 bf = CustomFieldV2.buildField(oriBf.getFieldName(), oriBf.getValue(), oriBf.getSqlType(), oriBf.isKey(), oriBf.isNull(), oriBf.isUpdated());
target.put(oriBf.getFieldName(), bf);
}
private static void processOrderExtend1InsertAndUpdateToUpdate(CustomData data, List<CustomData> re, EventTypeInSdk eventType) {
for (CustomRecordV2 oriRecord : data.getRecords()) {
CustomRecordV2 updateRecord = new CustomRecordV2();
if (eventType.equals(EventTypeInSdk.INSERT)) {
// 需要处理 afterColumnMap > beforeColumnMap
for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterColumnMap().entrySet()) {
changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeColumnMap());
}
for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterKeyColumnMap().entrySet()) {
changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeKeyColumnMap());
}
}
if (eventType.equals(EventTypeInSdk.UPDATE)) {
// 需要处理 beforeColumnMap
for (Map.Entry<String, CustomFieldV2> f : oriRecord.getBeforeColumnMap().entrySet()) {
changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeColumnMap());
}
for (Map.Entry<String, CustomFieldV2> f : oriRecord.getBeforeKeyColumnMap().entrySet()) {
changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeKeyColumnMap());
}
}
for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterColumnMap().entrySet()) {
changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getAfterColumnMap());
}
for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterKeyColumnMap().entrySet()) {
changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getAfterKeyColumnMap());
}
List<CustomRecordV2> updateRecords = new ArrayList<>();
updateRecords.add(updateRecord);
SchemaInfo updateSchemaInfo = data.cloneSchemaInfo(data.getSchemaInfo());
CustomData updateData = new CustomData(updateSchemaInfo, EventTypeInSdk.UPDATE, updateRecords);
re.add(updateData);
}
}
工程项目代码参考:
cloudcanal-data-process 本工程汇集了 CloudCanal 数据处理插件,以达成数据自定义 transformation 目标
https://gitee.com/clougence/cloudcanal-data-process
问题!
insert改写为update在增量同步阶段工作正常,在全量迁移阶段会报错!
截止2023年3月代码中没有办法判断任务阶段是在 全量迁移还是增量同步,所以只能在第一次建立任务的时候不要改写insert成为update,待任务正常执行时候,再重新上传激活代码处理包,
让insert改写update在增量同步阶段工作
由于全量迁移阶段的insert数据没同步,这个暂时只有单独手动处理更新到目标表去
2023-03-14 14:22:50.791 [full-task-executor-7-thd-1] INFO c.c.c.b.service.task.parser.full.RdbSinglePkPageScanner - [FINISH SCAN!]null.你的库.old_orders_extend1, cost time 222 ms.migrated count is 1774
2023-03-14 14:22:51.660 [full-apply-disruptor-6-thd-3] ERROR c.c.c.task.applier.full.FullDisruptorExceptionHandler - disruptor process full event error,msg:IllegalArgumentException: unsupported one FullEvent with multi CustomData.
java.lang.IllegalArgumentException: unsupported one FullEvent with multi CustomData.
at com.clougence.cloudcanal.task.data.process.pkg.editorv2.FullDataDeSerializer.deserialize(FullDataDeSerializer.java:59)
at com.clougence.cloudcanal.task.data.process.pkg.editorv2.FullDataDeSerializer.deserialize(FullDataDeSerializer.java:18)
at com.clougence.cloudcanal.task.data.process.pkg.V2PkgProcessor.process(V2PkgProcessor.java:125)
at com.clougence.cloudcanal.task.data.process.pkg.CustomPkgProcessor.process(CustomPkgProcessor.java:73)
at com.clougence.cloudcanal.task.data.process.DataProcHandlerImpl.process(DataProcHandlerImpl.java:19)
at com.clougence.cloudcanal.task.data.process.FullDataProcWorkerHandler.onEvent(FullDataProcWorkerHandler.java:32)
at com.clougence.cloudcanal.task.data.process.FullDataProcWorkerHandler.onEvent(FullDataProcWorkerHandler.java:12)
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143)
at java.lang.Thread.run(Thread.java:748)
3.3 CloudCanal任务建立
事实主表建立一个任务实例
old_orders 到 new_orders,并配置好字段映射
业务延迟产生数据的表建立一个任务实例
old_orders_extend1 到 new_orders,并配置好字段映射
old_orders_extend2 到 new_orders,并配置好字段映射
建立任务时在配置时候 数据处理 界面上传自定义代码的jar包
自定义代码jar包更新
激活后需要重启任务(PS:需要重启至少2次才生效,这个是CloudCanal bug,截止2023-3-10 还未解决)
3.4 实现结果
源表的insert和update都会准实时的同步到目标表达到设计目标