1. 数据平台数仓平台架构设计大图

1.1 基于Apache Doris(以下简称Doris)的实时数仓架构大图

基于Apache Doris的实时数仓架构图 流水理鱼 wwek

1.1 架构图说明

数据源
主要是业务数据库MySQL,当然也可以是其他的关系型数据库

数据集成和处理
实时,原封不动同步的数据使用CloudCanal;需要复杂的数据加工处理,使用Flink SQL ,并用Dinky FlinkSQL Studio 实时计算平台来开发、管理、运行Flink SQL
离线,使用DataX、SeaTunel,并用海豚调度(DolphinScheduler)编排任务调度进行数据集成和处理

数据仓库
主体是使用Doris作为数据仓库
ES、MySQL、Redis作为辅助数仓,同时ES也作为搜索引擎使用,数据同步复用该套架构

数据应用
总体分类2大类
自己开发API、BI等数据应用服务
三方BI ,商业Tableau、帆软BI、乾坤,开源Superset、Metabase等之上构建数据应用服务

2. 为什么是Doris?

2.1 Doris核心优势

Apache Doris 简单易用、高性能和统一的分析数据库,他是开源的!

简单易用
部署只需两个进程,不依赖其他系统;在线集群扩缩容,自动副本修复;兼容 MySQL 协议,并且使用标准 SQL

高性能
依托列式存储引擎、现代的 MPP 架构、向量化查询引擎、预聚合物化视图、数据索引的实现,在低延迟和高吞吐查询上, 都达到了极速性能

统一数仓
单一系统,可以同时支持实时数据服务、交互数据分析和离线数据处理场景

联邦查询
支持对 Hive、Iceberg、Hudi 等数据湖和 MySQL、Elasticsearch 等数据库的联邦查询分析

多种导入
支持从 HDFS/S3 等批量拉取导入和 MySQL Binlog/Kafka 等流式拉取导入;支持通过HTTP接口进行微批量推送写入和 JDBC 中使用 Insert 实时推送写入

生态丰富
Spark 利用 Spark Doris Connector 读取和写入 Doris;Flink Doris Connector 配合 Flink CDC 实现数据 Exactly Once 写入 Doris;利用 DBT Doris Adapter,可以很容易的在 Doris 中完成数据转化

2.2 实际使用体验

查询性能满足需求
实战下来,千万级数据主表,再join几个小表,聚合查询,8C16G硬件配置单机运行Doris,查询能在3s内
join支持友好,一个查询关联10+个表后查询也毫无压力

文本作者为 流水理鱼 wwek https://www.iamle.com

2.3 杀鸡用牛刀Hadoop、Hive

Hadoop、Hive大象固然好,但是对于大部分中小型企业来说,这个就是牛刀,现在的主流数据湖也是牛刀,大部分中小公司研发都会面临杀鸡难道用牛刀的情况。
传统的大数据基本都是玩离线的,业务要求的实时性如何做到
所以TB级别的数据应该用对应的解决方案,那就是MPP架构的Doris

2.4 平替商业分析型数据库

能够平替阿里云ADB(阿里云的分析型数据库)
和相同的数据库表,想同的服务器配置,在体验上大部分Doris比阿里云ADB更快,小部分相当或更慢(非严格的测试对比,仅仅是自己特定的场景下的结果)

3、让架构实际落地的渐进式方案

3.1 V1.0 解决MySQL不能做OLAP分析查询的问题

绝大多数中小公司都有朴实的需求,我要实时大屏,业务数据统计报表
而这个时候你发现MySQL已经不支持当前的报表统计查询了,已经卡爆了,加从库也不行
这就是V1.0 落地方案要解决的问题

MySQL是行存数据库也即是面向OLTP的,不是面向OLAP分析查询的,所以不适合做数据报表等数据应用,数据量不大,时间跨度不大用MySQL从库还能一战,但终归你会遇到MySQL已经不支持你的报表查询SQL的时候

如何最低成本的让各类数据应用能实时、高性能的查询,答案是Doirs
V1.0 落地数仓核心是:原封不动的实时同步了一份业务数据库MySQL中的表到 Doris

对应架构图中
数据源 数据集成 数据仓库 数据应用
业务MySQL 》 CloudCanal 》 Doris 》 数据查询应用
其实这就相当于完成了数据仓库的ODS层,直接用ODS层,在数仓中使用原始业务库表,是最简单的开始,这个时候已经支持绝大多数据应用了,业务需求大多数都能得到满足。

