基于FreeSWITCH自建呼叫中心中台 流水理鱼|wwek PPT分享

目录导航

  • 业务需求背景情况 – 业务需求、背景情况
  • 业务系统如何外呼 – 点呼、群呼、AI
  • 呼叫中心通话链路 – 通话序列图、呼叫线路、SIP协议介绍
  • 呼叫中心中台架构 – 呼叫中心中台架构设计(基础版)
  • FreeSWITCH-介绍 – FreeSWITCH电话软交换
  • FreeSWITCH-拨通第一个电话 – SIP Hello World
  • FreeSWITCH-集成 – FreeSWITCH系统集成设计和实现
  • FreeSWITCH-中台API封装 – 把FreeSWITCH的能力封装为中台API

业务需求背景情况

  • CRM中常见需要对ToB、ToC客户进行电话回访、电话销售
  • 在打电话这个事上,企业需求比个人需求要求更多,最基本的可系统集成、有话单、有录音等
  • 无论时代如何变,传统的基于运营商电话的接通需求是一直稳定存在的
  • 三方商业呼叫系统有很多,okcc、合力忆捷、天润、容联七陌、网易七鱼等
  • 自研呼叫解决3个主要核心问题。能力上,定制开发扩展性拉满;成本上,比购买商业呼叫系统便宜;安全上,数据在手
  • 自研呼叫需要具备条件,公司业务按年为单位长期有呼叫需求,有开发人员资源

业务系统如何外呼

  • 呼叫方式上,单个点击拨打(点呼)、批量呼通再分配排队坐席(群呼)、AI自动呼叫
  • 坐席(打电话人)软电话登录,登录呼叫中心的软电话客户端(无客户端的为网页浏览器客户端)
  • 业务系统中点击拨打、或者操作建立群呼、AI呼叫任务
  • 业务系统通过API调用呼叫中心控制发起每通电话,接下来看看通话序列图

查看文章

☎️呼叫中心通话链路 – 呼叫线路示意图

