Lua将Nginx请求数据写入Kafka——埋点日志解决方案

那年烟雨落申城
• 阅读 499

缘起

有一个埋点收集系统,架构是Nginx+Flume。web,小程序,App等客户端将数据报送至Nginx,Nginx将请求写入本地文件,然后Flume读取日志文件的数据,将日志写入Kafka。这个架构本来没什么问题,但是部署在K8s容器就有问题了,当前一个Nginx后面是3个Flume,Nginx根据渠道将日志写入web.log,mp.log,app.log,3个log文件各对应一个Flume将数据写入Kafka,遇到的问题首先是health check问题,K8s一个容器只能提供1个health check地址(最佳实战),因为Nginx后面有3个Flume,所以无法感知这3个健康状态,最多感知一个。第二个动态扩容问题,比如某一段时间web报送数据较多,但其它两个渠道较少,扩容成两个pod会浪费资源,第三个是优雅退出问题,由于Nginx速度较快,写入文件也比较快,Flume处理的较慢,导致如果我想把这个容器关闭的话,不知道Flume有没有把所有的日志都写入Kafka。 优化思路:Flume其实是有多个进口和多个出口的,对于客户的业务来讲,入口只有Nginx,出口只有Kafka,于是决定将Flume去掉,使用Nginx+Lua脚本形式直接将Nginx的日志写入Kafka,同时写一份文件以备出现问题补数据使用,也作为Bug定位追踪使用。

环境准备

  1. openresty-1.15.8.2 下载地址:http://openresty.org/cn/download.html
    1. lua-resty-kafka 是lua版本的kafka驱动,内置producer 下载地址:https://github.com/doujiang24/lua-resty-kafka
    2. zlib的lua库 解gzip使用的,如果你不用gzip 可以不安装到脚本 下载地址:https://github.com/madler/zlib
    3. lua-zlib lua调用gzip使用的库 下载地址:https://github.com/brimworks/lua-zlib
    4. Nginx和上面组件编译用的依赖:(我用的debian10.11)
      apt-get update -y && apt-get install --fix-missing zlib1g zlib1g-dev libpcre3-dev libssl-dev perl make build-essential curl cmake -y

      步骤

  2. 解压openresty并编译安装
    cp openresty-1.15.8.2.tar.gz /opt/app/source/
    tar -zxvf openresty-1.15.8.2.tar.gz
    

cd openresty-1.15.8.2 && ./configure --prefix=/opt/app/openresty && make && make install

2. 将kafka驱动放入lualib  就是将lua-resty-kafka-0.10.zip解压开,把lua-resty-kafka-0.10/lib下的resty文件夹直接拷贝进/opt/app/openresty/site/lualib/下
```shell
unzip lua-resty-kafka-0.10.zip
cd lua-resty-kafka-0.10/lib 
cp -r resty  /opt/app/openresty/site/lualib/
  1. 安装lua写的zlib库和lua-zlib库
    tar -zxvf zlib-master.tar.gz
    cp -a zlib-master/* /opt/app/openresty/site/lualib/
    

tar -zxvf lua-zlib-master.tgz cd lua-zlib-master
&& cmake -DLUA_INCLUDE_DIR=/opt/app/openresty/luajit/include/luajit-2.1 -DLUA_LIBRARIES=/opt/app/openresty/luajit/lib -DUSE_LUAJIT=ON -DUSE_LUA=OFF
&& make
&& cp zlib.so /opt/app/openresty/lualib/zlib.so

4. 编写kafkaconfig.lua并放入/opt/app/openresty/site/lualib/  这一步是为了配置kafka的ip,端口和topic。
内容是:
```lua
kafka_broker_list={
    {host="192.168.1.1",port=9092},
    {host="192.168.1.2",port=9092},
    {host="192.168.1.3",port=9092}
}

kafka_topic_mp="mp_topic"
kafka_topic_app="app_topic"
kafka_topic_web="web_topic"
  1. 编写nginx配置文件nginx.conf.内容如下:
    worker_processes  auto;
    error_log /opt/app/logs/error.log;
    events {
     worker_connections 10240;
    }
    

http { gzip on; gzip_types application/javascript text/plain text/xml text/css application/x-javascript application/xml text/javascript application/x-httpd-php image/jpeg image/gif image/png; gzip_vary on; gzip_comp_level 9;

server {
    listen 80;
    include /opt/app/openresty/nginx/conf/web.conf;
    }

}

}

