EMQ X Kuiper 是映云科技开源的轻量级物联网边缘数据分析和流式处理软件, Kuiper 设计的一个主要目标就是将在云端运行的实时流式计算框架(如 Apache Spark,Apache Storm 和 Apache Flink 等)迁移到边缘端。
Kuiper 参考了云端流式处理项目的架构与实现,结合边缘流式数据处理的特点,采用了编写基于源 (Source),SQL (业务逻辑处理), 目标 (Sink) 的规则引擎来实现边缘端的流式数据处理。 Kuiper 可以运行在各类物联网的边缘使用场景中, 如 流式处理:实现在边缘端的实时流式处理 规则引擎:灵活定义规则引擎,实现告警和消息转发 数据格式与协议转换:实现边缘与云端不同类型的数据格式与异构协议之间灵活转换,实现IT&OT融合 通过 Kuiper 在边缘端的处理,可以提升系统响应速度,节省网络带宽费用和存储成本,以及提高系统安全性等。
Kuiper 除了具备高可扩展性外,还具备与 KubeEdge 框架集成的能力。 感兴趣的读者可以移步官网解锁更多的姿势,笔者这里就不赘述了。 https://docs.emqx.cn/kuiper/latest/
在边缘端安装 Kuiper
kubernetes 环境如下,其中树莓派为边缘端设备
[root@k8s-test-master01 ~]# kubectl get no -o wide
NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
k8s-test-master01 Ready master 16h v1.20.2 172.31.250.220 <none> CentOS Linux 8 (Core) 4.18.0-193.28.1.el8_2.x86_64 cri-o://1.20.0
k8s-test-node01 Ready node 18h v1.20.2 172.31.250.221 <none> Ubuntu 20.04.1 LTS 5.4.0-58-generic containerd://1.3.3-0ubuntu2.2
k8s-test-node02 Ready node 17h v1.20.2 172.31.250.222 <none> openSUSE Leap 15.2 5.3.18-lp152.57-default cri-o://1.17.3
kubeedge-raspberrypi01 Ready agent,edge 116m v1.19.3-kubeedge-v1.5.0 192.168.13.102 <none> Raspbian GNU/Linux 10 (buster) 5.4.51-v7l+ docker://19.3.13
通过 kubernetes 部署 Kuiper,定义节点亲和性,把 Pod 调度到边缘节点,如果拥有的边缘设备量多,资源类别使用 daemonsets
MQTT_SOURCE__DEFAULT__SERVERS 指定的是 kubeedge 边缘端的 mqtt
apiVersion: apps/v1
kind: Deployment
metadata:
name: edge-kuiper
labels:
app: edge-kuiper
spec:
replicas: 1
selector:
matchLabels:
app: edge-kuiper
template:
metadata:
labels:
app: edge-kuiper
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: node-role.kubernetes.io/edge
operator: In
values:
- ""
containers:
- name: edge-kuiper
image: emqx/kuiper:1.1.1-alpine
env:
- name: MQTT_SOURCE__DEFAULT__SERVERS
value: "[tcp://127.0.0.1:1883]"
resources:
requests:
cpu: 100m
memory: 100Mi
limits:
cpu: 1000m
memory: 1024Mi
ports:
- containerPort: 9081
name: mq
hostPort: 9081
livenessProbe:
tcpSocket:
port: 9081
initialDelaySeconds: 60
periodSeconds: 60
Kuiper-manager 是用于管理 Kuiper 节点、流、规则和插件等的 Web 管理控制台
apiVersion: apps/v1
kind: Deployment
metadata:
name: edge-kuiper-manager
labels:
app: edge-kuiper-manager
spec:
replicas: 1
selector:
matchLabels:
app: edge-kuiper-manager
template:
metadata:
labels:
app: edge-kuiper-manager
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: node-role.kubernetes.io/edge
operator: In
values:
- ""
containers:
- name: edge-kuiper-manager
image: emqx/kuiper-manager:1.1.0
resources:
requests:
cpu: 100m
memory: 100Mi
limits:
cpu: 1000m
memory: 1024Mi
ports:
- containerPort: 9082
name: mq
hostPort: 9082
livenessProbe:
tcpSocket:
port: 9082
initialDelaySeconds: 60
periodSeconds: 60
使用 kubectl 命令 create 即可,安装完成后如下
[root@k8s-test-master01 ~]# kubectl get po -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
edge-kuiper-79667bd886-67xtw 1/1 Running 0 104m 172.17.0.3 kubeedge-raspberrypi01 <none> <none>
edge-kuiper-manager-ffb8bd5b-c9bxz 1/1 Running 0 94m 172.17.0.4 kubeedge-raspberrypi01 <none> <none>
edge-nginx-7bd689df6d-4rgvn 1/1 Running 0 124m 172.17.0.2 kubeedge-raspberrypi01 <none> <none>
使用边缘端 IP + 9082 端口访问 Kuiper 控制台,用户密码默认为 admin/public ,添加节点即可。
测试 Kuiper
先对 Kuiper 进行简单的测试
1.进入 Kuiper 容器
kubectl exec -it edge-kuiper-79667bd886-67xtw -- /bin/sh
2.创建温度与湿度流(stream)
/kuiper # bin/kuiper create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'
Connecting to 127.0.0.1:20498...
Stream demo is created.
# 查看创建的流
/kuiper # bin/kuiper show streams
Connecting to 127.0.0.1:20498...
demo
流的名字为 demo, DATASOURCE 为 devices/+/messages ,对应 MQTT 的 toptic,MQTT 默认是安装 Kuiper 时通过变量传入的地址
3.进入 Kuiper 交互界面
/kuiper # bin/kuiper query
Connecting to 127.0.0.1:20498...
kuiper > select * from demo WHERE temperature > 30;
Query was submit successfully.
kuiper >
该 SQL 规则将过滤掉 temperature 小于 30 的数据
4.在边缘端通过 MQTT 客户端制造一些数据
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 50, "humidity" : 20}' -t devices/device_001/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 40, "humidity" : 20}' -t devices/device_001/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 30, "humidity" : 20}' -t devices/device_001/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 20, "humidity" : 20}' -t devices/device_001/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 10, "humidity" : 20}' -t devices/device_001/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 50, "humidity" : 20}' -t devices/device_002/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 40, "humidity" : 20}' -t devices/device_002/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 30, "humidity" : 20}' -t devices/device_002/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 20, "humidity" : 20}' -t devices/device_002/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 10, "humidity" : 20}' -t devices/device_002/messages
-h 指定的是 kubeedge 的 MQTT
5.正常的情况,在 Kuiper 交互界面会打印出过滤后的数据
kuiper > [{"humidity":20,"temperature":50}][{"humidity":20,"temperature":40}][{"humidity":20,"temperature":50}][{"humidity":20,"temperature":40}]
也可以在 Kuiper 控制台创建流和插件等
在云端安装 EMQX
云边协同在边缘计算中是一个很重要的概念,而云边的通道又是云边协同的枢纽,Kuiper 可以订阅 kubeedge 的 mqtt topic ( $hw/events/device/+/twin/update ),还可以把规则处理后的数据推送到云端的 mqtt 。
使用 helm 部署 emqx
# 添加 emqx 仓库
helm repo add emqx https://repos.emqx.io/charts
# 更新仓库
helm repo update
# 查看 emqx 版本
helm search repo emqx
NAME CHART VERSION APP VERSION DESCRIPTION
emqx/emqx 4.2.7 4.2.7 A Helm chart for EMQ X
emqx/emqx-ee 4.2.3 4.2.3 A Helm chart for EMQ X Enterprise
emqx/kuiper 0.9.0 0.9.0 A lightweight IoT edge analytic software
因为是云边集成环境,为了保证云端的 emqx 始终运行在云端节点,故使用 helm template 的方式部署
# 导出 yaml 安装清单
helm template test emqx/emqx > emqx.yaml
修改 yaml 文件,定义节点反亲和性
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: node-role.kubernetes.io/edge
operator: NotIn
values:
- ""
emqx service 对外使用 nodeport 类型
---
# Source: emqx/templates/service.yaml
apiVersion: v1
kind: Service
metadata:
name: test-emqx-external
namespace: default
labels:
app.kubernetes.io/name: emqx
helm.sh/chart: emqx-4.2.7
app.kubernetes.io/instance: test
app.kubernetes.io/managed-by: Helm
spec:
type: NodePort
ports:
- name: mqtt
port: 1883
protocol: TCP
targetPort: mqtt
nodePort: 1883
- name: mqttssl
port: 8883
protocol: TCP
targetPort: mqttssl
nodePort: 8883
- name: mgmt
port: 8081
protocol: TCP
targetPort: mgmt
nodePort: 8081
- name: ws
port: 8083
protocol: TCP
targetPort: ws
nodePort: 8083
- name: wss
port: 8084
protocol: TCP
targetPort: wss
nodePort: 8084
- name: dashboard
port: 18083
protocol: TCP
targetPort: dashboard
nodePort: 18083
selector:
app.kubernetes.io/name: emqx
app.kubernetes.io/instance: test
查看 emqx 集群状态
[root@k8s-test-master01 ~]# kubectl get po -o wide --selector=app.kubernetes.io/name=emqx
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
test-emqx-0 1/1 Running 0 8m10s 192.168.57.195 k8s-test-master01 <none> <none>
test-emqx-1 1/1 Running 0 7m44s 192.168.116.68 k8s-test-node02 <none> <none>
test-emqx-2 1/1 Running 0 7m21s 192.168.85.196 k8s-test-node01 <none> <none>
[root@k8s-test-master01 ~]# kubectl exec -it test-emqx-0 -- emqx_ctl cluster status
Cluster status: #{running_nodes =>
['test@test-emqx-0.test-emqx-headless.default.svc.cluster.local',
'test@test-emqx-1.test-emqx-headless.default.svc.cluster.local',
'test@test-emqx-2.test-emqx-headless.default.svc.cluster.local'],
stopped_nodes => []}
EMQX 默认启用 Dashboard ,通过云端端 IP + 18083 端口访问,用户密码: admin/public
Kuiper 规则
创建 Kuiper 规则,把通过规则处理后的数据上传到云端的 mqtt 1.订阅云端 mqtt 的 demoSink topic ,用于验证 Kuiper 是否把数据上传到云端。
# 47.242.xxx.xxx 是云端 mqtt 的地址
# mosquitto_sub -i test_sub -h 47.242.xxx.xxx -p 1883 -d -t demoSink
Client test_sub sending CONNECT
Client test_sub received CONNACK (0)
Client test_sub sending SUBSCRIBE (Mid: 1, Topic: demoSink, QoS: 0, Options: 0x00)
Client test_sub received SUBACK
Subscribed (mid: 1): 0
2.在边缘端的 kuiper 中创建规则
# 定义规则
cat > /tmp/rule.yaml <<EOF
{
"id": "rule1",
"sql": "select * from demo WHERE temperature > 30",
"actions": [
{
"log": {}
},
{
"mqtt": {
"server": "tcp://$云端IP:1883",
"topic": "demoSink"
}
}
]
}
EOF
# 创建规则
/kuiper # bin/kuiper create rule rule1 -f /tmp/rule.yaml
Connecting to 127.0.0.1:20498...
Creating a new rule from file /tmp/rule.yaml.
Rule rule1 was created successfully, please use 'bin/kuiper getstatus rule rule1' command to get rule status.
# 查看规则
/kuiper # bin/kuiper getstatus rule rule1
Connecting to 127.0.0.1:20498...
{
"source_demo_0_records_in_total": 0,
"source_demo_0_records_out_total": 0,
"source_demo_0_exceptions_total": 0,
"source_demo_0_process_latency_us": 0,
"source_demo_0_buffer_length": 0,
"source_demo_0_last_invocation": 0,
"op_1_preprocessor_demo_0_records_in_total": 0,
"op_1_preprocessor_demo_0_records_out_total": 0,
"op_1_preprocessor_demo_0_exceptions_total": 0,
"op_1_preprocessor_demo_0_process_latency_us": 0,
"op_1_preprocessor_demo_0_buffer_length": 0,
"op_1_preprocessor_demo_0_last_invocation": 0,
"op_2_filter_0_records_in_total": 0,
"op_2_filter_0_records_out_total": 0,
"op_2_filter_0_exceptions_total": 0,
"op_2_filter_0_process_latency_us": 0,
"op_2_filter_0_buffer_length": 0,
"op_2_filter_0_last_invocation": 0,
"op_3_project_0_records_in_total": 0,
"op_3_project_0_records_out_total": 0,
"op_3_project_0_exceptions_total": 0,
"op_3_project_0_process_latency_us": 0,
"op_3_project_0_buffer_length": 0,
"op_3_project_0_last_invocation": 0,
"sink_log_0_0_records_in_total": 0,
"sink_log_0_0_records_out_total": 0,
"sink_log_0_0_exceptions_total": 0,
"sink_log_0_0_process_latency_us": 0,
"sink_log_0_0_buffer_length": 0,
"sink_log_0_0_last_invocation": 0,
"sink_mqtt_1_0_records_in_total": 0,
"sink_mqtt_1_0_records_out_total": 0,
"sink_mqtt_1_0_exceptions_total": 0,
"sink_mqtt_1_0_process_latency_us": 0,
"sink_mqtt_1_0_buffer_length": 0,
"sink_mqtt_1_0_last_invocation": 0
}
3.使用 mosquitto_pub 客户端发送一些数据
/ # mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 30, "humidity" : 20}' -t devices/device_002/messages
/ # mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 40, "humidity" : 20}' -t devices/device_002/messages
/ # mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 30, "humidity" : 20}' -t devices/device_002/messages
/ # mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 31, "humidity" : 20}' -t devices/device_002/messages
/ # mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 21, "humidity" : 20}' -t devices/device_002/messages
/ # mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1 -p 1883 -m '{"temperature": 40, "humidity" : 20}' -t devices/device_001/messages
4.查看规则状态
/kuiper # bin/kuiper getstatus rule rule1
Connecting to 127.0.0.1:20498...
{
"source_demo_0_records_in_total": 13,
"source_demo_0_records_out_total": 13,
"source_demo_0_exceptions_total": 0,
"source_demo_0_process_latency_us": 4,
"source_demo_0_buffer_length": 0,
"source_demo_0_last_invocation": "2021-02-17T09:15:25.79156",
"op_1_preprocessor_demo_0_records_in_total": 13,
"op_1_preprocessor_demo_0_records_out_total": 13,
"op_1_preprocessor_demo_0_exceptions_total": 0,
"op_1_preprocessor_demo_0_process_latency_us": 9,
"op_1_preprocessor_demo_0_buffer_length": 0,
"op_1_preprocessor_demo_0_last_invocation": "2021-02-17T09:15:25.791597",
"op_2_filter_0_records_in_total": 13,
"op_2_filter_0_records_out_total": 6,
"op_2_filter_0_exceptions_total": 0,
"op_2_filter_0_process_latency_us": 13,
"op_2_filter_0_buffer_length": 0,
"op_2_filter_0_last_invocation": "2021-02-17T09:15:25.791622",
"op_3_project_0_records_in_total": 6,
"op_3_project_0_records_out_total": 6,
"op_3_project_0_exceptions_total": 0,
"op_3_project_0_process_latency_us": 43,
"op_3_project_0_buffer_length": 0,
"op_3_project_0_last_invocation": "2021-02-17T09:15:25.791651",
"sink_log_0_0_records_in_total": 6,
"sink_log_0_0_records_out_total": 6,
"sink_log_0_0_exceptions_total": 0,
"sink_log_0_0_process_latency_us": 287,
"sink_log_0_0_buffer_length": 0,
"sink_log_0_0_last_invocation": "2021-02-17T09:15:25.791736",
"sink_mqtt_1_0_records_in_total": 6,
"sink_mqtt_1_0_records_out_total": 6,
"sink_mqtt_1_0_exceptions_total": 0,
"sink_mqtt_1_0_process_latency_us": 212,
"sink_mqtt_1_0_buffer_length": 0,
"sink_mqtt_1_0_last_invocation": "2021-02-17T09:15:25.79171"
}
5.查看第一步订阅的 topic 有没有接收到数据。
/ # mosquitto_sub -i test_sub -h 47.242.xxx.xxx -p 1883 -d -t demoSink
Client test_sub sending CONNECT
Client test_sub received CONNACK (0)
Client test_sub sending SUBSCRIBE (Mid: 1, Topic: demoSink, QoS: 0, Options: 0x00)
Client test_sub received SUBACK
Subscribed (mid: 1): 0
Client test_sub received PUBLISH (d0, q0, r0, m0, 'demoSink', ... (32 bytes))
{"humidity":20,"temperature":40}
Client test_sub received PUBLISH (d0, q0, r0, m0, 'demoSink', ... (32 bytes))
{"humidity":20,"temperature":31}
Client test_sub received PUBLISH (d0, q0, r0, m0, 'demoSink', ... (32 bytes))
{"humidity":20,"temperature":40}
可以看到 temperature 大于 30 的数据,都推送到了云端。
小结
Kuiper 为 kubeedge 的计算下沉提供了边缘流式数据处理的能力,Kuiper 也有在做一些适配 kubeedge 的设计(如对接 KubeEdge 设备模型),KubeEdge 和 Kuiper 确实有点 “双剑合并” 的意思。Kuiper 扩展性很高,有很多有趣的插件,比如 influx 插件可以把数据存储到 InfluxDB 等。
本文只是简单地介绍了下 Kuiper ,我们也在研究试用中,包括与 Apache Flink 等大数据服务的集成,关于 Kuiper 的进一步使用,后续再更新啦。
感兴趣的读者可以关注下微信号