深入理解 ELK 中 Logstash 的底层原理 + 填坑指南

2022-06-09 00:00:00 字段 配置 日志 插件 匹配

本文目录如下:

  • 前言

  • 一、部署架构图

  • 二、Logstash 用来做什么?

  • 三、Logstash 的原理

    • 3.1 从 Logstash 自带的配置说起

    • 3.2 Input 插件

    • 3.3 Filter 插件

    • 3.4 Output 插件

    • 3.5 完整配置

  • 四、Logstash 怎么跑起来的

    • 4.1 Logstash 如何运行的

    • 4.2 Logstash 的架构原理

  • 五、Logstash 宕机风险

    • 5.1 Logstash 单点部署的风险

    • 5.2 开机启动 Logstash

  • 六、总结

前言

通过本篇内容,你可以学到如何解决 Logstash 的常见问题、理解 Logstash 的运行机制、集群环境下如何部署 ELK Stack。

在使用 Logstash 遇到了很多坑,本篇也会讲解解决方案。

  • 日志记录的格式复杂,正则表达式非常磨人。
  • 服务日志有多种格式,如何匹配。
  • 错误日志打印了堆栈信息,包含很多行,如何合并。
  • 日志记录行数过多(100多行),被拆分到了其他的日志记录中。
  • 输出到 ES 的日志包含很多无意义字段。
  • 输出到 ES 的日志时间和本来的日志时间相差 8 小时。
  • 如何优化 Logstash 的性能
  • Logstash 单点故障如何处理。

一、部署架构图

上次我们聊到了 ELK Stack 的搭建:

一文带你搭建一套 ELK Stack 日志平台

近悟空正在我们的测试环境部署这一套 ELK,发现还是有很多内容需要再单独拎几篇出来详细讲讲的,这次我会带着大家一起来看下 ELK 中的 Logstash 组件的落地玩法和踩坑之路。

测试环境目前有 12 台机器,其中 有 4 台给后端微服务、Filebeat、Logstash 使用,3 台给 ES 集群和 Kibana 使用。

部署拓扑图如下:

部署说明

  • 4 台服务器给业务微服务服务使用,微服务的日志会存放本机上。

  • 4 台服务器都安装 Filebeat 日志采集器,采集本机的微服务日志,

  • 其中一台服务器安装 Logstash ,Filebeat 发送日志给 Logstash。Logstash 将日志输出到 Elasticsearch 集群中。

  • 3 台服务器都安装有 Elasticsearch 服务,组成 ES 集群。其中一台安装 Kibana 服务,查询 ES 集群中的日志信息。

二、Logstash 用来做什么?

你是否还在苦恼每次生产环境出现问题都需要远程到服务器查看日志文件?

你是否还在为了没有统一的日志搜索入口而烦心?

你是否还在为从几十万条日志中搜索关键信息而苦恼?

没错,Logstash 它来啦,带着所有的日志记录来啦。

Logstash 它是帮助我们收集、解析和转换日志的。作为 ELK 中的一员,发挥着很大的作用。

当然 Logstash 不仅仅用在收集日志方面,还可以收集其他内容,我们熟悉的还是用在日志方面。

三、Logstash 的原理

3.1 从 Logstash 自带的配置说起

Logstash 的原理其实还比较简单,一个输入,一个输出,中间有个管道(不是必须的),这个管道用来收集、解析和转换日志的。如下图所示:

Logstash 组件

Logstash 运行时,会读取 Logstash 的配置文件,配置文件可以配置输入源、输出源、以及如何解析和转换的。

Logstash 配置项中有两个必需元素,输入(inputs)和输出(ouputs),以及一个可选元素 filters 过滤器插件。input 可以配置来源数据,过滤器插件在你指定时修改数据,output 将数据写入目标。

我们来看下 Logstash 软件自带的一个示例配置,文件路径:\logstash-7.6.2\config\logstash-sample.conf

是不是很简单,一个 input 和 一个 output 就搞定了。如下图所示:

但是这种配置其实意义不大,没有对日志进行解析,传到 ES 中的数据是原始数据,也就是一个 message 字段包含一整条日志信息,不便于根据字段搜索。

3.2 Input 插件

配置文件中 input 输入源指定了 beats,而 beats 是一个大家族,Filebeat 只是其中之一。对应的端口 port = 5044,表示 beats 插件可以往 5044 端口发送日志,logstash 可以接收到通过这个端口和 beats 插件通信。

