纳尼?自建K8s集群日志收集还能通过JMQ保存到JES

京东云开发者
• 阅读 248

作者:京东科技 刘恩浩

一、背景

基于K8s集群的私有化交付方案中,日志收集采用了ilogtail+logstash+kafka+es方案,其中ilogtail负责日志收集,logstash负责对数据转换,kafka负责对日志传递中的消峰进而减少es的写入压力,es用来保存日志数据。在私有化交付中本方案中涉及的中间件一般需要单独部署,但是在京东内网环境的部署考虑到kafka和es的高可用,则不推荐采用单独部署的方案。

二、新方案实践

1.新方案简介

在京东内网环境部署K8S收集日志, kafka+es的替代方案考虑使用JMQ+JES,由于JMQ的底层是基于kafaka、JES的底层基于ES,所以该替换方案理论上是可行的

2.主要架构

数据流向大致如下
应用日志 -> ilogtail -> JMQ -> logstash -> JES

3.如何使用

核心改造点汇总

  1. ilogtail nameservers配置
    增加解析JMQ域名的nameserver(京东云主机上无法直接解析.local域名)
spec:
    spec:
      dnsPolicy: "None"
      dnsConfig:
        nameservers:
          - x.x.x.x # 可以解析jmq域名的nameserver
  1. ilogtail flushers配置
    调整发送到JMQ到配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: ilogtail-user-cm
  namespace: elastic-system
data:
  app_stdout.yaml: |
    flushers:
      - Type: flusher_stdout
        OnlyStdout: true
      - Type: flusher_kafka_v2
        Brokers:
          - nameserver.jmq.jd.local:80 # jmq元数据地址
        Topic: ai-middle-k8s-log-prod # jmq topic 
        ClientID: ai4middle4log # Kafka的用户ID(识别客户端并设置其唯一性),对应jmq的Group名称,重要‼️ (https://ilogtail.gitbook.io/ilogtail-docs/plugins/input/service-kafka#cai-ji-pei-zhi-v2)   
  1. logstash kafka&es配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-config
  namespace: elastic-system
  labels:
    elastic-app: logstash
data:
  logstash.conf: |-
    input {
        kafka {
                bootstrap_servers => ["nameserver.jmq.jd.local:80"] #jmq的元数据地址
                group_id => "ai4middle4log" # jmq的Group的名称
                client_id => "ai4middle4log" # jmq的Group的名称,即jmq的省略了kafka中的client_id概念,用Group名称代替
                consumer_threads => 2
                decorate_events => true
                topics => ["ai-middle-k8s-log-prod"] # jmp的topic
                auto_offset_reset => "latest"
                codec => json { charset => "UTF-8" }
        }
    }
    output {
        elasticsearch {
                hosts => ["http://x.x.x.x:40000","http://x.x.x.x:40000","http://x.x.x.x:40000"] # es地址
                index =>  "%{[@metadata][kafka][topic]}-%{+YYYY-MM-dd}" # 索引规则
                user => "XXXXXX" #jes的用户名
                password => "xxxxx" #jes的密码
                ssl => "false"
                ssl_certificate_verification => "false"
        }
    }

ilogtail 的配置如下

# ilogtail-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: ilogtail-ds
  namespace: elastic-system
  labels:
    k8s-app: logtail-ds
spec:
  selector:
    matchLabels:
      k8s-app: logtail-ds
  template:
    metadata:
      labels:
        k8s-app: logtail-ds
    spec:
      dnsPolicy: "None"
      dnsConfig:
        nameservers:
          - x.x.x.x # (京东云主机上)可以解析jmq域名的nameserver
      tolerations:
        - operator: Exists                    # deploy on all nodes
      containers:
        - name: logtail
          env:
            - name: ALIYUN_LOG_ENV_TAGS       # add log tags from env
              value: _node_name_|_node_ip_
            - name: _node_name_
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: spec.nodeName
            - name: _node_ip_
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.hostIP
            - name: cpu_usage_limit           # iLogtail's self monitor cpu limit
              value: "1"
            - name: mem_usage_limit           # iLogtail's self monitor mem limit
              value: "512"
          image: dockerhub.ai.jd.local/ai-middleware/ilogtail-community-edition/ilogtail:1.3.1
          imagePullPolicy: IfNotPresent
          resources:
            limits:
              cpu: 1000m
              memory: 1Gi
            requests:
              cpu: 400m
              memory: 384Mi
          volumeMounts:
            - mountPath: /var/run                       # for container runtime socket
              name: run
            - mountPath: /logtail_host                  # for log access on the node
              mountPropagation: HostToContainer
              name: root
              readOnly: true
            - mountPath: /usr/local/ilogtail/checkpoint # for checkpoint between container restart
              name: checkpoint
            - mountPath: /usr/local/ilogtail/user_yaml_config.d # mount config dir
              name: user-config
              readOnly: true
            - mountPath: /usr/local/ilogtail/apsara_log_conf.json
              name: apsara-log-config
              readOnly: true
              subPath: apsara_log_conf.json
      dnsPolicy: ClusterFirst
      hostNetwork: true
      volumes:
        - hostPath:
            path: /var/run
            type: Directory
          name: run
        - hostPath:
            path: /
            type: Directory
          name: root
        - hostPath:
            path: /etc/ilogtail-ilogtail-ds/checkpoint
            type: DirectoryOrCreate
          name: checkpoint
        - configMap:
            defaultMode: 420
            name: ilogtail-user-cm
          name: user-config
        - configMap:
            defaultMode: 420
            name: ilogtail-apsara-log-config-cm
          name: apsara-log-config