数据应用方面可以很易用,因为Doirs支持MySQL协议,又支持标准SQL
所以不管是商业或开源BI软件,自己程序开发API都能快速进行数据应用服务支持

在落地成本方面,1-2人开发者(甚至还不是数据开发工程师),服务器16C32G * 3 人、机资源即可落地

3.2 V2.0 解决更大的数据、更复杂的数据加工的问题

如果有更多的数据集成的需求,还有更复杂的ETL数据加工的需求
推荐使用:
实时 Flink SQL 使用 Dinky FlinkSQL Studio 实时计算平台来开发、管理、运行Flink SQL
离线 DataX、SeaTunnel 使用海豚(DolphinScheduler)调度编排和调度

实时场景下一般使用CDC,离线场景下一般使用SQL查询抽取

3.3 V3.0 走向成熟的数仓分层、数据治理

该阶段建立 数仓分层模型
建立数仓分层模型的好处:数据结构化更清晰、数据血缘追踪、增强数据复用能力、简化复杂问题、减少业务影响、统一数据口径

ADS层
数据集市

DWS层
分析主题域,“轻粒度汇总表”

DWD层
业务主题划分域,并打成“事实明细宽表”

ODS层
贴源层,也就是业务数据原封不动的同步过来存储到数仓

DIM层
维度数据

具体的细节怎么设计、规范怎么定,这已超出了本文的范围,到了这个阶段您也不需要看本文了

背景

MySQL库A 到 MySQL库B的增量数据同步需求

DolphinScheduler中配置DataX MySQL To MySQL工作流

工作流定义

工作流定义 > 创建工作流 > 拖入1个SHELL组件 > 拖入1个DATAX组件
SHELL组件(文章)
脚本

echo '文章同步 MySQL To MySQL'

DATAX组件(t_article)
用到2个插件mysqlreader^[1]、mysqlwriter^[2]
选 自定义模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库A?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
                                ],
                                "querySql": [
                                    "select a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= '${biz_update_dt}';"
                                ]
                            }
                        ],
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            "`id`",
                            "`title`",
                            "`content`",
                            "`is_delete`",
                            "`delete_date`",
                            "`create_date`",
                            "`update_date`"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库B?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
                                "table": [
                                    "t_article"
                                ]
                            }
                        ],
                        "writeMode": "replace",
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                }
            }
        ],
        "setting": {
            "errorLimit": {
                "percentage": 0,
                "record": 0
            },
            "speed": {
                "channel": 1,
                "record": 1000
            }
        }
    }
}

reader和writer的字段配置需保持一致

自定义参数:

biz_update_dt: ${global_bizdate}
biz_mysql_host: 你的mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的mysql账号
biz_mysql_password: 你的mysql密码

# 本文实验环境A库和B库用的同一个实例,如果MySQL是多个实例,可以再新增加参数定义例如 biz_mysql_host_b,在模板中对应引用即可

配置的自定义参数将会自动替换json模板中的同名变量

reader mysqlreader插件中关键配置: a.update_date >= '${biz_update_dt}' 就是实现增量同步的关键配置
writer mysqlwriter插件中关键配置: “

"parameter": {
    "writeMode": "replace",
    ......
}

writeMode为replace,相同主键id重复写入数据,就会更新数据。sql本质上执行的是 replace into

保存工作流

全局变量设置
global_bizdate: $[yyyy-MM-dd 00:00:00-1]

global_bizdate 引用的变量为 DolphinScheduler 内置变量,具体参考官网文档^[3]
结合调度时间设计好时间滚动的窗口时长,比如按1天增量,那么这里时间就是减1天

最终的工作流DAG图为:

爬坑记录

  • 官网下载的DataX不包含ElasticSearchWriter写插件
    默认不带该插件,需要自己编译ElasticSearchWriter插件。
git clone https://github.com/alibaba/DataX.git

为了加快编译速度,可以只编译<module>elasticsearchwriter</module>
项目根目录的pom.xml
<!-- reader --> 全注释掉,<!-- writer -->下只保留<module>elasticsearchwriter</module>其他注释掉,另外<!-- common support module -->也需要保留

如果自己不想编译或者编译失败请搜索🔍 “流水理鱼”微信公众号,或者加我私人微信我给你已经编译好的插件包