在部署架构图中,input 输入源是 Filebeat,它专门监控日志的变化,然后将日志传给 Logstash。在早期,Logstash 是自己来采集的日志文件的。所以早期的日志检索方案才叫做 ELK,Elasticsearch + Logstash + Kibana,而现在加入了 Filebeat 后,这套日志检索方案属于 ELK Stack,不是 ELKF,摒弃了用首字母缩写来命名。

另外 input 其实有很多组件可以作为输入源,不限于 Filebeat,比如我们可以用 Kafka 作为输入源,将消息传给 Logstash。具体有哪些插件列表,可以参考这个 input 插件列表[1]

3.3 Filter 插件

而对于 Logstash 的 Filter,这个才是 Logstash 强大的地方。Filter 插件也非常多,我们常用到的 grok、date、mutate、mutiline 四个插件。

对于 filter 的各个插件执行流程,可以看下面这张图:

图片来自 Elasticsearch 官网

3.3.1 日志示例

我以我们后端服务打印的日志为例,看是如何用 filter 插件来解析和转换日志的。

logback.xml 配置的日志格式如下:

<encoder>
    <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
</encoder>

日志格式解释如下:

  • 记录日志时间:%d{yyyy-MM-dd HH:mm:ss.SSS}

  • 记录是哪个线程打印的日志:[%thread]

  • 记录日志等级:%-5level

  • 打印日志的类:%logger

  • 记录具体日志信息:%msg%n,这个 msg 的内容就是 log.info("abc") 中的 abc。

通过执行代码 log.info("xxx") 后,就会在本地的日志文件中追加一条日志。

3.3.2 打印的日志内容

从服务器拷贝出了一条日志,看下长什么样,有部分敏感信息我已经去掉了。

2022-06-16 15:50:00.070 [XNIO-1 task-1] INFO  com.passjava.config - 方法名为:MemberController-,请求参数:{省略}

那么 Logstash 如何针对上面的信息解析出对应的字段呢?比如如何解析出打印日志的时间、日志等级、日志信息?

3.3.3 grok 插件

这里就要用到 logstash 的 filter 中的 grok 插件。filebeat 发送给 logstash 的日志内容会放到message 字段里面,logstash 匹配这个 message 字段就可以了。配置项如下所示:

filter {
 grok {
      match => [ "message""(?<logTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\s+\[(?<thread>.*)\]\s+(?<level>\w*)\s{1,2}+(?<class>\S*)\s+-\s+(?<content>.*)\s*"]
  }
}

:日志记录的格式复杂,正则表达式非常磨人。

大家发现没,上面的 匹配 message 的正则表达式还是挺复杂的,这个是我一点一点试出来的。Kibana 自带 grok 的正则匹配的工具,路径如下:

http://<your kibana IP>:5601/app/kibana#/dev_tools/grokdebugger

我们把日志和正则表达式分别粘贴到上面的输入框,点击 Simulate 就可以测试是否能正确匹配和解析出日志字段。如下图所示:

Grok Debugger 工具

有没有常用的正则表达式呢?有的,logstash 官方也给了一些常用的常量来表达那些正则表达式,可以到这个 Github 地址查看有哪些常用的常量。

https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/ecs-v1/grok-patterns

比如可以用 IP 常量来代替正则表达式 IP (?:%{IPV6}|%{IPV4})

好了,经过正则表达式的匹配之后,grok 插件会将日志解析成多个字段,然后将多个字段存到了 ES 中,这样我们可以在 ES 通过字段来搜索,也可以在 kibana 的 Discover 界面添加列表展示的字段。


:我们后端项目的不同服务打印了两种不同格式的日志,那这种如何匹配?

再加一个 match 就可以了。

filter {
 grok {
      match => [ "message""(?<logTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\s+\[(?<thread>.*)\]\s+(?<level>\w*)\s{1,2}+(?<class>\S*)\s+-\s+(?<content>.*)\s*"]
      match => [ "message""(?<logTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\s{1,2}+(?<level>\w*)\s{1,2}+.\s---+\s\[(?<thread>.*)\]+\s(?<class>\S*)\s*:+\s(?<content>.*)\s*"]
  }
}

当任意一个 message 匹配上了这个正则,则 grok 执行完毕。假如还有第三种格式的 message,那么虽然 grok 没有匹配上,但是 message 也会输出到 ES,只是这条日志在 ES 中不会展示 logTime、level 等字段。

3.3.4 multiline 插件

还有一个坑的地方是错误日志一般都是很多行的,会把堆栈信息打印出来,当经过 logstash 解析后,每一行都会当做一条记录存放到 ES,那这种情况肯定是需要处理的。这里就需要使用 multiline 插件,对属于同一个条日志的记录进行拼接。

3.3.4.1 安装 multiline 插件

