EFK实战二

Wesley13
• 阅读 705

前言

在EFK基础架构中,我们需要在客户端部署Filebeat,通过Filebeat将日志收集并传到LogStash中。在LogStash中对日志进行解析后再将日志传输到ElasticSearch中,最后通过Kibana查看日志。

上文已经搭建好了EFK的基础环境,本文我们通过真实案例打通三者之间的数据传输以及解决EFK在使用过程中的一些常见问题。

首先看一下实际的业务日志

2020-01-09 10:03:26,719 INFO ========GetCostCenter Start===============
2020-01-09 10:03:44,267 WARN 成本中心编码少于10位!{"deptId":"D000004345","companyCode":"01"}
2020-01-09 10:22:37,193 ERROR java.lang.IllegalStateException: SessionImpl[abcpI7fK-WYnW4nzXrv7w,]: can't call getAttribute() when session is no longer valid.
 at com.caucho.server.session.SessionImpl.getAttribute(SessionImpl.java:283)
 at weaver.filter.PFixFilter.doFilter(PFixFilter.java:73)
 at com.caucho.server.dispatch.FilterFilterChain.doFilter(FilterFilterChain.java:87)
 at weaver.filter.MonitorXFixIPFilter.doFilter(MonitorXFixIPFilter.java:30)
 at weaver.filter.MonitorForbiddenUrlFilter.doFilter(MonitorForbiddenUrlFilter.java:133)

「日志组成格式为:」
时间 日志级别 日志详情
那么我们的主要任务就是将这段日志正常写入EFK中。

filebeat安装配置

  • 下载filebeat7.5.1

  • 将下载后的文件上传至服务器并解压tar -zxvf filebeat-7.5.1-linux-x86_64.tar.gz

  • 修改filebeat.yml,

    filebeat.inputs: - type: log   enabled: true   paths:     - /app/weaver/Resin/log/xxx.log

此段配置日志输入,指定日志存储路径

output.logstash:
  # The Logstash hosts
  hosts: ["172.31.0.207:5044"]

此段配置日志输出,指定Logstash存储路径

  • 启动filebeat
    ./filebeat -e -c filebeat.yml
    如果需要静默启动,则使用 nohup ./filebeat -e -c filebeat.yml & 命令启动即可。

logstash配置

logstash的配置主要分为三段 inputfilteroutputinput用于指定输入,主要是开放端口给Filebeat用于接收日志filter用于指定过滤,对日志内容进行解析过滤。output用于指定输出,直接配置ES的地址即可

input {
  beats {
    port => 5044
  }
}

output {
  elasticsearch {
    hosts => ["http://172.31.0.127:9200"]
    index => "myindex-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "xxxxxx"
  }
}

我们配置好logstash后通过命令重启logstash
docker-compose -f elk.yml restart logstash

经过上述两步配置后应用程序往日志文件写入日志,filebeat会将日志写入logstash。在kibana查看写入的日志结果如下:EFK实战二

日志显示有2个问题:

  • 由于错误日志堆栈信息有多行,在kibana中展示成了多行,数据查看很乱。需要将堆栈异常整理成一行显示。

  • 需要对日志进行解析,拆成“时间 日志级别 日志详情”的显示格式。

优化升级

  • 在filebeat中设置合并行
    filebeat默认是行传输的,但是我们的日志肯定是多行一个日志,我们要把多行合并到一起就要找到日志的规律。比如我们的日志格式全都是以时间格式开头,所以我们在filebeat中 filebeat.inputs区域添加如下几行配置

      # 以日期作为前缀   multiline.pattern: ^\d{4}-\d{1,2}-\d{1,2}   # 开启多行合并   multiline.negate: true   # 合并到上一行之后   multiline.match: after

  • 在logstash中设置对日志的解析
    将日志解析成“时间 日志级别 日志详情”的展示格式,所以我们需要在logstash配置文件中添加filter段

    filter { grok{ match => { "message" => "(?\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}),\d{3} %{LOGLEVEL:loglevel} (?.*)" } } }

这里主要是使用grok语法对日志进行解析,通过正则表达式对日志进行过滤。大家可以通过kibana里的grok调试工具进行调试EFK实战二

配置完成后我们重新打开kibana Discover界面查看日志,符合预期,完美!EFK实战二

常见问题

kibana 乱码

这个主要原因还是客户端日志文件格式有问题,大家可以通过 file xxx.log查看日志文件的编码格式,如果是ISO8859的编码基本都会乱码,我们可以在filebeat配置文件中通过encoding指定日志编码进行传输。

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /app/weaver/Resin/log/xxx.log
  encoding: GB2312

kibana 提取字段出错

EFK实战二

如上所示,打开kibana Discover面板时出现此异常,大家只要删除ES中的 .kibana_1索引然后重新访问Kibana即可。EFK实战二

查看周围文件

我们在终端查看日志某关键字时一般会查上下文信息便于排查问题,如经常用到的指令 cat xxx.log | grep -C50 keyword,那么在Kibana中如何实现这功能呢。EFK实战二

在Kibana中搜索关键字,然后找到具体日志记录,点击左边向下箭头,然后再点击“查看周围文档”即可实现。

动态索引

我们日志平台可能需要对接多个业务系统,需要根据业务系统建立不同的索引。

  • 在filebeat中给日志打上标记

    - type: log   ......   fields:     logType: oabusiness

  • 在logstash中根据标记生成索引

    input { beats { port => 5044 } } filter { if [fields][logType] == "oabusiness" { grok{ match => { "message" => "(?\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}),\d{3} %{LOGLEVEL:loglevel} (?.*)" } } } } output { elasticsearch { hosts => ["http://172.31.0.207:9200"] index => "%{[fields][logType]}-%{+YYYY.MM.dd}" user => "elastic" password => "elastic" } }

如果本文对你有帮助,

别忘记来个三连:

点赞,转发,评论

EFK实战二

本文分享自微信公众号 - JAVA日知录(javadaily)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
ELK学习笔记之配置logstash消费kafka多个topic并分别生成索引
0x00 filebeat配置多个topicfilebeat.prospectors:input_type:logencoding:GB2312fields_under_root:truefields:添加字段
Stella981 Stella981
3年前
Android蓝牙连接汽车OBD设备
//设备连接public class BluetoothConnect implements Runnable {    private static final UUID CONNECT_UUID  UUID.fromString("0000110100001000800000805F9B34FB");
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这