边缘计算不仅仅是将应用部署在边缘,并对其进行自动化的监控和运维。在许多应用场景里,边缘和云上应用需要进行特定的消息传输、数据交换等,以完成边云协同的业务处理。例如,用户需要从云端发送命令至边缘的应用来触发特定的业务,或者边缘设备需要将采集的业务信息上传至云端处理。
KubeEdge v1.6 版本增加了自定义边云消息传输的支持,用户可以根据场景,借助 Rule 和 RuleEndpoint 两个新增API来自定义的边云消息传输设置,为需要边云通信的业务组件或第三方插件屏蔽底层网络环境差异。
Router Manager
KubeEdge 借助 Kubernetes CRD 和路由器模块支持路由管理。用户可以通过mqtt代理在云和边缘之间传递其自定义消息。
使用场景: 用于用户控制数据的传递; 不适合大数据传送; 一次传送的数据大小限制为12MB。
kubeedge 升级到 1.6 版本貌似默认没有开启 router ,需要手动创建 router crd 和开启 router 功能。
修改 cloudcore 配置,开启 router 功能。如果是新环境,直接开启即可。
# /etc/kubeedge/config/cloudcore.yaml
router:
enable: true
address: 0.0.0.0
port: 9443
restTimeout: 60
syncController:
enable: true
# kubectl get crds | grep kubeedge
clusterobjectsyncs.reliablesyncs.kubeedge.io 2021-03-08T07:53:39Z
devicemodels.devices.kubeedge.io 2021-03-08T07:53:39Z
devices.devices.kubeedge.io 2021-03-08T07:53:39Z
objectsyncs.reliablesyncs.kubeedge.io 2021-03-08T07:53:39Z
ruleendpoints.rules.kubeedge.io 2021-03-08T08:04:59Z
rules.rules.kubeedge.io 2021-03-08T08:04:59Z
云端到边缘端
通过调用 cloudcore api 将控制边缘端应用的消息从云传递到边缘,边缘端的应用接收消息后,开始或停止收集树莓派的系统日志,收集到的日志发布到 mq ,日志可以通过 kuiper 的规则处理,把需要的日志上传到云端 emqx 。 环境
# kubectl get nodes -o wide
NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
k8s-test-master01 Ready master 21d v1.20.2 172.31.250.220 <none> CentOS Linux 8 (Core) 4.18.0-193.28.1.el8_2.x86_64 cri-o://1.20.0
k8s-test-node01 Ready node 21d v1.20.2 172.31.250.221 <none> Ubuntu 20.04.1 LTS 5.4.0-58-generic containerd://1.3.3-0ubuntu2.2
k8s-test-node02 Ready node 21d v1.20.2 172.31.250.222 <none> openSUSE Leap 15.2 5.3.18-lp152.57-default cri-o://1.17.3
kubeedge-raspberrypi01 Ready agent,edge 2d2h v1.19.3-kubeedge-v1.6.0 192.168.13.102 <none> Raspbian GNU/Linux 10 (buster) 5.4.51-v7l+ docker://19.3.13
1.创建云端到边缘端的路由
# cat create-ruleEndpoint-rest.yaml
apiVersion: rules.kubeedge.io/v1
kind: RuleEndpoint
metadata:
name: my-rest
labels:
description: test
spec:
ruleEndpointType: "rest"
properties: {}
---
# cat create-ruleEndpoint-eventbus.yaml
apiVersion: rules.kubeedge.io/v1
kind: RuleEndpoint
metadata:
name: my-eventbus
labels:
description: test
spec:
ruleEndpointType: "eventbus"
properties: {}
---
# cat create-rule-rest-eventbus.yaml
apiVersion: rules.kubeedge.io/v1
kind: Rule
metadata:
name: my-rule
labels:
description: test
spec:
source: "my-rest"
sourceResource: {"path":"/a"}
target: "my-eventbus"
targetResource: {"topic":"test"}
2.写一个简单的程序,用于收集 Linux 日志
// messagePubHandler 订阅 mq 消息,启动或停止日志采集
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
log.Printf("Received message: %v messageid: %d from topic: %s\n", msg, msg.MessageID(), msg.Topic())
controlMessage := string(msg.Payload())
log.Println(controlMessage)
if controlMessage == "start" {
fileName := "/var/log/syslog"
config := tail.Config{
ReOpen: true, // 重新打开
Follow: true, // 是否跟随
Location: &tail.SeekInfo{Offset: S.offset, Whence: 0}, // 从文件的哪个地方开始读
MustExist: true, // 文件不存在报错
Poll: true,
}
tails, err := tail.TailFile(fileName, config)
if err != nil {
log.Println("tail file failed, err:", err)
return
}
go logsStream(tails, S.mqttClient)
} else if controlMessage == "stop" {
S.StopCh <- string(msg.Payload())
} else {
log.Printf("Unknown message : %s", controlMessage)
}
}
3.在 Kuiper 创建日志流
/kuiper # bin/kuiper create stream logs '(month string, days string,times string ,hostname string,kinds string,logs string,) WITH (FORMAT="JSON", DATASOURCE="demo")';
Connecting to 127.0.0.1:20498...
Stream logs is created.
/kuiper # bin/kuiper show streams
Connecting to 127.0.0.1:20498...
logs
4.创建 Kuiper 规则,把日志转发到云端 emqx
cat > /tmp/rule.yaml <<EOF
{
"id": "rule1",
"sql": "select * from logs ",
"actions": [
{
"log": {}
},
{
"mqtt": {
"server": "tcp://${云端emqx IP}:1883",
"topic": "demoSink"
}
}
]
}
EOF
/kuiper # bin/kuiper create rule rule1 -f /tmp/rule.yaml
5.调用云端的 cloudcore rest api 将消息发送到边缘端
# URL: http://{rest_endpoint}/{node_name}/{namespace}/{path}
[root@k8s-test-master01 ~]# curl -X POST --data 'start' http://127.0.0.1:9443/kubeedge-raspberrypi01/default/a
message delivered
6.边缘端的程序接收到消息,开始收集日志
7.订阅云端的 emqx 消息,验证是否有日志发送过来
边缘端到云端
1.创建边缘端到云端的路由
# create-ruleEndpoint-rest.yaml
apiVersion: rules.kubeedge.io/v1
kind: RuleEndpoint
metadata:
name: my-rest
labels:
description: test
spec:
ruleEndpointType: "rest"
properties: {}
---
# create-ruleEndpoint-eventbus.yaml
apiVersion: rules.kubeedge.io/v1
kind: RuleEndpoint
metadata:
name: my-eventbus
labels:
description: test
spec:
ruleEndpointType: "eventbus"
properties: {}
---
#create-rule-eventbus-rest.yaml
apiVersion: rules.kubeedge.io/v1
kind: Rule
metadata:
name: my-rule-eventbus-rest
labels:
description: test
spec:
source: "my-eventbus"
sourceResource: {"topic": "test","node_name": "kubeedge-raspberrypi01"}
target: "my-rest"
targetResource: {"resource":"http://172.31.250.220:8088/api/v1/msg"}
resource 是云端应用的 api 地址
2.写一个简单的 API 接口
package main
import(
"github.com/kataras/iris/v12"
"github.com/kataras/iris/v12/middleware/logger"
"github.com/kataras/iris/v12/middleware/recover"
)
type Msg struct{
EdgeMsg string `json:"edgemsg"`
}
func main(){
app := iris.New()
c := &Msg{}
app.Logger().SetLevel("debug")
app.Use(recover.New())
app.Use(logger.New())
resAPI := app.Party("/api/v1")
resAPI.Post("/msg", func(ctx iris.Context){
if err := ctx.ReadJSON(c); err != nil{
panic(err.Error())
}else{
ctx.JSON(c)
}
})
resAPI.Get("/msg", func(ctx iris.Context){
ctx.Writef("Received: %v\n", c.EdgeMsg)
})
app.Run(iris.Addr(":8088"), iris.WithoutServerError(iris.ErrServerClosed))
}
目前接口没有数据
3.在边缘端的将自定义的消息发布到边缘节点的 MQTT 代理
mosquitto_pub -t 'default/test' -d -m '{"edgemsg":"msgtocloud"}'
4.再访问云端 API 接口,这时已经获取到了从边缘端发送的消息。
结束
因为目前边云自定义消息传输的不适合大数据传送,一次传送的数据大小限制为12MB 和单向 REST 的局限性,目前使用场景还是相对简单,可能更多是用于用户控制数据的传递,比如控制边缘终端设备的启停、边缘端向云端汇总边缘终端设备的在线或离线状态等。
社区也计划在下一个版本中优化和扩展该功能特性。 参考文档 https://kubeedge.io/en/docs/developer/custom_message_deliver/\
感兴趣的读者可以关注下微信号