Go实现基于WebSocket的弹幕服务

Stella981
• 阅读 698

拉模式和推模式

拉模式

1、数据更新频率低,则大多数请求是无效的 2、在线用户量多,则服务端的查询负载高 3、定时轮询拉取,实时性低

推模式

1、仅在数据更新时才需要推送 2、需要维护大量的在线长连接 3、数据更新后可以立即推送

基于webSocket推送

1、浏览器支持的socket编程,轻松维持服务端长连接 2、基于TCP可靠传输之上的协议,无需开发者关心通讯细节 3、提供了高度抽象的编程接口,业务开发成本较低

webSocket协议与交互

通讯流程

客户端->upgrade->服务端 客户端<-switching<-服务端 客户端->message->服务端 客户端<-message<-服务端

实现http服务端

1、webSocket是http协议upgrade而来 2、使用http标准库快速实现空接口:/ws

webSocket握手

1、使用webSocket.Upgrader完成协议握手,得到webSocket长连接 2、操作webSocket api,读取客户端消息,然后原样发送回去

封装webSocket

缺乏工程化设计

1、其他代码模块,无法直接操作webSocket连接 2、webSocket连接非线程安全,并发读/写需要同步手段

隐藏细节,封装api

1、封装Connection结构,隐藏webSocket底层连接 2、封装Connection的api,提供Send/Read/Close等线程安全接口

api原理(channel是线程安全的)

1、SendMessage将消息投递到out channel 2、ReadMessage从in channel读取消息

内部原理

1、启动读协程,循环读取webSocket,将消息投递到in channel 2、启动写协程,循环读取out channel,将消息写给webSocket

// server.go
package main

import (
    "net/http"
    "github.com/gorilla/websocket"
    "./impl"
    "time"
)

var (
    upgrader = websocket.Upgrader{
        //允许跨域
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    }
)

func wsHandler(w http.ResponseWriter, r *http.Request) {
    var (
        wsConn *websocket.Conn
        err error
        conn *impl.Connection
        data []byte
    )

    //Upgrade:websocket
    if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
        return
    }
    if conn, err = impl.InitConnection(wsConn); err != nil {
        goto ERR
    }

    go func() {
        var (
            err error
        )
        for {
            if err =conn.WriteMessage([]byte("heartbeat")); err != nil {
                return
            }
            time.Sleep(1 * time.Second)
        }
    }()

    for {
        if data, err = conn.ReadMessage(); err != nil {
            goto ERR
        }
        if err = conn.WriteMessage(data); err != nil {
            goto ERR
        }

    }

    ERR:
        //关闭连接
        conn.Close()
}

func main() {
    //http:localhost:7777/ws
    http.HandleFunc("/ws", wsHandler)
    http.ListenAndServe("0.0.0.0:7777", nil)
}


// connection.go
package impl

import (
    "github.com/gorilla/websocket"
    "sync"
    "github.com/influxdata/platform/kit/errors"
)

var once sync.Once

type Connection struct {
    wsConn *websocket.Conn
    inChan chan []byte
    outChan chan []byte
    closeChan chan byte
    isClosed bool
    mutex sync.Mutex
}

func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
    conn = &Connection{
        wsConn:wsConn,
        inChan:make(chan []byte, 1000),
        outChan:make(chan []byte, 1000),
        closeChan:make(chan byte, 1),
    }

    //启动读协程
    go conn.readLoop()

    //启动写协程
    go conn.writeLoop()

    return
}

//API
func (conn *Connection) ReadMessage() (data []byte, err error) {
    select {
    case data = <- conn.inChan:
    case <- conn.closeChan:
        err = errors.New("connection is closed")
    }
    return
}

func (conn *Connection) WriteMessage(data []byte) (err error) {
    select {
    case conn.outChan <- data:
    case <- conn.closeChan:
        err = errors.New("connection is closed")
    }
    return
}

func (conn *Connection) Close() {
    // 线程安全的close,可重入
    conn.wsConn.Close()
    conn.mutex.Lock()
    if !conn.isClosed {
        close(conn.closeChan)
        conn.isClosed = true
    }
    conn.mutex.Unlock()
}

//内部实现
func (conn *Connection) readLoop() {
    var (
        data []byte
        err error
    )
    for {
        if _, data, err = conn.wsConn.ReadMessage(); err != nil {
            goto ERR
        }

        //阻塞在这里,等待inChan有空位置
        //但是如果writeLoop连接关闭了,这边无法得知
        //conn.inChan <- data

        select {
        case conn.inChan <- data:
        case <-conn.closeChan:
            //closeChan关闭的时候,会进入此分支
            goto ERR
        }
    }
    ERR:
        conn.Close()
}

func (conn *Connection) writeLoop() {
    var (
        data []byte
        err error
    )
    for {
        select {
        case data = <- conn.outChan:
        case <- conn.closeChan:
            goto ERR

        }

        if err = conn.wsConn.WriteMessage(websocket.TextMessage, data); err != nil {
            goto ERR
        }
        conn.outChan <- data
    }
    ERR:
        conn.Close()
}
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
SpringBoot2.x服务器端主动推送技术
一.服务端推送常用技术介绍  服务端主流推送技术:websocket、SSE等  1.客户端轮询:ajax定时拉取后台数据    js  setInterval定时函数  ajax异步加载 定时向服务器发送请求    服务器压力会较大  2.服务端主动推送:websocket《推荐使用》    全双工即双向通讯,本质上是一个
Wesley13 Wesley13
3年前
ES6 新增的数组的方法
给定一个数组letlist\//wu:武力zhi:智力{id:1,name:'张飞',wu:97,zhi:10},{id:2,name:'诸葛亮',wu:55,zhi:99},{id:3,name:'赵云',wu:97,zhi:66},{id:4,na
Stella981 Stella981
3年前
SpringBoot2.x服务器端主动推送技术
一.服务端推送常用技术介绍  服务端主流推送技术:websocket、SSE等  1.客户端轮询:ajax定时拉取后台数据    js  setInterval定时函数  ajax异步加载 定时向服务器发送请求    服务器压力会较大  2.服务端主动推送:websocket《推荐使用》    全双工即双向通讯,本质上是一个
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这