ELK日志分析系统
一般我们需要进行日志分析场景,往往应运营需要, 对应用进行日志分析, 为运营推广提供有用的运营数据,使运营推广具有目的性、针对性和直接性,我不是运营出身,所以我也不知道怎么去概括这个事情,说白了,就是运营想知道,用户行为是什么样的,用户更关心的是什么等等一系列问题;那我们如何做到呢?通常我们的做法是直接对日志文件进行grep、awk等命令就可以获得想要的信息。但是,随着用户体量的增加,支撑的节点也随之增加,那么传统方法暴露出很多问题,比如:效率低下,日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询等等。这就需要我们集中化处理我们的日志,那么如何对所有服务器上的日志进行收集汇总?常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问。
一个完整的集中式日志系统,需要包含以下几个主要特点:
收集-能够采集多种来源的日志数据
传输-能够稳定的把日志数据传输到中央系统
存储-如何存储日志数据
分析-可以支持 UI 分析
警告-能够提供错误报告,监控机制
ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用。目前主流的一种日志系统。
ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。新增了一个FileBeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。
Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志。
这是常见的ELK的结构图
带有beats的结构图
关于安装,这里就不讲了,二进制包,解压就可用;下面我们看一下,用logstash收集系统secure日志和nginx日志,首先我们来看一下secure的logstash收集配置文件:
input {
file {
type => "system"
path => [ "/var/log/secure" ]
start_position => "beginning"
}
}
filter {
grok {
add_tag => [ "valid" ]
match => [
"message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host}%{DATA:syslog_program}(?:\[%{POSINT}\])?: %{WORD:login} password for %{USERNAME:username} from %{IP:ip} %{GREEDYDATA}",
"message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host}%{DATA:syslog_program}(?:\[%{POSINT}\])?: message repeated 2 times: \[ %{WORD:login} password for %{USERNAME:username} from %{IP:ip} %{GREEDYDATA}",
"message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host}%{DATA:syslog_program}(?:\[%{POSINT}\])?: %{WORD:login} password for invalid user %{USERNAME:username} from %{IP:ip} %{GREEDYDATA}",
"message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host}%{DATA:syslog_program}(?:\[%{POSINT}\])?: %{WORD:login} %{WORD:auth_method} for %{USERNAME:username} from %{IP:ip} %{GREEDYDATA}"
]
}
if "valid" not in [tags] {
drop { }
}
mutate {
remove_tag => [ "valid" ]
lowercase => [ "login" ]
}
date {
match => [ "syslog_date", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
timezone => "Asia/Shanghai"
}
geoip {
source => "ip"
fields => ["city_name"]
database => "/usr/local/lnmp/logstash/GeoLite2-City.mmdb"
}
}
output {
# stdout { codec => rubydebug }
elasticsearch {
hosts => [ "10.9.117.83:9200", "10.9.165.245:9200", "10.9.13.93:9200" ]
index => "logstash-ssh-login-%{+YYYY.MM.dd}"
}
}
相关文章