“`mermaid {theme: ‘neutral’, scale: 0.66}
graph LR
A[呼叫中心系统] –>|呼叫线路选择| B(选择线路网关1)
B –> |SIP|C{呼叫线路商1线路路由}
A[呼叫中心系统] –>|呼叫线路选择| Bn(选择线路网关N)
C –>|电信运营商落地1| D[A地固话号]
C –>|电信运营商落地2| E[B地固话号]
C –>|电信运营商落地3| F[C地手机号]
Bn –> |SIP|Cn{呼叫线路商N线路路由}
Cn –>|电信运营商落地1| Dn[A地固话号]
Cn –>|电信运营商落地2| En[B地手机号]
Cn –>|电信运营商落地3| Fn[C地手机号]

<pre><code><br /><br /># 呼叫中心通话链路 – 呼叫线路介绍
– 呼叫线路商提供的线路在 呼叫中心系统 有多个称呼,线路网关、SIP中继、中继网关、落地线路
– 呼叫线路的称呼,不论叫什么,他都是电话的通道,使用SIP协议对接
– 呼叫线路商使用VOS语言运营系统作为支持软件,VOS处于VOIP运营垄断地位
– 不使用呼叫线路商,直找电信运营商对接线路,用SIP协议对接
– 这样就相当于只有一条落地线路了,没有丰富的线路资源来优化线路路由,需按业务需求选择
– **呼叫线路商的本质是聚合多条、多地、多类型电信运营商线路资源**
一个典型的线路对接信息
</code></pre>

线路备注:xx线路
IP:8.8.8.8
UDP端口:5060 (SIP协议)
主叫送:20220601
被叫加前缀:6
并发:500
限制:一天一次
盲区:北、新、西

<pre><code><br /># SIP协议介绍
SIP(Session Initiation Protocol,会话初始协议)[^1]是由IETF(Internet Engineering Task Force,因特网工程任务组)制定的多媒体通信协议。广泛应用于CS(Circuit Switched,电路交换)、NGN(Next Generation Network,下一代网络)以及IMS(IP Multimedia Subsystem,IP多媒体子系统)的网络中,可以支持并应用于语音、视频、数据等多媒体业务,同时也可以应用于Presence(呈现)、Instant Message(即时消息)等特色业务。可以说,有IP网络的地方就有SIP协议的存在。SIP类似于HTTP
– **说人话!SIP协议用来在IP网络上做电话通讯**
[^1]: [一文详解 SIP 协议](https://www.cnblogs.com/xiaxveliang/p/12434170.html)

# SIP协议介绍-SIP Server
![](https://static.iamle.com/note/202207011111385.png)
– FreeSWITCH就是一个SIP Server,也是一个B2BUA,后面具体讲 FreeSWITCH
– **呼叫中心的SIP Server桥接 A leg 和 B leg,这样A和B就建立通话了**
– B2BUA看起来唬人,靠背嘛,就是A leg 和 B leg靠背桥接起来
– 支持SIP协议的软电话客户端常用的有:Linphone、MicroSIP、Eyebeam等

# SIP协议介绍-SIP Server 和 A leg 通讯
<img class="m-1 h-110 rounded" src="https://static.iamle.com/note/202207011131334.png" />

# SIP协议介绍-SIP Server 和 B leg 通讯
<img class="m-1 h-110 rounded" src="https://static.iamle.com/note/202207011124286.png" />

# 呼叫中心中台架构 – 呼叫中心中台架构设计(基础版)目标
– 基础版设计目标,实现点击拨打(点呼)
– **呼叫能力的高级抽象**
## 呼叫(主叫号码,被叫号码,[线路网关], [拓展数据])

– 主被叫号码既可以是内部的坐席分机号码,也可以是手机号码
– 线路网关是选填参数,支持多个逗号分割,为空使用系统默认配置网关
– 拓展数据是选填参数,呼叫系统在后续通话技术后原样传回业务系统
– 拓展数据,用于业务自身逻辑

layout: center

# 呼叫中心中台架构 – 呼叫中心中台架构设计(基础版)架构图
<img class="m-1 h-110 rounded" src="https://static.iamle.com/note/202207011731635.png" />

# FreeSWITCH – 介绍
– FreeSWITCH 是一个作为背靠背用户代理实现的开源运营商级电话平台。由于这种设计,它可以执行大量不同的任务,从PBX到传输交换机、TTS(文本到语音)转换、音频和视频会议主机,甚至是VoIP电话等等。
– 是一款非常好用的电话软交换框架,支持跨平台,扩展性良好,配置灵活。
– 可以在很多平台上运行,包括 Linux、Mac OS X、BSD、Solaris,甚至 Windows。
– 可以处理来自 IP 网络 (VoIP) 和 PSTN(普通的固定电话)的语音、视频和文本通信。
– 支持所有流行的 VoIP 协议以及与 PRIs 的接口。
– 支持 OPUS、iLBC、Speex、GSM、G711、G722 等多种语音编解码,支持 G723、G729 等语音编解码的透传模式。
– 可以当作 PBX、SBC、媒体服务器、业务服务器等不同的通信节点来使用
– 本身是在 MPL 1.1 (Mozilla 公共许可证) 下许可的,但是一些单独的模块可能使用其他许可证。
– **说人话 FreeSWITCH 是一个软件实现的电话交换平台,开源、模块化、功能丰富**
– **市面上绝大多商业呼叫中心都是基于 FreeSWITCH 为核心开发的**

layout: center

# FreeSWITCH – 总体结构
<img class="m-1 h-120 rounded" src="https://static.iamle.com/note/202207011403991.png" />

layout: center

# FreeSWITCH – 总体结构
<img class="m-1 h-120 rounded" src="https://static.iamle.com/note/202207011405842.png" />

layout: center

# FreeSWITCH – 配置文件目录结构
/etc/freeswitch# tree -Ld 3
</code></pre>

├── freeswitch.xml 主xml文件,就是它将所有配置文件“粘”到一起,生成一个大的xml文件
├── vars.xml 常用变量
├── autoload_configs 一般都是模块级的配置文件,每个模块对应一个。文件名一般以 module_name.conf.xml 方式命名。
│   ├── *.conf.xml
├── chatplan 聊天计划
├── dialplan 拨号计划
├── directory 用户目录,分级用户账号
│   ├── default 默认的用户目录配置
│   │   ├── *.xml SIP用户,每用户一个文件
├── sip_profiles
│   ├── external SIP中继网关配置
│   │   ├── *.xml
│   ├── external.xml
│   └── internal.xml
├── ivr_menus IVR 菜单
├── jingle_profiles 连接Google Talk的相关配置
├── lang 多语言支持
├── mrcp_profiles MRCP的相关配置, 用于跟第三方语音合成和语音识别系统对接
├── skinny_profiles 思科SCCP协议话机的配置文件
├── tls tls证书
├── extensions.conf

<pre><code>- FreeSWITCH的配置文件由众多XML配置文件构建

layout: two-cols

# FreeSWITCH – 控制客户端和开发者接口
FreeSWITCH如何操作控制和开发对接?[^1]
– ① fs_cli 为命令行控制接口,也就是敲命令控制[^2]
– ② ESL(Event Socket Library) 通过事件接口和FreeSWITCH交互控制,fs_cli本质上也是走的ESL一样的底层流程
– ③ mod_xml_curl、mod_xml_rpc、lua脚本语言、自编写模块等更多方式实现和FreeSWITCH交互控制
::right::
</code></pre>

输入 fs_cli 进入命令行控制
.=======================================================.
| _____ ____ ____ _ ___ |
| | <strong><em>/ ___| / ___| | |</em> <em>| |
| | |</em> _</strong> \ | | | | | | |
| | <em>| <strong><em>) | | |</em></strong>| |___ | | |
| |</em>| |____/ ____|_____|___| |
| |
.=======================================================.
| Anthony Minessale II, Ken Rice, |
| Michael Jerris, Travis Cross |
| FreeSWITCH (http://www.freeswitch.org) |
| Paypal Donations Appreciated: paypal@freeswitch.org |
| Brought to you by ClueCon http://www.cluecon.com/ |
.=======================================================.
Type /help to see a list of commands
+OK log level [7]
freeswitch@callcenter>

<pre><code>[^1]: [FreeSWITCH Client and Developer Interfaces](https://freeswitch.org/confluence/display/FREESWITCH/Client+and+Developer+Interfaces)
[^2]: [Command Line Interface (fs_cli)](https://freeswitch.org/confluence/pages/viewpage.action?pageId=1048948)

# FreeSWITCH – 拨通第一个电话 – 分机和分机
<img class="m-1 h-90 rounded" src="https://static.iamle.com/note/202207011645715.jpg" />
* fs_cli 命令行让分机1000和分机1002通话&gt; originate user/1000 'bridge:user/1002' inline
* 分机1000使用软电话客户端Linphone,分机1002使用软电话客户端MicroSIP注册在了FreeSWITCH服务器

# FreeSWITCH – 拨通第一个电话 – 分机和手机
<img class="m-1 h-90 rounded" src="https://static.iamle.com/note/202207041015836.png" />
* fs_cli 命令行让分机1000通过网关和手机通话&gt; originate user/1000 'bridge:{origination_caller_id_number=网关主叫}sofia/gateway/网关名/被叫前缀+手机号码' inline
* 分机1000使用软电话客户端Linphone,被叫为手机号

# FreeSWITCH – FreeSWITCH系统集成设计
FreeSWITCH 的配置都是 XML的,最朴素的想法,如何实现动态配置能力?

和 FreeSWITCH 集成需要解决下列问题
|问题|方案|
| —- | —- |
| 分机动态配置 | mod_xml_curl 提供分机动态配置能力,开发对应的API输出分机XML配置文件 |
| 拨号计划动态配置 | mod_xml_curl 提供拨号计划动态配置能力,开发对应的API输出拨号计划XML配置文件 |
| 网关动态配置 | 低频需求暂时手工加载配置文件,开发对应的API可以一键生成网关XML配置文件 |
| CDR话单存储 | mod_xml_cdr 提供话单推送能力,开发对应的API接收话单推送 |
| Record录音对象存储生成URL | api_hangup_hook挂机后回调处理上传录音文件,开发对应API接收上传的录音文件,并上传到对象存储 |
| WebHook回调CDR和Record录音到业务系统 | 在CDR和Record都有了的时候执行回调 |

# FreeSWITCH – FreeSWITCH系统集成实现
集成实现采用golang编程语言开发,框架采用goframe v2

实现如下API,提供给FreeSWITCH集成
|API|实现方式| 说明 |
| —- | —- | —- |
| /fsapi/xml_curl | 读取数据库分机表生成分机XML配置文件 | mod_xml_curl模块对接 |
| /fsapi/cdr | 接收CDR话单并存储数据库CDR话单表 | mod_xml_cdr模块对接 |
| /fsapi/upload_audio | 接收录音文件上传,并上传到对象存储生成录音URL | api_hangup_hook挂机后回调处理上传录音文件 |


layout: full

# FreeSWITCH – 呼叫中心中台API封装实现
中台API和 FreeSWITCH系统集成API放同一个golang项目

|API|实现方式| 说明 |
| —- | —- | —- |
| /v1/call/callback| 调用ESL接口发送事件命令 |呼叫-回拨 参数:主叫号码、被叫号码、网关名称(多个逗号分割)、拓展数据|
| /v1/extension/list| 读取数据库分机表 |分机列表|
| /v1/extension/detail| 读取数据库分机表 |分机详情 参数:分机号|
| /v1/gateway/list| 读取数据库分机表 | 网关列表|
| /v1/gateway/detail| 读取数据库分机表 | 网关详情|
| /v1/gateway/detail_xml| 读取数据库分机表 | 网关详情XML配置文件|
| /v1/extension/online/list| 调用ESL接口 |在线分机列表|
| /v1/gateway/online/list| 调用ESL接口 |在线网关列表|
<!–

<style>
h1 {<br />
font-size: 20px;<br />
}<br />
</style> –>

# FreeSWITCH – 呼叫中心中台API封装实现-呼叫实现
– **呼叫实现核心逻辑**
使用拨号计划组合多个"APP"实现录音、录音采样率设置、挂机后上传录音等能力
拨号计划使用inline即内联模式[^1],ESL接口发送一条实践命令即可完成
</code></pre>

'app1:arg1,app2:arg2,app3:arg3' inline

<pre><code>呼叫模板
</code></pre>

"originate [参数]user/分机号
'
set:media_bug_answer_req=true,
set:record_sample_rate=8000,
set:RECORD_STEREO=true,
set:cc_record_filename=$${recordings_dir}/${strftime(%%Y-%%m-%%d)}/${uuid}.mp3,
export:nolocal:execute_on_answer=record_session:${cc_record_filename},
set:curl_sendfile_url=%s,
set:api_hangup_hook=system curl -XPOST -F \"files=@${cc_record_filename}\" ${curl_sendfile_url}?uuid=${uuid} &,
set:continue_on_failure=true,
set:hangup_after_bridge=true,
set:session_in_hangup_hook=true,
set:ringback=$${sounds_dir}/music/8000/ponce-preludio-in-e-major.wav,
bridge:[参数]sofia/gateway/网关名/前缀+被叫号码,
playback:$${sounds_dir}/zh/cn/link/misc/misc-your_call_has_been_terminated.wav,
info:,
hangup:
'
inline"
“`

