1. 需求

业务上对业务表就行重构,重新设计了新表,需要原有的数据准实时同步到新表(目标表)

字段类型和数据值等做了修改需要处理映射

例如

1是,2否需要映射为 1是,0否

varchar存储的时间转换为datetime

int存储的时间戳要转换为datetime

新表设计了新状态码,需要按照对照表做数据值映射

三表合一逻辑结构为

源表主键/唯一建 类别 需要的字段 源表 目标表 目标表主键
id 事实主表 10+ old_orders new_orders id(订单号)
order_id 业务延迟产生数据 2+ old_orders_extend1
order_id 业务延迟产生数据 1个 old_orders_extend2

需要实现,三张源表的的insert和update操作准实时的方式同步到同目标目标表中

2.设计

2.1 功能性设计

准实时,必须使用MySQL binlog订阅CDC的方式,使用CloudCanal作为准实时同步数据平台

字段类型和数据值的转化映射,使用CloudCanal的自定义数据处理插件功能,自编写业务处理代码(java)实现

2.2 鲁棒性设计

建表

null处理,目标表字段设置为允许为null,虽然在数据库上常规来讲设置为not null更性能,但是考虑综合收益新表就尽量不使用 not null,这样有更好的数据导入容错性

容量,目标表表字段容量大小必须>= 源表对应的字段,一般情况保持原有大小一致

数据转换

格式化,时间字段格式化,文本格式的字段类型转换为datatime类型时做好异常捕获,确保格式化为正确的日期,保障数据写入更新不出问题

脏数据,其他脏数据的预处理

2.3 CloudCanal任务设计

事实主表建立一个任务实例,这样做保障事实主表独立

业务延迟产生数据的表建立一个任务实例,这样做和事实主表的同步独立,互不影响

3.实现

3.1 目标表建表

注意符合鲁棒性设计中的建表设计

具体内容略…

3.2 编写自定义数据处理插件

常规的一些转换操作看CloudCanal提供的示例代码即可

处理技术重点在,业务延迟产生数据的时候,需要把insert和update都转换为update

处理insert时候,需要同时保留getAfterColumnMap和getBeforeColumnMap,由于insert是没有变更前数据的,所以设置getBeforeColumnMap为getAfterColumnMap即可

核心代码为

//  本文来自 www.iamle.com 流水理鱼
public static void changeOrderExtend1FillColumnMap(CustomFieldV2 oriBf, LinkedHashMap<String, CustomFieldV2> target) {
    CustomFieldV2 bf = CustomFieldV2.buildField(oriBf.getFieldName(), oriBf.getValue(), oriBf.getSqlType(), oriBf.isKey(), oriBf.isNull(), oriBf.isUpdated());
    target.put(oriBf.getFieldName(), bf);
}

private static void processOrderExtend1InsertAndUpdateToUpdate(CustomData data, List<CustomData> re, EventTypeInSdk eventType) {
    for (CustomRecordV2 oriRecord : data.getRecords()) {
        CustomRecordV2 updateRecord = new CustomRecordV2();

        if (eventType.equals(EventTypeInSdk.INSERT)) {
            // 需要处理 afterColumnMap > beforeColumnMap
            for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterColumnMap().entrySet()) {
                changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeColumnMap());
            }

            for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterKeyColumnMap().entrySet()) {
                changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeKeyColumnMap());
            }
        }

        if (eventType.equals(EventTypeInSdk.UPDATE)) {
            // 需要处理 beforeColumnMap
            for (Map.Entry<String, CustomFieldV2> f : oriRecord.getBeforeColumnMap().entrySet()) {
                changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeColumnMap());
            }

            for (Map.Entry<String, CustomFieldV2> f : oriRecord.getBeforeKeyColumnMap().entrySet()) {
                changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getBeforeKeyColumnMap());
            }
        }

        for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterColumnMap().entrySet()) {
            changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getAfterColumnMap());
        }

        for (Map.Entry<String, CustomFieldV2> f : oriRecord.getAfterKeyColumnMap().entrySet()) {
            changeOrderExtend1FillColumnMap(f.getValue(), updateRecord.getAfterKeyColumnMap());
        }

        List<CustomRecordV2> updateRecords = new ArrayList<>();
        updateRecords.add(updateRecord);

        SchemaInfo updateSchemaInfo = data.cloneSchemaInfo(data.getSchemaInfo());
        CustomData updateData = new CustomData(updateSchemaInfo, EventTypeInSdk.UPDATE, updateRecords);

        re.add(updateData);
    }
}

工程项目代码参考:

cloudcanal-data-process 本工程汇集了 CloudCanal 数据处理插件,以达成数据自定义 transformation 目标

https://gitee.com/clougence/cloudcanal-data-process

问题!
insert改写为update在增量同步阶段工作正常,在全量迁移阶段会报错!
截止2023年3月代码中没有办法判断任务阶段是在 全量迁移还是增量同步,所以只能在第一次建立任务的时候不要改写insert成为update,待任务正常执行时候,再重新上传激活代码处理包,
让insert改写update在增量同步阶段工作
由于全量迁移阶段的insert数据没同步,这个暂时只有单独手动处理更新到目标表去

2023-03-14 14:22:50.791 [full-task-executor-7-thd-1] INFO  c.c.c.b.service.task.parser.full.RdbSinglePkPageScanner - [FINISH SCAN!]null.你的库.old_orders_extend1, cost time 222 ms.migrated count is 1774
2023-03-14 14:22:51.660 [full-apply-disruptor-6-thd-3] ERROR c.c.c.task.applier.full.FullDisruptorExceptionHandler - disruptor process full event error,msg:IllegalArgumentException: unsupported one FullEvent with multi CustomData.
java.lang.IllegalArgumentException: unsupported one FullEvent with multi CustomData.
 at com.clougence.cloudcanal.task.data.process.pkg.editorv2.FullDataDeSerializer.deserialize(FullDataDeSerializer.java:59)
 at com.clougence.cloudcanal.task.data.process.pkg.editorv2.FullDataDeSerializer.deserialize(FullDataDeSerializer.java:18)
 at com.clougence.cloudcanal.task.data.process.pkg.V2PkgProcessor.process(V2PkgProcessor.java:125)
 at com.clougence.cloudcanal.task.data.process.pkg.CustomPkgProcessor.process(CustomPkgProcessor.java:73)
 at com.clougence.cloudcanal.task.data.process.DataProcHandlerImpl.process(DataProcHandlerImpl.java:19)
 at com.clougence.cloudcanal.task.data.process.FullDataProcWorkerHandler.onEvent(FullDataProcWorkerHandler.java:32)
 at com.clougence.cloudcanal.task.data.process.FullDataProcWorkerHandler.onEvent(FullDataProcWorkerHandler.java:12)
 at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143)
 at java.lang.Thread.run(Thread.java:748)

