ELFK-实时抓取JAVA日志

ELFK-实时抓取JAVA日志

1、前言

本篇博客仅记录java应用日志,通过logstash正则进行数据结构化,输出到elasticsearch中,最后通过kibaka进行展示的完整过程,EK的安装及配置方法,请参考:ELK-Stack简介及安装手册

2、部署环境介绍

平台 IP 用途 E版本 L版本 K版本
CentOS 6.7 64Bit 192.168.2.245 ES+Cerebro+Kibana 6.7.0 6.7.0
CentOS 6.7 64Bit 192.168.2.15 Logstash 6.7.0

3、Logstash的安装与配置

本篇仅记录抓取java日志的全过程,Logstash的安装请参考:ELFK-实时抓取HAPROXY日志

3.1、java样本数据分析与正则编写

  • 样本数据

    1
    2019-10-30 12:20:00,190 [pool-15038-thread-1] INFO  [com.ebring.jfpt.core.service.jfmanage.AutoBankCheckService] - 根据清算平台对账交易,该交易流水支付状态为未支付,交易流水号: 6666666666666
  • 正则分拆:

    1
    (?<TOMCAT_DATESTAMP>20(?>\d\d){1,2}-(?:0?[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9]):?(?:[0-5][0-9])(?::?(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?))) (?<THREAD>\S+) (?<LOG_LEVEL>([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)\s?) (?<COM_RONGLIAN>\S+) - (?<MESSAGE>.*)
  • Kibana Grok Debug:

Simulate后,如无报错则表示规则生效,数据结构化详情见Structured Data一栏

image_1dodpa7111j3v1dcn1jshjlhrp8p.png-79.6kB

3.2、导入java_patterns规则库

1
2
3
4
5
6
7
8
9
10
[root@localhost patterns]# pwd
/usr/share/logstash/patterns
[root@localhost patterns]# cat java_patterns
#JAVA LOGS PATTTERNS
TOMCAT_DATESTAMP 20(?>\d\d){1,2}-(?:0?[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9]):?(?:[0-5][0-9])(?::?(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?))
THREAD \S+
LOG_LEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)\s?
COM_RONGLIAN \S+
UNKNOWN \S+
MESSAGE .*

3.3、导入java_log.conf配置文件

提示:本次收集JAVA日志规则库,均为自行编写,配置文件篇幅过长,由于我司TOMCAT应用,字符集配置不一致导致数据抓取会出现乱码,所以我们最终是按照统一字符集进行收集分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
[root@localhost conf.d]# pwd
/etc/logstash/conf.d
[root@localhost conf.d]# cat java_log.conf
input {
file {
path => "/data/java_log/*"
type => "java-log-utf-8"
exclude => "*.gz"
start_position => "beginning"
stat_interval => "1"
codec => multiline {
charset => "UTF-8"
pattern => "^%{TIMESTAMP_ISO8601} "
negate => true
what => "previous"
}
}
file {
path => "/data/java_log_gbk18030/*"
type => "java-log-gbk18030"
exclude => "*.gz"
start_position => "beginning"
stat_interval => "1"
codec => multiline {
charset => "GB18030"
pattern => "^%{TIMESTAMP_ISO8601} "
negate => true
what => "previous"
}
}
file {
path => "/data/java_log_binary/*"
type => "java-log-binary"
exclude => "*.gz"
start_position => "beginning"
stat_interval => "1"
codec => multiline {
charset => "BINARY"
pattern => "^%{TIMESTAMP_ISO8601} "
negate => true
what => "previous"
}
}
file {
path => "/data/java_log_iso-8859-1/*"
type => "java-log-iso-8859-1"
exclude => "*.gz"
start_position => "beginning"
stat_interval => "1"
codec => multiline {
charset => "ISO-8859-1"
pattern => "^%{TIMESTAMP_ISO8601} "
negate => true
what => "previous"
}
}
file {
path => "/data/java_log_gb2312/*"
type => "java-log-gb2312"
exclude => "*.gz"
start_position => "beginning"
stat_interval => "1"
codec => multiline {
charset => "GB2312"
pattern => "^%{TIMESTAMP_ISO8601} "
negate => true
what => "previous"
}
}
}

filter {
grok {
patterns_dir => ["/usr/share/logstash/patterns"]
match => { "message" => "%{TOMCAT_DATESTAMP:tomcat_datestamp} %{THREAD:thread} %{LOG_LEVEL:log_level} %{COM_RONGLIAN:com_ronglian} %{UNKNOWN:unknown} %{MESSAGE:message}" }
overwrite => [ "message" ]
}
date {
match => [ "tomcat_datestamp", "yyyy-MM-dd HH:mm:ss,SSS" ]
target => "@timestamp"
}
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
mutate {
add_field => { "remote_ip" => "192.168.2.15" }
convert => ["timestamp", "string"]
gsub => [ "message", "\r", "" ]
gsub => ["timestamp", "T([\S\s]*?)Z", ""]
gsub => ["timestamp", "-", "."]
}
}

output {
if [type] == "java-log-utf-8" {
elasticsearch {
hosts => ["192.168.2.245:9200"]
index => "java-log-utf-8-%{timestamp}"
}
}
if [type] == "java-log-gbk18030" {
elasticsearch {
hosts => ["192.168.2.245:9200"]
index => "java-log-gb18030-%{timestamp}"
}
}
if [type] == "java-log-binary" {
elasticsearch {
hosts => ["192.168.2.245:9200"]
index => "java-log-binary-%{timestamp}"
}
}
if [type] == "java-log-iso-8859-1" {
elasticsearch {
hosts => ["192.168.2.245:9200"]
index => "java-log-iso-8859-1-%{timestamp}"
}
}
if [type] == "java-log-gb2312" {
elasticsearch {
hosts => ["192.168.2.245:9200"]
index => "java-log-gb2312-%{timestamp}"
}
}
}

3.4、重启logstash

提示:日志如无ERROR级别的日志,表示数据已开始向ES导入,随后请登录kibana查看数据情况

1
2
[root@elk logstash]# service logstash restart
[root@elk logstash]# tail -f /var/log/logstash/logstash-plain.log

4、问题收集与解决方案

-------------本文结束感谢您的阅读-------------
LiGuanCheng wechat
如有问题,请与我微信交流或通过右下角“daovoice”与我联系~。
请我喝一杯咖啡~