Logtsash配置语法及常用参数

语法cat ~

  • Logstash 支持少量的数据值类型:
    1
    2
    3
    4
    5
    6
    7
    8
    debug => true ### bool
    host => "hostname" ### string
    port => 514 ### number
    match => ["datetime", "UNIX", "ISO8601"] ### array
    options => { ### hash
    key1 => "value1",
    key2 => "value2"
    }
  • logstash支持的条件判断(condition)
    1
    2
    3
    4
    5
    6
    - logstash条件判断参数
    - ==(等于), !=(不等于), <(小于), >(大于), <=(小于等于), >=(大于等于)
    - =~(匹配正则), !~(不匹配正则)
    - in(包含), not in(不包含)
    - and(与), or(或), nand(非与), xor(非或)
    - ()(复合表达式), !()(对复合表达式结果取反)
  • logstash官方插件 input/filter/output/codec
    1
    2
    3
    4
    5
    6
    input {
    stdin{}
    filter{}
    syslog {}
    }
    ### 插件搜索 https://github.com/logstash-plugins

input输入插件

标准输入(stdin)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# vi config/stdin.conf
input {
stdin {
add_field => {"key" => "value"}
codec => "plain"
tags => ["add"]
type => "std"
}
}
output {
stdout{
codec => rubydebug
}
}
# logtash -f confi/stdin.conf 运行后 输入helloworld 即可看到输入结果
读取文件(file)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
input {
file {
path => "/Users/liuhaogui/www/hillinsight/edu_tp/runtime/log/event/*.log" ### 可以数组形式传入多个、也可正则匹配文件 path => ["/var/log/*.log", "/var/log/message"]
#start_position => beginning
#ignore_older => 0
#sincedb_path => "/dev/null"
type => 'tp_event_150'
tags => ["tp_event"]
}
}
### 字段解释
discover_interval : logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒
exclude : 不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开
sincedb_path : 配置定义 sincedb 文件到其他位置
sincedb_write_interval : 每隔多久写一次 sincedb 文件,默认是 15 秒
stat_interval : 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒
start_position : 开始读取文件数据位置,默认是结束位置(等同 tail -F )
ignore_older : 在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是 86400 秒,即一天。
close_older : 一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是 3600 秒,即一小时。
读取网络数据(tcp)
1
2
3
4
5
6
7
input {
tcp {
port => 8888
mode => "server"
ssl_enable => false
}
}
读取Redis数据
1
2
3
4
5
6
7
8
9
input {
redis {
data_type => "pattern_channel"
key => "logstash-*"
host => "192.168.0.2"
port => 6379
threads => 5
}
}
读取collectd数据
1
2
3
4
5
6
7
8
udp {
port => 25826
buffer_size => 1452
workers => 3 # Default is 2
queue_size => 30000 # Default is 2000
codec => collectd { }
type => "collectd"
}

output输出插件

标准输出stdout
1
2
3
4
5
6
output {
stdout {
codec => rubydebug
workers => 2
}
}
输出文件file
1
2
3
4
5
6
7
output {
file {
path => "/path/to/%{+yyyy/MM/dd/HH}/%{host}.log.gz"
message_format => "%{message}"
gzip => true
}
}
输出elasticsearch
1
2
3
4
5
6
7
8
9
10
output {
elasticsearch {
host => "192.168.0.2"
protocol => "http"
index => "logstash-%{type}-%{+YYYY.MM.dd}"
index_type => "%{type}"
workers => 5
template_overwrite => true
}
}
输出redis
1
2
3
4
5
6
output {
redis {
data_type => "channel"
key => "logstash-chan-%{+yyyy.MM.dd}"
}
}
输出tcp
1
2
3
4
5
6
7
output {
tcp {
host => "192.168.0.2"
port => 8888
codec => json_lines
}
}
输出naios
1
2
3
4
5
6
7
8
9
output {
nagios_nsca {
nagios_host => "%{host}"
nagios_service => "logstash_check_%{type}"
nagios_status => "2"
message_format => "%{@timestamp}: %{message}"
host => "nagiosserver.domain.com"
}
}
输出Email
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
output {
email {
to => "admin@website.com,root@website.com"
cc => "other@website.com"
via => "smtp"
subject => "Warning: %{title}"
options => {
smtpIporHost => "localhost",
port => 25,
domain => 'localhost.localdomain',
userName => nil,
password => nil,
authenticationType => nil, # (plain, login and cram_md5)
starttls => true
}
htmlbody => ""
body => ""
attachments => ["/path/to/filename"]
}
}
调用命令执行
1
2
3
4
5
output {
exec {
command => "sendsms.pl \"%{message}\" -t %{user}"
}
}

filter过滤器

Grok 正则捕获
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input {stdin{}}
filter {
grok {
match => {
"message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+"
}
}
}
output {stdout{}}

### 多项匹配
match => [
"message", "(?<request_time>\d+(?:\.\d+)?)",
"message", "%{SYSLOGBASE} %{DATA:message}",
"message", "(?m)%{WORD}"
]
时间处理Date
GeoIp查询
1
2
3
4
5
6
7
8
9
10
11
filter {
geoip {
source => "message"
}
}
-----
filter {
geoip {
fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
}
}
Spiit拆分
UserAgent匹配归类
1
2
3
4
5
6
filter {
useragent {
target => "ua"
source => "useragent"
}
}
Key-Value切分
Ruby处理
数值统计(Metrics)

codec编码插件

json编码
1
2
3
4
5
6
input {
file {
path => "/var/log/nginx/access.log_json""
codec => "json"
}
}
multiline合并多行
1
2
3
4
5
6
7
8
9
input {
stdin {
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}