- Logstash
- Logstash 配置文件结构
- Logstash 导入数据到 ES
- 同步数据库数据到 ES
- FileBeat
- ELK(采集 Tomcat 服务器日志)
- 使用FileBeats将日志发送到Logstash
- Logstash输出数据到Elasticsearch(logstash开头的索引)
- 利用Logstash过滤器解析日志
- 使用Grok插件通过模式匹配的方式来识别日志中的数据
- 使用mutate插件过滤掉不需要的字段
- 使用Date插件将日期格式进行转换
- 利用Logstash过滤器解析日志
- 输出到Elasticsearch指定索引
- 完整的Logstash配置文件
Logstash
- Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到存储库
- Pipeline
- 包含了 input-filter-output 三个阶段的处理流程
- 插件生命周期管理
- 队列管理
- Logstash Event
- 数据在内部流转时的具体表现形式:数据在 input 阶段被转换为 Event,在 output 被转化成目标格式数据
- Event 其实是一个 Java Object,在配置文件中,对 Event 的属性进行增删改查
- Codec (Code / Decode):将原始数据decode成Event;将Event encode成目标数据
- Logstash数据传输原理
- 数据采集与输入:Logstash支持各种输入选择,能够以连续的流式传输方式,轻松地从日志、指标、Web应用以及数据存储中采集数据
- 实时解析和数据转换:通过Logstash过滤器解析各个事件,识别已命名的字段来构建结构,并将它们转换成通用格式,最终将数据从源端传输到存储库中
- 存储与数据导出:Logstash提供多种输出选择,可以将数据发送到指定的地方
- Logstash通过管道完成数据的采集与处理,管道配置中包含input、output和filter(可选)插件
- input和output用来配置输入和输出数据源、filter用来对数据进行过滤或预处理
- Logstash 配置文件结构
- Logstash的管道配置文件对每种类型的插件都提供了一个单独的配置部分,用于处理管道事件
- 每个配置部分可以包含一个或多个插件
- 指定多个filter插件,Logstash会按照它们在配置文件中出现的顺序进行处理
bin/logstash ‐f logstash‐demo.conf
运行
input {
stdin { }
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch { hosts => ["localhost:9200"]}
stdout { codec => rubydebug }
}
-
Logstash Queue
-
- In Memory Queue: 进程Crash,机器宕机,都会引起数据的丢失
- Persistent Queue: 机器宕机,数据也不会丢失; 数据保证会被消费; 可以替代 Kafka等消息队列缓冲区的作用
queue.type: persisted
默认是 memoryqueue.max_bytes: 4gb
-
-
Codec Plugin - Multiline
- pattern: 设置行匹配的正则表达式
- what : 如果匹配成功,那么匹配行属于上一个事件还是下一个事件
- previous / next
- negate : 是否对pattern结果取反
- true / false
input {
stdin {
codec => multiline {
pattern => "^\s"
what => "previous"
}
}
}
filter { }
output {
stdout { codec => rubydebug }
}
- Input Plugin - File
- 支持从文件中读取数据,如日志文件
- 文件读取需要解决的问题:只被读取一次。重启后需要从上次读取的位置继续 (通过 sincedb 实现)
- 读取到文件新内容,发现新文件
- 文件发生归档操作 (文档位置发生变化,日志 rotation),不能影响当前的内容读取
- Filter Plugin:Filter Plugin可以对Logstash Event进行各种处理,例如解析,删除字段,类型转换
- Date: 日期解析
- Dissect: 分割符解析
- Grok: 正则匹配解析
- Mutate: 处理字段。重命名,删除,替换
- Ruby: 利用Ruby 代码来动态修改Event
- Filter Plugin - Mutate
- Convert : 类型转换
- Gsub : 字符串替换
- Split / Join /Merge: 字符串切割,数组合并字符串,数组合并数组
- Rename: 字段重命名
- Update / Replace: 字段内容更新替换
- Remove_field: 字段删除
- Logstash 导入数据到 ES
input {
file {
path => "/home/es/logstash‐7.17.3/dataset/movies.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["id","content","genre"]
}
mutate {
split => { "genre" => "|" }
remove_field => ["path", "host","@timestamp","message"]
}
mutate {
split => ["content", "("]
add_field => { "title" => "%{[content][0]}"}
add_field => { "year" => "%{[content][1]}"}
}
mutate {
convert => {
"year" => "integer"
}
strip => ["title"]
remove_field => ["path", "host","@timestamp","message","content"]
}
}
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "movies"
document_id => "%{id}"
user => "elastic"
password => "123456"
}
stdout { }
}
- 同步数据库数据到 ES: 借助 JDBC Input Plugin 将数据从数据库读到 Logstash
- 需要自己提供所需的 JDBC Driver
- JDBC Input Plugin 支持定时任务 Scheduling,其语法来自 Rufus-scheduler
- 其扩展了 Cron,使用 Cron 的语法可以完成任务的触发
- JDBC Input Plugin 支持通过 Tracking_column / sql_last_value 的方式记录 State,最终实现增量的更新
- 需要拷贝 jdbc 依赖到 logstash/drivers 目录下
- ES 创建 alias,只显示没有被标记 deleted 的用户
- 需要数据库表中有 deleted 字段
input {
jdbc {
jdbc_driver_library => "/home/es/logstash‐7.17.3/drivers/mysql‐connector-java‐5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/test?useSSL=false"
jdbc_user => "root"
jdbc_password => "123456"
# 启用追踪,如果为true,则需要指定tracking_column
use_column_value => true
# 指定追踪的字段,
tracking_column => "last_updated"
# 追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型
tracking_column_type => "numeric"
# 记录最后一次运行的结果
record_last_run => true
# 上面运行结果的保存位置
last_run_metadata_path => "jdbc‐position.txt"
statement => "SELECT * FROM user where last_updated >:sql_last_value;"
schedule => " * * * * * *"
}
}
output {
elasticsearch {
document_id => "%{id}"
document_type => "_doc"
index => "users"
hosts => ["http://localhost:9200"]
user => "elastic"
password => "123456"
}
stdout{
codec => rubydebug
}
}
# 创建 alias,只显示没有被标记 deleted 的用户
POST /_aliases
{"actions":[{"add":{
"index":"users",
"alias":"view_users",
"filter":{"term":{"is_deleted":0}}}}]}
FileBeat
- Beats 轻量型数据采集器是一个免费且开放的平台,集合了多种单一用途的数据采集器
- FileBeat 专门用于转发和收集日志数据的轻量级采集工具
- 它可以作为代理安装在服务器上,FileBeat监视指定路径的日志文件,收集日志数据
- 并将收集到的日志转发到Elasticsearch或者Logstash
- FileBeat 的工作原理
- ![[…/…/assets/Attachment/Pasted image 20240527042649.png | 400]]
- 启动FileBeat时,会启动一个或者多个输入(Input),这些Input监控指定的日志数据位置
- FileBeat会针对每一个文件启动一个Harvester(收割机)
- Harvester读取每一个文件的日志,将新的日志发送到libbeat
- libbeat将数据收集到一起,并将数据发送给输出(Output)
- logstash vs FileBeat
- Logstash是在jvm上运行的,资源消耗比较大。而FileBeat是基于golang编写的,功能较少但资源消耗也比较小,更轻量级
- Logstash 和Filebeat都具有日志收集功能,Filebeat更轻量,占用资源更少
- Logstash 具有Filter功能,能过滤分析日志
- 一般结构都是Filebeat采集日志,然后发送到消息队列、Redis、MQ中,然后Logstash去获取,利用Filter功能过滤分析,然后存储到Elasticsearch中
- FileBeat和Logstash配合,实现背压机制
- 当将数据发送到Logstash或 Elasticsearch时,Filebeat使用背压敏感协议,以应对更多的数据量
- 如果Logstash正在忙于处理数据,则会告诉Filebeat 减慢读取速度
- 一旦拥堵得到解决,Filebeat就会恢复到原来的步伐并继续传输数据
filebeat.yml
output.elasticsearch:
hosts: ["192.168.65.174:9200","192.168.65.192:9200","192.168.65.204:9200"]
username: "elastic"
password: "123456"
setup.kibana:
host: "192.168.65.174:5601"
- 启用和配置数据收集模块
# 查看可以模块列表
./filebeat modules list
# 启用 nginx 模块
./filebeat modules enable nginx
# 启用 Logstash 模块
./filebeat modules enable logstash
# setup 命令加载 Kibana 仪表板。 如果仪表板已经设置,则忽略此命令。
./filebeat setup
# 启动 Filebeat
./filebeat ‐e
# 如果需要更改 nginx 日志路径,修改 modules.d/nginx.yml
‐ module: nginx
access:
var.paths: ["/var/log/nginx/access.log*"]
# 在 modules.d/logstash.yml 文件中修改设置
‐ module: logstash
log:
enabled: true
var.paths: ["/home/es/logstash‐7.17.3/logs/*.log"]
ELK
- 集中化日志管理思路:日志收集 -> 格式化分析 -> 检索和可视化 -> 风险告警
- ELK架构分为两种,一种是经典的ELK,另外一种是加上消息队列(Redis或Kafka或RabbitMQ)和Nginx结构
- 经典的ELK主要是由Filebeat + Logstash + Elasticsearch + Kibana组成
- 适用于数据量小的开发环境,存在数据丢失的危险
- 整合消息队列+Nginx架构:主要加上了Redis或Kafka或RabbitMQ做消息队列,保证了消息的不丢失
- 主要用在生产环境,可以处理大数据量,并且不会丢失数据
- 经典的ELK主要是由Filebeat + Logstash + Elasticsearch + Kibana组成
采集 Tomcat 服务器日志
- 使用FileBeats将日志发送到Logstash
- 因为Tomcat的web log日志都是以IP地址开头的,所以我们需要把不以ip地址开头的行追加到上一行
- multiline 多行日志
- pattern:正则表达式
- negate:true 或 false
- 默认是false,匹配pattern的行合并到上一行
- true,不匹配pattern的行合并到上一行
- match:after 或 before,合并到上一行的末尾或开头
vim filebeat‐logstash.yml
chmod 644 filebeat‐logstash.yml
./filebeat ‐e ‐c filebeat‐logstash.yml
filebeat.inputs:
‐ type: log
enabled: true
paths:
‐ /home/es/apache‐tomcat‐8.5.33/logs/*access*.*
multiline.pattern: '^\\d+\\.\\d+\\.\\d+\\.\\d+ '
multiline.negate: true
multiline.match: after
output.logstash:
enabled: true
hosts: ["192.168.65.204:5044"]
- Logstash输出数据到Elasticsearch
filebeat‐elasticSearch.conf
bin/logstash ‐f config/filebeat‐elasticSearch.conf ‐‐config.reload.automatic
- ES中会生成一个以logstash开头的索引
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
user => "elastic"
password => "123456"
}
stdout{
codec => rubydebug
}
}
- 利用Logstash过滤器解析日志
bin/logstash‐plugin list
- Grok插件:Grok是一种将非结构化日志解析为结构化的插件
- 适合用来解析系统日志、Web服务器日志、MySQL或者是任意其他的日志格式
- Grok是通过模式匹配的方式来识别日志中的数据,可以把Grok插件简单理解为升级版本的正则表达式
- 它拥有更多的模式,默认Logstash拥有120个模式。如果这些模式不满足我们解析日志的需求,我们可以直接使用正则表达式来进行匹配
- 可以使用Kibana来进行Grok开发
- Grok语法
%{SYNTAX:SEMANTIC}
- SYNTAX(语法)指的是Grok模式名称
- SEMANTIC(语义)是给模式匹配到的文本字段名
%{NUMBER:duration} %{IP:client}
- duration表示:匹配一个数字,client表示匹配一个IP地址
- 默认在Grok中,所有匹配到的的数据类型都是字符串
- 转换成int类型(目前只支持int和float):
%{NUMBER:duration:int} %{IP:client}
- 转换成int类型(目前只支持int和float):
filter {
grok {
# 192.168.65.103 ‐ ‐ [23/Jun/2022:22:37:23 +0800] "GET /docs/images/docs‐stylesheet.css HTTP/1.1" 200 5780
match => { "message" => "%{IP:ip} ‐ ‐ \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status} %{INT:length}" }
}
}
- 使用mutate插件过滤掉不需要的字段
mutate {
enable_metric => "false"
remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]
}
- 要将日期格式进行转换,可以使用Date插件来实现
filter {
date {
match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy‐MM‐dd HH:mm:ss"]
target => "date"
}
}
- 输出到Elasticsearch指定索引
- index来指定索引名称,默认输出的index名称为:
logstash-%{+yyyy.MM.dd}
- 要在index中使用时间格式化,filter的输出必须包含
@timestamp
字段,否则将无法解析日期
- index来指定索引名称,默认输出的index名称为:
- 注意:index名称中,不能出现大写字符
output {
elasticsearch {
index => "tomcat_web_log_%{+YYYY‐MM}"
hosts => ["http://localhost:9200"]
user => "elastic"
password => "123456"
}
stdout{
codec => rubydebug
}
}
- 完整的Logstash配置文件
bin/logstash ‐f config/filebeat‐filter‐es.conf ‐‐config.reload.automatic
input {
beats {
port => 5044
}
}
filter {
grok {
match => {
"message" => "%{IP:ip} ‐ ‐ \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status:int} %{INT:length:int}"
}
}
mutate {
enable_metric => "false"
remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]
}
date {
match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy‐MM‐dd HH:mm:ss"]
target => "date"
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
index => "tomcat_web_log_%{+YYYY‐MM}"
hosts => ["http://localhost:9200"]
user => "elastic"
password => "123456"
}
}