3.3 CloudCanal任务建立

事实主表建立一个任务实例

old_orders 到 new_orders,并配置好字段映射

业务延迟产生数据的表建立一个任务实例

old_orders_extend1 到 new_orders,并配置好字段映射

old_orders_extend2 到 new_orders,并配置好字段映射

建立任务时在配置时候 数据处理 界面上传自定义代码的jar包

自定义代码jar包更新

激活后需要重启任务(PS:需要重启至少2次才生效,这个是CloudCanal bug,截止2023-3-10 还未解决)

3.4 实现结果

源表的insert和update都会准实时的同步到目标表达到设计目标

1. 数据平台数仓平台架构设计大图

1.1 基于Apache Doris(以下简称Doris)的实时数仓架构大图

基于Apache Doris的实时数仓架构图 流水理鱼 wwek

1.1 架构图说明

数据源
主要是业务数据库MySQL,当然也可以是其他的关系型数据库

数据集成和处理
实时,原封不动同步的数据使用CloudCanal;需要复杂的数据加工处理,使用Flink SQL ,并用Dinky FlinkSQL Studio 实时计算平台来开发、管理、运行Flink SQL
离线,使用DataX、SeaTunel,并用海豚调度(DolphinScheduler)编排任务调度进行数据集成和处理

数据仓库
主体是使用Doris作为数据仓库
ES、MySQL、Redis作为辅助数仓,同时ES也作为搜索引擎使用,数据同步复用该套架构

数据应用
总体分类2大类
自己开发API、BI等数据应用服务
三方BI ,商业Tableau、帆软BI、乾坤,开源Superset、Metabase等之上构建数据应用服务

2. 为什么是Doris?

2.1 Doris核心优势

Apache Doris 简单易用、高性能和统一的分析数据库,他是开源的!

简单易用
部署只需两个进程,不依赖其他系统;在线集群扩缩容,自动副本修复;兼容 MySQL 协议,并且使用标准 SQL

高性能
依托列式存储引擎、现代的 MPP 架构、向量化查询引擎、预聚合物化视图、数据索引的实现,在低延迟和高吞吐查询上, 都达到了极速性能

统一数仓
单一系统,可以同时支持实时数据服务、交互数据分析和离线数据处理场景

联邦查询
支持对 Hive、Iceberg、Hudi 等数据湖和 MySQL、Elasticsearch 等数据库的联邦查询分析

多种导入
支持从 HDFS/S3 等批量拉取导入和 MySQL Binlog/Kafka 等流式拉取导入;支持通过HTTP接口进行微批量推送写入和 JDBC 中使用 Insert 实时推送写入

生态丰富
Spark 利用 Spark Doris Connector 读取和写入 Doris;Flink Doris Connector 配合 Flink CDC 实现数据 Exactly Once 写入 Doris;利用 DBT Doris Adapter,可以很容易的在 Doris 中完成数据转化

2.2 实际使用体验

查询性能满足需求
实战下来,千万级数据主表,再join几个小表,聚合查询,8C16G硬件配置单机运行Doris,查询能在3s内
join支持友好,一个查询关联10+个表后查询也毫无压力

文本作者为 流水理鱼 wwek https://www.iamle.com

2.3 杀鸡用牛刀Hadoop、Hive

Hadoop、Hive大象固然好,但是对于大部分中小型企业来说,这个就是牛刀,现在的主流数据湖也是牛刀,大部分中小公司研发都会面临杀鸡难道用牛刀的情况。
传统的大数据基本都是玩离线的,业务要求的实时性如何做到
所以TB级别的数据应该用对应的解决方案,那就是MPP架构的Doris

2.4 平替商业分析型数据库

能够平替阿里云ADB(阿里云的分析型数据库)
和相同的数据库表,想同的服务器配置,在体验上大部分Doris比阿里云ADB更快,小部分相当或更慢(非严格的测试对比,仅仅是自己特定的场景下的结果)

3、让架构实际落地的渐进式方案

3.1 V1.0 解决MySQL不能做OLAP分析查询的问题

绝大多数中小公司都有朴实的需求,我要实时大屏,业务数据统计报表
而这个时候你发现MySQL已经不支持当前的报表统计查询了,已经卡爆了,加从库也不行
这就是V1.0 落地方案要解决的问题

MySQL是行存数据库也即是面向OLTP的,不是面向OLAP分析查询的,所以不适合做数据报表等数据应用,数据量不大,时间跨度不大用MySQL从库还能一战,但终归你会遇到MySQL已经不支持你的报表查询SQL的时候

如何最低成本的让各类数据应用能实时、高性能的查询,答案是Doirs
V1.0 落地数仓核心是:原封不动的实时同步了一份业务数据库MySQL中的表到 Doris

对应架构图中
数据源 数据集成 数据仓库 数据应用
业务MySQL 》 CloudCanal 》 Doris 》 数据查询应用
其实这就相当于完成了数据仓库的ODS层,直接用ODS层,在数仓中使用原始业务库表,是最简单的开始,这个时候已经支持绝大多数据应用了,业务需求大多数都能得到满足。

数据应用方面可以很易用,因为Doirs支持MySQL协议,又支持标准SQL
所以不管是商业或开源BI软件,自己程序开发API都能快速进行数据应用服务支持