# ilogtail-user-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: ilogtail-user-cm
  namespace: elastic-system
data:
  app_stdout.yaml: |
    enable: true
    inputs:
      - Type: service_docker_stdout
        Stderr: true
        Stdout: true
        K8sNamespaceRegex: ai-train
        ExternalK8sLabelTag:
          platform/resource-name: k8s_label_resource-name
          platform/task-identify: k8s_label_task-identify
          task-id: k8s_label_task-id
          run-id: k8s_label_run-id
          request-id: k8s_label_request-id
    processors:
      - Type: processor_rename
        SourceKeys:
          - k8s_label_resource-name
          - k8s_label_task-identify
          - k8s_label_task-id
          - k8s_label_run-id
          - k8s_label_request-id
          - _namespace_
          - _image_name_
          - _pod_uid_
          - _pod_name_
          - _container_name_
          - _container_ip_
          - __path__
          - _source_
        DestKeys:
          - resource_name
          - task_identify
          - task_id
          - run_id
          - request_id
          - namespace
          - image_name
          - pod_uid
          - pod_name
          - container_name
          - container_ip
          - path
          - source
    flushers:
      - Type: flusher_stdout
        OnlyStdout: true
      - Type: flusher_kafka_v2
        Brokers:
          - nameserver.jmq.jd.local:80 # jmq元数据地址
        Topic: ai-middle-k8s-log-prod # jmq topic 
        ClientID: ai4middle4log # Kafka的用户ID(识别客户端并设置其唯一性),对应jmq的Group名称,重要‼️ (https://ilogtail.gitbook.io/ilogtail-docs/plugins/input/service-kafka#cai-ji-pei-zhi-v2)

  app_file_log.yaml: |
    enable: true
    inputs:
      - Type: file_log
        LogPath: /export/Logs/ai-dt-algorithm-tools
        FilePattern: "*.log"
        ContainerInfo:
          K8sNamespaceRegex: ai-train
          ExternalK8sLabelTag:
            platform/resource-name: k8s_label_resource-name
            platform/task-identify: k8s_label_task-identify
            task-id: k8s_label_task-id
            run-id: k8s_label_run-id
            request-id: k8s_label_request-id

    processors:
      - Type: processor_add_fields
        Fields:
          source: file
      - Type: processor_rename
        SourceKeys:
          - __tag__:k8s_label_resource-name
          - __tag__:k8s_label_task-identify
          - __tag__:k8s_label_task-id
          - __tag__:k8s_label_run-id
          - __tag__:k8s_label_request-id
          - __tag__:_namespace_
          - __tag__:_image_name_
          - __tag__:_pod_uid_
          - __tag__:_pod_name_
          - __tag__:_container_name_
          - __tag__:_container_ip_
          - __tag__:__path__
        DestKeys:
          - resource_name
          - task_identify
          - task_id
          - run_id
          - request_id
          - namespace
          - image_name
          - pod_uid
          - pod_name
          - container_name
          - container_ip
          - path

    flushers:
      - Type: flusher_stdout
        OnlyStdout: true
      - Type: flusher_kafka_v2
        Brokers:
          - nameserver.jmq.jd.local:80
        Topic: ai-middle-k8s-log-prod
        ClientID: ai4middle4log

logstash 的配置如下

# logstash-configmap.yaml
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-config
  namespace: elastic-system
  labels:
    elastic-app: logstash