⏬基于FreeSWITCH自建呼叫中心中台 PDF

背景

MySQL库A 到 MySQL库B的增量数据同步需求

DolphinScheduler中配置DataX MySQL To MySQL工作流

工作流定义

工作流定义 > 创建工作流 > 拖入1个SHELL组件 > 拖入1个DATAX组件
SHELL组件(文章)
脚本

echo '文章同步 MySQL To MySQL'

DATAX组件(t_article)
用到2个插件mysqlreader^[1]、mysqlwriter^[2]
选 自定义模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库A?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
                                ],
                                "querySql": [
                                    "select a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= '${biz_update_dt}';"
                                ]
                            }
                        ],
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            "`id`",
                            "`title`",
                            "`content`",
                            "`is_delete`",
                            "`delete_date`",
                            "`create_date`",
                            "`update_date`"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库B?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
                                "table": [
                                    "t_article"
                                ]
                            }
                        ],
                        "writeMode": "replace",
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                }
            }
        ],
        "setting": {
            "errorLimit": {
                "percentage": 0,
                "record": 0
            },
            "speed": {
                "channel": 1,
                "record": 1000
            }
        }
    }
}

reader和writer的字段配置需保持一致

自定义参数:

biz_update_dt: ${global_bizdate}
biz_mysql_host: 你的mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的mysql账号
biz_mysql_password: 你的mysql密码

