1. 数据平台数仓平台架构设计大图

1.1 基于Apache Doris(以下简称Doris)的实时数仓架构大图

基于Apache Doris的实时数仓架构图 流水理鱼 wwek

1.1 架构图说明

数据源
主要是业务数据库MySQL,当然也可以是其他的关系型数据库

数据集成和处理
实时,原封不动同步的数据使用CloudCanal;需要复杂的数据加工处理,使用Flink SQL ,并用Dinky FlinkSQL Studio 实时计算平台来开发、管理、运行Flink SQL
离线,使用DataX、SeaTunel,并用海豚调度(DolphinScheduler)编排任务调度进行数据集成和处理

数据仓库
主体是使用Doris作为数据仓库
ES、MySQL、Redis作为辅助数仓,同时ES也作为搜索引擎使用,数据同步复用该套架构

数据应用
总体分类2大类
自己开发API、BI等数据应用服务
三方BI ,商业Tableau、帆软BI、乾坤,开源Superset、Metabase等之上构建数据应用服务

2. 为什么是Doris?

2.1 Doris核心优势

Apache Doris 简单易用、高性能和统一的分析数据库,他是开源的!

简单易用
部署只需两个进程,不依赖其他系统;在线集群扩缩容,自动副本修复;兼容 MySQL 协议,并且使用标准 SQL

高性能
依托列式存储引擎、现代的 MPP 架构、向量化查询引擎、预聚合物化视图、数据索引的实现,在低延迟和高吞吐查询上, 都达到了极速性能

统一数仓
单一系统,可以同时支持实时数据服务、交互数据分析和离线数据处理场景

联邦查询
支持对 Hive、Iceberg、Hudi 等数据湖和 MySQL、Elasticsearch 等数据库的联邦查询分析

多种导入
支持从 HDFS/S3 等批量拉取导入和 MySQL Binlog/Kafka 等流式拉取导入;支持通过HTTP接口进行微批量推送写入和 JDBC 中使用 Insert 实时推送写入

生态丰富
Spark 利用 Spark Doris Connector 读取和写入 Doris;Flink Doris Connector 配合 Flink CDC 实现数据 Exactly Once 写入 Doris;利用 DBT Doris Adapter,可以很容易的在 Doris 中完成数据转化

2.2 实际使用体验

查询性能满足需求
实战下来,千万级数据主表,再join几个小表,聚合查询,8C16G硬件配置单机运行Doris,查询能在3s内
join支持友好,一个查询关联10+个表后查询也毫无压力

文本作者为 流水理鱼 wwek https://www.iamle.com

2.3 杀鸡用牛刀Hadoop、Hive

Hadoop、Hive大象固然好,但是对于大部分中小型企业来说,这个就是牛刀,现在的主流数据湖也是牛刀,大部分中小公司研发都会面临杀鸡难道用牛刀的情况。
传统的大数据基本都是玩离线的,业务要求的实时性如何做到
所以TB级别的数据应该用对应的解决方案,那就是MPP架构的Doris

2.4 平替商业分析型数据库

能够平替阿里云ADB(阿里云的分析型数据库)
和相同的数据库表,想同的服务器配置,在体验上大部分Doris比阿里云ADB更快,小部分相当或更慢(非严格的测试对比,仅仅是自己特定的场景下的结果)

3、让架构实际落地的渐进式方案

3.1 V1.0 解决MySQL不能做OLAP分析查询的问题

绝大多数中小公司都有朴实的需求,我要实时大屏,业务数据统计报表
而这个时候你发现MySQL已经不支持当前的报表统计查询了,已经卡爆了,加从库也不行
这就是V1.0 落地方案要解决的问题

MySQL是行存数据库也即是面向OLTP的,不是面向OLAP分析查询的,所以不适合做数据报表等数据应用,数据量不大,时间跨度不大用MySQL从库还能一战,但终归你会遇到MySQL已经不支持你的报表查询SQL的时候

如何最低成本的让各类数据应用能实时、高性能的查询,答案是Doirs
V1.0 落地数仓核心是:原封不动的实时同步了一份业务数据库MySQL中的表到 Doris

对应架构图中
数据源 数据集成 数据仓库 数据应用
业务MySQL 》 CloudCanal 》 Doris 》 数据查询应用
其实这就相当于完成了数据仓库的ODS层,直接用ODS层,在数仓中使用原始业务库表,是最简单的开始,这个时候已经支持绝大多数据应用了,业务需求大多数都能得到满足。

数据应用方面可以很易用,因为Doirs支持MySQL协议,又支持标准SQL
所以不管是商业或开源BI软件,自己程序开发API都能快速进行数据应用服务支持

在落地成本方面,1-2人开发者(甚至还不是数据开发工程师),服务器16C32G * 3 人、机资源即可落地

3.2 V2.0 解决更大的数据、更复杂的数据加工的问题

