数据同步的方式

数据同步的2大方式

  • 基于SQL查询的 CDC(Change Data Capture):
    • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据。也就是我们说的基于SQL查询抽取;
    • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
    • 不保障实时性,基于离线调度存在天然的延迟;
    • 工具软件以Kettle(Apache Hop最新版)、DataX为代表,需要结合任务调度系统使用。
  • 基于日志的 CDC:
    • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
    • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
    • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据;
    • 工具软件以Flink CDC、阿里巴巴Canal、Debezium为代表。

基于SQL查询增量数据同步原理

我们考虑用SQL如何查询增量数据? 数据有增加、需改、删除
删除数据采用逻辑删除的方式,比如定义一个is_deleted字段标识逻辑删除
如果数据是 UPDATE的,也就是会被修改的,那么 where update_datetime >= last_datetime(调度滚动时间)就是增量数据
如果数据是 APPEND ONLY 的除了用更新时间还可以用where id >= 调度上次last_id

结合任务调度系统
调度时间是每日调度执行一次,那么 last_datetime = 当前调度开始执行时间 – 24小时,延迟就是1天
调度时间是15分钟一次,那么 last_datetime = 当前调度开始执行时间 – 15分钟,延迟就是15分钟

这样就实现了捕获增量数据,从而实现增量同步

DolphinScheduler + Datax 构建离线增量数据同步平台

本实践使用
单机8c16g
DataX 2022-03-01 官网下载
DolphinScheduler 2.0.3(DolphinScheduler的安装过程略,请参考官网)

DolphinScheduler 中设置好DataX环境变量
DolphinScheduler 提供了可视化的作业流程定义,用来离线定时调度DataX Job作业,使用起来很是顺滑

基于SQL查询离线数据同步的用武之地
为什么不用基于日志实时的方式?不是不用,而是根据场合用。考虑到业务实际需求情况,基于SQL查询这种离线的方式也并非完全淘汰了
特别是业务上实时性要求不高,每次调度增量数据没那么大的情况下,不需要分布式架构来负载,这种情况下是比较合适的选择
场景举例:
网站、APP的百万级、千万级的内容搜索,每天几百篇内容新增+修改,搜索上会用到ES(ElasticSearch),那么就需要把 MySQL内容数据增量同步到ES
DataX就能满足需求!

DolphinScheduler中配置DataX MySQL To ElasticSearch工作流

工作流定义

工作流定义 > 创建工作流 > 拖入1个SHELL组件 > 拖入1个DATAX组件
SHELL组件(文章)
脚本

echo '文章同步 MySQL To ElasticSearch'

