本文共 21429 字,大约阅读时间需要 71 分钟。
ELK搭建文档
环境准备
系统:Centos6.8ip: 192.168.137.52 node1192.168.137.48 node2[root@node2 ~]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4::1 localhost localhost.localdomain localhost6 localhost6.localdomain6192.168.137.48 node2192.168.137.52 node1node1同上操作步骤elk准备环境俩台完全相同
一.elasticsearch安装[root@node2 ~]# rpm --import 添加yum源[root@node2 ~]# vim /etc/yum.repos.d/elasticsearch.repo[elasticsearch-2.x]name=Elasticsearch repository for 2.x packagesbaseurl=gpgcheck=1gpgkey=enabled=11.安装elasticsearch[root@node2 ~]# yum install -y elasticsearch2.logstash下载并安装GPG key[root@node2 ~]# rpm --import 添加yum源[root@node2 ~]# vim /etc/yum.repos.d/logstash.repo[logstash-2.1]name=Logstash repository for 2.1.x packagesbaseurl=gpgcheck=1gpgkey=enabled=13.安装logstash[root@node2 ~]# yum install -y logstash4.安装kibana[root@node2 ~]# cd /usr/local/src[root@node2 src]# wget [root@node2 src]# tar zxf kibana-4.3.1-linux-x64.tar.gz[root@node2 src]# mv kibana-4.3.1-linux-x64 /usr/local/[root@node2 src]# ln -s /usr/local/kibana-4.3.1-linux-x64/ /usr/local/kibana5.安装Redis,nginx和java[root@node2 src]# yum install epel-release -y[root@node2 src]# yum install -y redis nginx java二.配置管理 elasticsearch配置node1的elasticsearch,并授权[root@node1 ~]# grep -n '^[a-Z]' /etc/elasticsearch/elasticsearch.yml 17:cluster.name: check-cluster #判别节点是否是统一集群23:node.name: node1 #节点的hostname33:path.data: /data/es-data #数据存放路径37:path.logs: /var/log/elasticsearch/ #日志路径43:bootstrap.memory_lock: true #锁住内存,使内存不会再swap中使用54:network.host: 0.0.0.0 #允许访问的ip58:http.port: 9200 #端口[root@node1 ~]# mkdir -p /data/es-data[root@node1 ~]# chown elasticsearch.elasticsearch /data/es-data/[root@node1 ~]# /etc/init.d/elasticsearch start[root@node1 ~]# chkconfig elasticsearch on[root@node1 ~]# netstat -lntup|grep 9200tcp 0 0 :::9200 :::* LISTEN 2443/java 访问 ip+9200端口就显示出信息,如果不行检查防火墙是否放通9200,例如:elasticsearch进行交互,使用restful apj进行交互,查看当前索引和分片情况,稍后会有插件表示,如下:[root@node1 ~]# curl -i -XGET '' -d '{"query" {
"match_all": {}}}'
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8Content-Length: 95{ "count" : 0, #索引0个"_shards" : { #分区0个"total" : 0, "successful" : 0, #成功0个"failed" : 0 #失败0个}}使用head插件显示索引和分片情况[root@node1 ~]# /usr/share/elasticsearch/bin/plugin install mobz/elasticsearch-head在插件中添加一个index-demo/test的索引,提交请求 点击复合索引 在下方第二行写上index_demo/test 索引 ,空白出填写输出信息,如下:发送一个GET(当然可以使用其他类型请求)请求,查询上述索引id填写在索引行的类型后面,选择get,提交请求在基本查询中查看所建索引 ,点击基本查询,点击搜素,就会信息信息三.管理node2的elasticsearch将node1的配置文件拷贝到node2中,并修改配置文件node.name并授权 配置文件中cluster.name的名字一定要一致,当集群内节点启动的时候,默认使用组播(多播),寻找集群中的节点root@node1 ~]# scp /etc/elasticsearch/elasticsearch.yml 192.168.137.48:/etc/elasticsearch/elasticsearch.yml[root@node2 ~]# grep -n '^[a-Z]' /etc/elasticsearch/elasticsearch.yml 17:cluster.name: check-cluster23:node.name: node233:path.data: /data/es-data37:path.logs: /var/log/elasticsearch/43:bootstrap.memory_lock: true54:network.host: 0.0.0.058:http.port: 9200[root@node2 ~]# mkdir -p /data/es-data[root@node2 ~]# chown elasticsearch.elasticsearch /data/es-data/启动elasticsearch[root@node2 ~]# /etc/init.d/elasticsearch start [root@node2 ~]# chkconfig elasticsearch on在node2配置中添加如下内容,使用单播模式(尝试了使用组播,但是不生效)[root@node1 ~]# grep -n "^discovery" /etc/elasticsearch/elasticsearch.yml68:discovery.zen.ping.unicast.hosts: ["node1", "node2"][root@node2 ~]# grep -n "^discovery" /etc/elasticsearch/elasticsearch.yml68:discovery.zen.ping.unicast.hosts: ["node1", "node2"][root@node2 ~]# /etc/init.d/elasticsearch restart [root@node1 ~]# /etc/init.d/elasticsearch restart在浏览器中查看分片信息,一个索引默认被分成了5个分片,每份数据被分成了五个分片(可以调节分片数量),下图中外围带绿色框的为主分片,不带框的为副本分片,主分片丢失,副本分片会复制一份成为主分片,起到了高可用的作用,主副分片也可以使用负载均衡加快查询速度,但是如果主副本分片都丢失,则索引就是彻底丢失。 使用kopf插件监控elasticsearch[root@node1 ~]# /usr/share/elasticsearch/bin/plugin install lmenezes/elasticsearch-kopf点击nodes,从下图可以看出节点的负载,cpu适应情况,java对内存的使用(heap usage),磁盘使用,启动时间 除此之外,kopf插件还提供了REST API 等,类似kopf插件的还有bigdesk,但是bigdesk目前还不支持2.1!!!安装bigdesk的方法如下[root@node1 ~]# /usr/share/elasticsearch/bin/plugin install hlstudio/bigdesk四.配置logstash启动一个logstash,-e:在命令行执行;input输入,stdin标准输入,是一个插件;output输出,stdout:标准输出[root@node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{} }'OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=NSettings: Default filter workers: 1Logstash startup completedcheck #输入2018-05-22T12:43:12.064Z node1 check #自动输出www.baidu.com #输入2018-05-22T12:43:27.696Z node1 www.baidu.com #输出使用rubudebug显示详细输出,codec为一种编×××[root@node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{ codec => rubydebug} }'OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=NSettings: Default filter workers: 1Logstash startup completedcheck #输入{ "message" => "check","@version" => "1","@timestamp" => "2018-05-22T12:50:07.161Z","host" => "node1"}上述每一条输出的内容称为一个事件,多个相同的输出的内容合并到一起称为一个事件(举例:日志中连续相同的日志输出称为一个事件)! 使用logstash将信息写入到elasticsearch[root@node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.137.52:9200"] } }'OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=NSettings: Default filter workers: 1Logstash startup completedmaliangcheckbaidu.comwww.baidu.com在elasticsearch中查看logstash新加的索引 在elasticsearch中写一份,同时在本地输出一份,也就是在本地保留一份文本文件,也就不用在elasticsearch中再定时备份到远端一份了。此处使用的保留文本文件三大优势:1)文本最简单 2)文本可以二次加工 3)文本的压缩比最高[root@node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.137.52:9200"] } stdout{ codec => rubydebug } }'OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=NSettings: Default filter workers: 1Logstash startup completedwww.google.com{ "message" => "www.google.com","@version" => "1","@timestamp" => "2018-05-22T13:03:48.940Z","host" => "node1"}www.elastic.co{ "message" => "www.elastic.co","@version" => "1","@timestamp" => "2018-05-22T13:04:06.880Z","host" => "node1"}#使用logstash启动一个配置文件,会在elasticsearch中写一份[root@node1 ~]# vim normal.conf[root@node1 ~]# cat normal.conf input { stdin { } }output { elasticsearch { hosts => ["localhost:9200"] }stdout { codec => rubydebug }}[root@node1 ~]# /opt/logstash/bin/logstash -f normal.conf
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=NSettings: Default filter workers: 1Logstash startup completed123 #输入{ "message" => "123","@version" => "1","@timestamp" => "2018-05-22T13:33:38.994Z","host" => "node1"}学习编写conf格式输入插件配置,此处以file为例,可以设置多个input { file { path => "/var/log/messages"type => "syslog"}file {
path => "/var/log/apache/access.log"type => "apache"}}介绍几种收集文件的方式,可以使用数组方式或者用匹配,也可以写多个pathpath => ["/var/log/messages","/var/log/.log"]path => ["/data/mysql/mysql.log"]设置boolean值ssl_enable => true文件大小单位my_bytes => "1113" # 1113 bytesmy_bytes => "10MiB" # 10485760 bytesmy_bytes => "100kib" # 102400 bytesmy_bytes => "180 mb" # 180000000 bytesjason收集 codec => “json”hash收集match => { "field1" => "value1""field2" => "value2"...}端口port => 33密码my_password => "password"收集系统日志的conf通过input和output插件编写conf文件[root@node1 ~]# cat system.conf input { file { path => "/var/log/messages"type => "system"start_position => "beginning"}}output { elasticsearch { hosts => ["192.168.137.52:9200"]index => "system-%{+YYYY.MM.dd}"}}点击数据浏览就看到收集elasticsearch的error日志此处把上个system日志和这个error(java程序日志)日志,放在一起。使用if判断,两种日志分别写到不同索引中.此处的type(固定的就是type,不可更改)不可以和日志格式的任何一个域(可以理解为字段)的名称重复,也就是说日志的域不可以有type这个名称[root@node1 ~]# cat all.conf input { file { path => "/var/log/messages"type => "system"start_position => "beginning"}file { path => "/var/log/elasticsearch/check-cluster.log"type => "es-error"start_position => "beginning"}}output {if [type] == "system" { elasticsearch { hosts => ["192.168.137.52:9200"] index => "system-%{+YYYY.MM.dd}" }}if [type] == "es-error" { elasticsearch { hosts => ["192.168.137.52:9200"] index => "es-error-%{+YYYY.MM.dd}" }}
}
把多行整个报错收集到一个事件中,举例说明
以at.org开头的内容都属于同一个事件,但是显示在不同行,这样的日志格式看起来很不方便,所以需要把他们合并到一个事件中,引入codec的multiline插件官方文档提供input { stdin { codec => multiline {pattern => "pattern, a regexp"<br/>negate => "true" or "false"<br/>what => "previous" or "next"
}}}regrxp:使用正则,什么情况下把多行合并起来 negate:正向匹配和反向匹配 what:合并到当前行还是下一行 在标准输入和标准输出中测试以证明多行收集到一个日志成功[root@node1 ~]# cat muliline.conf input { stdin { codec => multiline { pattern => "^["negate => truewhat => "previous"}}}output { stdout { codec => "rubydebug"}}[root@node1 ~]# /opt/logstash/bin/logstash -f muliline.conf OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=NSettings: Default filter workers: 1Logstash startup completed[1[2{ "@timestamp" => "2018-05-22T15:53:07.756Z","message" => "[1","@version" => "1","host" => "node1"}[3{ "@timestamp" => "2018-05-22T15:53:14.942Z","message" => "[2","@version" => "1","host" => "node1"}继续将上述实验结果放到all.conf的es-error索引中[root@node1 ~]# cat all.conf input { file { path => "/var/log/messages"type => "system"start_position => "beginning"} file { path => "/var/log/elasticsearch/check-cluster.log" type => "es-error" start_position => "beginning" codec => multiline { pattern => "^\[" negate => true what => "previous" }}
}
output {if [type] == "system" { elasticsearch { hosts => ["192.168.137.52:9200"] index => "system-%{+YYYY.MM.dd}" }}if [type] == "es-error" { elasticsearch { hosts => ["192.168.137.52:9200"] index => "es-error-%{+YYYY.MM.dd}" }}
}
[root@node1 ~]# /opt/logstash/bin/logstash -f all.conf 五.熟悉kibana编辑kibana配置文件使其生效[root@node1 ~]# vim /usr/local/kibana/config/kibana.yml [root@node1 ~]# grep '^[a-Z]' /usr/local/kibana/config/kibana.ymlserver.port: 5601 #端口server.host: "0.0.0.0" #开启对外服务主机ipelasticsearch.url: "" #填写浏览器访问elasticsearch的urlkibana.index: ".kibana" #在elasticsearch中添加.kibana索引[root@node1 ~]# screen[root@node1 ~]# /usr/local/kibana/bin/kibana 使用crtl +a+d退出screen使用浏览器打开192.168.137.52:5601验证logstash的muliline插件生效,在kibana中添加一个logstash索引,如果添加索引后,discover现在不了,删除新建索引,重新添加索引,并把前面俩个√都去掉就行了可以看到默认的字段选择discover查看 logstash收集nginx、syslog和tcp日志收集nginx的访问日志在这里使用codec的json插件将日志的域进行分段,使用key-value的方式,使日志格式更清晰,易于搜索,还可以降低cpu的负载 更改nginx的配置文件的日志格式,使用json[root@node1 ~]# sed -n '19,37p' /etc/nginx/nginx.conf log_format main '$remote_addr - $remote_user [$time_local] "$request" ''$status $body_bytes_sent "$http_referer" ''"$http_user_agent" "$http_x_forwarded_for"';log_format json '{ "@timestamp": "$time_local", ''"@fields": { ''"remote_addr": "$remote_addr", ''"remote_user": "$remote_user", ''"body_bytes_sent": "$body_bytes_sent", ''"request_time": "$request_time", ''"status": "$status", ''"request": "$request", ''"request_method": "$request_method", ''"http_referrer": "$http_referer", ''"body_bytes_sent":"$body_bytes_sent", ''"http_x_forwarded_for": "$http_x_forwarded_for", ''"http_user_agent": "$http_user_agent" } }';access_log /var/log/nginx/access.log json;
[root@node1 ~]# nginx -tnginx: the configuration file /etc/nginx/nginx.conf syntax is oknginx: configuration file /etc/nginx/nginx.conf test is successful[root@node1 ~]# service nginx restart[root@node1 ~]# netstat -ntupl |grep nginxtcp 0 0 0.0.0.0:80 0.0.0.0: LISTEN 4091/nginx tcp 0 0 :::80 ::: LISTEN 4091/nginx 日志格式显示如下:浏览器输入:192.168.137.52 连续刷新就有日志出现使用logstash将nginx访问日志收集起来,继续写到all.conf中[root@node1 ~]# cat all.conf input { file { path => "/var/log/messages"type => "system"start_position => "beginning"}file { path => "/var/log/elasticsearch/check-cluster.log" type => "es-error" start_position => "beginning" codec => multiline { pattern => "^\[" negate => true what => "previous" }} file { path => "/var/log/nginx/access_json.log" codec => json start_position => "beginning" type => "nginx-log"}
}
output {if [type] == "system" { elasticsearch { hosts => ["192.168.137.52:9200"] index => "system-%{+YYYY.MM.dd}" }}if [type] == "es-error" { elasticsearch { hosts => ["192.168.137.52:9200"] index => "es-error-%{+YYYY.MM.dd}" }} if [type] == "nginx-log" { elasticsearch { hosts => ["192.168.137.52:9200"] index => "nginx-log-%{+YYYY.MM.dd}" }}
}
将nginx-log加入kibana中并显示 收集系统syslog日志,前文中已经使用文件file的形式收集了系统日志/var/log/messages,但是实际生产环境是需要使用syslog插件直接收集 ,修改syslog的配置文件,把日志信息发送到514端口上[root@node1 ~]# vim /etc/rsyslog.conf. @@192.168.137.52:514[root@node1 ~]# /etc/init.d/rsyslog restart[root@node1 ~]# cat all.conf input { syslog { type => "system-syslog"host => "192.168.137.52"port => "514"}file { path => "/var/log/messages"type => "system"start_position => "beginning"}file { path => "/var/log/elasticsearch/check-cluster.log" type => "es-error" start_position => "beginning" codec => multiline { pattern => "^\[" negate => true what => "previous" }} file { path => "/var/log/nginx/access_json.log" codec => json start_position => "beginning" type => "nginx-log"}
}
output {if [type] == "system" { elasticsearch { hosts => ["192.168.137.52:9200"] index => "system-%{+YYYY.MM.dd}" }}if [type] == "es-error" { elasticsearch { hosts => ["192.168.137.52:9200"] index => "es-error-%{+YYYY.MM.dd}" }} if [type] == "nginx-log" { elasticsearch { hosts => ["192.168.137.52:9200"] index => "nginx-log-%{+YYYY.MM.dd}" }} if [type] == "system-syslog" { elasticsearch { hosts => ["192.168.137.52:9200"] index => "system-syslog-%{+YYYY.MM.dd}" }}
}
[root@node1 ~]# /opt/logstash/bin/logstash -f all.conf在elasticsearch插件中就可见到增加的system-syslog索引 使用redis收集logstash的信息,修改redis的配置文件并启动redis[root@node1 ~]# vim /etc/redis.confbind 192.168.137.52daemonize yes[root@node1 ~]# /etc/init.d/redis start[root@node1 ~]# netstat -ntupl|grep redistcp 0 0 192.168.137.52:6379 0.0.0.0: LISTEN 5031/redis-server 1 编写redis.conf[root@node1 ~]# cat redis-out.conf input{ stdin{ }}output{ redis{ host => "192.168.137.52"port => "6379"db => "6"data_type => "list" # 数据类型为listkey => "demo"}}启动配置文件输入信息[root@node1 ~]# /opt/logstash/bin/logstash -f redis-out.conf OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=NSettings: Default filter workers: 1Logstash startup completedcheckwww.uc123.com使用redis-cli连接到redis并查看输入的信息[root@node1 ~]# redis-cli -h 192.168.137.52192.168.137.52:6379> select 6OK192.168.137.52:6379[6]> keys 1) "demo"192.168.137.52:6379[6]> keys *1) "demo"192.168.137.52:6379[6]> lindex demo -2[root@node1 ~]# vim shipper.conf
input { syslog { type => "system-syslog"host => "192.168.137.52"port => "514"}tcp { type => "tcp-6666"host => "192.168.137.52"port => "6666"}file { path => "/var/log/messages"type => "system"start_position => "beginning"}file { path => "/var/log/nginx/access_json.log"codec => jsonstart_position => "beginning"type => "nginx-log"}file { path => "/var/log/elasticsearch/check-cluster.log"type => "es-error"start_position => "beginning"codec => multiline { pattern => "^["negate => truewhat => "previous"}}}output { if [type] == "system" { redis { host => "192.168.137.52"port => "6379"db => "6"data_type => "list"key => "system"}}if [type] == "es-error" { redis { host => "192.168.137.52"port => "6379"db => "6"data_type => "list"key => "es-error"}}if [type] == "nginx-log" { redis { host => "192.168.137.52"port => "6379"db => "6"data_type => "list"key => "nginx-log"}}if [type] == "system-syslog" { redis { host => "192.168.137.52"port => "6379"db => "6"data_type => "list"key => "system-syslog"}}if [type] == "tcp-6666" { redis { host => "192.168.137.52"port => "6379"db => "6"data_type => "list"key => "tcp-6666"}}}在redis中查看keys[root@node1 ~]# redis-cli -h 192.168.137.52192.168.137.52:6379> select 6OK192.168.137.52:6379[6]> keys *1) "system-syslog"2) "es-error"3) "system"编写indexer.conf作为redis发送elasticsearch配置文件[root@node1 ~]# cat indexer.conf input { redis { type => "system-syslog"host => "192.168.137.52"port => "6379"db => "6"data_type => "list"key => "system-syslog"}redis { type => "tcp-6666"host => "192.168.137.52"port => "6379"db => "6"data_type => "list"key => "tcp-6666"}redis { type => "system"host => "192.168.137.52"port => "6379"db => "6"data_type => "list"key => "system"}redis { type => "nginx-log"host => "192.168.137.52"port => "6379"db => "6"data_type => "list"key => "nginx-log"}redis { type => "es-error"host => "192.168.137.52"port => "6379"db => "6"data_type => "list"key => "es-error"}}output { if [type] == "system" { elasticsearch { hosts => "192.168.137.52"index => "system-%{+YYYY.MM.dd}"}}if [type] == "es-error" { elasticsearch { hosts => "192.168.137.52"index => "es-error-%{+YYYY.MM.dd}"}}if [type] == "nginx-log" { elasticsearch { hosts => "192.168.137.52"index => "nginx-log-%{+YYYY.MM.dd}"}}if [type] == "system-syslog" { elasticsearch { hosts => "192.168.137.52"index => "system-syslog-%{+YYYY.MM.dd}"}}if [type] == "tcp-6666" { elasticsearch { hosts => "192.168.137.52"index => "tcp-6666-%{+YYYY.MM.dd}"}}}启动shipper.conf[root@node1 ~]# /opt/logstash/bin/logstash -f shipper.conf Settings: Default filter workers: 1由于日志量小,很快就会全部被发送到elasticsearch,key也就没了,所以多写写数据到日志中[root@node1 ~]# for n inseq 10000
;do echo $n >>/var/log/elasticsearch/check-cluster.log;done[root@node1 ~]# for n in seq 10000
;do echo $n >>/var/log/nginx/access.log;doneaccess.log[root@node1 ~]# for n in seq 10000
;do echo $n >>/var/log/nginx/access.log;done[root@node1 ~]# for n in seq 10000
;do echo $n >>/var/log/messages;done查看key的长度看到key在增长192.168.137.52:6379[6]> llen system(integer) 24546192.168.137.52:6379[6]> llen system(integer) 30001启动indexer.conf[root@node1 ~]# /opt/logstash/bin/logstash -f indexer.conf#查看key的长度看到key在减小192.168.137.52:6379[6]> llen system(integer) 29990192.168.137.52:6379[6]> llen system(integer) 29958192.168.137.52:6379[6]> llen system(integer) 29732学习logstash的fliter插件, 熟悉grokfilter插件有很多,在这里就学习grok插件,使用正则匹配日志里的域来拆分。在实际生产中,apache日志不支持jason,就只能使用grok插件匹配;mysql慢查询日志也是无法拆分,只能使用grok正则表达式匹配拆分。 在如下链接,github上有很多写好的grok模板,可以直接引用 在装好的logstash中也会有grok匹配规则,直接可以引用,路径如下[root@node1 patterns]# pwd/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.2/patterns根据官方文档提供的编写grok.conf[root@node1 ~]# cat grok.conf input { stdin {}}filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }}}output { stdout { codec => "rubydebug"}}启动logstash,并根据官方文档提供输入,可得到拆分结果如下显示 使用logstash收集mysql慢查询日志
开启慢查询日志mysql> set global slow_query_log=ON;mysql> set global slow_query_time=2;查看show variables like "%slow%";倒入生产中mysql的slow日志,示例格式如下:SET timestamp=1527172087;
select sleep(20);使用multiline处理,并编写slow.conf[root@node1 ~]# cat mysql-slow.conf input{ file { path => "/var/lib/mysql/node1-slow.log"type => "mysql-slow-log"start_position => "beginning"codec => multiline { pattern => "^# User@Host:"negate => truewhat => "previous"}}}filter { grok { match => { "message" =>"SELECT SLEEP" }add_tag => [ "sleep_drop" ]tag_on_failure => [] # prevent default _grokparsefailure tag on real records}if "sleep_drop" in [tags] { drop {}}grok { match => [ "message", "(?m)^# User@Host: %{USER:user}[[^]]+] @ (?:(?<clienthost>\S) )?[(?:%{IP:clientip})?]\s+Id: %{NUMBER:row_id:int}\s# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s(?:use %{DATA:database};\s)?SET timestamp=%{NUMBER:timestamp};\s(?<query>(?<action>\w+)\s+.)\n#\s*" ]}date { match => [ "timestamp", "UNIX" ]remove_field => [ "timestamp" ]}}output { stdout{ codec => "rubydebug"}}执行该配置文件,查看grok正则匹配结果 [root@node1 ~]# /opt/logstash/bin/logstash -f mysql-slow.conf生产如何上线ELK。日志分类系统日志 rsyslog logstash syslog插件访问日志 nginx logstash codec json错误日志 file logstash file+ mulitline运行日志 file logstash codec json设备日志 syslog logstash syslog插件debug日志 file logstash json or mulitline日志标准化1)路径固定标准化2)格式尽量使用json日志收集步骤系统日志开始->错误日志->运行日志->访问日志 注意 logstash 配置文件里不能有特殊符号转载于:https://blog.51cto.com/8999a/2119865