[TOC]

0、前言

image-20210606160820147

为什么使用Elasticsearch (以下简称 ES ) ?

全文搜索能力

ES本身就是一个搜索引擎,技术上基于倒排索引实现的搜索系统。我们做业务不必自己去开发一个搜索引擎,

ES已经满足绝大多数企业的搜索场景的需求。拼多多电商在发展前期,搜索业务使用ES支撑。

复杂查询(多维度组合筛选)

假如某个实体表,例如订单日订单量几十万,运营管理后台通常有十多个维度的组合筛选搜索。你可能会通过缩小查询时间范围来把数据量级降下来,MySQL也许也能一战,但是实际业务场景业务人员不太能接受这样一个小时间段筛选。如果稍微扩大时间段,MySQL这时候就无能为力了。那么把订单表做成“宽表”存入ES,十多个维度的搜索对于ES俩说是毫无压力的。ES群集能轻松支持亿级的宽表多维度组合筛选。这种需求在CRM、BOSS等场景是刚需。

滴滴司机端早期日订单量几十万,运营管理后台大时间跨度且多维度组合筛选就是用ES来实现的。

一个产品是否成熟,公有云是否提供基于该产品的服务是一个风向标。那么ES毫无疑问是成熟的产品。

虽然ES这么好,但是我的数据都在MySQL中那么怎么让数据实时同步到ES中呢?

1、基于应用程序多写

直接通过应用程序数据双写到MySQL和ES

image-20210606174354578

记录删除机制:直接删除

一致性: 需要自行处理,需要对失败错误做好日志记录,做好异常告警并人工补偿

优点: 直接明了,能够灵活控制数据写入,延迟最低

缺点: 与业务耦合严重,逻辑要写在业务系统中

应用双写(同步)

直接通过ES API将数据写入到ES集群中,也就是写入数据库的同时调用ES API写入到ES中

这个过程是同步的

应用双写(MQ异步解耦)

对 应用双写(同步)的改进,引入MQ中间件。

把同步变为异步,做了解耦。

同时引入MQ后双写性能提高,解决数据一致性问题。

缺点是会增加延迟性,业务系统增加mq代码,而且多一个MQ中间件要维护

2、基于binlog订阅

binlog订阅的原理很简单,模拟一个MySQL slave 订阅binlog日志,从而实现CDC(change data capture)

CDC,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等。

记录删除机制:直接删除

一致性: 最终一致性

优点: 对业务系统无任何侵入

缺点: 需要维护额外增加的一套数据同步平台;有分钟级的延迟

Canal

img

https://github.com/alibaba/canal/

阿里巴巴 MySQL binlog 增量订阅&消费组件

Databus

https://github.com/linkedin/databus

Linkedin Databus 分布式数据库同步系统

Maxwell

官网:http://maxwells-daemon.io/

https://github.com/zendesk/maxwell

Flink CDC

https://github.com/ververica/flink-cdc-connectors

flink-cdc-connectors 中文教程

基于flink数据计算平台实现 MySQL binlog订阅直接写入es

Flink CDC抛弃掉其他中间件,实现 MySQL 》Flink CDC》ES 非常简洁的数据同步架构

该方式比较新2020年开始的项目,目前在一些实时数仓上有应用

DTS(阿里云)

阿里云的商业产品,具备好的易用性,省运维成本。

CloudCanal

CloudCanal官网

CloudCannal数据同步迁移系统,商业产品

3、基于SQL抽取

基于SQL查询的数据抽取同步

这种方式需要满足2个基本条件

1、MySQL的表必须有唯一键字段(和ES中_id对应)

2、MySQL的表必须有一个“修改时间”字段,该记录任何一个字段修改都需要更新“修改时间”

有了唯一键字段就可以知道修改某条记录后同步哪条ES记录,有了修改时间字段就可以知道同步到哪儿了。

满足了这2个基本条件这样就可实现增量实时同步。

记录删除机制:逻辑删除,在MySQL中增加逻辑删除字段,ES搜索时过滤状态

一致性: 依赖修改时间字段;延迟时间等于计划任务多久执行一次

优点: 对业务系统无任何侵入,简单方便;课用JOIN打宽表