在落地成本方面,1-2人开发者(甚至还不是数据开发工程师),服务器16C32G * 3 人、机资源即可落地

3.2 V2.0 解决更大的数据、更复杂的数据加工的问题

如果有更多的数据集成的需求,还有更复杂的ETL数据加工的需求
推荐使用:
实时 Flink SQL 使用 Dinky FlinkSQL Studio 实时计算平台来开发、管理、运行Flink SQL
离线 DataX、SeaTunnel 使用海豚(DolphinScheduler)调度编排和调度

实时场景下一般使用CDC,离线场景下一般使用SQL查询抽取

3.3 V3.0 走向成熟的数仓分层、数据治理

该阶段建立 数仓分层模型
建立数仓分层模型的好处:数据结构化更清晰、数据血缘追踪、增强数据复用能力、简化复杂问题、减少业务影响、统一数据口径

ADS层
数据集市

DWS层
分析主题域,“轻粒度汇总表”

DWD层
业务主题划分域,并打成“事实明细宽表”

ODS层
贴源层,也就是业务数据原封不动的同步过来存储到数仓

DIM层
维度数据

具体的细节怎么设计、规范怎么定,这已超出了本文的范围,到了这个阶段您也不需要看本文了

1. 问题和解决办法

1.1 SELECT 字段中的非聚合函数包裹的字段,必须在 GROUP BY中申明

# SQLSTATE[HY000]: General error: 1064 '`u`.`name`' must be an aggregate expression or appear in GROUP BY clause

这个问题会是遇到最多的问题(实际上在大数据场景,例如hive中也有这个要求的)
因为ADB(MySQL)引擎是MySQL所以对这个没要求,导致迁移到Doris的时候这个问题最为突出
时间处理函数、字符串处理函数这种是非聚合函数,所以也是需要在GROUP BY中申明的

1.2 Doris的 GROUP_CONCAT函数中字段前不支持用distinct去重 #11932

https://github.com/StarRocks/starrocks/issues/8079
array_distinct(array_agg(str_col))
to get a distinct a array value.
And if you want to make it a value you can use the following function
array_join(array_distinct(array_agg(str_col)), ‘,’)

也就是先取出列为数组,然后去重数组,再把数组拼接字符串

ps:新版本的Doris已经支持,见
[Bug] doris的group_concat函数不支持distinct #11932

1.3 GROUP_CONCAT函数中的字段不支持int类型

需要拼接的字段如果为int类型那么必须先强转成字符串,使用CAST函数
GROUP_CONCAT(CAST( id整形字段 as STRING), ‘,’)

1.4 SUBSTRING_INDEX函数不支持

截止2022年11月11日需要使用UDF,也就是用户定义函数解决
Apache Doris 1.1 版本只支持原生UDF,也就是需要重新编译整个Doris,1.2 版本开始支持Java UDF 可以动态挂载
StarRocks 2.2.0 版本开始支持Java UDF 可以动态挂载

提供一个已经实现好的StarRocks Java UDF,可直接使用(由同事贡献)
代码

package com.starrocks.udf;  

import org.apache.commons.lang3.StringUtils;  

/**  
 * 根据下标截取  
 *  
 * @author dingyoukun  
 * @date 2022-10-26 14:30  
 **/public class SubStringByIndex {  
    public final String evaluate (String targetStr, String str, Integer index) {  
        Boolean desc = false;  
        if (index < 0 ) {  
            desc = true;  
            index = Math.abs(index);  
        }  

        String result = targetStr;  
        String partStr = str;  

        if (StringUtils.isBlank(targetStr)) {  
            return result;  
        }  

        if (index == 0) {  
            return targetStr;  
        }  

        if (desc) {  
            targetStr = new StringBuffer(targetStr).reverse().toString();  
            partStr = new StringBuffer(partStr).reverse().toString();  
        }  
        int beginIndex = 0;  
        int count = 0;  
        while ((beginIndex = targetStr.indexOf(partStr, beginIndex)) != -1) {  
            count++;  
            if (count == index) {  
                if (desc) {  
                    targetStr = new StringBuffer(targetStr).reverse().toString();  
                    result = targetStr.substring(targetStr.length() - beginIndex);  
                } else {  
                    result = targetStr.substring(0, beginIndex);  
                }  
                return result;  
            }  
            beginIndex = beginIndex + partStr.length();  
        }  
        return result;  
    }  
}

2. 参考

https://docs.starrocks.io/zh-cn/latest/introduction/StarRocks_intro
https://github.com/StarRocks/starrocks/issues

https://github.com/apache/doris/issues
https://doris.apache.org/zh-CN/docs/summary/basic-summary