multiline 不是 logstash 自带的,需要单独进行安装。我们的环境是没有外网的,所以需要进行离线安装。

介绍在线和离线安装 multiline 的方式:

  • 在线安装插件。

在 logstash 根目录执行以下命令进行安装。

bin/logstash-plugin install logstash-filter-multiline

  • 离线安装插件。

在有网的机器上在线安装插件,然后打包。

bin/logstash-plugin install logstash-filter-multiline
bin/logstash-plugin prepare-offline-pack logstash-filter-multiline

拷贝到服务器,执行安装命令。

bin/logstash-plugin install file:///home/software/logstash-offline-plugins-7.6.2.zip

安装插件需要等待 5 分钟左右的时间,控制台界面会被 hang 住,当出现 Install successful 表示安装成功。

检查下插件是否安装成功,可以执行以下命令查看插件列表。当出现 multiline 插件时则表示安装成功。

bin/logstash-plugin list
3.3.4.2 使用 multiline 插件

如果要对同一条日志的多行进行合并,你的思路是怎么样的?比如下面这两条异常日志,如何把文件中的 8 行日志合并成两条日志?

多行日志示例

思路是这样的:

  • 步:每一条日志的行开头都是一个时间,可以用时间的正则表达式匹配到行。
  • 第二步:然后将后面每一行的日志与行合并。
  • 第三步:当遇到某一行的开头是可以匹配正则表达式的时间的,就停止条日志的合并,开始合并第二条日志。
  • 第四步:重复第二步和第三步

按照这个思路,multiline 的配置如下:

filter {
  multiline {
    pattern => "^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}.\d{3}"
    negate => true
    what => "previous"
  }
}

时间的正则表达式就是这个 pattern 字段,大家可以根据自己项目中的日志的时间来定义正则表达式。

  • pattern: 这个是用来匹配文本的表达式,也可以是grok表达式

  • what: 如果pattern匹配成功的话,那么匹配行是归属于上一个事件,还是归属于下一个事件。

    previous: 归属于上一个事件,向上合并。

    next: 归属于下一个事件,向下合并

  • negate:是否对 pattern 的结果取反

    false: 不取反,是默认值

    true: 取反。将多行事件扫描过程中的行匹配逻辑取反(如果pattern匹配失败,则认为当前行是多行事件的组成部分)

参考 multiline 官方文档[2]

3.3.5 多行被拆分

坑:Java 堆栈日志太长了,有 100 多行,被拆分了两部分,一部分被合并到了原来的那一条日志中,另外一部分被合并到了不相关的日志中。

如下图所示,第二条日志有 100 多行,其中后一行被错误地合并到了第三条日志中。

日志合并错乱

为了解决这个问题,我是通过配置 filebeat 的 multiline 插件来截断日志的。为什么不用 logstash 的 multiline 插件呢?因为在 filter 中使用 multiline 没有截断的配置项。filebeat 的 multiline 配置项如下:

multiline.type: pattern
multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}.\d{3}'
multiline.negate: true
multiline.match: after
multiline.max_lines: 50

配置项说明:

  • multiline.pattern:希望匹配到的结果(正则表达式)
  • multiline.negate:值为 true 或 false。使用 false 代表匹配到的行合并到上一行;使用 true 代表不匹配的行合并到上一行
  • multiline.match:值为 after 或 before。after 代表合并到上一行的末尾;before 代表合并到下一行的开头
  • multiline.max_lines:合并的大行数,默认 500
  • multiline.timeout:一次合并事件的超时时间,默认为 5s,防止合并消耗太多时间导致 filebeat 进程卡死

我们重点关注 max_lines 属性,表示多保留多少行后执行截断,这里配置 50 行。

注意:filebeat 和 logstash 我都配置了 multiline,没有验证过只配置 filebeat 的情况。参考 Filebeat 官方文档[3]

3.3.6 mutate 插件

当我们将日志解析出来后,Logstash 自身会传一些不相关的字段到 ES 中,这些字段对我们排查线上问题帮助不大。可以直接剔除掉。

坑:输出到 ES 的日志包含很多无意义字段。

这里我们就要用到 mutate 插件了。它可以对字段进行转换,剔除等。

比如我的配置是这样的,对很多字段进行了剔除。

mutate {
    remove_field => ["agent","message","@version""tags""ecs""input""[log][offset]"]
}

注意:一定要把 log.offset 字段去掉,这个字段可能会包含很多无意义内容。

关于 Mutate 过滤器它有很多配置项可供选择,如下表格所示:

Mutate  过滤器配置选项

参考 Mutate 参考文章[4]

3.3.7 date 插件