缺点: MySQL承受查询压力;需要业务中满足2个基本条件

logstash

我们使用 Logstash 和 JDBC 输入插件来让 Elasticsearch 与 MySQL 保持同步。从概念上讲,Logstash 的 JDBC 输入插件会运行一个循环来定期对 MySQL 进行轮询,从而找出在此次循环的上次迭代后插入或更改的记录。如要让其正确运行,必须满足下列条件:

  1. 在将 MySQL 中的文档写入 Elasticsearch 时,Elasticsearch 中的 “_id” 字段必须设置为 MySQL 中的 “id” 字段。这可在 MySQL 记录与 Elasticsearch 文档之间建立一个直接映射关系。如果在 MySQL 中更新了某条记录,那么将会在 Elasticsearch 中覆盖整条相关记录。请注意,在 Elasticsearch 中覆盖文档的效率与更新操作的效率一样高,因为从内部原理上来讲,更新便包括删除旧文档以及随后对全新文档进行索引。
  2. 当在 MySQL 中插入或更新数据时,该条记录必须有一个包含更新或插入时间的字段。通过此字段,便可允许 Logstash 仅请求获得在轮询循环的上次迭代后编辑或插入的文档。Logstash 每次对 MySQL 进行轮询时,都会保存其从 MySQL 所读取最后一条记录的更新或插入时间。在下一次迭代时,Logstash 便知道其仅需请求获得符合下列条件的记录:更新或插入时间晚于在轮询循环中的上一次迭代中所收到的最后一条记录。