by 流水理鱼|wwek

参考

1. DataX MysqlReader 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
2. DataX MysqlWriter 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
3. Apache DolphinScheduler 内置参数 https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/built-in.html

背景

MySQL库A 到 MySQL库B的增量数据同步需求

DolphinScheduler中配置DataX MySQL To MySQL工作流

工作流定义

工作流定义 > 创建工作流 > 拖入1个SHELL组件 > 拖入1个DATAX组件
SHELL组件(文章)
脚本

echo '文章同步 MySQL To MySQL'

DATAX组件(t_article)
用到2个插件mysqlreader^[1]、mysqlwriter^[2]
选 自定义模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库A?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
                                ],
                                "querySql": [
                                    "select a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= '${biz_update_dt}';"
                                ]
                            }
                        ],
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            "`id`",
                            "`title`",
                            "`content`",
                            "`is_delete`",
                            "`delete_date`",
                            "`create_date`",
                            "`update_date`"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库B?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
                                "table": [
                                    "t_article"
                                ]
                            }
                        ],
                        "writeMode": "replace",
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                }
            }
        ],
        "setting": {
            "errorLimit": {
                "percentage": 0,
                "record": 0
            },
            "speed": {
                "channel": 1,
                "record": 1000
            }
        }
    }
}

reader和writer的字段配置需保持一致

自定义参数:

biz_update_dt: ${global_bizdate}
biz_mysql_host: 你的mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的mysql账号
biz_mysql_password: 你的mysql密码

# 本文实验环境A库和B库用的同一个实例,如果MySQL是多个实例,可以再新增加参数定义例如 biz_mysql_host_b,在模板中对应引用即可

配置的自定义参数将会自动替换json模板中的同名变量

reader mysqlreader插件中关键配置: a.update_date >= '${biz_update_dt}' 就是实现增量同步的关键配置
writer mysqlwriter插件中关键配置: “

"parameter": {
    "writeMode": "replace",
    ......
}

writeMode为replace,相同主键id重复写入数据,就会更新数据。sql本质上执行的是 replace into

保存工作流

全局变量设置
global_bizdate: $[yyyy-MM-dd 00:00:00-1]

global_bizdate 引用的变量为 DolphinScheduler 内置变量,具体参考官网文档^[3]
结合调度时间设计好时间滚动的窗口时长,比如按1天增量,那么这里时间就是减1天

最终的工作流DAG图为:

爬坑记录

  • 官网下载的DataX不包含ElasticSearchWriter写插件
    默认不带该插件,需要自己编译ElasticSearchWriter插件。
git clone https://github.com/alibaba/DataX.git

为了加快编译速度,可以只编译<module>elasticsearchwriter</module>
项目根目录的pom.xml
<!-- reader --> 全注释掉,<!-- writer -->下只保留<module>elasticsearchwriter</module>其他注释掉,另外<!-- common support module -->也需要保留

如果自己不想编译或者编译失败请搜索🔍 “流水理鱼”微信公众号,或者加我私人微信我给你已经编译好的插件包

by 流水理鱼|wwek

参考

1. DataX MysqlReader 插件文档
2. DataX MysqlWriter 插件文档
3. Apache DolphinScheduler 内置参数

数据同步的方式

数据同步的2大方式

  • 基于SQL查询的 CDC(Change Data Capture):
    • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据。也就是我们说的基于SQL查询抽取;
    • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
    • 不保障实时性,基于离线调度存在天然的延迟;
    • 工具软件以Kettle(Apache Hop最新版)、DataX为代表,需要结合任务调度系统使用。
  • 基于日志的 CDC:
    • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
    • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
    • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据;
    • 工具软件以Flink CDC、阿里巴巴Canal、Debezium为代表。

基于SQL查询增量数据同步原理

我们考虑用SQL如何查询增量数据? 数据有增加、需改、删除
删除数据采用逻辑删除的方式,比如定义一个is_deleted字段标识逻辑删除
如果数据是 UPDATE的,也就是会被修改的,那么 where update_datetime >= last_datetime(调度滚动时间)就是增量数据
如果数据是 APPEND ONLY 的除了用更新时间还可以用where id >= 调度上次last_id

结合任务调度系统
调度时间是每日调度执行一次,那么 last_datetime = 当前调度开始执行时间 – 24小时,延迟就是1天
调度时间是15分钟一次,那么 last_datetime = 当前调度开始执行时间 – 15分钟,延迟就是15分钟

