Every day to be a little better

logstash配置参数语法

语法

  • Logstash 支持少量的数据值类型:
debug => true ### bool
host => "hostname" ### string
port => 514 ### number
match => ["datetime", "UNIX", "ISO8601"] ### array
options => { ### hash
    key1 => "value1",
    key2 => "value2"
}
  • logstash支持的条件判断(condition)
- logstash条件判断参数
-  ==(等于), !=(不等于), <(小于), >(大于), <=(小于等于), >=(大于等于) 
- =~(匹配正则), !~(不匹配正则) 
- in(包含), not in(不包含) 
- and(与), or(或), nand(非与), xor(非或) 
- ()(复合表达式), !()(对复合表达式结果取反)
  • logstash官方插件 input/filter/output/codec
input {
    stdin {}
    filter{}
    syslog {}
}
### 插件搜索 https://github.com/logstash-plugins

input输入插件

标准输入(stdin)
# vi config/stdin.conf
input {
    stdin {
        add_field => {"key" => "value"}
        codec => "plain"
        tags => ["add"]
        type => "std"
    }
}
output {
    stdout{          
        codec => rubydebug 
    }
}
# logtash -f  confi/stdin.conf  运行后 输入helloworld 即可看到输入结果
读取文件(file)
input {
    file {
        path => "/Users/liuhaogui/www/hillinsight/edu_tp/runtime/log/event/*.log" ### 可以数组形式传入多个、也可正则匹配文件 path => ["/var/log/*.log", "/var/log/message"]
        #start_position => beginning
        #ignore_older => 0
        #sincedb_path => "/dev/null"
        type => 'tp_event_150'
        tags => ["tp_event"]
    }
}
### 字段解释
discover_interval : logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒
exclude : 不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开
sincedb_path : 配置定义 sincedb 文件到其他位置
sincedb_write_interval : 每隔多久写一次 sincedb 文件,默认是 15 秒
stat_interval : 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒
start_position : 开始读取文件数据位置,默认是结束位置(等同 tail -F )
ignore_older : 在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是 86400 秒,即一天。
close_older : 一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是 3600 秒,即一小时。
读取网络数据(tcp)
input {
    tcp {
        port => 8888
        mode => "server"
        ssl_enable => false
    }
}
读取Redis数据
input {
    redis {
        data_type => "pattern_channel"
        key => "logstash-*"
        host => "192.168.0.2"
        port => 6379
        threads => 5
    }
}
读取collectd数据
udp {
    port => 25826
    buffer_size => 1452
    workers => 3          # Default is 2
    queue_size => 30000   # Default is 2000
    codec => collectd { }
    type => "collectd"
}

output输出插件

标准输出stdout
output {
    stdout {
        codec => rubydebug
        workers => 2
    }
}
输出文件file
output {
    file {
        path => "/path/to/%{+yyyy/MM/dd/HH}/%{host}.log.gz"
        message_format => "%{message}"
        gzip => true
    }
}
输出elasticsearch
output {
    elasticsearch {
        host => "192.168.0.2"
        protocol => "http"
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        index_type => "%{type}"
        workers => 5
        template_overwrite => true
    }
}
输出redis
output {
    redis {
        data_type => "channel"
        key => "logstash-chan-%{+yyyy.MM.dd}"
    }
}
输出tcp
output {
    tcp {
        host  => "192.168.0.2"
        port  => 8888
        codec => json_lines
    }
}
输出naios
output {
    nagios_nsca {
        nagios_host => "%{host}"
        nagios_service => "logstash_check_%{type}"
        nagios_status => "2"
        message_format => "%{@timestamp}: %{message}"
        host => "nagiosserver.domain.com"
    }
}
输出Email
output {
    email {
        to => "admin@website.com,root@website.com"
        cc => "other@website.com"
        via => "smtp"
        subject => "Warning: %{title}"
        options => {
            smtpIporHost       => "localhost",
            port               => 25,
            domain             => 'localhost.localdomain',
            userName           => nil,
            password           => nil,
            authenticationType => nil, # (plain, login and cram_md5)
            starttls           => true
        }
        htmlbody => ""
        body => ""
        attachments => ["/path/to/filename"]
    }
}
调用命令执行
output {
    exec {
        command => "sendsms.pl \"%{message}\" -t %{user}"
    }
}

filter过滤器

Grok 正则捕获
input {stdin{}}
filter {
    grok {
        match => {
            "message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+"
        }
    }
}
output {stdout{}}

### 多项匹配
match => [
    "message", "(?<request_time>\d+(?:\.\d+)?)",
    "message", "%{SYSLOGBASE} %{DATA:message}",
    "message", "(?m)%{WORD}"
]
时间处理Date
GeoIp查询
filter {
    geoip {
        source => "message"
    }
}
-----
filter {
    geoip {
        fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
    }
}
Spiit拆分
UserAgent匹配归类
filter {
    useragent {
        target => "ua"
        source => "useragent"
    }
}
Key-Value切分
Ruby处理
数值统计(Metrics)

codec编码插件

json编码
input {
    file {
        path => "/var/log/nginx/access.log_json""
        codec => "json"
    }
}
multiline合并多行
input {
    stdin {
        codec => multiline {
            pattern => "^\["
            negate => true
            what => "previous"
        }
    }
}

未经允许不得转载:奇葩菌博客 » logstash配置参数语法

分享到:更多 ()