[TOC]
0、前言
为什么使用Elasticsearch (以下简称 ES ) ?
全文搜索能力
ES本身就是一个搜索引擎,技术上基于倒排索引实现的搜索系统。我们做业务不必自己去开发一个搜索引擎,
ES已经满足绝大多数企业的搜索场景的需求。拼多多电商在发展前期,搜索业务使用ES支撑。
复杂查询(多维度组合筛选)
假如某个实体表,例如订单日订单量几十万,运营管理后台通常有十多个维度的组合筛选搜索。你可能会通过缩小查询时间范围来把数据量级降下来,MySQL也许也能一战,但是实际业务场景业务人员不太能接受这样一个小时间段筛选。如果稍微扩大时间段,MySQL这时候就无能为力了。那么把订单表做成“宽表”存入ES,十多个维度的搜索对于ES俩说是毫无压力的。ES群集能轻松支持亿级的宽表多维度组合筛选。这种需求在CRM、BOSS等场景是刚需。
滴滴司机端早期日订单量几十万,运营管理后台大时间跨度且多维度组合筛选就是用ES来实现的。
一个产品是否成熟,公有云是否提供基于该产品的服务是一个风向标。那么ES毫无疑问是成熟的产品。
虽然ES这么好,但是我的数据都在MySQL中那么怎么让数据实时同步到ES中呢?
1、基于应用程序多写
直接通过应用程序数据双写到MySQL和ES
记录删除机制:直接删除
一致性: 需要自行处理,需要对失败错误做好日志记录,做好异常告警并人工补偿
优点: 直接明了,能够灵活控制数据写入,延迟最低
缺点: 与业务耦合严重,逻辑要写在业务系统中
应用双写(同步)
直接通过ES API将数据写入到ES集群中,也就是写入数据库的同时调用ES API写入到ES中
这个过程是同步的
应用双写(MQ异步解耦)
对 应用双写(同步)的改进,引入MQ中间件。
把同步变为异步,做了解耦。
同时引入MQ后双写性能提高,解决数据一致性问题。
缺点是会增加延迟性,业务系统增加mq代码,而且多一个MQ中间件要维护
2、基于binlog订阅
binlog订阅的原理很简单,模拟一个MySQL slave 订阅binlog日志,从而实现CDC(change data capture)
CDC,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等。
记录删除机制:直接删除
一致性: 最终一致性
优点: 对业务系统无任何侵入
缺点: 需要维护额外增加的一套数据同步平台;有分钟级的延迟
Canal
https://github.com/alibaba/canal/
阿里巴巴 MySQL binlog 增量订阅&消费组件
Databus
https://github.com/linkedin/databus
Linkedin Databus 分布式数据库同步系统
Maxwell
https://github.com/zendesk/maxwell
Flink CDC
https://github.com/ververica/flink-cdc-connectors
基于flink数据计算平台实现 MySQL binlog订阅直接写入es
Flink CDC抛弃掉其他中间件,实现 MySQL 》Flink CDC》ES 非常简洁的数据同步架构
该方式比较新2020年开始的项目,目前在一些实时数仓上有应用
DTS(阿里云)
阿里云的商业产品,具备好的易用性,省运维成本。
CloudCanal
CloudCannal数据同步迁移系统,商业产品
3、基于SQL抽取
基于SQL查询的数据抽取同步
这种方式需要满足2个基本条件
1、MySQL的表必须有唯一键字段(和ES中_id对应)
2、MySQL的表必须有一个“修改时间”字段,该记录任何一个字段修改都需要更新“修改时间”
有了唯一键字段就可以知道修改某条记录后同步哪条ES记录,有了修改时间字段就可以知道同步到哪儿了。
满足了这2个基本条件这样就可实现增量实时同步。
记录删除机制:逻辑删除,在MySQL中增加逻辑删除字段,ES搜索时过滤状态
一致性: 依赖修改时间字段;延迟时间等于计划任务多久执行一次
优点: 对业务系统无任何侵入,简单方便;课用JOIN打宽表
缺点: MySQL承受查询压力;需要业务中满足2个基本条件
logstash
我们使用 Logstash 和 JDBC 输入插件来让 Elasticsearch 与 MySQL 保持同步。从概念上讲,Logstash 的 JDBC 输入插件会运行一个循环来定期对 MySQL 进行轮询,从而找出在此次循环的上次迭代后插入或更改的记录。如要让其正确运行,必须满足下列条件:
- 在将 MySQL 中的文档写入 Elasticsearch 时,Elasticsearch 中的 “_id” 字段必须设置为 MySQL 中的 “id” 字段。这可在 MySQL 记录与 Elasticsearch 文档之间建立一个直接映射关系。如果在 MySQL 中更新了某条记录,那么将会在 Elasticsearch 中覆盖整条相关记录。请注意,在 Elasticsearch 中覆盖文档的效率与更新操作的效率一样高,因为从内部原理上来讲,更新便包括删除旧文档以及随后对全新文档进行索引。
- 当在 MySQL 中插入或更新数据时,该条记录必须有一个包含更新或插入时间的字段。通过此字段,便可允许 Logstash 仅请求获得在轮询循环的上次迭代后编辑或插入的文档。Logstash 每次对 MySQL 进行轮询时,都会保存其从 MySQL 所读取最后一条记录的更新或插入时间。在下一次迭代时,Logstash 便知道其仅需请求获得符合下列条件的记录:更新或插入时间晚于在轮询循环中的上一次迭代中所收到的最后一条记录。
input {
jdbc {
jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
jdbc_user => <my username>
jdbc_password => <my password>
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
# stdout { codec => "rubydebug"}
elasticsearch {
index => "rdbms_sync_idx"
document_id => "%{[@metadata][_id]}"
}
}
配置Logstash的计划任务,定时执行
4、总结
前期建议采用 基于SQL抽取的方式做同步,后期数据量大了建议采用基于binlog订阅的方式同步。
如果本身有现成的Flink平台可用,推荐使用Flink CDC。
什么是最佳的 MySQL 同步 ElasticSearch 方案?
答案是选择缺点可以接受,又满足需求,拥有成本最低的方案。
“完美”的方案往往拥有成本会比较高,所以需要结合业务环境的上下文去选择。
流水理鱼觉得没有一招鲜的方案,因为每种方案都有利弊,所以选取适合你当下业务环境的方案。那么这样的方案就是最佳方案。
5、参考
MySQL 数据实时同步到 Elasticsearch 的技术方案选型和思考 by 万凯明
为啥不推荐logstash呢
如果同步任务不是太多10来个,而且也不需要经常改,那么logstash这种手工编辑配置文件的方式也能接受
如果任务较多,也有比较多的变更,那么有Web GUI的调度系统来操作会更容易维护
学习了,给朋友看看