DATAX组件(t_article)
用到2个插件mysqlreader、elasticsearchwriter^[1]
选 自定义模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
                                ],
                                "querySql": [
                                    "select a.id as pk,a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= '${biz_update_dt}';"
                                ]
                            }
                        ],
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                },
                "writer": {
                    "name": "elasticsearchwriter",
                    "parameter": {
                        "endpoint": "${biz_es_host}",
                        "accessId": "${biz_es_username}",
                        "accessKey": "${biz_es_password}",
                        "index": "t_article",
                        "type": "_doc",
                        "batchSize": 1000,
                        "cleanup": false,
                        "discovery": false,
                        "dynamic": true,
                        "settings": {
                            "index": {
                                "number_of_replicas": 0,
                                "number_of_shards": 1
                            }
                        },
                        "splitter": ",",
                        "column": [
                            {
                                "name": "pk",
                                "type": "id"
                            },
                            {
                                "name": "id",
                                "type": "long"
                            },
                            {
                                "name": "title",
                                "type": "text"
                            },
                            {
                                "name": "content",
                                "type": "text"
                            }
                            {
                                "name": "is_delete",
                                "type": "text"
                            },
                            {
                                "name": "delete_date",
                                "type": "date"
                            },
                            {
                                "name": "create_date",
                                "type": "date"
                            },
                            {
                                "name": "update_date",
                                "type": "date"
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "errorLimit": {
                "percentage": 0,
                "record": 0
            },
            "speed": {
                "channel": 1,
                "record": 1000
            }
        }
    }
}

reader和writer的字段配置需保持一致

自定义参数:

biz_update_dt: ${global_bizdate} 
biz_mysql_host: 你的mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的mysql账号
biz_mysql_password: 你的mysql密码
biz_es_host: 你的es地址带协议和端口 http://127.0.0.1:9200
biz_es_username: 你的es账号
biz_es_password: 你的es密码

配置的自定义参数将会自动替换json模板中的同名变量

reader mysqlreader插件中关键配置: a.update_date >= '${biz_update_dt}' 就是实现增量同步的关键配置
writer elasticsearchwriter插件中关键配置: “

"column": [
    {
        "name": "pk",
        "type": "id"
    },
    ......
]

type = id 这样配置,就把文章主键映射到es主键 _id 从而实现相同主键id重复写入数据,就会更新数据。如果不这样配置数据将会重复导入es中

保存工作流

全局变量设置
global_bizdate: $[yyyy-MM-dd 00:00:00-1]

global_bizdate 引用的变量为 DolphinScheduler 内置变量,具体参考官网文档^[2]
结合调度时间设计好时间滚动的窗口时长,比如按1天增量,那么这里时间就是减1天

最终的工作流DAG图为:

by 流水理鱼|wwek

参考

1. DataX ElasticSearchWriter 插件文档
2. Apache DolphinScheduler 内置参数

[TOC]

0、前言

image-20210606160820147

为什么使用Elasticsearch (以下简称 ES ) ?

全文搜索能力

ES本身就是一个搜索引擎,技术上基于倒排索引实现的搜索系统。我们做业务不必自己去开发一个搜索引擎,

ES已经满足绝大多数企业的搜索场景的需求。拼多多电商在发展前期,搜索业务使用ES支撑。

复杂查询(多维度组合筛选)

假如某个实体表,例如订单日订单量几十万,运营管理后台通常有十多个维度的组合筛选搜索。你可能会通过缩小查询时间范围来把数据量级降下来,MySQL也许也能一战,但是实际业务场景业务人员不太能接受这样一个小时间段筛选。如果稍微扩大时间段,MySQL这时候就无能为力了。那么把订单表做成“宽表”存入ES,十多个维度的搜索对于ES俩说是毫无压力的。ES群集能轻松支持亿级的宽表多维度组合筛选。这种需求在CRM、BOSS等场景是刚需。

滴滴司机端早期日订单量几十万,运营管理后台大时间跨度且多维度组合筛选就是用ES来实现的。

一个产品是否成熟,公有云是否提供基于该产品的服务是一个风向标。那么ES毫无疑问是成熟的产品。

虽然ES这么好,但是我的数据都在MySQL中那么怎么让数据实时同步到ES中呢?

1、基于应用程序多写

直接通过应用程序数据双写到MySQL和ES

image-20210606174354578

记录删除机制:直接删除

一致性: 需要自行处理,需要对失败错误做好日志记录,做好异常告警并人工补偿

优点: 直接明了,能够灵活控制数据写入,延迟最低

缺点: 与业务耦合严重,逻辑要写在业务系统中

应用双写(同步)

直接通过ES API将数据写入到ES集群中,也就是写入数据库的同时调用ES API写入到ES中

这个过程是同步的

应用双写(MQ异步解耦)

对 应用双写(同步)的改进,引入MQ中间件。

把同步变为异步,做了解耦。

同时引入MQ后双写性能提高,解决数据一致性问题。

缺点是会增加延迟性,业务系统增加mq代码,而且多一个MQ中间件要维护

2、基于binlog订阅

binlog订阅的原理很简单,模拟一个MySQL slave 订阅binlog日志,从而实现CDC(change data capture)

CDC,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等。

记录删除机制:直接删除

一致性: 最终一致性

优点: 对业务系统无任何侵入

缺点: 需要维护额外增加的一套数据同步平台;有分钟级的延迟

Canal

img

https://github.com/alibaba/canal/

阿里巴巴 MySQL binlog 增量订阅&消费组件

Databus

https://github.com/linkedin/databus

Linkedin Databus 分布式数据库同步系统

Maxwell

官网:http://maxwells-daemon.io/

https://github.com/zendesk/maxwell

Flink CDC

https://github.com/ververica/flink-cdc-connectors

flink-cdc-connectors 中文教程

基于flink数据计算平台实现 MySQL binlog订阅直接写入es

Flink CDC抛弃掉其他中间件,实现 MySQL 》Flink CDC》ES 非常简洁的数据同步架构

该方式比较新2020年开始的项目,目前在一些实时数仓上有应用

DTS(阿里云)

阿里云的商业产品,具备好的易用性,省运维成本。

CloudCanal

CloudCanal官网

CloudCannal数据同步迁移系统,商业产品

3、基于SQL抽取

基于SQL查询的数据抽取同步

这种方式需要满足2个基本条件

1、MySQL的表必须有唯一键字段(和ES中_id对应)

2、MySQL的表必须有一个“修改时间”字段,该记录任何一个字段修改都需要更新“修改时间”

有了唯一键字段就可以知道修改某条记录后同步哪条ES记录,有了修改时间字段就可以知道同步到哪儿了。

满足了这2个基本条件这样就可实现增量实时同步。

记录删除机制:逻辑删除,在MySQL中增加逻辑删除字段,ES搜索时过滤状态

一致性: 依赖修改时间字段;延迟时间等于计划任务多久执行一次

优点: 对业务系统无任何侵入,简单方便;课用JOIN打宽表

缺点: MySQL承受查询压力;需要业务中满足2个基本条件

logstash

我们使用 Logstash 和 JDBC 输入插件来让 Elasticsearch 与 MySQL 保持同步。从概念上讲,Logstash 的 JDBC 输入插件会运行一个循环来定期对 MySQL 进行轮询,从而找出在此次循环的上次迭代后插入或更改的记录。如要让其正确运行,必须满足下列条件:

  1. 在将 MySQL 中的文档写入 Elasticsearch 时,Elasticsearch 中的 “_id” 字段必须设置为 MySQL 中的 “id” 字段。这可在 MySQL 记录与 Elasticsearch 文档之间建立一个直接映射关系。如果在 MySQL 中更新了某条记录,那么将会在 Elasticsearch 中覆盖整条相关记录。请注意,在 Elasticsearch 中覆盖文档的效率与更新操作的效率一样高,因为从内部原理上来讲,更新便包括删除旧文档以及随后对全新文档进行索引。
  2. 当在 MySQL 中插入或更新数据时,该条记录必须有一个包含更新或插入时间的字段。通过此字段,便可允许 Logstash 仅请求获得在轮询循环的上次迭代后编辑或插入的文档。Logstash 每次对 MySQL 进行轮询时,都会保存其从 MySQL 所读取最后一条记录的更新或插入时间。在下一次迭代时,Logstash 便知道其仅需请求获得符合下列条件的记录:更新或插入时间晚于在轮询循环中的上一次迭代中所收到的最后一条记录。
input {
  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => <my username>
    jdbc_password => <my password>
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
output {
  # stdout { codec =>  "rubydebug"}
  elasticsearch {
      index => "rdbms_sync_idx"
      document_id => "%{[@metadata][_id]}"
  }
}

配置Logstash的计划任务,定时执行

4、总结

前期建议采用 基于SQL抽取的方式做同步,后期数据量大了建议采用基于binlog订阅的方式同步。

如果本身有现成的Flink平台可用,推荐使用Flink CDC。

什么是最佳的 MySQL 同步 ElasticSearch 方案?

答案是选择缺点可以接受,又满足需求,拥有成本最低的方案。

“完美”的方案往往拥有成本会比较高,所以需要结合业务环境的上下文去选择。

流水理鱼觉得没有一招鲜的方案,因为每种方案都有利弊,所以选取适合你当下业务环境的方案。那么这样的方案就是最佳方案。

5、参考

MySQL 数据实时同步到 Elasticsearch 的技术方案选型和思考 by 万凯明

如何使用 Logstash 和 JDBC 确保 Elasticsearch 与关系型数据库保持同步

监听mysql的binlog日志工具分析:canal、Maxwell、Databus、DTS

介绍

ELK是业界标准的日志采集,存储索引,展示分析系统解决方案
logstash提供了灵活多样的插件支持不同的input/output
主流使用redis/kafka作为日志/消息的中间环节
如果已有kafka的环境了,使用kafka比使用redis更佳
以下是一个最简化的配置做个笔记,elastic官网提供了非常丰富的文档
不要用搜索引擎去搜索,没多少结果的,请直接看官网文档

采用的ELK/kafka版本

elasticsearch-2.x
logstash-2.3
kibana-4.5.1

Kafka 0.9.0.1

应用/网络 环境

Nginx机
10.0.0.1

Kafka群集
10.0.0.11
10.0.0.12
10.0.0.13

ElasticSearch机
10.0.0.21

整体说明

数据流向

日志/消息整体流向
logstash => kafka => logstash => elasticsearch => kibana

安装

elk所有安装都可以使用rpm二进制包的方式,增加elastic官网的仓库repo就可以用yum安装了

elasticsearch看这里
https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-repositories.html

logstash看这里
https://www.elastic.co/guide/en/logstash/current/installing-logstash.html

kibana看这里
https://www.elastic.co/guide/en/kibana/current/setup.html

安装概览

nginx机 10.0.0.1
运行nginx的日志格式化为json
运行logstash输入input从nginx json,输出output到kafka

kafka群集 10.0.0.11 10.0.0.12 10.0.0.13
kafka群集Topic为logstash

elasticsearch机10.0.0.21
运行elasticsearch
运行logstash输入input从kafka,输出output到elasticsearch

Nginx机

nginx日志格式化为json

在nginx的 http{} 中定义一个名为logstash_json格式化,格式化日志为json

log_format logstash_json '{ "@timestamp": "$time_local", '
'"@fields": { '
'"remote_addr": "$remote_addr", '
'"remote_user": "$remote_user", '
'"body_bytes_sent": "$body_bytes_sent", '
'"request_time": "$request_time", '
'"status": "$status", '
'"request": "$request", '
'"request_method": "$request_method", '
'"http_referrer": "$http_referer", '
'"body_bytes_sent":"$body_bytes_sent", '
'"http_x_forwarded_for": "$http_x_forwarded_for", '
'"http_user_agent": "$http_user_agent" } }';

在server{} 中增加记录logstash_json日志,可以用原有的日志输出共存

access_log /data/wwwlogs/iamle.log log_format;
access_log /data/wwwlogs/nginx_json.log logstash_json;

logstash日志采集配置

/etc/logstash/conf.d/nginx.conf

input {
file {
path => "/data/wwwlogs/nginx_json.log"
codec => "json"
}
}
filter {
mutate {
split => [ "upstreamtime", "," ]
}
mutate {
convert => [ "upstreamtime", "float" ]
}
}
output {
kafka {
bootstrap_servers => "10.0.0.11:9092"
topic_id => "logstash"
compression_type => "gzip"
}
}

Kafka群集

新建一个Topic

新建一个Topic叫做
logstash

Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)

ElasticSearch机

logstash把数据从kafka存到elasticsearch的配置

其中选取kafka群集任意一个有zk的ip做连接使用
topic_id就是kafka中设置的topic logstash
/etc/logstash/conf.d/logstashes.conf

input {
kafka {
zk_connect => "10.0.0.13:2181"
topic_id => "logstash"
}
}
filter {
mutate {
split => [ "upstreamtime", "," ]
}
mutate {
convert => [ "upstreamtime", "float" ]
}
}
output {
elasticsearch {
hosts => ["10.0.0.21"]
index => "logstash-iamle-%{+YYYY.MM.dd}"
document_type => "iamle"
workers => 5
template_overwrite => true
}
}

补充说明

以上是主要的配置,就差kibana的查看/展示了

kibana

我这里kibana和elasticsearch是同一台机器
官方yum方式安装的kibana配置文件在
/opt/kibana/config/kibana.yml
需要改2个地方,监听端口和es的连接信息

server.host: "10.0.0.21"
elasticsearch.url: "http://10.0.0.21:9200"

启动kibana /etc/init.d/kibana start后可以通过 http://10.0.0.21:5601 访问

kibana的使用的多看官网文档,网上中文的资料不多,关于elk有一本饶琛琳写的
ELKstack 中文指南
https://www.gitbook.com/book/chenryn/kibana-guide-cn/details
kibana Discover 过滤静态文件
NOT \/static AND NOT \/upload\/

elasticsearch

官方yum方式安装的elasticsearch配置文件在
/etc/elasticsearch/elasticsearch.yml
需要配置下监听ip,默认是127.0.0.1

network.host: 10.0.0.21
path.data: /data

elasticsearch安装了head插件后可以看到es状态
http://10.0.0.21:9200/_plugin/head/

安全问题

特别要注意elk所有软件的端口监听,切勿暴露监听到公网上去,另外即便是内网你也得注意配置内网的访问限制

Nginx+Logstash+Elasticsearch+Kibana搭建网站日志分析系统笔记

前言

流程,nignx格式化日志成json,通过logstash直接采集到elasticsearch,然后通过kibana gui界面展示分析

要点nignx日志成json格式,避免nignx默认日志是空格,需要正则匹配,导致logstash占过多cpu
elasticsearch机配置防火墙,只让指定的logstash机访问
kibana只监听本地127.0.0.1使用nignx方向代理,nginx中配置Http Basic Auth账号密码登陆

比较粗略的笔记,备忘
安装java
yum install java-1.8.0-openjdk*

nginx配置

为了让nignx机跑logstash采集日志负载最低,建议直接生成json的方式,直接就可以用logstash读取写入到Elasticsearch

http{} 中定义 格式化日志成json

log_format logstash_json '{"@timestamp":"$time_iso8601",'
'"host":"$server_addr",'
'"clientip":"$remote_addr",'
'"http_x_forwarded_for":"$http_x_forwarded_for",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"upstreamtime":"$upstream_response_time",'
'"upstreamhost":"$upstream_addr",'
'"http_host":"$host",'
'"request":"$request",'
'"url":"$uri",'
'"xff":"$http_x_forwarded_for",'
'"referer":"$http_referer",'
'"agent":"$http_user_agent",'
'"status":"$status"}';

server内输出日志 access_log可以配置多个同时输出,可以保留你以前的

access_log /data/wwwlogs/www.iamle.log iamle.com;
access_log /data/wwwlogs/www.iamle.com.logstash_json.log logstash_json;

nginx机安装Logstash1.5.x

rpm --import http://packages.elasticsearch.org/GPG-KEY-elasticsearch
cat > /etc/yum.repos.d/logstash.repo <<EOF
[logstash-1.5]
name=logstash repository for 1.5.x packages
baseurl=http://packages.elasticsearch.org/logstash/1.5/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1
EOF
yum clean all
yum install logstash

 

在目录 /etc/logstash/conf.d/
建立配置文件 nginx_json.conf

input {
file {
path => "/data/wwwlogs/www.iamle.com.logstash_json.log"
codec => json
}
}
filter {
mutate {
split => [ "upstreamtime", "," ]
}
mutate {
convert => [ "upstreamtime", "float" ]
}
}
output {
elasticsearch {
host => "elk.server.iamle.com"
protocol => "http"
index => "logstash-%{type}-%{+YYYY.MM.dd}"
index_type => "%{type}"
workers => 5
template_overwrite => true
}
}

service logstash start

日志存储机安装Elasticsearch1.7.x提供数据底层支持

rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
cat > /etc/yum.repos.d/elasticsearch.repo <<EOF
[elasticsearch-1.7]
name=Elasticsearch repository for 1.7.x packages
baseurl=http://packages.elastic.co/elasticsearch/1.7/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
EOF
yum clean all
yum install elasticsearch

配置文件
配置数据保存位置

vim /etc/elasticsearch/elasticsearch.yml
# Can optionally include more than one location, causing data to be striped across
# the locations (a la RAID 0) on a file level, favouring locations with most free
# space on creation. For example:
#
path.data: /data
目录会自动生成,只需要指定一个空目录就可以了

service elasticsearch start

centos7
systemctl start elasticsearch
systemctl status elasticsearch
elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; disabled)
Active: active (running) since Fri 2015-09-04 15:37:08 CST; 1s ago
Docs: http://www.elastic.co
Main PID: 19376 (java)
CGroup: /system.slice/elasticsearch.service
└─19376 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -X...
Sep 04 15:37:08 elk systemd[1]: Starting Elasticsearch...
Sep 04 15:37:08 elk systemd[1]: Started Elasticsearch.
检查是否已经成功开启
ss -ltnp |grep 9200