# 本文实验环境A库和B库用的同一个实例,如果MySQL是多个实例,可以再新增加参数定义例如 biz_mysql_host_b,在模板中对应引用即可

配置的自定义参数将会自动替换json模板中的同名变量

reader mysqlreader插件中关键配置: a.update_date >= '${biz_update_dt}' 就是实现增量同步的关键配置
writer mysqlwriter插件中关键配置: “

"parameter": {
    "writeMode": "replace",
    ......
}

writeMode为replace,相同主键id重复写入数据,就会更新数据。sql本质上执行的是 replace into

保存工作流

全局变量设置
global_bizdate: $[yyyy-MM-dd 00:00:00-1]

global_bizdate 引用的变量为 DolphinScheduler 内置变量,具体参考官网文档^[3]
结合调度时间设计好时间滚动的窗口时长,比如按1天增量,那么这里时间就是减1天

最终的工作流DAG图为:

爬坑记录

  • 官网下载的DataX不包含ElasticSearchWriter写插件
    默认不带该插件,需要自己编译ElasticSearchWriter插件。
git clone https://github.com/alibaba/DataX.git

为了加快编译速度,可以只编译<module>elasticsearchwriter</module>
项目根目录的pom.xml
<!-- reader --> 全注释掉,<!-- writer -->下只保留<module>elasticsearchwriter</module>其他注释掉,另外<!-- common support module -->也需要保留

如果自己不想编译或者编译失败请搜索🔍 “流水理鱼”微信公众号,或者加我私人微信我给你已经编译好的插件包

by 流水理鱼|wwek

参考

1. DataX MysqlReader 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
2. DataX MysqlWriter 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
3. Apache DolphinScheduler 内置参数 https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/built-in.html

背景

MySQL库A 到 MySQL库B的增量数据同步需求

DolphinScheduler中配置DataX MySQL To MySQL工作流

工作流定义

工作流定义 > 创建工作流 > 拖入1个SHELL组件 > 拖入1个DATAX组件
SHELL组件(文章)
脚本

echo '文章同步 MySQL To MySQL'

DATAX组件(t_article)
用到2个插件mysqlreader^[1]、mysqlwriter^[2]
选 自定义模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库A?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
                                ],
                                "querySql": [
                                    "select a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= '${biz_update_dt}';"
                                ]
                            }
                        ],
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            "`id`",
                            "`title`",
                            "`content`",
                            "`is_delete`",
                            "`delete_date`",
                            "`create_date`",
                            "`update_date`"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库B?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
                                "table": [
                                    "t_article"
                                ]
                            }
                        ],
                        "writeMode": "replace",
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                }
            }
        ],
        "setting": {
            "errorLimit": {
                "percentage": 0,
                "record": 0
            },
            "speed": {
                "channel": 1,
                "record": 1000
            }
        }
    }
}

reader和writer的字段配置需保持一致

自定义参数:

biz_update_dt: ${global_bizdate}
biz_mysql_host: 你的mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的mysql账号
biz_mysql_password: 你的mysql密码

# 本文实验环境A库和B库用的同一个实例,如果MySQL是多个实例,可以再新增加参数定义例如 biz_mysql_host_b,在模板中对应引用即可

配置的自定义参数将会自动替换json模板中的同名变量

reader mysqlreader插件中关键配置: a.update_date >= '${biz_update_dt}' 就是实现增量同步的关键配置
writer mysqlwriter插件中关键配置: “

"parameter": {
    "writeMode": "replace",
    ......
}

writeMode为replace,相同主键id重复写入数据,就会更新数据。sql本质上执行的是 replace into

保存工作流