到 kibana 查询日志时,发现排序和过滤字段 @timestamp 是 ES 插入日志的时间,而不是打印日志的时间。

这里我们就要用到 date 插件了。

上面的 grok 插件已经成功解析出了打印日志的时间,赋值到了 logTime 变量中,现在用 date 插件将 logTime 匹配下,如果能匹配,则会赋值到 @timestamp 字段,写入到 ES 中的 @timestamp 字段就会和日志时间一致了。配置如下所示:

date {
    match => ["logTime""MMM d HH:mm:ss""MMM dd HH:mm:ss""ISO8601"]
}

但是经过测试写入到 ES 的 @timestamp 日志时间和打印的日志时间相差 8 小时。如下图所示:

我们到 ES 中查询记录后,发现 @timestamp 字段时间多了一个字母 Z,代表 UTC 时间,也就是说 ES 中存的时间比日志记录的时间晚 8 个小时。

我们可以通过增加配置  timezone => "Asia/Shanghai" 来解决这个问题。修改后的配置如下所示:

date {
    match => ["logTime""MMM d HH:mm:ss""MMM dd HH:mm:ss""ISO8601"]
    timezone => "Asia/Shanghai"
  }

调整后,再加一条日志后查看结果,Kibana 显示 @timestamp 字段和日志的记录时间一致了。

3.4 Output 插件

Logstash 解析和转换后的日志后输出到了 Elasticsearch 中,由于我们 ES 是集群部署的,所以需要配置多个  ES 节点地址。

output {
  stdout { }

  elasticsearch {
    hosts => ["10.2.1.64:9200","10.2.1.65:9200","10.27.2.1:9200"]
    index => "qa_log"
  }
}

注意这里的 index 名称 qa_log 必须是小写,不然写入 es 时会报错。

3.5 完整配置

logstah 配置文件内容如下:

input {
  beats {
    port => 9900
  }
}

filter {

  multiline {
    pattern => "^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}.\d{3}"
    negate => true
    what => "previous"
  }

  grok {
      match => [ "message""(?<logTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\s+\[(?<thread>.*)\]\s+(?<level>\w*)\s{1,2}+(?<class>\S*)\s+-\s+(?<content>.*)\s*"]
      match => [ "message""(?<logTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\s{1,2}+(?<level>\w*)\s{1,2}+.\s---+\s\[(?<thread>.*)\]+\s(?<class>\S*)\s*:+\s(?<content>.*)\s*"]
      match => [
           "source""/home/passjava/logs/(?<logName>\w+)/.*.log"
       ]
      overwrite => [ "source"]
      break_on_match => false
  }

  mutate {
    convert => {
      "bytes" => "integer"
    }
    remove_field => ["agent","message","@version""tags""ecs""_score""input""[log][offset]"]
  }

  useragent {
    source => "user_agent"
    target => "useragent"
  }

  date {
    match => ["logTime""MMM d HH:mm:ss""MMM dd HH:mm:ss""ISO8601"]
    timezone => "Asia/Shanghai"
  }
}

output {
  stdout { }

  elasticsearch {
    hosts => ["10.2.1.64:9200","10.2.1.65:9200","10.2.1.66:9200"]
    index => "qa_log"
  }
}

四、Logstash 怎么跑起来的

4.1 Logstash 如何运行的

你会好奇 Logstash 是怎么运行起来的吗?

官方提供的启动方式是执行 logstash -f weblog.conf 命令来启动,当执行这个命令的时候其实会调用 Java 命令,以及设置 java 启动参数,然后传入了一个配置文件 weblog.conf 来启动 Logstash。

cd /home/logstash-7.6.2
sudo ./bin/logstash -f weblog.conf

当启动完之后,我们通过命令来看下 Logstash 的运行状态

ps -ef | grep logstash

执行结果如下图所示,可以看到用到了 Java 命令,设置了 JVM 参数,用到了 Logstash 的JAR 包,传入了参数。

所以建议 Logstash 单独部署到一台服务器上,避免服务器的资源被 Logstash 占用。

Logstash 默认的 JVM 配置是 -Xms1g -Xmx1g,表示分配的小和大堆内存大小为 1 G。

那么这个参数是在哪里配置的呢?全局搜索下 Xms1g,找到是在这个文件里面配置的,config\jvm.options,我们可以修改这里面的 JVM 配置。

我们可以调整 Logstash 的 JVM 启动参数,来优化 Logstash 的性能

另外 Kibana 上面还可以监控 Logstash 的运行状态(不在本篇讨论范围)。

4.2 Logstash 的架构原理

本内容参考这篇 Logstash 架构[5]