这样就实现了捕获增量数据,从而实现增量同步

DolphinScheduler + Datax 构建离线增量数据同步平台

本实践使用
单机8c16g
DataX 2022-03-01 官网下载
DolphinScheduler 2.0.3(DolphinScheduler的安装过程略,请参考官网)

DolphinScheduler 中设置好DataX环境变量
DolphinScheduler 提供了可视化的作业流程定义,用来离线定时调度DataX Job作业,使用起来很是顺滑

基于SQL查询离线数据同步的用武之地
为什么不用基于日志实时的方式?不是不用,而是根据场合用。考虑到业务实际需求情况,基于SQL查询这种离线的方式也并非完全淘汰了
特别是业务上实时性要求不高,每次调度增量数据没那么大的情况下,不需要分布式架构来负载,这种情况下是比较合适的选择
场景举例:
网站、APP的百万级、千万级的内容搜索,每天几百篇内容新增+修改,搜索上会用到ES(ElasticSearch),那么就需要把 MySQL内容数据增量同步到ES
DataX就能满足需求!

DolphinScheduler中配置DataX MySQL To ElasticSearch工作流

工作流定义

工作流定义 > 创建工作流 > 拖入1个SHELL组件 > 拖入1个DATAX组件
SHELL组件(文章)
脚本

echo '文章同步 MySQL To ElasticSearch'

DATAX组件(t_article)
用到2个插件mysqlreader、elasticsearchwriter^[1]
选 自定义模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
                                ],
                                "querySql": [
                                    "select a.id as pk,a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= '${biz_update_dt}';"
                                ]
                            }
                        ],
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                },
                "writer": {
                    "name": "elasticsearchwriter",
                    "parameter": {
                        "endpoint": "${biz_es_host}",
                        "accessId": "${biz_es_username}",
                        "accessKey": "${biz_es_password}",
                        "index": "t_article",
                        "type": "_doc",
                        "batchSize": 1000,
                        "cleanup": false,
                        "discovery": false,
                        "dynamic": true,
                        "settings": {
                            "index": {
                                "number_of_replicas": 0,
                                "number_of_shards": 1
                            }
                        },
                        "splitter": ",",
                        "column": [
                            {
                                "name": "pk",
                                "type": "id"
                            },
                            {
                                "name": "id",
                                "type": "long"
                            },
                            {
                                "name": "title",
                                "type": "text"
                            },
                            {
                                "name": "content",
                                "type": "text"
                            }
                            {
                                "name": "is_delete",
                                "type": "text"
                            },
                            {
                                "name": "delete_date",
                                "type": "date"
                            },
                            {
                                "name": "create_date",
                                "type": "date"
                            },
                            {
                                "name": "update_date",
                                "type": "date"
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "errorLimit": {
                "percentage": 0,
                "record": 0
            },
            "speed": {
                "channel": 1,
                "record": 1000
            }
        }
    }
}

reader和writer的字段配置需保持一致

自定义参数:

biz_update_dt: ${global_bizdate} 
biz_mysql_host: 你的mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的mysql账号
biz_mysql_password: 你的mysql密码
biz_es_host: 你的es地址带协议和端口 http://127.0.0.1:9200
biz_es_username: 你的es账号
biz_es_password: 你的es密码

配置的自定义参数将会自动替换json模板中的同名变量

reader mysqlreader插件中关键配置: a.update_date >= '${biz_update_dt}' 就是实现增量同步的关键配置
writer elasticsearchwriter插件中关键配置: “

"column": [
    {
        "name": "pk",
        "type": "id"
    },
    ......
]

type = id 这样配置,就把文章主键映射到es主键 _id 从而实现相同主键id重复写入数据,就会更新数据。如果不这样配置数据将会重复导入es中

保存工作流

全局变量设置
global_bizdate: $[yyyy-MM-dd 00:00:00-1]

global_bizdate 引用的变量为 DolphinScheduler 内置变量,具体参考官网文档^[2]
结合调度时间设计好时间滚动的窗口时长,比如按1天增量,那么这里时间就是减1天

最终的工作流DAG图为:

by 流水理鱼|wwek

参考

1. DataX ElasticSearchWriter 插件文档
2. Apache DolphinScheduler 内置参数