ELK日志分析系统

2020-06-03 00:00:00 数据 分析 开源 日志 收集

一般我们需要进行日志分析场景,往往应运营需要, 对应用进行日志分析, 为运营推广提供有用的运营数据,使运营推广具有目的性、针对性和直接性,我不是运营出身,所以我也不知道怎么去概括这个事情,说白了,就是运营想知道,用户行为是什么样的,用户更关心的是什么等等一系列问题;那我们如何做到呢?通常我们的做法是直接对日志文件进行grep、awk等命令就可以获得想要的信息。但是,随着用户体量的增加,支撑的节点也随之增加,那么传统方法暴露出很多问题,比如:效率低下,日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询等等。这就需要我们集中化处理我们的日志,那么如何对所有服务器上的日志进行收集汇总?常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问。

一个完整的集中式日志系统,需要包含以下几个主要特点:

收集-能够采集多种来源的日志数据

传输-能够稳定的把日志数据传输到中央系统

存储-如何存储日志数据

分析-可以支持 UI 分析

警告-能够提供错误报告,监控机制


ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用。目前主流的一种日志系统。

ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。新增了一个FileBeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。

Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。

Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。

Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志。

这是常见的ELK的结构图

带有beats的结构图

关于安装,这里就不讲了,二进制包,解压就可用;下面我们看一下,用logstash收集系统secure日志和nginx日志,首先我们来看一下secure的logstash收集配置文件:

input {
    file {
        type => "system"
        path => [ "/var/log/secure" ]
        start_position => "beginning"
    }
}

filter {
    grok {
        add_tag => [ "valid" ]
        match => [
        "message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host}%{DATA:syslog_program}(?:\[%{POSINT}\])?: %{WORD:login} password for %{USERNAME:username} from %{IP:ip} %{GREEDYDATA}",
        "message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host}%{DATA:syslog_program}(?:\[%{POSINT}\])?: message repeated 2 times: \[ %{WORD:login} password for %{USERNAME:username} from %{IP:ip} %{GREEDYDATA}",
        "message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host}%{DATA:syslog_program}(?:\[%{POSINT}\])?: %{WORD:login} password for invalid user %{USERNAME:username} from %{IP:ip} %{GREEDYDATA}",
        "message", "%{SYSLOGTIMESTAMP:syslog_date} %{SYSLOGHOST:syslog_host}%{DATA:syslog_program}(?:\[%{POSINT}\])?: %{WORD:login} %{WORD:auth_method} for %{USERNAME:username} from %{IP:ip} %{GREEDYDATA}"
        ]
    }

    if "valid" not in [tags] {
        drop { }
    }

    mutate {
        remove_tag => [ "valid" ]
        lowercase => [ "login" ]
    }


    date {
        match => [ "syslog_date", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
        timezone => "Asia/Shanghai"
    }

    geoip {
        source => "ip"
        fields => ["city_name"]
        database => "/usr/local/lnmp/logstash/GeoLite2-City.mmdb"
    }
}


output {
    # stdout { codec => rubydebug }
    elasticsearch {
        hosts => [ "10.9.117.83:9200", "10.9.165.245:9200", "10.9.13.93:9200" ]
        index => "logstash-ssh-login-%{+YYYY.MM.dd}"
    }
}

相关文章