全局变量设置
global_bizdate: $[yyyy-MM-dd 00:00:00-1]

global_bizdate 引用的变量为 DolphinScheduler 内置变量,具体参考官网文档^[3]
结合调度时间设计好时间滚动的窗口时长,比如按1天增量,那么这里时间就是减1天

最终的工作流DAG图为:

爬坑记录

  • 官网下载的DataX不包含ElasticSearchWriter写插件
    默认不带该插件,需要自己编译ElasticSearchWriter插件。
git clone https://github.com/alibaba/DataX.git

为了加快编译速度,可以只编译<module>elasticsearchwriter</module>
项目根目录的pom.xml
<!-- reader --> 全注释掉,<!-- writer -->下只保留<module>elasticsearchwriter</module>其他注释掉,另外<!-- common support module -->也需要保留

如果自己不想编译或者编译失败请搜索🔍 “流水理鱼”微信公众号,或者加我私人微信我给你已经编译好的插件包

by 流水理鱼|wwek

参考

1. DataX MysqlReader 插件文档
2. DataX MysqlWriter 插件文档
3. Apache DolphinScheduler 内置参数

数据同步的方式

数据同步的2大方式

  • 基于SQL查询的 CDC(Change Data Capture):
    • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据。也就是我们说的基于SQL查询抽取;
    • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
    • 不保障实时性,基于离线调度存在天然的延迟;
    • 工具软件以Kettle(Apache Hop最新版)、DataX为代表,需要结合任务调度系统使用。
  • 基于日志的 CDC:
    • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
    • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
    • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据;
    • 工具软件以Flink CDC、阿里巴巴Canal、Debezium为代表。

基于SQL查询增量数据同步原理

我们考虑用SQL如何查询增量数据? 数据有增加、需改、删除
删除数据采用逻辑删除的方式,比如定义一个is_deleted字段标识逻辑删除
如果数据是 UPDATE的,也就是会被修改的,那么 where update_datetime >= last_datetime(调度滚动时间)就是增量数据
如果数据是 APPEND ONLY 的除了用更新时间还可以用where id >= 调度上次last_id

结合任务调度系统
调度时间是每日调度执行一次,那么 last_datetime = 当前调度开始执行时间 – 24小时,延迟就是1天
调度时间是15分钟一次,那么 last_datetime = 当前调度开始执行时间 – 15分钟,延迟就是15分钟

这样就实现了捕获增量数据,从而实现增量同步

DolphinScheduler + Datax 构建离线增量数据同步平台

本实践使用
单机8c16g
DataX 2022-03-01 官网下载
DolphinScheduler 2.0.3(DolphinScheduler的安装过程略,请参考官网)

DolphinScheduler 中设置好DataX环境变量
DolphinScheduler 提供了可视化的作业流程定义,用来离线定时调度DataX Job作业,使用起来很是顺滑

基于SQL查询离线数据同步的用武之地
为什么不用基于日志实时的方式?不是不用,而是根据场合用。考虑到业务实际需求情况,基于SQL查询这种离线的方式也并非完全淘汰了
特别是业务上实时性要求不高,每次调度增量数据没那么大的情况下,不需要分布式架构来负载,这种情况下是比较合适的选择
场景举例:
网站、APP的百万级、千万级的内容搜索,每天几百篇内容新增+修改,搜索上会用到ES(ElasticSearch),那么就需要把 MySQL内容数据增量同步到ES
DataX就能满足需求!

DolphinScheduler中配置DataX MySQL To ElasticSearch工作流

工作流定义

工作流定义 > 创建工作流 > 拖入1个SHELL组件 > 拖入1个DATAX组件
SHELL组件(文章)
脚本

echo '文章同步 MySQL To ElasticSearch'