如果有更多的数据集成的需求,还有更复杂的ETL数据加工的需求
推荐使用:
实时 Flink SQL 使用 Dinky FlinkSQL Studio 实时计算平台来开发、管理、运行Flink SQL
离线 DataX、SeaTunnel 使用海豚(DolphinScheduler)调度编排和调度

实时场景下一般使用CDC,离线场景下一般使用SQL查询抽取

3.3 V3.0 走向成熟的数仓分层、数据治理

该阶段建立 数仓分层模型
建立数仓分层模型的好处:数据结构化更清晰、数据血缘追踪、增强数据复用能力、简化复杂问题、减少业务影响、统一数据口径

ADS层
数据集市

DWS层
分析主题域,“轻粒度汇总表”

DWD层
业务主题划分域,并打成“事实明细宽表”

ODS层
贴源层,也就是业务数据原封不动的同步过来存储到数仓

DIM层
维度数据

具体的细节怎么设计、规范怎么定,这已超出了本文的范围,到了这个阶段您也不需要看本文了

1. 问题和解决办法

1.1 SELECT 字段中的非聚合函数包裹的字段,必须在 GROUP BY中申明

# SQLSTATE[HY000]: General error: 1064 '`u`.`name`' must be an aggregate expression or appear in GROUP BY clause

这个问题会是遇到最多的问题(实际上在大数据场景,例如hive中也有这个要求的)
因为ADB(MySQL)引擎是MySQL所以对这个没要求,导致迁移到Doris的时候这个问题最为突出
时间处理函数、字符串处理函数这种是非聚合函数,所以也是需要在GROUP BY中申明的

1.2 Doris的 GROUP_CONCAT函数中字段前不支持用distinct去重 #11932

https://github.com/StarRocks/starrocks/issues/8079
array_distinct(array_agg(str_col))
to get a distinct a array value.
And if you want to make it a value you can use the following function
array_join(array_distinct(array_agg(str_col)), ‘,’)

也就是先取出列为数组,然后去重数组,再把数组拼接字符串

ps:新版本的Doris已经支持,见
[Bug] doris的group_concat函数不支持distinct #11932

1.3 GROUP_CONCAT函数中的字段不支持int类型

需要拼接的字段如果为int类型那么必须先强转成字符串,使用CAST函数
GROUP_CONCAT(CAST( id整形字段 as STRING), ‘,’)

1.4 SUBSTRING_INDEX函数不支持

截止2022年11月11日需要使用UDF,也就是用户定义函数解决
Apache Doris 1.1 版本只支持原生UDF,也就是需要重新编译整个Doris,1.2 版本开始支持Java UDF 可以动态挂载
StarRocks 2.2.0 版本开始支持Java UDF 可以动态挂载

提供一个已经实现好的StarRocks Java UDF,可直接使用(由同事贡献)
代码

package com.starrocks.udf;  

import org.apache.commons.lang3.StringUtils;  

/**  
 * 根据下标截取  
 *  
 * @author dingyoukun  
 * @date 2022-10-26 14:30  
 **/public class SubStringByIndex {  
    public final String evaluate (String targetStr, String str, Integer index) {  
        Boolean desc = false;  
        if (index < 0 ) {  
            desc = true;  
            index = Math.abs(index);  
        }  

        String result = targetStr;  
        String partStr = str;  

        if (StringUtils.isBlank(targetStr)) {  
            return result;  
        }  

        if (index == 0) {  
            return targetStr;  
        }  

        if (desc) {  
            targetStr = new StringBuffer(targetStr).reverse().toString();  
            partStr = new StringBuffer(partStr).reverse().toString();  
        }  
        int beginIndex = 0;  
        int count = 0;  
        while ((beginIndex = targetStr.indexOf(partStr, beginIndex)) != -1) {  
            count++;  
            if (count == index) {  
                if (desc) {  
                    targetStr = new StringBuffer(targetStr).reverse().toString();  
                    result = targetStr.substring(targetStr.length() - beginIndex);  
                } else {  
                    result = targetStr.substring(0, beginIndex);  
                }  
                return result;  
            }  
            beginIndex = beginIndex + partStr.length();  
        }  
        return result;  
    }  
}

2. 参考

https://docs.starrocks.io/zh-cn/latest/introduction/StarRocks_intro
https://github.com/StarRocks/starrocks/issues

https://github.com/apache/doris/issues
https://doris.apache.org/zh-CN/docs/summary/basic-summary

[Apache Doris Java UDF https://doris.apache.org/zh-CN/docs/dev/ecosystem/udf/java-user-defined-function] (https://doris.apache.org/zh-CN/docs/dev/ecosystem/udf/java-user-defined-function)
[StarRocks Java UDF https://docs.starrocks.io/zh-cn/latest/sql-reference/sql-functions/JAVA_UDF] (https://docs.starrocks.io/zh-cn/latest/sql-reference/sql-functions/JAVA_UDF)