web.conf
```conf
location ^~ /health {
    default_type text/html;
    return 200 'ok';
}
# app 批量
location ^~ /app {
    if ($request_method != POST) {
        return 405;
    }
    default_type 'text/plain';
    expires off;
    add_header Last-Modified '';
    add_header Cache-Control 'no-cache';
    add_header Pragma "no-cache";
    empty_gif;
    echo_read_request_body;
    access_by_lua_file /opt/app/openresty/nginx/conf/app.lua;
    access_log   /opt/app/logs/app.log;
}

app.lua:

ngx.req.read_body()
local body = ngx.req.get_body_data()

#引入kafka的生产者
local producer = require "resty.kafka.producer"
#引入上面写的kafka配置
local kafka_config = require "kafkaconfig"

# 因为引入了kafkaconfig 里面的变量直接访问就好
# local p = producer:new(kafka_broker_list)
# --------> 2023-03-16 分界线 start<------------
# 请使用以下方式初始化producer 相比于上面的初始化,下面添加了broker刷新时间
# 可以在网络抖动获取不到boker list,防止后续无法继续使用的问题
# 详细见我另一篇文章 https://blog.csdn.net/codeblf2/article/details/129505283
local p = producer:new(kafka_broker_list, {producer_type = "async",refresh_interval=10000})
# --------> 2023-03-16 分界线 end <------------

# 中间为nil的参数是kafka将本条消息写入哪个分区所使用的key,为nil代表轮询写入
local offset, err = p:send(kafka_topic_app, nil, body)
if not offset then
    ngx.say("send err:", err)
    return
end

以上示例只展示了app的,web和mp的类似即可。

点赞
收藏
评论区
推荐文章
Stella981 Stella981
3年前
Nginx 502 Bad Gateway 的错误的解决方案
我用的是nginx反向代理Apache,直接用Apache不会有任何问题,加上nginx就会有部分ajax请求502的错误,下面是我收集到的解决方案。一、fastcgi缓冲区设置过小 出现错误,首先要查找nginx的日志文件,目录为/var/log/nginx,在日志中发现了如下错误 2013/01/1713:33:47\err
Wesley13 Wesley13
3年前
ELK最佳实践
1.ELK最佳实践解析!(https://oscimg.oschina.net/oscnet/50d3ea4fa3946e374b0a03fb0e5795f4cb2.png)a.用户通过nginx或haproxy访问ELK日志统计平台,IP地址为keepalived的vip地址;b.nginx将请求转发到kibana;c.kibana到e
Wesley13 Wesley13
3年前
ELK之八
一、logstash结合kafka收集系统日志和nginx日志架构图:!(https://oscimg.oschina.net/oscnet/2d28dece38ea896fdb974165c799ff8130a.png)环境准备:A主机:kibana、e
Stella981 Stella981
3年前
OpenWRT上进行EXT4格式化和内容写入
在系统自身上进行格式化和内容写入需要停掉所有占用存储卡的进程,目前只有nginx和vsftpd,然后umount卸载掉,然后进行文件系统创建格式化重新挂载后启用vsftpd将文件传入/etc/init.d/nginxstop/etc/init.d/vsftpdstopblockumountmkfs.ext4/dev/mmcblk0p1
Stella981 Stella981
3年前
Flume使用Kafka Sink导致CPU过高的问题
在日志收集服务器上使用Flume(1.6)的KafkaSink将日志数据发送至Kafka,在FlumeAgent启动之后,发现每个Agent的CPU使用率都非常高,而我们需要在每台机器上启动多个FlumeAgent来收集不同类型的日志,如果每个Agent都这样,那肯定会把机器的CPU吃满了,刚开始使用jstack定位到是org.apache.flume
Stella981 Stella981
3年前
JVM 字节码指令表
字节码助记符指令含义0x00nop什么都不做0x01aconst\_null将null推送至栈顶0x02iconst\_m1将int型1推送至栈顶0x03iconst\_0将int型0推送至栈顶0x04iconst\_1将int型1推送至栈顶0x05ic
埋点日志最终解决方案——Golang+Gin+Sarama VS Java+SpringWebFlux+ReactorKafka
埋点日志最终解决方案——GolangGinSaramaVSJavaSpringWebFluxReactorKafka之前我就写过几篇OpenRestyluakafkaclient将埋点数据写入Kafka的文章,如下:以上一步一个坑,有些是自己能力
python定时任务执行shell脚本切割Nginx日志-慎用
Python定时任务执行shell脚本切割Nginx日志(慎用)缘起我们有一个Nginx服务用来接收埋点上报数据,输出的日志文件比较大,Nginx没有自带日志分割组件,这样输出的日志文件就比较大,抽取日志就比较麻烦。网上切割日志的方式一般就两种:logra
京东云开发者 京东云开发者
3个月前
纳尼?自建K8s集群日志收集还能通过JMQ保存到JES
作者:京东科技刘恩浩一、背景基于K8s集群的私有化交付方案中,日志收集采用了logstashkafkaes方案,其中ilogtail负责日志收集,logstash负责对数据转换,kafka负责对日志传递中的消峰进而减少es的写入压力,es用来保存日志
那年烟雨落申城
那年烟雨落申城
Lv1
男 · 众安科技 · 高级Java开发工程师
是你吧,我能从很远很远的地方一眼认出你来
文章
26
粉丝
0
获赞
1