Logstash:将特定事件写入特定索引

在这篇博文中,您将学习如何使用 Logstash 将特定事件写入特定索引。 Logstash 是一个免费且开放的服务器端数据处理管道,它从多个来源摄取数据,对其进行转换,然后将其发送到您最喜欢的“stash”,在此示例设置中,它是一个 弹性搜索.

Logstash 可以配置为根据条件将特定事件写入特定索引。 跟随以了解如何操作。

Logstash:将特定事件写入特定索引

那么,如何配置 Logstash 将特定事件写入特定的 Elasticsearch 索引? 好吧,如果您要将事件转发到 Logstash,那么在转发到存储此类 Elasticsearch 以进行索引之前,您肯定必须有一些过滤器来解析或提取特定字段。

在我们之前的指南中,我们介绍了使用 Logstash 处理各种日志的各种教程;

如何使用 Logstash 解析 SSH 日志

在 ELK Stack 上处理和可视化 ModSecurity 日志

在 ELK Stack 上可视化 WordPress 用户活动日志

为了演示如何使用 Logstash 将特定事件写入特定索引,我们将考虑有关如何使用 Logstash 解析 SSH 日志的指南。

考虑下面解析 SSH 身份验证事件的 Logstash 配置文件;

  • 无效用户登录失败
May 20 20:18:59 elk sshd[1831]: Failed password for invalid user admin from 192.168.59.1 port 41150 ssh2
  • 接受密码
May 20 20:20:32 elk sshd[1863]: Accepted password for root from 192.168.59.1 port 41174 ssh2
  • 有效用户的密码失败
May 20 20:22:05 elk sshd[1967]: Failed password for kifarunix from 192.168.59.1 port 41190 ssh2
input {   beats {     port => 5044   } } filter {   # parses Successful login events   grok {     match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}s+%{IPORHOST:dst_host}s+%{WORD:syslog_program}[d+]:s+(?<status>Accepteds+password)s+fors+%{USER:auth_user}s+froms+%{SYSLOGHOST:src_host}.*" }     add_field => { "activity" => "SSH Logins" }     add_tag => "successful_login"     }   # parses Failed login events for valid users   grok {     match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}s+%{IPORHOST:dst_host}s+%{WORD:syslog_program}[d+]:s+(?<status>Faileds+password)s+fors+%{USER:auth_user}s+froms+%{SYSLOGHOST:src_host}.*" }     add_field => { "activity" => "SSH Logins" }     add_tag => "failed_login"     }   # parses Failed login events for invalid users   grok {     match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}s+%{IPORHOST:dst_host}s+%{WORD:syslog_program}[d+]:s+(?<status>Faileds+password)s+fors+invalids+users+%{USER:auth_user}s+froms+%{SYSLOGHOST:src_host}.*" }     add_field => { "activity" => "SSH Logins" }     add_tag => "invalid_users"     } } output {    elasticsearch {      hosts => ["localhost:9200"]      manage_template => false      index => "ssh_auth-%{+YYYY.MM}"  } }

根据 Logstash 输出配置部分;

output {    elasticsearch {      hosts => ["localhost:9200"]      manage_template => false      index => "ssh_auth-%{+YYYY.MM}"  } }

所有事件都发送到一个索引,称为 ssh_auth-%{+YYYY.MM}.

Logstash:将特定事件写入特定索引

现在,我们想编写事件 有效用户登录失败, 无效用户的登录失败事件, 和 成功登录事件 到个别指数。

Logstash 支持可以帮助解决此问题的使用条件。

为了轻松演示如何使用 Logstash 将特定事件写入特定索引,我们将配置每个 grok 模式正则表达式,为事件添加一个标签,将事件彼此分开。

例如,对于成功的登录事件,一个标签, 成功登录, 将被添加到事件中,同样适用于 有效用户登录失败无效用户的登录失败事件 事件增加了 登录失败invalid_users 分别标记。

因此,我们将使用 if 语句修改我们的 Logstash 输出,如下所示;

output { if "successful_login" in  {    elasticsearch {      hosts => ["localhost:9200"]      index => "ssh-success-logins-%{+YYYY.MM}"      }    }    else if "failed_login" in  {    elasticsearch {      hosts => ["localhost:9200"]      index => "ssh-failed-logins-%{+YYYY.MM}"      }    }    else if "invalid_users" in  {    elasticsearch {      hosts => ["localhost:9200"]      index => "ssh-invalid-users-%{+YYYY.MM}"      }    }    else {    elasticsearch {      hosts => ["localhost:9200"]      index => "other-ssh-events-%{+YYYY.MM}"      }    } }

这将导致 Logstash 写入 成功登录事件 到 Elasticsearch 索引调用 ssh-success-logins-*, 登录失败事件 索引调用 ssh-failed-logins-*, 无效的用户登录事件 索引调用 ssh-无效用户-*任何其他事件其他 ssh 事件-* 指数。

检查 Logstash 配置语法

Save Logstash 配置文件并使用命令运行语法检查;

/usr/share/logstash/bin/logstash --path.settings /etc/logstash -t
Configuration OK [2021-05-20T21:50:10,111][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

如果你得到 配置OK 那么你就可以走了。

Logstash 现在已准备好接收来自 Filebeat 的事件。

假设您已经设置了 Filebeat 并执行了 ssh 身份验证事件,那么您导航到 Kibana 并验证您是否已创建 Elasticsearch 索引;

您现在可以继续创建 Kibana 索引, Kibana > 索引模式 > 创建索引模式.

来自发现;

好了,您已经将不同的事件写入了不同的 Elasticsearch 索引。

这标志着 Logstash 教程的结束:将特定事件写入特定索引。

其他相关教程

配置 Filebeat-Logstash SSL/TLS 连接的简单方法

如何调试 Logstash Grok 过滤器

将 Wazuh Manager 与 ELK Stack 集成