DATAX组件(t_article)
用到2个插件mysqlreader、elasticsearchwriter^[1]
选 自定义模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
                                ],
                                "querySql": [
                                    "select a.id as pk,a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= '${biz_update_dt}';"
                                ]
                            }
                        ],
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                },
                "writer": {
                    "name": "elasticsearchwriter",
                    "parameter": {
                        "endpoint": "${biz_es_host}",
                        "accessId": "${biz_es_username}",
                        "accessKey": "${biz_es_password}",
                        "index": "t_article",
                        "type": "_doc",
                        "batchSize": 1000,
                        "cleanup": false,
                        "discovery": false,
                        "dynamic": true,
                        "settings": {
                            "index": {
                                "number_of_replicas": 0,
                                "number_of_shards": 1
                            }
                        },
                        "splitter": ",",
                        "column": [
                            {
                                "name": "pk",
                                "type": "id"
                            },
                            {
                                "name": "id",
                                "type": "long"
                            },
                            {
                                "name": "title",
                                "type": "text"
                            },
                            {
                                "name": "content",
                                "type": "text"
                            }
                            {
                                "name": "is_delete",
                                "type": "text"
                            },
                            {
                                "name": "delete_date",
                                "type": "date"
                            },
                            {
                                "name": "create_date",
                                "type": "date"
                            },
                            {
                                "name": "update_date",
                                "type": "date"
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "errorLimit": {
                "percentage": 0,
                "record": 0
            },
            "speed": {
                "channel": 1,
                "record": 1000
            }
        }
    }
}

reader和writer的字段配置需保持一致

自定义参数:

biz_update_dt: ${global_bizdate} 
biz_mysql_host: 你的mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的mysql账号
biz_mysql_password: 你的mysql密码
biz_es_host: 你的es地址带协议和端口 http://127.0.0.1:9200
biz_es_username: 你的es账号
biz_es_password: 你的es密码

配置的自定义参数将会自动替换json模板中的同名变量

reader mysqlreader插件中关键配置: a.update_date >= '${biz_update_dt}' 就是实现增量同步的关键配置
writer elasticsearchwriter插件中关键配置: “

"column": [
    {
        "name": "pk",
        "type": "id"
    },
    ......
]

type = id 这样配置,就把文章主键映射到es主键 _id 从而实现相同主键id重复写入数据,就会更新数据。如果不这样配置数据将会重复导入es中

保存工作流

全局变量设置
global_bizdate: $[yyyy-MM-dd 00:00:00-1]

global_bizdate 引用的变量为 DolphinScheduler 内置变量,具体参考官网文档^[2]
结合调度时间设计好时间滚动的窗口时长,比如按1天增量,那么这里时间就是减1天

最终的工作流DAG图为:

by 流水理鱼|wwek

参考

1. DataX ElasticSearchWriter 插件文档
2. Apache DolphinScheduler 内置参数

[TOC]

0、前言

image-20210606160820147

为什么使用Elasticsearch (以下简称 ES ) ?

全文搜索能力

ES本身就是一个搜索引擎,技术上基于倒排索引实现的搜索系统。我们做业务不必自己去开发一个搜索引擎,

ES已经满足绝大多数企业的搜索场景的需求。拼多多电商在发展前期,搜索业务使用ES支撑。

复杂查询(多维度组合筛选)

假如某个实体表,例如订单日订单量几十万,运营管理后台通常有十多个维度的组合筛选搜索。你可能会通过缩小查询时间范围来把数据量级降下来,MySQL也许也能一战,但是实际业务场景业务人员不太能接受这样一个小时间段筛选。如果稍微扩大时间段,MySQL这时候就无能为力了。那么把订单表做成“宽表”存入ES,十多个维度的搜索对于ES俩说是毫无压力的。ES群集能轻松支持亿级的宽表多维度组合筛选。这种需求在CRM、BOSS等场景是刚需。

滴滴司机端早期日订单量几十万,运营管理后台大时间跨度且多维度组合筛选就是用ES来实现的。

一个产品是否成熟,公有云是否提供基于该产品的服务是一个风向标。那么ES毫无疑问是成熟的产品。

虽然ES这么好,但是我的数据都在MySQL中那么怎么让数据实时同步到ES中呢?

1、基于应用程序多写

直接通过应用程序数据双写到MySQL和ES

image-20210606174354578

记录删除机制:直接删除

一致性: 需要自行处理,需要对失败错误做好日志记录,做好异常告警并人工补偿

优点: 直接明了,能够灵活控制数据写入,延迟最低

缺点: 与业务耦合严重,逻辑要写在业务系统中

应用双写(同步)

直接通过ES API将数据写入到ES集群中,也就是写入数据库的同时调用ES API写入到ES中

这个过程是同步的

应用双写(MQ异步解耦)

对 应用双写(同步)的改进,引入MQ中间件。

把同步变为异步,做了解耦。

同时引入MQ后双写性能提高,解决数据一致性问题。

缺点是会增加延迟性,业务系统增加mq代码,而且多一个MQ中间件要维护

2、基于binlog订阅

binlog订阅的原理很简单,模拟一个MySQL slave 订阅binlog日志,从而实现CDC(change data capture)

CDC,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等。

记录删除机制:直接删除

一致性: 最终一致性

优点: 对业务系统无任何侵入

缺点: 需要维护额外增加的一套数据同步平台;有分钟级的延迟

Canal

img

https://github.com/alibaba/canal/

阿里巴巴 MySQL binlog 增量订阅&消费组件

Databus

https://github.com/linkedin/databus

Linkedin Databus 分布式数据库同步系统

Maxwell

官网:http://maxwells-daemon.io/

https://github.com/zendesk/maxwell

Flink CDC

https://github.com/ververica/flink-cdc-connectors

flink-cdc-connectors 中文教程

基于flink数据计算平台实现 MySQL binlog订阅直接写入es

Flink CDC抛弃掉其他中间件,实现 MySQL 》Flink CDC》ES 非常简洁的数据同步架构

该方式比较新2020年开始的项目,目前在一些实时数仓上有应用

DTS(阿里云)

阿里云的商业产品,具备好的易用性,省运维成本。

CloudCanal

CloudCanal官网

CloudCannal数据同步迁移系统,商业产品

3、基于SQL抽取

基于SQL查询的数据抽取同步

这种方式需要满足2个基本条件

1、MySQL的表必须有唯一键字段(和ES中_id对应)

2、MySQL的表必须有一个“修改时间”字段,该记录任何一个字段修改都需要更新“修改时间”

有了唯一键字段就可以知道修改某条记录后同步哪条ES记录,有了修改时间字段就可以知道同步到哪儿了。

满足了这2个基本条件这样就可实现增量实时同步。

记录删除机制:逻辑删除,在MySQL中增加逻辑删除字段,ES搜索时过滤状态

一致性: 依赖修改时间字段;延迟时间等于计划任务多久执行一次

优点: 对业务系统无任何侵入,简单方便;课用JOIN打宽表

缺点: MySQL承受查询压力;需要业务中满足2个基本条件

logstash

我们使用 Logstash 和 JDBC 输入插件来让 Elasticsearch 与 MySQL 保持同步。从概念上讲,Logstash 的 JDBC 输入插件会运行一个循环来定期对 MySQL 进行轮询,从而找出在此次循环的上次迭代后插入或更改的记录。如要让其正确运行,必须满足下列条件:

  1. 在将 MySQL 中的文档写入 Elasticsearch 时,Elasticsearch 中的 “_id” 字段必须设置为 MySQL 中的 “id” 字段。这可在 MySQL 记录与 Elasticsearch 文档之间建立一个直接映射关系。如果在 MySQL 中更新了某条记录,那么将会在 Elasticsearch 中覆盖整条相关记录。请注意,在 Elasticsearch 中覆盖文档的效率与更新操作的效率一样高,因为从内部原理上来讲,更新便包括删除旧文档以及随后对全新文档进行索引。
  2. 当在 MySQL 中插入或更新数据时,该条记录必须有一个包含更新或插入时间的字段。通过此字段,便可允许 Logstash 仅请求获得在轮询循环的上次迭代后编辑或插入的文档。Logstash 每次对 MySQL 进行轮询时,都会保存其从 MySQL 所读取最后一条记录的更新或插入时间。在下一次迭代时,Logstash 便知道其仅需请求获得符合下列条件的记录:更新或插入时间晚于在轮询循环中的上一次迭代中所收到的最后一条记录。
input {
  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => <my username>
    jdbc_password => <my password>
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
output {
  # stdout { codec =>  "rubydebug"}
  elasticsearch {
      index => "rdbms_sync_idx"
      document_id => "%{[@metadata][_id]}"
  }
}

配置Logstash的计划任务,定时执行

4、总结

前期建议采用 基于SQL抽取的方式做同步,后期数据量大了建议采用基于binlog订阅的方式同步。

如果本身有现成的Flink平台可用,推荐使用Flink CDC。

什么是最佳的 MySQL 同步 ElasticSearch 方案?

答案是选择缺点可以接受,又满足需求,拥有成本最低的方案。

“完美”的方案往往拥有成本会比较高,所以需要结合业务环境的上下文去选择。

流水理鱼觉得没有一招鲜的方案,因为每种方案都有利弊,所以选取适合你当下业务环境的方案。那么这样的方案就是最佳方案。

5、参考

MySQL 数据实时同步到 Elasticsearch 的技术方案选型和思考 by 万凯明

如何使用 Logstash 和 JDBC 确保 Elasticsearch 与关系型数据库保持同步

监听mysql的binlog日志工具分析:canal、Maxwell、Databus、DTS

1.简介

使用微创联合M5S空气检测仪、树莓派3b+、prometheus、grafana实现空气质量持续监控告警WEB可视化

grafana dashboard效果:

image-20201031171328947

2.背景

2.1 需求:

1.新办公地址的甲醛(HCHO)、异味(TVOC)做个监测

2.能够把这些空气指标进行WEB展示

3.监控告警关心的空气指标,告警发送到钉钉上

2.2 已有硬件:

树莓派3b+ (长期吃灰中)以下简称树莓派

微创联合M5S空气检测仪(版本:M5S温湿+锂电+TVOC+数据导出 17年485元价格购买 )以下简称空气检测仪

M5S 家用 激光PM2.5检测仪 甲醛 CO2 空气质量 雾霾甲醛 检测仪

【在售价】380.00 元(基础版)

【立即下单】点击链接立即下单:https://s.click.taobao.com/zREU4vu

img

3.设计

把空气检测仪的usb插到树莓派上(树莓派上通过串口读取数据),供电也是通过树莓派供电

相关软件运行在树莓派上

3.1 硬件:

空气检测仪如何导出数据?

空气检测仪是有数据导出版本,自带串口转USB(ch340芯片)

打开空气检测仪并通过检测仪左侧 FUN 按钮,把屏幕切换到第 3 屏,这个时候串口中就会持续输出监控数据

在linux上设备文件符为“/dev/ttyUSB0”

cat /dev/ttyUSB0
#可以看到
24.9 45.2 23 32 35 26 33 35 4419 1301 159 8 3 0 0.006 0.21

读取过程中,按一下检测仪上的 RST 键,可以显示每列数据对应的字段定义,(按完后需要再按 FUN 键切换到第 3 屏

*--------------------------------------- Data Output Format Definition ----------------------------------------*
TEMP HUMI CH_PM1.0 CH_PM2.5 CH_PM10 US_PM1.0 US_PM2.5 US_PM10 >0.3um >0.5um >1.0um >2.5um >5.0um >10um HCHO TVOC
TEMP HUMI CH_PM1.0 CH_PM2.5 CH_PM10 US_PM1.0 US_PM2.5 US_PM10 >0.3um >0.5um >1.0um >2.5um >5.0um >10um HCHO TVOC
25.1 43.3 21 29 31 22 29 31 3843 1136 170 6 2 0 0.003 0.2
25.1 43.2 22 30 31 23 30 31 3876 1156 170 5 2 0 0.005 0.16
25.1 43.3 22 30 31 23 30 31 3963 1174 164 6 2 0 0.003 0.21
25.2 43.3 22 30 31 23 30 31 3942 1163 167 6 2 0 0.007 0.21

带二氧化碳检测的版本在HCHO字段前多一个CO2字段

更多资料看空气检测仪配套的资料

3.2 软件:

软件监控采用prometheus + grafana的方案

本文为流水理鱼wwek原创 www.iamle.com

现在只需要实现一个串口(/dev/ttyUSB0) exporter即可把空气监测仪的数据打通给prometheus使用

最后配置prometheus取空气检测仪的exporter数据,并配置grafana面板

监控告警直接使用grafana带的监控告警

4.实现

*安装配置wclh_air_detector_exporter 获得空气检测仪数据

wclh_air_detector_exporter读取串口数据并把数据进行结构化,然后输出metrics

M5S Temperature and Humidity+lithium battery+CO2+TVOC PM2.5 CO2(S8)TEMP&HUMI Detector Haze PM2.5 sensors Laser PM2.5 detector

M5S 家用 激光PM2.5检测仪 甲醛 CO2 空气质量 雾霾甲醛 检测仪

WCLH_AIR_DETECTOR_EXPORTER_VERSION=0.1.3
wget https://github.com/wwek/wclh_air_detector_exporter/releases/download/v${WCLH_AIR_DETECTOR_EXPORTER_VERSION}/wclh_air_detector_exporter_${WCLH_AIR_DETECTOR_EXPORTER_VERSION}_linux_armv7.tar.gz
tar zxvf wclh_air_detector_exporter_${WCLH_AIR_DETECTOR_EXPORTER_VERSION}_linux_armv7.tar.gz
mkdir -p /data/soft/wclh_air_detector_exporter
mv wclh_air_detector_exporter /data/soft/wclh_air_detector_exporter
cd /data/soft/wclh_air_detector_exporter

#./wclh_air_detector_exporter -serial_port /dev/ttyUSB0

#自动启动&进程守护
sudo bash -c 'cat > /etc/systemd/system/wclh_air_detector_exporter.service << EOF [Unit] Description=https://github.com/wwek/wclh_air_detector_exporter Wants=network-online.target After=network-online.target [Service] Restart=on-failure #User=root ExecStart=/data/soft/wclh_air_detector_exporter/wclh_air_detector_exporter [Install] WantedBy=default.target EOF' sudo systemctl daemon-reload sudo systemctl status wclh_air_detector_exporter sudo systemctl start wclh_air_detector_exporter sudo systemctl enable wclh_air_detector_exporter sudo systemctl status wclh_air_detector_exporter curl http://localhost:9166/metrics ``` ## 安装配置prometheus ``` PROMETHEUS_VERSION=2.22.0 wget https://github.com/prometheus/prometheus/releases/download/v${PROMETHEUS_VERSION}/prometheus-${PROMETHEUS_VERSION}.linux-armv7.tar.gz tar zxvf prometheus-${PROMETHEUS_VERSION}.linux-armv7.tar.gz mkdir -p /data/soft/ mv prometheus-${PROMETHEUS_VERSION}.linux-armv7 prometheus &amp;&amp; mv prometheus /data/soft/ #自动启动&amp;进程守护 sudo bash -c 'cat &gt; /etc/systemd/system/prometheus.service &lt;&lt; EOF
[Unit]
Description=https://prometheus.io
Wants=network-online.target
After=network-online.target

[Service]
Restart=on-failure
#User=root
ExecStart=/data/soft/prometheus/prometheus --config.file="/data/soft/prometheus/prometheus.yml"

[Install]
WantedBy=default.target
EOF'

sudo systemctl daemon-reload
sudo systemctl status prometheus
sudo systemctl start prometheus
sudo systemctl enable prometheus
sudo systemctl status prometheus

curl http://localhost:9090
#<a href="/graph">Found</a>.

安装配置grafana

sudo apt-get install -y apt-transport-https
sudo apt-get install -y software-properties-common wget
wget -q -O - https://packages.grafana.com/gpg.key | sudo apt-key add -
echo "deb https://packages.grafana.com/oss/deb stable main" | sudo tee -a /etc/apt/sources.list.d/grafana.list
sudo apt-get update
sudo apt-get install grafana
sudo systemctl daemon-reload
sudo systemctl start grafana-server
sudo systemctl enable grafana-server
sudo systemctl status grafana-server

curl http://localhost:3000
#<a href="/login">Found</a>.

grafana 中先配置 prometheus(http://localhost:9090)数据源,然后导入 “grafana-dashboard.json”空气检测仪的dashboard

监控告警

直接使用grafana自带的告警功能,将关心的指标进行监控并告警