data:
  logstash.conf: |-
    input {
        kafka {
                bootstrap_servers => ["nameserver.jmq.jd.local:80"] #jmq的元数据地址
                #group_id => "services"
                group_id => "ai4middle4log" # jmq的Group的名称
                client_id => "ai4middle4log" # jmq的Group的名称,即jmq的省略了kafka中的client_id概念,用Group名称代替
                consumer_threads => 2
                decorate_events => true
                #topics_pattern => ".*"
                topics => ["ai-middle-k8s-log-prod"] # jmp的topic
                auto_offset_reset => "latest"
                codec => json { charset => "UTF-8" }
        }
    }


    filter {
      ruby {
          code => "event.set('index_date', event.get('@timestamp').time.localtime + 8*60*60)"
      }
      ruby {
          code => "event.set('message',event.get('contents'))"
      }
      #ruby {
      #    code => "event.set('@timestamp',event.get('time').time.localtime)"
      #}

      mutate {
          remove_field => ["contents"]
          convert => ["index_date", "string"]
          #convert => ["@timestamp", "string"]
          gsub => ["index_date", "T.*Z",""]
          #gsub => ["@timestamp", "T.*Z",""]
      }
    }


    output {
        elasticsearch {
                #hosts => ["https://ai-middle-cluster-es-http:9200"]
                hosts => ["http://x.x.x.x:40000","http://x.x.x.x:40000","http://x.x.x.x:40000"] # es地址
                index =>  "%{[@metadata][kafka][topic]}-%{+YYYY-MM-dd}" # 索引规则
                user => "XXXXXX" #jes的用户名
                password => "xxxxx" #jes的密码
                ssl => "false"
                ssl_certificate_verification => "false"
                #cacert => "/usr/share/logstash/cert/ca_logstash.cer"
        }
        stdout {
            codec => rubydebug
        }
    }

4.核心价值

在私有化部署的基础上通过简单改造实现了与京东内部中间件的完美融合,使得系统在高可用性上适应性更强、可用范围更广。

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
ELK之八
一、logstash结合kafka收集系统日志和nginx日志架构图:!(https://oscimg.oschina.net/oscnet/2d28dece38ea896fdb974165c799ff8130a.png)环境准备:A主机:kibana、e
Stella981 Stella981
3年前
Elasticsearch 索引模板
概述记录自己在工作中将生产的数据按月保存在ES中(通过logstash采集kafka数据到ES),由于生产环境数据量比较庞大(一天的日志量大概在2500万条左右),为了后期减轻服务器压力,方便我们维护,所以需要对我们的日志进行处理,按月建立不同的ES索引库,能够查询最近6个月的日志,关闭前6个月不用的日志。创建模板如果用户每次新建
Stella981 Stella981
3年前
Kubernetes 集群日志管理
Kubernetes开发了一个Elasticsearch附加组件来实现集群的日志管理。这是一个Elasticsearch、Fluentd和Kibana的组合。Elasticsearch是一个搜索引擎,负责存储日志并提供查询接口;Fluentd负责从Kubernetes搜集日志并发送给Elasticsearch;Kibana提供了一个
Stella981 Stella981
3年前
Logstash
 Logstash数据处理工具具有实时渠道能力的数据收集引擎,包含输入、过滤、输出模块,一般在过滤模块中做日志格式化的解析工作日志信息logstshjson形式mysql\\hbase\\ESlogstsh(select\fromuser)ESlogstsh架
Stella981 Stella981
3年前
Kafka+Zookeeper+Filebeat+ELK 搭建日志收集系统
ELKELK目前主流的一种日志系统,过多的就不多介绍了Filebeat收集日志,将收集的日志输出到kafka,避免网络问题丢失信息kafka接收到日志消息后直接消费到LogstashLogstash将从kafka中的日志发往elasticsearchKibana对elasticsearch中的日志数
Wesley13 Wesley13
3年前
EFK实战二
前言在EFK基础架构中,我们需要在客户端部署Filebeat,通过Filebeat将日志收集并传到LogStash中。在LogStash中对日志进行解析后再将日志传输到ElasticSearch中,最后通过Kibana查看日志。上文已经搭建好了EFK的基础环境,本文我们通过真实案例打通三者之间的数据传输以及解决EFK在使用过程中的一些常见问题。
Stella981 Stella981
3年前
Linux日志安全分析技巧
0x00前言我正在整理一个项目,收集和汇总了一些应急响应案例(不断更新中)。GitHub地址:https://github.com/Bypass007/EmergencyResponseNotes本文主要介绍Linux日志分析的技巧,更多详细信息请访问Github地址,欢迎Star。0x01日志简介Lin
Lua将Nginx请求数据写入Kafka——埋点日志解决方案
缘起有一个埋点收集系统,架构是NginxFlume。web,小程序,App等客户端将数据报送至Nginx,Nginx将请求写入本地文件,然后Flume读取日志文件的数据,将日志写入Kafka。这个架构本来没什么问题,但是部署在K8s容器就有问题了,当前一
京东云开发者 京东云开发者
10个月前
2024了,我不想再用AOP收集业务操作日志了 | 京东云技术团队
0.背景在近期的项目中,系统涉及到针对系统的业务操作日志统计功能,由于本系统位于业务链路的中心环节,负责接收上游系统的数据,并将基于用户操作产生的数据传递至下游系统,鉴于业务链路的复杂性和操作场景的多样性,我们计划通过对核心业务数据进行全生命周期的日志记录