centos7配置firewalld固定ip可访问elasticsearch
systemctl start firewalld.service
systemctl status firewalld.service

 

只允许nignx机访问elasticsearch机9200 9300端口

firewall-cmd --permanent --zone=public --add-rich-rule="rule family="ipv4" \
source address="10.8.8.2" \
port protocol="tcp" port="9200" accept"

firewall-cmd --permanent --zone=public --add-rich-rule="rule family="ipv4" \
source address="10.8.8.2" \
port protocol="tcp" port="9300" accept"
firewall-cmd --reload

iptables -L -n |grep 9200
ACCEPT tcp -- 10.8.8.2 0.0.0.0/0 tcp dpt:9200 ctstate NEW

 

安装Kibana4展示Elasticsearch中的数据

 

wget https://download.elastic.co/kibana/kibana/kibana-4.1.1-linux-x64.tar.gz
tar zxvf kibana-4.1.1-linux-x64.tar.gz
cd kibana-4.1.1-linux-x64
修改配置文件
vim /usr/local/kibana-4.1.1-linux-x64/config/kibana.yml
# Kibana is served by a back end server. This controls which port to use.
port: 5601

# The host to bind the server to.
#监听本地地址 用nignx反向代理
host: "127.0.0.1"

nohup ./bin/kibana &