[Apache Doris Java UDF https://doris.apache.org/zh-CN/docs/dev/ecosystem/udf/java-user-defined-function] (https://doris.apache.org/zh-CN/docs/dev/ecosystem/udf/java-user-defined-function)
[StarRocks Java UDF https://docs.starrocks.io/zh-cn/latest/sql-reference/sql-functions/JAVA_UDF] (https://docs.starrocks.io/zh-cn/latest/sql-reference/sql-functions/JAVA_UDF)

基于FreeSWITCH自建呼叫中心中台 流水理鱼|wwek PPT分享

目录导航

  • 业务需求背景情况 – 业务需求、背景情况
  • 业务系统如何外呼 – 点呼、群呼、AI
  • 呼叫中心通话链路 – 通话序列图、呼叫线路、SIP协议介绍
  • 呼叫中心中台架构 – 呼叫中心中台架构设计(基础版)
  • FreeSWITCH-介绍 – FreeSWITCH电话软交换
  • FreeSWITCH-拨通第一个电话 – SIP Hello World
  • FreeSWITCH-集成 – FreeSWITCH系统集成设计和实现
  • FreeSWITCH-中台API封装 – 把FreeSWITCH的能力封装为中台API

业务需求背景情况

  • CRM中常见需要对ToB、ToC客户进行电话回访、电话销售
  • 在打电话这个事上,企业需求比个人需求要求更多,最基本的可系统集成、有话单、有录音等
  • 无论时代如何变,传统的基于运营商电话的接通需求是一直稳定存在的
  • 三方商业呼叫系统有很多,okcc、合力忆捷、天润、容联七陌、网易七鱼等
  • 自研呼叫解决3个主要核心问题。能力上,定制开发扩展性拉满;成本上,比购买商业呼叫系统便宜;安全上,数据在手
  • 自研呼叫需要具备条件,公司业务按年为单位长期有呼叫需求,有开发人员资源

业务系统如何外呼

  • 呼叫方式上,单个点击拨打(点呼)、批量呼通再分配排队坐席(群呼)、AI自动呼叫
  • 坐席(打电话人)软电话登录,登录呼叫中心的软电话客户端(无客户端的为网页浏览器客户端)
  • 业务系统中点击拨打、或者操作建立群呼、AI呼叫任务
  • 业务系统通过API调用呼叫中心控制发起每通电话,接下来看看通话序列图

查看文章

☎️呼叫中心通话链路 – 呼叫线路示意图

“`mermaid {theme: ‘neutral’, scale: 0.66}
graph LR
A[呼叫中心系统] –>|呼叫线路选择| B(选择线路网关1)
B –> |SIP|C{呼叫线路商1线路路由}
A[呼叫中心系统] –>|呼叫线路选择| Bn(选择线路网关N)
C –>|电信运营商落地1| D[A地固话号]
C –>|电信运营商落地2| E[B地固话号]
C –>|电信运营商落地3| F[C地手机号]
Bn –> |SIP|Cn{呼叫线路商N线路路由}
Cn –>|电信运营商落地1| Dn[A地固话号]
Cn –>|电信运营商落地2| En[B地手机号]
Cn –>|电信运营商落地3| Fn[C地手机号]

<pre><code><br /><br /># 呼叫中心通话链路 – 呼叫线路介绍
– 呼叫线路商提供的线路在 呼叫中心系统 有多个称呼,线路网关、SIP中继、中继网关、落地线路
– 呼叫线路的称呼,不论叫什么,他都是电话的通道,使用SIP协议对接
– 呼叫线路商使用VOS语言运营系统作为支持软件,VOS处于VOIP运营垄断地位
– 不使用呼叫线路商,直找电信运营商对接线路,用SIP协议对接
– 这样就相当于只有一条落地线路了,没有丰富的线路资源来优化线路路由,需按业务需求选择
– **呼叫线路商的本质是聚合多条、多地、多类型电信运营商线路资源**
一个典型的线路对接信息
</code></pre>

线路备注:xx线路
IP:8.8.8.8
UDP端口:5060 (SIP协议)
主叫送:20220601
被叫加前缀:6
并发:500
限制:一天一次
盲区:北、新、西

<pre><code><br /># SIP协议介绍
SIP(Session Initiation Protocol,会话初始协议)[^1]是由IETF(Internet Engineering Task Force,因特网工程任务组)制定的多媒体通信协议。广泛应用于CS(Circuit Switched,电路交换)、NGN(Next Generation Network,下一代网络)以及IMS(IP Multimedia Subsystem,IP多媒体子系统)的网络中,可以支持并应用于语音、视频、数据等多媒体业务,同时也可以应用于Presence(呈现)、Instant Message(即时消息)等特色业务。可以说,有IP网络的地方就有SIP协议的存在。SIP类似于HTTP
– **说人话!SIP协议用来在IP网络上做电话通讯**
[^1]: [一文详解 SIP 协议](https://www.cnblogs.com/xiaxveliang/p/12434170.html)

# SIP协议介绍-SIP Server
![](https://static.iamle.com/note/202207011111385.png)
– FreeSWITCH就是一个SIP Server,也是一个B2BUA,后面具体讲 FreeSWITCH
– **呼叫中心的SIP Server桥接 A leg 和 B leg,这样A和B就建立通话了**
– B2BUA看起来唬人,靠背嘛,就是A leg 和 B leg靠背桥接起来
– 支持SIP协议的软电话客户端常用的有:Linphone、MicroSIP、Eyebeam等

# SIP协议介绍-SIP Server 和 A leg 通讯
<img class="m-1 h-110 rounded" src="https://static.iamle.com/note/202207011131334.png" />

# SIP协议介绍-SIP Server 和 B leg 通讯
<img class="m-1 h-110 rounded" src="https://static.iamle.com/note/202207011124286.png" />

# 呼叫中心中台架构 – 呼叫中心中台架构设计(基础版)目标
– 基础版设计目标,实现点击拨打(点呼)
– **呼叫能力的高级抽象**
## 呼叫(主叫号码,被叫号码,[线路网关], [拓展数据])

– 主被叫号码既可以是内部的坐席分机号码,也可以是手机号码
– 线路网关是选填参数,支持多个逗号分割,为空使用系统默认配置网关
– 拓展数据是选填参数,呼叫系统在后续通话技术后原样传回业务系统
– 拓展数据,用于业务自身逻辑

layout: center

# 呼叫中心中台架构 – 呼叫中心中台架构设计(基础版)架构图
<img class="m-1 h-110 rounded" src="https://static.iamle.com/note/202207011731635.png" />

# FreeSWITCH – 介绍
– FreeSWITCH 是一个作为背靠背用户代理实现的开源运营商级电话平台。由于这种设计,它可以执行大量不同的任务,从PBX到传输交换机、TTS(文本到语音)转换、音频和视频会议主机,甚至是VoIP电话等等。
– 是一款非常好用的电话软交换框架,支持跨平台,扩展性良好,配置灵活。
– 可以在很多平台上运行,包括 Linux、Mac OS X、BSD、Solaris,甚至 Windows。
– 可以处理来自 IP 网络 (VoIP) 和 PSTN(普通的固定电话)的语音、视频和文本通信。
– 支持所有流行的 VoIP 协议以及与 PRIs 的接口。
– 支持 OPUS、iLBC、Speex、GSM、G711、G722 等多种语音编解码,支持 G723、G729 等语音编解码的透传模式。
– 可以当作 PBX、SBC、媒体服务器、业务服务器等不同的通信节点来使用
– 本身是在 MPL 1.1 (Mozilla 公共许可证) 下许可的,但是一些单独的模块可能使用其他许可证。
– **说人话 FreeSWITCH 是一个软件实现的电话交换平台,开源、模块化、功能丰富**
– **市面上绝大多商业呼叫中心都是基于 FreeSWITCH 为核心开发的**

layout: center

# FreeSWITCH – 总体结构
<img class="m-1 h-120 rounded" src="https://static.iamle.com/note/202207011403991.png" />

layout: center

# FreeSWITCH – 总体结构
<img class="m-1 h-120 rounded" src="https://static.iamle.com/note/202207011405842.png" />

layout: center

# FreeSWITCH – 配置文件目录结构
/etc/freeswitch# tree -Ld 3
</code></pre>

├── freeswitch.xml 主xml文件,就是它将所有配置文件“粘”到一起,生成一个大的xml文件
├── vars.xml 常用变量
├── autoload_configs 一般都是模块级的配置文件,每个模块对应一个。文件名一般以 module_name.conf.xml 方式命名。
│   ├── *.conf.xml
├── chatplan 聊天计划
├── dialplan 拨号计划
├── directory 用户目录,分级用户账号
│   ├── default 默认的用户目录配置
│   │   ├── *.xml SIP用户,每用户一个文件
├── sip_profiles
│   ├── external SIP中继网关配置
│   │   ├── *.xml
│   ├── external.xml
│   └── internal.xml
├── ivr_menus IVR 菜单
├── jingle_profiles 连接Google Talk的相关配置
├── lang 多语言支持
├── mrcp_profiles MRCP的相关配置, 用于跟第三方语音合成和语音识别系统对接
├── skinny_profiles 思科SCCP协议话机的配置文件
├── tls tls证书
├── extensions.conf

<pre><code>- FreeSWITCH的配置文件由众多XML配置文件构建

layout: two-cols

# FreeSWITCH – 控制客户端和开发者接口
FreeSWITCH如何操作控制和开发对接?[^1]
– ① fs_cli 为命令行控制接口,也就是敲命令控制[^2]
– ② ESL(Event Socket Library) 通过事件接口和FreeSWITCH交互控制,fs_cli本质上也是走的ESL一样的底层流程
– ③ mod_xml_curl、mod_xml_rpc、lua脚本语言、自编写模块等更多方式实现和FreeSWITCH交互控制
::right::
</code></pre>

输入 fs_cli 进入命令行控制
.=======================================================.
| _____ ____ ____ _ ___ |
| | <strong><em>/ ___| / ___| | |</em> <em>| |
| | |</em> _</strong> \ | | | | | | |
| | <em>| <strong><em>) | | |</em></strong>| |___ | | |
| |</em>| |____/ ____|_____|___| |
| |
.=======================================================.
| Anthony Minessale II, Ken Rice, |
| Michael Jerris, Travis Cross |
| FreeSWITCH (http://www.freeswitch.org) |
| Paypal Donations Appreciated: paypal@freeswitch.org |
| Brought to you by ClueCon http://www.cluecon.com/ |
.=======================================================.
Type /help to see a list of commands
+OK log level [7]
freeswitch@callcenter>

<pre><code>[^1]: [FreeSWITCH Client and Developer Interfaces](https://freeswitch.org/confluence/display/FREESWITCH/Client+and+Developer+Interfaces)
[^2]: [Command Line Interface (fs_cli)](https://freeswitch.org/confluence/pages/viewpage.action?pageId=1048948)

# FreeSWITCH – 拨通第一个电话 – 分机和分机
<img class="m-1 h-90 rounded" src="https://static.iamle.com/note/202207011645715.jpg" />
* fs_cli 命令行让分机1000和分机1002通话&gt; originate user/1000 'bridge:user/1002' inline
* 分机1000使用软电话客户端Linphone,分机1002使用软电话客户端MicroSIP注册在了FreeSWITCH服务器

# FreeSWITCH – 拨通第一个电话 – 分机和手机
<img class="m-1 h-90 rounded" src="https://static.iamle.com/note/202207041015836.png" />
* fs_cli 命令行让分机1000通过网关和手机通话&gt; originate user/1000 'bridge:{origination_caller_id_number=网关主叫}sofia/gateway/网关名/被叫前缀+手机号码' inline
* 分机1000使用软电话客户端Linphone,被叫为手机号

# FreeSWITCH – FreeSWITCH系统集成设计
FreeSWITCH 的配置都是 XML的,最朴素的想法,如何实现动态配置能力?

和 FreeSWITCH 集成需要解决下列问题
|问题|方案|
| —- | —- |
| 分机动态配置 | mod_xml_curl 提供分机动态配置能力,开发对应的API输出分机XML配置文件 |
| 拨号计划动态配置 | mod_xml_curl 提供拨号计划动态配置能力,开发对应的API输出拨号计划XML配置文件 |
| 网关动态配置 | 低频需求暂时手工加载配置文件,开发对应的API可以一键生成网关XML配置文件 |
| CDR话单存储 | mod_xml_cdr 提供话单推送能力,开发对应的API接收话单推送 |
| Record录音对象存储生成URL | api_hangup_hook挂机后回调处理上传录音文件,开发对应API接收上传的录音文件,并上传到对象存储 |
| WebHook回调CDR和Record录音到业务系统 | 在CDR和Record都有了的时候执行回调 |

# FreeSWITCH – FreeSWITCH系统集成实现
集成实现采用golang编程语言开发,框架采用goframe v2

实现如下API,提供给FreeSWITCH集成
|API|实现方式| 说明 |
| —- | —- | —- |
| /fsapi/xml_curl | 读取数据库分机表生成分机XML配置文件 | mod_xml_curl模块对接 |
| /fsapi/cdr | 接收CDR话单并存储数据库CDR话单表 | mod_xml_cdr模块对接 |
| /fsapi/upload_audio | 接收录音文件上传,并上传到对象存储生成录音URL | api_hangup_hook挂机后回调处理上传录音文件 |


layout: full

# FreeSWITCH – 呼叫中心中台API封装实现
中台API和 FreeSWITCH系统集成API放同一个golang项目

|API|实现方式| 说明 |
| —- | —- | —- |
| /v1/call/callback| 调用ESL接口发送事件命令 |呼叫-回拨 参数:主叫号码、被叫号码、网关名称(多个逗号分割)、拓展数据|
| /v1/extension/list| 读取数据库分机表 |分机列表|
| /v1/extension/detail| 读取数据库分机表 |分机详情 参数:分机号|
| /v1/gateway/list| 读取数据库分机表 | 网关列表|
| /v1/gateway/detail| 读取数据库分机表 | 网关详情|
| /v1/gateway/detail_xml| 读取数据库分机表 | 网关详情XML配置文件|
| /v1/extension/online/list| 调用ESL接口 |在线分机列表|
| /v1/gateway/online/list| 调用ESL接口 |在线网关列表|
<!–

<style>
h1 {<br />
font-size: 20px;<br />
}<br />
</style> –>

# FreeSWITCH – 呼叫中心中台API封装实现-呼叫实现
– **呼叫实现核心逻辑**
使用拨号计划组合多个"APP"实现录音、录音采样率设置、挂机后上传录音等能力
拨号计划使用inline即内联模式[^1],ESL接口发送一条实践命令即可完成
</code></pre>

'app1:arg1,app2:arg2,app3:arg3' inline

<pre><code>呼叫模板
</code></pre>

"originate [参数]user/分机号
'
set:media_bug_answer_req=true,
set:record_sample_rate=8000,
set:RECORD_STEREO=true,
set:cc_record_filename=$${recordings_dir}/${strftime(%%Y-%%m-%%d)}/${uuid}.mp3,
export:nolocal:execute_on_answer=record_session:${cc_record_filename},
set:curl_sendfile_url=%s,
set:api_hangup_hook=system curl -XPOST -F \"files=@${cc_record_filename}\" ${curl_sendfile_url}?uuid=${uuid} &,
set:continue_on_failure=true,
set:hangup_after_bridge=true,
set:session_in_hangup_hook=true,
set:ringback=$${sounds_dir}/music/8000/ponce-preludio-in-e-major.wav,
bridge:[参数]sofia/gateway/网关名/前缀+被叫号码,
playback:$${sounds_dir}/zh/cn/link/misc/misc-your_call_has_been_terminated.wav,
info:,
hangup:
'
inline"
“`

⏬基于FreeSWITCH自建呼叫中心中台 PDF

[TOC]

0、前言

image-20210606160820147

为什么使用Elasticsearch (以下简称 ES ) ?

全文搜索能力

ES本身就是一个搜索引擎,技术上基于倒排索引实现的搜索系统。我们做业务不必自己去开发一个搜索引擎,

ES已经满足绝大多数企业的搜索场景的需求。拼多多电商在发展前期,搜索业务使用ES支撑。

复杂查询(多维度组合筛选)

假如某个实体表,例如订单日订单量几十万,运营管理后台通常有十多个维度的组合筛选搜索。你可能会通过缩小查询时间范围来把数据量级降下来,MySQL也许也能一战,但是实际业务场景业务人员不太能接受这样一个小时间段筛选。如果稍微扩大时间段,MySQL这时候就无能为力了。那么把订单表做成“宽表”存入ES,十多个维度的搜索对于ES俩说是毫无压力的。ES群集能轻松支持亿级的宽表多维度组合筛选。这种需求在CRM、BOSS等场景是刚需。

滴滴司机端早期日订单量几十万,运营管理后台大时间跨度且多维度组合筛选就是用ES来实现的。

一个产品是否成熟,公有云是否提供基于该产品的服务是一个风向标。那么ES毫无疑问是成熟的产品。

虽然ES这么好,但是我的数据都在MySQL中那么怎么让数据实时同步到ES中呢?

1、基于应用程序多写

直接通过应用程序数据双写到MySQL和ES

image-20210606174354578

记录删除机制:直接删除

一致性: 需要自行处理,需要对失败错误做好日志记录,做好异常告警并人工补偿

优点: 直接明了,能够灵活控制数据写入,延迟最低

缺点: 与业务耦合严重,逻辑要写在业务系统中

应用双写(同步)

直接通过ES API将数据写入到ES集群中,也就是写入数据库的同时调用ES API写入到ES中

这个过程是同步的

应用双写(MQ异步解耦)

对 应用双写(同步)的改进,引入MQ中间件。

把同步变为异步,做了解耦。

同时引入MQ后双写性能提高,解决数据一致性问题。

缺点是会增加延迟性,业务系统增加mq代码,而且多一个MQ中间件要维护

2、基于binlog订阅

binlog订阅的原理很简单,模拟一个MySQL slave 订阅binlog日志,从而实现CDC(change data capture)

CDC,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等。

记录删除机制:直接删除

一致性: 最终一致性

优点: 对业务系统无任何侵入

缺点: 需要维护额外增加的一套数据同步平台;有分钟级的延迟

Canal

img

https://github.com/alibaba/canal/

阿里巴巴 MySQL binlog 增量订阅&消费组件

Databus

https://github.com/linkedin/databus

Linkedin Databus 分布式数据库同步系统

Maxwell

官网:http://maxwells-daemon.io/

https://github.com/zendesk/maxwell

Flink CDC

https://github.com/ververica/flink-cdc-connectors

flink-cdc-connectors 中文教程

基于flink数据计算平台实现 MySQL binlog订阅直接写入es

Flink CDC抛弃掉其他中间件,实现 MySQL 》Flink CDC》ES 非常简洁的数据同步架构

该方式比较新2020年开始的项目,目前在一些实时数仓上有应用

DTS(阿里云)

阿里云的商业产品,具备好的易用性,省运维成本。

CloudCanal

CloudCanal官网

CloudCannal数据同步迁移系统,商业产品

3、基于SQL抽取

基于SQL查询的数据抽取同步

这种方式需要满足2个基本条件

1、MySQL的表必须有唯一键字段(和ES中_id对应)

2、MySQL的表必须有一个“修改时间”字段,该记录任何一个字段修改都需要更新“修改时间”

有了唯一键字段就可以知道修改某条记录后同步哪条ES记录,有了修改时间字段就可以知道同步到哪儿了。

满足了这2个基本条件这样就可实现增量实时同步。

记录删除机制:逻辑删除,在MySQL中增加逻辑删除字段,ES搜索时过滤状态

一致性: 依赖修改时间字段;延迟时间等于计划任务多久执行一次

优点: 对业务系统无任何侵入,简单方便;课用JOIN打宽表

缺点: MySQL承受查询压力;需要业务中满足2个基本条件

logstash

我们使用 Logstash 和 JDBC 输入插件来让 Elasticsearch 与 MySQL 保持同步。从概念上讲,Logstash 的 JDBC 输入插件会运行一个循环来定期对 MySQL 进行轮询,从而找出在此次循环的上次迭代后插入或更改的记录。如要让其正确运行,必须满足下列条件:

  1. 在将 MySQL 中的文档写入 Elasticsearch 时,Elasticsearch 中的 “_id” 字段必须设置为 MySQL 中的 “id” 字段。这可在 MySQL 记录与 Elasticsearch 文档之间建立一个直接映射关系。如果在 MySQL 中更新了某条记录,那么将会在 Elasticsearch 中覆盖整条相关记录。请注意,在 Elasticsearch 中覆盖文档的效率与更新操作的效率一样高,因为从内部原理上来讲,更新便包括删除旧文档以及随后对全新文档进行索引。
  2. 当在 MySQL 中插入或更新数据时,该条记录必须有一个包含更新或插入时间的字段。通过此字段,便可允许 Logstash 仅请求获得在轮询循环的上次迭代后编辑或插入的文档。Logstash 每次对 MySQL 进行轮询时,都会保存其从 MySQL 所读取最后一条记录的更新或插入时间。在下一次迭代时,Logstash 便知道其仅需请求获得符合下列条件的记录:更新或插入时间晚于在轮询循环中的上一次迭代中所收到的最后一条记录。
input {
  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => <my username>
    jdbc_password => <my password>
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
output {
  # stdout { codec =>  "rubydebug"}
  elasticsearch {
      index => "rdbms_sync_idx"
      document_id => "%{[@metadata][_id]}"
  }
}

配置Logstash的计划任务,定时执行

4、总结

前期建议采用 基于SQL抽取的方式做同步,后期数据量大了建议采用基于binlog订阅的方式同步。

如果本身有现成的Flink平台可用,推荐使用Flink CDC。

什么是最佳的 MySQL 同步 ElasticSearch 方案?

答案是选择缺点可以接受,又满足需求,拥有成本最低的方案。

“完美”的方案往往拥有成本会比较高,所以需要结合业务环境的上下文去选择。

流水理鱼觉得没有一招鲜的方案,因为每种方案都有利弊,所以选取适合你当下业务环境的方案。那么这样的方案就是最佳方案。

5、参考

MySQL 数据实时同步到 Elasticsearch 的技术方案选型和思考 by 万凯明

如何使用 Logstash 和 JDBC 确保 Elasticsearch 与关系型数据库保持同步

监听mysql的binlog日志工具分析:canal、Maxwell、Databus、DTS

0. 前言

关键字

号码隐私保护

虚拟号码保护

号码保护

手机小号

虚拟号码

隐私号码

定义

A、B一般代表用户真实号码

X一般代表PASS平台中的小号,通常和A进行绑定(也叫中间号、隐私号、虚拟号)

Y一般代表PASS平台中的小号,用于解决回呼,通常用于B回呼A

x小x代表分机号

1. 号码隐私保护通话模式介绍

1.1 AX (AXN)模式

只保护号码A,号码X不能复用

一对多隐私保护

呼叫前预先建立A和X的绑定关系

为用户A分配隐私号X,A对外的号码都以X替代,所有与A的通话都通过X建立,保护A号码不泄漏

隐私保护通话AX模式中,A为业务受益用户,为了保护A的真实号码不被泄露,在隐私保护通话平台为A绑定一个虚拟号码X。

  • 绑定关系建立后:
    • 所有人均可拨打X联系A,保护A的真实号码不被泄露。
    • 用户A呼叫其他用户时,企业需要通过API(AX模式设置临时被叫接口)指定呼叫对象(如B),然后A拨打X号码呼叫B。
  • AX模式下的X号码只专属于A号码,即一个虚拟号码X同时只能绑定一个A号码。但1个A号码可以同时绑定5个X号码。
  • 当X号码和A号码解除绑定关系后,该X号码可以被回收,供其他号码绑定。

AX模式下具有如下关键功能:

点击放大

1.2 AXB模式

保护号码A和B,号码X可以复用

一对一隐私保护

用户A与B之间建立一对一绑定关系,双方通过X号码联系对方,保护双方号码隐私。同一X号码可以复用在不同的绑定关系中

呼叫前A和B都必须是已知号码,预先建立AXB的绑定关系

用户A和B不知道对方真实号码,通过平台分配的临时隐私号X联系对方,保护双方号码不泄漏

隐私保护通话AXB模式中,A和B为相互保密的两个业务受益用户,A和B用户都不知道对方真实号码的存在,为了双方的真实号码不被泄露,在隐私保护通话平台为A和B用户绑定一个虚拟号码X,A和B用户对对方只呈现X号码,A和B之间的通信都是通过X号码进行转接。

  • AXB模式下X号码允许被多组号码(建议不超过1000组号码)绑定,即支持多路并发呼叫,但是多组号码中的A和B号码不能重复。例如,允许同时绑定AXB和CXD,但不允许同时绑定AXB和BXC。
  • 同一个A号码若要绑定不同B号码,需要使用不同的X号码进行绑定。如,若已存在AXB,还需绑定A和C,需使用别的X号码,如AX1C。
  • 当X号码和A、B号码解除绑定关系后,该X号码可以被回收,供其他号码绑定。

AXB模式下具有如下关键功能:

点击放大

1.3 X模式

该模式由企业自身维护绑定关系,控制力强

PASS平台仅提供呼叫和短信的管道能力,由企业管理隐私号X与用户的绑定关系,通过灵活利用绑定关系,节省号码资源,创新更丰富的应用

隐私保护通话X模式中,PASS平台对外提供X号码呼叫和短信能力,PASS平台侧不存储任何绑定关系,小号平台接收到呼叫或短信后到第三方系统获取绑定关系。

X模式下具有如下关键功能:

点击放大

点击放大

1.4 AXE (AXN分机、AXx分机)模式

AX模式上增加分机号E

一对多隐私保护

只保护号码A,号码X通过增加分机的方式复用

为用户A分配隐私号X+分机号,A对外的号码都以X+分机号替代,所有与A的通话都通过X+分机号建立,保护A号码不泄漏,提高号码利用率

隐私保护通话AXE模式中,A为业务受益用户,为了保护A的真实号码不被泄露,隐私保护通话平台为A绑定一个分机主号码X和一个分机号E。

  • 绑定关系建立后,其他用户拨打X号码再输入分机号E即可联系A用户。A用户拨打X号码可回呼之前通话用户或企业指定号码。
  • AXE模式下1个X号码可以绑定多个A号码,每个A号码分配不同的分机号E;分机号E最大4位(即0001~9999),但建议一个X号码不要绑定超过200个A号码。

AXE模式下具有如下关键功能:

点击放大

点击放大

1.5 AXG模式

一对组隐私保护

image-20200522234702286

1.6 AXYB模式

保护号码A和号码B

多关系隐私保护

隐私保护通话AXYB模式中,隐私保护通话平台为需要通话的一对或多对用户分别分配对应的隐私号码,保护通话的双方真实号码不被泄露。如隐私保护通话平台为A、B分别绑定隐私号码X和Y,建立了AXYB的绑定关系。

  • A用户对B用户只呈现X号码,与A用户建立的通信通过X号码建立;B用户对A用户呈现Y号码,与A用户建立的通信通过Y号码建立。
  • 一个X号码同时只能绑定一个A号码,一个A号码可以同时绑定5个X号码。Y号码可绑定的最大关系数量为1000(Y号码绑定一个AX关系计为一次绑定关系),但绑定的AX关系不可重复。例如,允许同时绑定A1X1Y1B1和A2X2Y1B2,但不允许同时绑定A1X1Y1B1和A1X1Y1B2。

AXYB模式下具有如下关键功能:

点击放大

1.7 AXxYB分机模式

AXxYB在AXYB上增加分机号码,小x就是分机号,通过增加分机号的方式节约号码X的使用

多关系隐私保护

2. 号码资源

目前号码资源有

虚商号码170/171

标准手机号码

固话号

95呼叫行业专用

3. 业务场景模式适配

AX (AXN)

1对多场景下的隐私保护,在不占用手机SIM卡槽的情况下为用户A增加一个第二号码,保护用户A的隐私,其他人都是通过拨打X号码接通用户A。

商业号码随身行

隐私号码作为商业号码绑定私人手机,随身接通客户电话。非工作时段可设置关机,防止骚扰

优势:

  • 客户资源不遗失

    员工离职,公司收回隐私号码,原有客户资源保持企业所有

  • 企业号码不变更

    客户只需拨打1个企业号码,不需随业务人员离职而频繁变更联系号码

  • 商业行为可追溯

    员工与客户的呼叫记录可追溯,方便管理;支持录音,促进企业服务质量提升

AXB

AXB中间号于1对1场景下的隐私保护,前置条件是A、B的联系方式已知;业务在绑定时候把A、B的联系方式通过api传递到号码隐私保护平台。应用场景:打车、短租、O2O服务等

用车出行、网约车出行

司机和乘客通过平台临时分配的隐私号码呼叫对方,不暴露自己的真实号码

在打车出行场景中,用车订单生成后,司机与乘客间,建立绑定关系,服务过程双方通过隐私号码联系对方,有效保护隐私,服务结束后,解除绑定,避免骚扰纠纷。

X

企业自行维护绑定关系

扫码挪车

车主申请隐私号码并和二维码绑定,其他人扫描二维码,拨打隐私号码联系车主

优势:

  • 隐私保护

    保护双方号码不泄漏,阻断第三方数据采集;可设置二维码关闭,防止骚扰

  • 方便快捷

    无繁琐的界面设定,手机扫码,一键通知挪车

  • 多管齐下

    电话和短信多渠道联系车主,确保通知到位

AXE (AXN分机、AXx分机)

一对多场景下的隐私保护;针对单一面单一利润较低,为了实现X号码复用,引入分机号概念大大降低了单一面单分摊的X号码接通用户A。应用场景:房产中介等

快递派送

快递员拨打隐私号,听到语音提示后,输入分机号转接至收件人

优势:

  • 隐私保护

保护收件人号码不泄露,派送结束隐私号码失效,防止后续骚扰

  • 节省号码资源

    1个隐私号可设置1万个分机号,提高号码复用率,降低号码成本

  • 灵活易用

    分机号根据业务规模灵活设置;交易结束快速回收隐私号,循环利用,无需设置号码冷却期

AXG

一对组场景下的隐私保护;主要针对有团队协作场景的行业,如招聘,通过API将A和企业的人事和用人部门等绑定,进行联系。应用场景:招聘、银行、保险等

AXYB

A和B双方保护

此模式主要针对企业严格把控企业会员隐私情况而定,适用场景较符合当前市场的房产中介类企业。此模式对于保护会员隐私效果最好

AXxYB

A和B双方保护

美团外卖典型的场景

骑手手机号B,拨打号码X 分机号x 转接到点餐人A,你的餐到了

点餐人A拨打 Y 转接到骑手B,我的餐为什么还没到

4. 参考

https://cloud.tencent.com/product/npp

https://support.huaweicloud.com/PrivateNumber/index.html

https://www.aliyun.com/product/pls

5. 提供商

如果你遇到了号码隐私保护上的技术问题,可以加流水理鱼的微信,拉你进技术群讨论