ELFK-实时抓取WAF硬件设备日志

ELFK-实时抓取WAF硬件设备日志

1、前言

本篇博客记录我司一台山石waf设备,由于日志查看过于繁琐,需反复登录设备,缺少实时分析和预警功能,后开启设备syslog udp:514端口,通过logstash正则进行数据结构化处理,输出到elasticsearch中,最后通过kibaka进行展示的完整过程,EK的安装及配置方法,请参考:ELK-Stack简介及安装手册

2、部署环境介绍

平台 IP 用途 E版本 L版本 K版本
CentOS 6.7 64Bit 192.168.1.241 ES+Cerebro+Kibana+Logstash 6.7.0 6.7.0 6.7.0

3、配置硬件WAF

3.1、开启山石WAF日志服务器

此处填写ES的地址,我们只针对“网页安全日志”进行收集分析

image_1dodk7i0n1b811sr21lh114chr9m9.png-41.2kB

4、Logstash的配置

本篇仅记录抓取syslog的全过程,Logstash的安装请参考:ELFK-实时抓取HAPROXY日志

4.1、waf样本数据分析与正则编写

  • 样本数据

    1
    Jun 15 22:39:26 261030KSA3454057(root) 4144c203 Web-security@SYS: CRITICAL! From 36.110.101.2:59780 to 192.168.1.133:443, site name: 130-ejf123-443, policy name: 禁止爬虫部分开启SQL, protection type: 104, protection subtype: 10400, action: pass, followed_action: NONE, block_time: N/A, rule ID: 1040010022, rule name: infoleak_srverr:http_stu, vul_id: N/A, severity: 2, domain name: ccbxjmob.ejf123.com, HTTP method: GET, HTTP url: /jfpt/static/html/template_ccbxjmob/1.0/20180718140822/images/favicon.ico, source country/region: CN, message: HTTP Status Code: Server Return HTTP Status Code 404 (Not Found), Matched Data: 404 found within RESPONSE_STATUS: 404, attack count: 1
  • 正则分拆:

    1
    (?:%{SYSLOGTIMESTAMP:syslog_timestamp}|%{TIMESTAMP_ISO8601:timestamp8601}) (?<服务器信息>\S+ \S+ \S+:) (?<告警级别>\S+\!?) (?<A>\S+) (?<客户端IP>%{IP}):(?<客户端端口>%{INT}) (?<B>\S+) (?<服务端IP>%{IP}):(?<服务端端口>%{INT}), (?<C>\S+ \S+) (?<站点名称>\S+) (?<D>\S+ \S+) (?<策略名称>\S+) (?<E>\S+ \S+) (?<防护类型>\S+) (?<F>\S+ \S+) (?<防护子类型>\S+) (?<G>\S+) (?<动作>\S+) (?<H>\S+) (?<后续动作>\S+) (?<J>\S+) (?<阻塞时间>\S+) (?<K>\S+ \S+) (?<规则ID>\S+) (?<L>\S+ \S+) (?<规则名称>\S+) (?<M>\S+) (?<漏洞ID>\S+) (?<I>\S+ \S+) (?<O>\S+ \S+) (?<域名>%{HOSTNAME}\S+) (?<Q>\S+ \S+) (?<HTTP请求方法>\S+) (?<P>\S+ \S+) (?<HTTPURL>\S+) (?<R>\S+ \S+) (?<攻击源>\S+) (?<X>\S+) (?<消息>.*,) (?<T>\S+ \S+) (?<攻击次数>%{BASE16NUM})
  • Kibana Grok Debug:

Simulate后,如无报错则表示规则生效,数据结构化详情见Structured Data一栏

image_1dodl7665p2n1ta1ol63utvag9.png-102kB

4.2、导入waf_patterns规则库

1
2
3
4
5
[root@elk patterns]# pwd
/usr/share/logstash/patterns
[root@elk patterns]# cat waf_patterns
#20190615 wafaccess log from 192.168.1.133
WAFACCESS (?:%{SYSLOGTIMESTAMP:syslog_timestamp}|%{TIMESTAMP_ISO8601:timestamp8601}) (?<服务器信息>\S+ \S+ \S+:) (?<告警级别>\S+\!?) (?<A>\S+) (?<客户端IP>%{IP}):(?<客户端端口>%{INT}) (?<B>\S+) (?<服务端IP>%{IP}):(?<服务端端口>%{INT}), (?<C>\S+ \S+) (?<站点名称>\S+) (?<D>\S+ \S+) (?<策略名称>\S+) (?<E>\S+ \S+) (?<防护类型>\S+) (?<F>\S+ \S+) (?<防护子类型>\S+) (?<G>\S+) (?<动作>\S+) (?<H>\S+) (?<后续动作>\S+) (?<J>\S+) (?<阻塞时间>\S+) (?<K>\S+ \S+) (?<规则ID>\S+) (?<L>\S+ \S+) (?<规则名称>\S+) (?<M>\S+) (?<漏洞ID>\S+) (?<I>\S+ \S+) (?<O>\S+ \S+) (?<域名>%{HOSTNAME}\S+) (?<Q>\S+ \S+) (?<HTTP请求方法>\S+) (?<P>\S+ \S+) (?<HTTPURL>\S+) (?<R>\S+ \S+) (?<攻击源>\S+) (?<X>\S+) (?<消息>.*,) (?<T>\S+ \S+) (?<攻击次数>%{BASE16NUM})

4.3、导入waf.conf配置文件

提示:本次收集山石WAF的日志规则库,均为自行编写,时间字段为logstsh自带正则实现;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
[root@elk conf.d]# pwd
/etc/logstash/conf.d
[root@elk conf.d]# cat waf.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
syslog {
type => "waf-accesslog"
host => "192.168.1.241"
port => "514"
# tags => "waf"
facility_labels => "local5"
}
}

filter {
if [type] == "waf-accesslog" {
grok {
patterns_dir => ["/usr/share/logstash/patterns"]
match => { "message" => "%{WAFACCESS}" }
}
ruby {
code => "event.set('timestamp', event.get('@timestamp'))"
}
mutate {
rename => { "host" => "host.name" }
remove_field => ["A","B","C","D","E","F","G","H","I","J","K","L","M","O","P","Q","R","T","X"]
remove_field => ["message"]
}
}
}

output {
if [type] == "waf-accesslog" {
elasticsearch {
hosts => ["192.168.1.241:9200"]
index => "waf-accesslog-%{+YYYY.MM.dd}"
}
}

}

4.4、重启logstash

提示:日志如无ERROR级别的日志,表示数据已开始向ES导入

1
2
3
4
5
[root@elk logstash]# service logstash restart
[root@elk logstash]# ss -tlnup | grep 514
udp UNCONN 0 0 192.168.1.241:514 *:* users:(("java",10643,101))
tcp LISTEN 0 50 192.168.1.241:514 *:* users:(("java",10643,86))
[root@elk logstash]# tail -f /var/log/logstash/logstash-plain.log

4.5、登录kibana查看数据情况

  • 建立WAF索引
    image_1dodm1cgoqomu1lghs1vmpbv41f.png-126.2kB

  • 通过fields及filter对数据进行过滤展示
    image_1dodm06q43eohbb6e97bnehq12.png-125.4kB

-------------本文结束感谢您的阅读-------------
LiGuanCheng wechat
如有问题,请与我微信交流或通过右下角“daovoice”与我联系~。
请我喝一杯咖啡~