核心流程
- 利用logstash查询Elasticsearch.
- 再利用match, mutate提取必要信息.
- 之后利用ruby执行本地shell或者命令获取输出返回值
- 利用aggregate将多个event合并为一个
- 最后发送邮件或者输出
注意, es查询到多条数据在logstash中算是多个event. 如果不做aggregate的话, 查到三条数据就会输出三次, 查到几条就输出几次. 做了aggregate后,会将多个event合并. 但是一定要配合event.cancel.这样会阻止前面的event,只保留最后一次aggregate的event. 也就是说, 你查询到了3条数据是3个event, aggregate是1个event. 整个过程中我们实际产生了4个event.前三个被cancel了, 只有最后一个aggregate没有人cancel
配置如下
input {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "test_indices"
query =>'{"query": {
"bool": {
"filter": [
{
"range": {
"date_search": {
"gte": "now-5d/d",
"lte": "now+1d/d"
}
}
},
{
"term": {
"tags": "errors"
}
}
]
}
}}'
#docinfo => true
# schedule => "* * * * *"
}
}
filter {
grok {
match => {
"message" => "at (?<class>net.ray.[A-Za-z.]+)\.(?<method>[A-Za-z.]+)\([A-Za-z.]+:(?<line_num>[0-9]+)\)"
}
}
mutate {
gsub => [ "class", "\." , "/" ]
}
mutate {
update=>{ "class" => "%{class}.java" }
}
ruby {
code => "
cls = event.get('class')
line=event.get('line_num')
commit_log = `git -C e:/workspace loge:/workspace/src/main/java/#{cls}`
blame_log = `git -C e:/workspace blame -L #{line},#{line} e:/workspace/src/main/java/#{cls}`
event.set('commit_log',commit_log)
event.set('blame_log',blame_log)
"
}
aggregate {
task_id => "%{fields}%{log_source}"
code => "
map['result'] ||= []
map['result'] << 'Log Date:' + event.get('datetime_search')+ ' \ncommit_log: \n' + event.get('commit_log') + '\n\nblame_log: \n' + event.get('blame_log') + ' \n\nJava Stack Trace:\n' + event.get('message')
event.cancel()
"
push_previous_map_as_event => true
}
mutate {
remove_field => [
"tags", "host", "sequence", "@version", "@timestamp"
]
join => {"result" => "\n\n--------------------------------\n\n"}
}
}
output {
stdout {
codec => rubydebug
}
}