缘起
有一个埋点收集系统,架构是Nginx+Flume。web,小程序,App等客户端将数据报送至Nginx,Nginx将请求写入本地文件,然后Flume读取日志文件的数据,将日志写入Kafka。这个架构本来没什么问题,但是部署在K8s容器就有问题了,当前一个Nginx后面是3个Flume,Nginx根据渠道将日志写入web.log,mp.log,app.log,3个log文件各对应一个Flume将数据写入Kafka,遇到的问题首先是health check问题,K8s一个容器只能提供1个health check地址(最佳实战),因为Nginx后面有3个Flume,所以无法感知这3个健康状态,最多感知一个。第二个动态扩容问题,比如某一段时间web报送数据较多,但其它两个渠道较少,扩容成两个pod会浪费资源,第三个是优雅退出问题,由于Nginx速度较快,写入文件也比较快,Flume处理的较慢,导致如果我想把这个容器关闭的话,不知道Flume有没有把所有的日志都写入Kafka。 优化思路:Flume其实是有多个进口和多个出口的,对于客户的业务来讲,入口只有Nginx,出口只有Kafka,于是决定将Flume去掉,使用Nginx+Lua脚本形式直接将Nginx的日志写入Kafka,同时写一份文件以备出现问题补数据使用,也作为Bug定位追踪使用。
环境准备
- openresty-1.15.8.2
下载地址:http://openresty.org/cn/download.html
- lua-resty-kafka 是lua版本的kafka驱动,内置producer 下载地址:https://github.com/doujiang24/lua-resty-kafka
- zlib的lua库 解gzip使用的,如果你不用gzip 可以不安装到脚本 下载地址:https://github.com/madler/zlib
- lua-zlib lua调用gzip使用的库 下载地址:https://github.com/brimworks/lua-zlib
- Nginx和上面组件编译用的依赖:(我用的debian10.11)
apt-get update -y && apt-get install --fix-missing zlib1g zlib1g-dev libpcre3-dev libssl-dev perl make build-essential curl cmake -y
步骤
- 解压openresty并编译安装
cp openresty-1.15.8.2.tar.gz /opt/app/source/ tar -zxvf openresty-1.15.8.2.tar.gz
cd openresty-1.15.8.2 && ./configure --prefix=/opt/app/openresty && make && make install
2. 将kafka驱动放入lualib 就是将lua-resty-kafka-0.10.zip解压开,把lua-resty-kafka-0.10/lib下的resty文件夹直接拷贝进/opt/app/openresty/site/lualib/下
```shell
unzip lua-resty-kafka-0.10.zip
cd lua-resty-kafka-0.10/lib
cp -r resty /opt/app/openresty/site/lualib/
- 安装lua写的zlib库和lua-zlib库
tar -zxvf zlib-master.tar.gz cp -a zlib-master/* /opt/app/openresty/site/lualib/
tar -zxvf lua-zlib-master.tgz
cd lua-zlib-master
&& cmake -DLUA_INCLUDE_DIR=/opt/app/openresty/luajit/include/luajit-2.1 -DLUA_LIBRARIES=/opt/app/openresty/luajit/lib -DUSE_LUAJIT=ON -DUSE_LUA=OFF
&& make
&& cp zlib.so /opt/app/openresty/lualib/zlib.so
4. 编写kafkaconfig.lua并放入/opt/app/openresty/site/lualib/ 这一步是为了配置kafka的ip,端口和topic。
内容是:
```lua
kafka_broker_list={
{host="192.168.1.1",port=9092},
{host="192.168.1.2",port=9092},
{host="192.168.1.3",port=9092}
}
kafka_topic_mp="mp_topic"
kafka_topic_app="app_topic"
kafka_topic_web="web_topic"
- 编写nginx配置文件nginx.conf.内容如下:
worker_processes auto; error_log /opt/app/logs/error.log; events { worker_connections 10240; }
http { gzip on; gzip_types application/javascript text/plain text/xml text/css application/x-javascript application/xml text/javascript application/x-httpd-php image/jpeg image/gif image/png; gzip_vary on; gzip_comp_level 9;
server {
listen 80;
include /opt/app/openresty/nginx/conf/web.conf;
}
}
}
web.conf
```conf
location ^~ /health {
default_type text/html;
return 200 'ok';
}
# app 批量
location ^~ /app {
if ($request_method != POST) {
return 405;
}
default_type 'text/plain';
expires off;
add_header Last-Modified '';
add_header Cache-Control 'no-cache';
add_header Pragma "no-cache";
empty_gif;
echo_read_request_body;
access_by_lua_file /opt/app/openresty/nginx/conf/app.lua;
access_log /opt/app/logs/app.log;
}
app.lua:
ngx.req.read_body()
local body = ngx.req.get_body_data()
#引入kafka的生产者
local producer = require "resty.kafka.producer"
#引入上面写的kafka配置
local kafka_config = require "kafkaconfig"
# 因为引入了kafkaconfig 里面的变量直接访问就好
# local p = producer:new(kafka_broker_list)
# --------> 2023-03-16 分界线 start<------------
# 请使用以下方式初始化producer 相比于上面的初始化,下面添加了broker刷新时间
# 可以在网络抖动获取不到boker list,防止后续无法继续使用的问题
# 详细见我另一篇文章 https://blog.csdn.net/codeblf2/article/details/129505283
local p = producer:new(kafka_broker_list, {producer_type = "async",refresh_interval=10000})
# --------> 2023-03-16 分界线 end <------------
# 中间为nil的参数是kafka将本条消息写入哪个分区所使用的key,为nil代表轮询写入
local offset, err = p:send(kafka_topic_app, nil, body)
if not offset then
ngx.say("send err:", err)
return
end
以上示例只展示了app的,web和mp的类似即可。