Logstash 有多个 input,每个 input 都会有自己的 codec。

数据会先存放到 Queue 中,Logstash 会把 Queue 中的数据分发到不同的 pipeline 中。

然后每一个 pipeline 由 Batcher、filter、output 组成

Batcher 的作用是批量地从Queue中取数据。Batcher 可以配置为一次取一百个数据。

五、Logstash 宕机风险

5.1 Logstash 单点部署的风险

因为 Logstash 是单点部署到一台服务器上,所以会存在两个风险:

  • logstash 突然崩了怎么办?
  • logstash 所在的机器宕机了怎么办?
  • Logstash 所在的机器重启了怎么办?

对于个问题,可以安装 Keepalived 软件来保证高可用。另外即使没有安装,当手动启动 Logstash 后,Logstash 也能将未及时同步的日志写入到 ES。

对于第二个问题,所在的机器宕机了,那可以通过安装两套 Logstash,通过 keepalived 提供的虚拟 IP 功能,切换流量到另外一个 Logstash。关于如何使用 Keepalived,可以参考之前的 实战 MySQL 高可用架构

对于第三个问题,就是把启动 Logstash 的命令放到开机启动脚本中就可以了,但是存在以下问题:

  • Ubuntu 18.04 版本是没有开机启动文件的
  • Logstash 无法找到 Java 运行环境

接下来我们来看下怎么进行配置开机自启动 Logstash。

5.2 开机启动 Logstash

5.2.1 创建自动启动脚本

建立rc-local.service文件

sudo vim /etc/systemd/system/rc-local.service

将下列内容复制进 rc-local.service 文件

[Unit]
Description=/etc/rc.local Compatibility
ConditionPathExists=/etc/rc.local
 
[Service]
Type=forking
ExecStart=/etc/rc.local start
TimeoutSec=0
StandardOutput=tty
RemainAfterExit=yes
SysVStartPriority=99
 
[Install]
WantedBy=multi-user.target

创建文件 rc.local

sudo vim /etc/rc.local

添加启动脚本到启动文件中

#!/bin/sh -e

# 启动 logstash
#nohup /home/software/logstash-7.6.2/bin/logstash -f /home/software/logstash-7.6.2/weblog.conf &

# 启动 filebeat
nohup /home/software/filebeat-7.6.2-linux-x86_64/filebeat -e -c /home/software/filebeat-7.6.2-linux-x86_64/config.yml &

exit 0

5.2.2 修改 Java 运行环境

因在开机启动中,logstash 找不到 java 的运行环境,所以需要手动配置下 logstash。

cd /home/software/logstash-7.6.2/bin/
sudo vim logstash.lib.sh

在 setup_java() 方法的行加入 JAVA_HOME 变量,JAVA_HOME 的路径需要根据自己的 java 安装目录来。

JAVA_HOME="/opt/java/jdk1.8.0_181"

修改 Java 运行环境

5.2.3 权限问题

给 rc.local 加上权限,启用服务

sudo chmod +x /etc/rc.local
sudo systemctl enable rc-local
sudo systemctl stop rc-local.service
sudo systemctl start rc-local.service
sudo systemctl status rc-local.service

Logstash 启动成功

然后重启机器,查看 logstash进程是否正在运行,看到一大串 java 运行的命令则表示 logstash 正在运行。

ps -ef | grep logstash

六、总结

本篇讲解了 Logstash 在集群环境下的部署架构图、Logstash 遇到的几大坑、以及 Logstash 的运行机制和架构原理。

Logstash 还是非常强大的,有很多功能未在本篇进行讲解,本篇也是抛砖引玉,感兴趣的读者朋友们可以加我好友 passjava 共同探索。

更多好文请查看:

实战 MySQL 高可用架构

一文带你搭建一套 ELK Stack 日志平台

巨人的肩膀

https://blog.csdn.net/xzk9381/article/details/109571087
https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html
https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/ecs-v1/grok-patterns
https://www.runoob.com/regexp/regexp-syntax.html
https://www.elastic.co/guide/en/beats/libbeat/current/config-file-permissions.html
https://www.tutorialspoint.com/logstash/logstash_supported_outputs.htm

参考资料

[1]input 插件列表: https://www.elastic.co/guide/en/logstash/current/input-plugins.html

[2]multiline 官方文档: https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html#plugins-codecs-multiline-negate

[3]Filebeat 官方文档: https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html

[4]Mutate 参考文章: https://blog.csdn.net/UbuntuTouch/article/details/106466873

[5]Logstash 架构: https://jenrey.blog.csdn.net/article/details/107122930

- END -


相关文章