检查是否已经成功开启
ss -ltnp |grep 5601

 

使用nignx反向代理kibana
nginx配置Http Basic Auth账号密码登陆
http://trac.edgewall.org/export/10770/trunk/contrib/htpasswd.py (nginx wiki里推荐的)
运行示例
chmod 777 htpasswd.py
./htpasswd.py -c -b htpasswd username password
#-c为生成文件 htpasswd为文件名

server
{
listen 80;
#listen [::]:80;
server_name elk.server.iamle.com;

location / {
auth_basic "Password please";
auth_basic_user_file /usr/local/nginx/conf/htpasswd;
proxy_pass http://127.0.0.1:5601/;
proxy_redirect off;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}

老版本
https://download.elastic.co/kibana/kibana/kibana-3.1.3.tar.gz
https://www.elastic.co/downloads/past-releases/kibana-3-1-3

参考

http://kibana.logstash.es/
https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-repositories.html

扩展centos7 firewall的使用

 

检查防火墙状态
firewall-cmd --stat

临时开放ftp服务
firewall-cmd --add-service=ftp
永久开放ftp服务
firewall-cmd --add-service=ftp --permanent
关闭ftp服务
firewall-cmd --remove-service=ftp --permanent
配置防火墙在public区域永久开放http服务
firewall-cmd --permanent --zone=public --add-service=http
加入指定开放端口
firewall-cmd --add-port=1324/tcp

为了让之前的设定生效当然要重启服务咯
systemctl restart firewalld
或者使用下面的命令免去重启服务(防火墙策略配置后重新载入)
firewall-cmd --complete-reload
firewall-cmd --reload (这两句功能相同)

检查ftp服务的21端口是否开放
iptables -L -n | grep 21
ACCEPT tcp -- 0.0.0.0/0 0.0.0.0/0 tcp dpt:21 ctstate NEW

查询ftp服务启用状态
firewall-cmd --query-service ftp

查看当前规则
firewall-cmd --list-all

仅允许部分IP访问本机服务配置
firewall-cmd --permanent --zone=public --add-rich-rule="rule family="ipv4" \
source address="192.168.0.4/24" service name="http" accept"

仅允许部分IP访问本机端口配置
firewall-cmd --permanent --zone=public --add-rich-rule="rule family="ipv4" \
source address="192.168.0.4/24" \
port protocol="tcp" port="8080" accept"