input {
  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => <my username>
    jdbc_password => <my password>
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
output {
  # stdout { codec =>  "rubydebug"}
  elasticsearch {
      index => "rdbms_sync_idx"
      document_id => "%{[@metadata][_id]}"
  }
}

配置Logstash的计划任务,定时执行

4、总结

前期建议采用 基于SQL抽取的方式做同步,后期数据量大了建议采用基于binlog订阅的方式同步。

如果本身有现成的Flink平台可用,推荐使用Flink CDC。

什么是最佳的 MySQL 同步 ElasticSearch 方案?

答案是选择缺点可以接受,又满足需求,拥有成本最低的方案。

“完美”的方案往往拥有成本会比较高,所以需要结合业务环境的上下文去选择。

流水理鱼觉得没有一招鲜的方案,因为每种方案都有利弊,所以选取适合你当下业务环境的方案。那么这样的方案就是最佳方案。

5、参考

MySQL 数据实时同步到 Elasticsearch 的技术方案选型和思考 by 万凯明

如何使用 Logstash 和 JDBC 确保 Elasticsearch 与关系型数据库保持同步

监听mysql的binlog日志工具分析:canal、Maxwell、Databus、DTS

需求

当前php-fpm进程是不是不够了,一通命令一打,也只能看当前情况。
熟话说无监控无度量就无诊断。
市面上有成熟的php-fpm status状态采集,展示工具那就是elasti stack全家桶
目标采集分析php-fpm status 状态数据

选型

Elastic公司全家桶之Beats(PHP-FPM Module@Metricbeat) + Elasticsearch + Kibana

Metricbeat 是 Beats 的一个采集组件
PHP-FPM Module 是 Metricbeat 的一个采集模块

整体的数据流方式

方式1 : Beats (Metricbeat )>Elasticsearch>Kibana
方式2: Beats (Metricbeat )>Logstash>[直连,redis队列,Kafka队列]>Elasticsearch>Kibana

先讲方式1

配置php-fpm

php-fpm.conf

[www]
...
pm.status_path = /phpfpm-status-www

[u]
...
pm.status_path = /phpfpm-status-u

[www] [u] 是独立的php-fpm pool 进程池,每个池子的pm.status_path是需要单独设置的

测试配置文件是否正确

/usr/local/php/sbin/php-fpm -t
[29-Mar-2017 16:15:08] NOTICE: configuration file /usr/local/php/etc/php-fpm.conf test is successful

重启php-fpm

/etc/init.d/php-fpm reload
Reload service php-fpm done

配置nginx

phpfpm.conf

server
{
  listen 80;
  server_name localhost;
  location ~ ^/(phpfpm-status-www|phpstatuswww)$
  {
    fastcgi_pass unix:/tmp/php-cgi.sock;
    include fastcgi.conf;
    fastcgi_param SCRIPT_FILENAME $fastcgi_script_name;
  }
  location ~ ^/(phpfpm-status-u|phpstatusu)$
  {
    fastcgi_pass unix:/tmp/u-php-cgi.sock;
    include fastcgi.conf;
    fastcgi_param SCRIPT_FILENAME $fastcgi_script_name;
  }
}

只给本地访问,这里监听的是localhost,也可用nginx allow deny的方式

测试配置文件是否正确

/usr/local/nginx/sbin/nginx -t
nginx: the configuration file /usr/local/nginx/conf/nginx.conf syntax is ok
nginx: configuration file /usr/local/nginx/conf/nginx.conf test is successful

重启nginx

/etc/init.d/nginx reload
Reloading nginx daemon configuration....

用curl测试php-fpm status状态是否可以获取

curl http://localhost/phpfpm-status-www
pool: www
process manager: static
start time: 29/Mar/2017:16:19:09 +0800
start since: 75
accepted conn: 900
listen queue: 0
max listen queue: 0
listen queue len: 0
idle processes: 696
active processes: 4
total processes: 700
max active processes: 60
max children reached: 0
slow requests: 1

配置Metricbeat

Metricbeat Reference [5.3] » Modules » PHP-FPM Module

/etc/metricbeat/metricbeat.yml

#========================== Modules configuration ============================
metricbeat.modules:

#------------------------------- PHP-FPM Module -------------------------------
- module: php_fpm
  metricsets: ["pool"]
  enabled: true
  period: 10s
  status_path: "/phpfpm-status-www"
  hosts: ["localhost:80"]

- module: php_fpm
  metricsets: ["pool"]
  enabled: true
  period: 10s
  status_path: "/phpfpm-status-u"
  hosts: ["localhost:80"]

#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:
# Array of hosts to connect to.
  hosts: ["localhost:9200"]

测试配置文件是否正确

/usr/share/metricbeat/bin/metricbeat -configtest
Config OK

给metricbeat创建Elasticsearch的Index Template
Loading the Index Template in Elasticsearch

curl -XPUT 'http://localhost:9200/_template/metricbeat' -d@/etc/metricbeat/metricbeat.template.json

给metricbeat创建Kibana Dashboards
Loading Sample Kibana Dashboards

./scripts/import_dashboards -es http://localhost:9200

metricbeat中phpfpm模块,没有elastic官方的 Sample Kibana Dashboard
所以需要分析什么数据的自己在Kibana中自己创建visualize然后做成Dashboard

启动metricbeat

/etc/init.d/metricbeat start

配置Kibana

Management / Kibana / Indices > Add New
Index name or pattern 填写 “metricbeat-*”
Time-field name 选 “@timestamp”
Alt text

分享我做的 metricbeat phpfpm kibana dashboard (不完整)
export.json

[
  {
    "_id": "PHP-FPM导航",
    "_type": "visualization",
    "_source": {
      "title": "PHPFPM导航",
      "visState": "{\n  \"title\": \"PHP-FPM导航\",\n  \"type\": \"markdown\",\n  \"params\": {\n    \"markdown\": \"- [Overview](#/dashboard/Metricbeat-phpfpm-overview)\\n\\n\\n```\\npool:php-fpm池的名称,一般都是应该是www\\nprocess manage:进程的管理方法,php-fpm支持三种管理方法,分别是static,dynamic和ondemand,一般情况下都是dynamic\\nstart time:php-fpm启动时候的时间,不管是restart或者reload都会更新这里的时间\\nstart since:php-fpm自启动起来经过的时间,默认为秒\\naccepted conn:当前接收的连接数\\nlisten queue:在队列中等待连接的请求个数,如果这个数字为非0,那么最好增加进程的fpm个数\\nmax listen queue:从fpm启动以来,在队列中等待连接请求的最大值\\nlisten queue len:等待连接的套接字队列大小\\nidle processes:空闲的进程个数\\nactive processes:活动的进程个数\\ntotal processes:总共的进程个数\\nmax active processes:从fpm启动以来,活动进程的最大个数,如果这个值小于当前的max_children,可以调小此值\\nmax children reached:当pm尝试启动更多的进程,却因为max_children的限制,没有启动更多进程的次数。如果这个值非0,那么可以适当增加fpm的进程数\\nslow requests:慢请求的次数,一般如果这个值未非0,那么可能会有慢的php进程,一般一个不好的mysql查询是最大的祸首。\\n```\"\n  },\n  \"aggs\": [],\n  \"listeners\": {}\n}",
      "uiStateJSON": "{}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\n  \"query\": {\n    \"query_string\": {\n      \"query\": \"*\",\n      \"analyze_wildcard\": true\n    }\n  },\n  \"filter\": []\n}"
      }
    }
  },
  {
    "_id": "PHPFPM-processes-active",
    "_type": "visualization",
    "_source": {
      "title": "PHPFPM processes active",
      "visState": "{\"title\":\"PHPFPM processes active\",\"type\":\"area\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"smoothLines\":false,\"scale\":\"linear\",\"interpolate\":\"linear\",\"mode\":\"stacked\",\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false,\"yAxis\":{}},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"avg\",\"schema\":\"metric\",\"params\":{\"field\":\"php_fpm.pool.processes.active\",\"customLabel\":\"活动\"}},{\"id\":\"2\",\"enabled\":true,\"type\":\"date_histogram\",\"schema\":\"segment\",\"params\":{\"field\":\"@timestamp\",\"interval\":\"auto\",\"customInterval\":\"2h\",\"min_doc_count\":1,\"extended_bounds\":{},\"customLabel\":\"时间\"}},{\"id\":\"3\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"group\",\"params\":{\"field\":\"php_fpm.pool.name\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\",\"customLabel\":\"pool池\"}},{\"id\":\"4\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"split\",\"params\":{\"field\":\"beat.name\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\",\"customLabel\":\"主机\",\"row\":true}}],\"listeners\":{}}",
      "uiStateJSON": "{}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"metricbeat-*\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  },
  {
    "_id": "PHPFPM-connections-queued",
    "_type": "visualization",
    "_source": {
      "title": "PHPFPM connections queued",
      "visState": "{\"title\":\"PHPFPM connections queued\",\"type\":\"area\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"smoothLines\":false,\"scale\":\"linear\",\"interpolate\":\"linear\",\"mode\":\"stacked\",\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false,\"yAxis\":{}},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"avg\",\"schema\":\"metric\",\"params\":{\"field\":\"php_fpm.pool.connections.queued\",\"customLabel\":\"队列\"}},{\"id\":\"2\",\"enabled\":true,\"type\":\"date_histogram\",\"schema\":\"segment\",\"params\":{\"field\":\"@timestamp\",\"interval\":\"auto\",\"customInterval\":\"2h\",\"min_doc_count\":1,\"extended_bounds\":{},\"customLabel\":\"时间\"}},{\"id\":\"3\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"group\",\"params\":{\"field\":\"php_fpm.pool.name\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\",\"customLabel\":\"pool池\"}},{\"id\":\"4\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"split\",\"params\":{\"field\":\"beat.name\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\",\"customLabel\":\"主机\",\"row\":true}}],\"listeners\":{}}",
      "uiStateJSON": "{}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"metricbeat-*\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  },
  {
    "_id": "PHPFPM-processes-idle",
    "_type": "visualization",
    "_source": {
      "title": "PHPFPM processes idle",
      "visState": "{\"title\":\"PHPFPM processes idle\",\"type\":\"area\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"smoothLines\":false,\"scale\":\"linear\",\"interpolate\":\"linear\",\"mode\":\"stacked\",\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false,\"yAxis\":{}},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"avg\",\"schema\":\"metric\",\"params\":{\"field\":\"php_fpm.pool.processes.idle\",\"customLabel\":\"空闲\"}},{\"id\":\"2\",\"enabled\":true,\"type\":\"date_histogram\",\"schema\":\"segment\",\"params\":{\"field\":\"@timestamp\",\"interval\":\"auto\",\"customInterval\":\"2h\",\"min_doc_count\":1,\"extended_bounds\":{},\"customLabel\":\"时间\"}},{\"id\":\"4\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"group\",\"params\":{\"field\":\"php_fpm.pool.name\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\",\"customLabel\":\"pool池\"}},{\"id\":\"3\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"split\",\"params\":{\"field\":\"beat.name\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\",\"customLabel\":\"主机\",\"row\":true}}],\"listeners\":{}}",
      "uiStateJSON": "{}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"metricbeat-*\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  },
  {
    "_id": "PHPFPM-slow_requests",
    "_type": "visualization",
    "_source": {
      "title": "PHPFPM slow_requests",
      "visState": "{\"title\":\"PHPFPM slow_requests\",\"type\":\"area\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"smoothLines\":false,\"scale\":\"linear\",\"interpolate\":\"linear\",\"mode\":\"stacked\",\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false,\"yAxis\":{}},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"avg\",\"schema\":\"metric\",\"params\":{\"field\":\"php_fpm.pool.slow_requests\"}},{\"id\":\"2\",\"enabled\":true,\"type\":\"date_histogram\",\"schema\":\"segment\",\"params\":{\"field\":\"@timestamp\",\"interval\":\"auto\",\"customInterval\":\"2h\",\"min_doc_count\":1,\"extended_bounds\":{},\"customLabel\":\"时间\"}},{\"id\":\"4\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"group\",\"params\":{\"field\":\"php_fpm.pool.name\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\",\"customLabel\":\"pool池\"}},{\"id\":\"3\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"split\",\"params\":{\"field\":\"beat.name\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\",\"customLabel\":\"主机\",\"row\":true}}],\"listeners\":{}}",
      "uiStateJSON": "{}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"metricbeat-*\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  }
]

补充logstash采集数据(方式2)

配置Metricbeat

/etc/metricbeat/metricbeat.yml

#================================ Outputs =====================================
# The Logstash hosts
output.logstash:

  hosts: ["localhost:5044"]

output输出到logstash的input tcp 5044上
input输入参考上面方式1 Metricbeat 的配置方法

配置logstash2.x

Logstash Reference [2.4] » Input plugins » beats

/etc/logstash/conf.d/beats.conf

input {
    beats {
        host => "127.0.0.1"
        port => 5044
    }
}
filter {
}
output {
    ... 输出到kafka 或者reids 或者 elasticsearch等,具体配置方法看官方文档
}

input输入tcp 5044 监听本机
output输出,根据自己环境来

测试配置文件是否正确

/opt/logstash/bin/logstash -t
Configuration OK

启动logstash

/etc/init.d/logstash start

提示

  • 使用测试配置文件的功能测试配置文件是否书写正确
  • 使用tail -f 日志的方式查错
  • 注意tcp监听的安全问题,别暴露到公网IP上

Elastic Stack5.0 5.x elk5.x部署疑问解答

如何给Kibana增加帐号认证?

Kibana5.0以下一般使用nignx反向代理,在nginx配置中增加HTTP Auth Basic
Kibana5.0起,官方提供X-Pack插件方案提供安全认证功能
在Kibana和Elasticsearch中安装X-Pack插件
官方文档
X-Pack for the Elastic Stack » Installing X-Pack
https://www.elastic.co/guide/en/x-pack/current/installing-xpack.html
默认的帐号密码
帐号:elastic
密码:changeme
可在Kibana中 Management > Users 可视化管理
修改密码后,在/etc/kibana/kibana.yml配置文件中也要同步修改
elasticsearch.username: “elastic”
elasticsearch.password: “changeme”

在Kibana5.0 和Elasticsearch5.0上安装了X-Pack插件后启用了安全帐号密码,那么logstash output如何配置?

这个太坑了,文档不太好找,官方文档藏的太深
官方文档
X-Pack for the Elastic Stack » Securing Elasticsearch and Kibana » Tribe, Clients and Integrations » Logstash and Security
https://www.elastic.co/guide/en/x-pack/current/logstash.html
帐号密码存在es中,可以直接在kibana的Management中配置
然后在/etc/logstash/conf.d/logstash.conf 配置文件中

output {
    elasticsearch {
        hosts => ["你的esip"]
        index => "logstash-%{+YYYY.MM.dd}"
        document_type => "你的type"
        template_overwrite => true
        user => 你的帐号
        password => 你的密码
    }
}

最后重启logstash
提示:如果重启了没生效,用kill把logstash强制关闭,再开启。 调试logstash配置的,可以用tail -f /var/log/logstash/xxxx日志,观察日志文件是你解决问题的开始

介绍

ELK是业界标准的日志采集,存储索引,展示分析系统解决方案
logstash提供了灵活多样的插件支持不同的input/output
主流使用redis/kafka作为日志/消息的中间环节
如果已有kafka的环境了,使用kafka比使用redis更佳
以下是一个最简化的配置做个笔记,elastic官网提供了非常丰富的文档
不要用搜索引擎去搜索,没多少结果的,请直接看官网文档

采用的ELK/kafka版本

elasticsearch-2.x
logstash-2.3
kibana-4.5.1

Kafka 0.9.0.1

应用/网络 环境

Nginx机
10.0.0.1

Kafka群集
10.0.0.11
10.0.0.12
10.0.0.13

ElasticSearch机
10.0.0.21

整体说明

数据流向

日志/消息整体流向
logstash => kafka => logstash => elasticsearch => kibana

安装

elk所有安装都可以使用rpm二进制包的方式,增加elastic官网的仓库repo就可以用yum安装了

elasticsearch看这里
https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-repositories.html

logstash看这里
https://www.elastic.co/guide/en/logstash/current/installing-logstash.html

kibana看这里
https://www.elastic.co/guide/en/kibana/current/setup.html

安装概览

nginx机 10.0.0.1
运行nginx的日志格式化为json
运行logstash输入input从nginx json,输出output到kafka

kafka群集 10.0.0.11 10.0.0.12 10.0.0.13
kafka群集Topic为logstash

elasticsearch机10.0.0.21
运行elasticsearch
运行logstash输入input从kafka,输出output到elasticsearch

Nginx机

nginx日志格式化为json

在nginx的 http{} 中定义一个名为logstash_json格式化,格式化日志为json

log_format logstash_json '{ "@timestamp": "$time_local", '
'"@fields": { '
'"remote_addr": "$remote_addr", '
'"remote_user": "$remote_user", '
'"body_bytes_sent": "$body_bytes_sent", '
'"request_time": "$request_time", '
'"status": "$status", '
'"request": "$request", '
'"request_method": "$request_method", '
'"http_referrer": "$http_referer", '
'"body_bytes_sent":"$body_bytes_sent", '
'"http_x_forwarded_for": "$http_x_forwarded_for", '
'"http_user_agent": "$http_user_agent" } }';

在server{} 中增加记录logstash_json日志,可以用原有的日志输出共存

access_log /data/wwwlogs/iamle.log log_format;
access_log /data/wwwlogs/nginx_json.log logstash_json;

logstash日志采集配置

/etc/logstash/conf.d/nginx.conf

input {
file {
path => "/data/wwwlogs/nginx_json.log"
codec => "json"
}
}
filter {
mutate {
split => [ "upstreamtime", "," ]
}
mutate {
convert => [ "upstreamtime", "float" ]
}
}
output {
kafka {
bootstrap_servers => "10.0.0.11:9092"
topic_id => "logstash"
compression_type => "gzip"
}
}

Kafka群集

新建一个Topic

新建一个Topic叫做
logstash

Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)

ElasticSearch机

logstash把数据从kafka存到elasticsearch的配置

其中选取kafka群集任意一个有zk的ip做连接使用
topic_id就是kafka中设置的topic logstash
/etc/logstash/conf.d/logstashes.conf

input {
kafka {
zk_connect => "10.0.0.13:2181"
topic_id => "logstash"
}
}
filter {
mutate {
split => [ "upstreamtime", "," ]
}
mutate {
convert => [ "upstreamtime", "float" ]
}
}
output {
elasticsearch {
hosts => ["10.0.0.21"]
index => "logstash-iamle-%{+YYYY.MM.dd}"
document_type => "iamle"
workers => 5
template_overwrite => true
}
}

补充说明

以上是主要的配置,就差kibana的查看/展示了

kibana

我这里kibana和elasticsearch是同一台机器
官方yum方式安装的kibana配置文件在
/opt/kibana/config/kibana.yml
需要改2个地方,监听端口和es的连接信息

server.host: "10.0.0.21"
elasticsearch.url: "http://10.0.0.21:9200"

启动kibana /etc/init.d/kibana start后可以通过 http://10.0.0.21:5601 访问

kibana的使用的多看官网文档,网上中文的资料不多,关于elk有一本饶琛琳写的
ELKstack 中文指南
https://www.gitbook.com/book/chenryn/kibana-guide-cn/details
kibana Discover 过滤静态文件
NOT \/static AND NOT \/upload\/

elasticsearch

官方yum方式安装的elasticsearch配置文件在
/etc/elasticsearch/elasticsearch.yml
需要配置下监听ip,默认是127.0.0.1

network.host: 10.0.0.21
path.data: /data

elasticsearch安装了head插件后可以看到es状态
http://10.0.0.21:9200/_plugin/head/

安全问题

特别要注意elk所有软件的端口监听,切勿暴露监听到公网上去,另外即便是内网你也得注意配置内网的访问限制

Nginx+Logstash+Elasticsearch+Kibana搭建网站日志分析系统笔记

前言

流程,nignx格式化日志成json,通过logstash直接采集到elasticsearch,然后通过kibana gui界面展示分析

要点nignx日志成json格式,避免nignx默认日志是空格,需要正则匹配,导致logstash占过多cpu
elasticsearch机配置防火墙,只让指定的logstash机访问
kibana只监听本地127.0.0.1使用nignx方向代理,nginx中配置Http Basic Auth账号密码登陆

比较粗略的笔记,备忘
安装java
yum install java-1.8.0-openjdk*

nginx配置

为了让nignx机跑logstash采集日志负载最低,建议直接生成json的方式,直接就可以用logstash读取写入到Elasticsearch

http{} 中定义 格式化日志成json

log_format logstash_json '{"@timestamp":"$time_iso8601",'
'"host":"$server_addr",'
'"clientip":"$remote_addr",'
'"http_x_forwarded_for":"$http_x_forwarded_for",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"upstreamtime":"$upstream_response_time",'
'"upstreamhost":"$upstream_addr",'
'"http_host":"$host",'
'"request":"$request",'
'"url":"$uri",'
'"xff":"$http_x_forwarded_for",'
'"referer":"$http_referer",'
'"agent":"$http_user_agent",'
'"status":"$status"}';

server内输出日志 access_log可以配置多个同时输出,可以保留你以前的

access_log /data/wwwlogs/www.iamle.log iamle.com;
access_log /data/wwwlogs/www.iamle.com.logstash_json.log logstash_json;

nginx机安装Logstash1.5.x

rpm --import http://packages.elasticsearch.org/GPG-KEY-elasticsearch
cat > /etc/yum.repos.d/logstash.repo <<EOF
[logstash-1.5]
name=logstash repository for 1.5.x packages
baseurl=http://packages.elasticsearch.org/logstash/1.5/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1
EOF
yum clean all
yum install logstash

 

在目录 /etc/logstash/conf.d/
建立配置文件 nginx_json.conf

input {
file {
path => "/data/wwwlogs/www.iamle.com.logstash_json.log"
codec => json
}
}
filter {
mutate {
split => [ "upstreamtime", "," ]
}
mutate {
convert => [ "upstreamtime", "float" ]
}
}
output {
elasticsearch {
host => "elk.server.iamle.com"
protocol => "http"
index => "logstash-%{type}-%{+YYYY.MM.dd}"
index_type => "%{type}"
workers => 5
template_overwrite => true
}
}

service logstash start

日志存储机安装Elasticsearch1.7.x提供数据底层支持

rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
cat > /etc/yum.repos.d/elasticsearch.repo <<EOF
[elasticsearch-1.7]
name=Elasticsearch repository for 1.7.x packages
baseurl=http://packages.elastic.co/elasticsearch/1.7/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
EOF
yum clean all
yum install elasticsearch

配置文件
配置数据保存位置

vim /etc/elasticsearch/elasticsearch.yml
# Can optionally include more than one location, causing data to be striped across
# the locations (a la RAID 0) on a file level, favouring locations with most free
# space on creation. For example:
#
path.data: /data
目录会自动生成,只需要指定一个空目录就可以了

service elasticsearch start

centos7
systemctl start elasticsearch
systemctl status elasticsearch
elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; disabled)
Active: active (running) since Fri 2015-09-04 15:37:08 CST; 1s ago
Docs: http://www.elastic.co
Main PID: 19376 (java)
CGroup: /system.slice/elasticsearch.service
└─19376 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -X...
Sep 04 15:37:08 elk systemd[1]: Starting Elasticsearch...
Sep 04 15:37:08 elk systemd[1]: Started Elasticsearch.
检查是否已经成功开启
ss -ltnp |grep 9200

centos7配置firewalld固定ip可访问elasticsearch
systemctl start firewalld.service
systemctl status firewalld.service

 

只允许nignx机访问elasticsearch机9200 9300端口

firewall-cmd --permanent --zone=public --add-rich-rule="rule family="ipv4" \
source address="10.8.8.2" \
port protocol="tcp" port="9200" accept"

firewall-cmd --permanent --zone=public --add-rich-rule="rule family="ipv4" \
source address="10.8.8.2" \
port protocol="tcp" port="9300" accept"
firewall-cmd --reload

iptables -L -n |grep 9200
ACCEPT tcp -- 10.8.8.2 0.0.0.0/0 tcp dpt:9200 ctstate NEW

 

安装Kibana4展示Elasticsearch中的数据

 

wget https://download.elastic.co/kibana/kibana/kibana-4.1.1-linux-x64.tar.gz
tar zxvf kibana-4.1.1-linux-x64.tar.gz
cd kibana-4.1.1-linux-x64
修改配置文件
vim /usr/local/kibana-4.1.1-linux-x64/config/kibana.yml
# Kibana is served by a back end server. This controls which port to use.
port: 5601

# The host to bind the server to.
#监听本地地址 用nignx反向代理
host: "127.0.0.1"

nohup ./bin/kibana &

检查是否已经成功开启
ss -ltnp |grep 5601

 

使用nignx反向代理kibana
nginx配置Http Basic Auth账号密码登陆
http://trac.edgewall.org/export/10770/trunk/contrib/htpasswd.py (nginx wiki里推荐的)
运行示例
chmod 777 htpasswd.py
./htpasswd.py -c -b htpasswd username password
#-c为生成文件 htpasswd为文件名

server
{
listen 80;
#listen [::]:80;
server_name elk.server.iamle.com;

location / {
auth_basic "Password please";
auth_basic_user_file /usr/local/nginx/conf/htpasswd;
proxy_pass http://127.0.0.1:5601/;
proxy_redirect off;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}

老版本
https://download.elastic.co/kibana/kibana/kibana-3.1.3.tar.gz
https://www.elastic.co/downloads/past-releases/kibana-3-1-3

参考

http://kibana.logstash.es/
https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-repositories.html

扩展centos7 firewall的使用

 

检查防火墙状态
firewall-cmd --stat

临时开放ftp服务
firewall-cmd --add-service=ftp
永久开放ftp服务
firewall-cmd --add-service=ftp --permanent
关闭ftp服务
firewall-cmd --remove-service=ftp --permanent
配置防火墙在public区域永久开放http服务
firewall-cmd --permanent --zone=public --add-service=http
加入指定开放端口
firewall-cmd --add-port=1324/tcp

为了让之前的设定生效当然要重启服务咯
systemctl restart firewalld
或者使用下面的命令免去重启服务(防火墙策略配置后重新载入)
firewall-cmd --complete-reload
firewall-cmd --reload (这两句功能相同)

检查ftp服务的21端口是否开放
iptables -L -n | grep 21
ACCEPT tcp -- 0.0.0.0/0 0.0.0.0/0 tcp dpt:21 ctstate NEW

查询ftp服务启用状态
firewall-cmd --query-service ftp

查看当前规则
firewall-cmd --list-all

仅允许部分IP访问本机服务配置
firewall-cmd --permanent --zone=public --add-rich-rule="rule family="ipv4" \
source address="192.168.0.4/24" service name="http" accept"

仅允许部分IP访问本机端口配置
firewall-cmd --permanent --zone=public --add-rich-rule="rule family="ipv4" \
source address="192.168.0.4/24" \
port protocol="tcp" port="8080" accept"