Netty+WebSocket 获取火币交易所数据项目

Stella981
• 阅读 1553
Netty+WebSocket 获取火币交易所时时数据项目
==============================================

先附上项目项目GitHub地址 spring-boot-netty-websocket-huobi

项目简介

本项目使用 SpringBoot+Netty来开发WebSocket服务器,与火币交易所Websocket建立连接,时时获取火币网交易所推送过来的交易对最新数据

该项目可以直接运用于实际开发中,做为获取各大交易所最新交易对相关数据的项目。

项目本身也是我在之前公司为了获取各大交易所数据所开发的项目,现在只是重新整理了下代码,现在它更像一个脚手架项目,可以在此基础上很方便的添加其它交易所。

技术架构

SpringBoot2.1.5 +Netty4.1.25 + Maven3.5.4 + lombok(插件)

项目测试

直接启动Springboot启动类Application.java,就可以时时获取火币网推送过来交易对的数据了。

如图

Netty+WebSocket 获取火币交易所数据项目


一、项目概述

1、项目启动入口

在项目启动的时候就开始去连接火币交易所Websocket订阅数据。

   /**
     * 首次启动并订阅火币websocket数据
     */
    @PostConstruct
    public void firstSub() {
        try {
            huobiProMainService.start();
        } catch (Exception e) {
            log.error("huobi 首次启动订阅异常", e);
        }
    }

2、获取交易对数据

我们是先要获取火币交易所所有的交易对数据,然后告诉火币交易所我需要订阅哪些交易对数据。

是订阅所有交易对数据还是订阅部分交易对数据。

    @Override
    public synchronized List<String> getChannelCache() {
        // 假设这里是从远处拉取交易对数据
        List<String> list = Lists.newArrayList("btcusdt");
        return list;
    }

3、连接火币交易所Websocket,并订阅指定的交易对。

先与火币网WebSocket建立连接,连接成功后再告诉它我要订阅哪些交易对,哪种主题,成功后,火币交易所就会根据我们所订阅的主题和交易对,给我们时时推送消息。

 /**
     * 首次订阅交易对数据
     *
     * @param channelList 交易对列表
     * @param topicFormat 交易对订阅主题格式
     */
    private void firstSub(List<String> channelList, String topicFormat) {
        //封装huoBiProWebSocketService对象
        klineClient = new HuoBiProWebSocketClient(huoBiProWebSocketService);
        //启动连接火币网websocket
        klineClient.start();
        for (String channel : channelList) {
            //订阅具体交易对
            klineClient.addSub(formatChannel(topicFormat, channel));
        }
    }

启动连接火币网websocket核心代码

很明显我们我们是作为客户端去获取服务端的数据,所以这里的Bootstrap来与服务端进行数据交互,而不是用ServerBootstrap

还有一点就是作为客户端,我们是要获取服务端所推送来的消息,所以我们自定义的handler是入站Handler,所以这里选择的是SimpleChannelInboundHandler

   /**
         * 连接WebSocket,
         *
         * @param uri url构造出URI
         * @param handler 处理消息
         */
        protected void connectWebSocket(final URI uri, SimpleChannelInboundHandler handler) {
            try {
                String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
                final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
                final int port;

                if (uri.getPort() == -1) {
                    if ("http".equalsIgnoreCase(scheme) || "ws".equalsIgnoreCase(scheme)) {
                        port = 80;
                    } else if ("wss".equalsIgnoreCase(scheme)) {
                        port = 443;
                    } else {
                        port = -1;
                    }
                } else {
                    port = uri.getPort();
                }

                if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
                    System.out.println("Only WS(S) is supported");
                    throw new UnsupportedAddressTypeException();
                }
                final boolean ssl = "wss".equalsIgnoreCase(scheme);
                final SslContext sslCtx;
                if (ssl) {
                    sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
                } else {
                    sslCtx = null;
                }

                group = new NioEventLoopGroup(2);
                //构建客户端Bootstrap
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        if (sslCtx != null) {
                            pipeline.addLast(sslCtx.newHandler(ch.alloc(), host, port));
                        }
                        //pipeline可以同时放入多个handler,最后一个为自定义hanler
                        pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler);
                    }
                });
                channel = bootstrap.connect(host, port).sync().channel();
            } catch (Exception e) {
                log.error(" webSocketClient start error.", e);
                if (group != null) {
                    group.shutdownGracefully();
                }
            }
    }

4、自定义handler

自定义Handler才是核心,作为数据的入站这里选择继承SimpleChannelInboundHandler,继承它必须要实现一个方法就是channelRead0,通过该方法的msg,就可以获取火币交易所时时推送过来的消息了。

/**
 * @Description: 火币网WebSocket 消息处理类
 * 自定义入站的handler 这个也是核心类
 */
@Slf4j
public class HuoBiProWebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketClientHandshaker handshaker;
    private HuoBiProWebSocketClient client;
    
    /**
     * 该handel获取消息的方法
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = ctx.channel();
        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof BinaryWebSocketFrame) {
            //火币网的数据是压缩过的,所以需要我们进行解压
            BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
            //获取数据、保存数据
            client.onReceive(decodeByteBuf(binaryFrame.content()));
        } else if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
            client.onReceive(textWebSocketFrame.text());
        } 
     }
  }

二、项目注意点

1、服务器问题

一般交易所的服务器都在国外,所以我们本地是无法建立Websocket连接的,除非本地翻墙。

同样项目也不能部署到阿里云等国内服务器,你只能选择香港或者国外服务器部署项目。

这里是火币网专门为我们提供的国内测试地址,所以本地可以获取数据。

2、获取交易所最新交易对数据问题

我们在向交易所Websocket订阅交易对的时候,首先就是要知道该交易所有哪些交易对,这份数据是需要我们单独去获取的,而且不是一次获取就好了。

因为该交易所可能新增或者删除交易对。所以需要我们通过定时任务去获取更新最新的交易对数据。

我这边只是模拟了一个交易对btcusdt,并没有提供获取最新交易对数据的服务。

3、数据存储问题

这也是最值得思考的一个问题,数据我们是获取了,但如果保存!

正常合理的开发应该获取数据是一个微服务,处理获取的数据是一个微服务。那么只需要获取数据后去调处理数据微服务就可以保存数据了。

但在这里,如果只是这样是行不通的。

因为火币网向我们推送的消息的速度会比我们调其它服务保存的数据要快,这就会存在数据丢失的情况发生

这里仅仅是输出一个btcusdt交易对,并且只是订阅一个k线主题,而实际上交易所会有上百个交易对和几种订阅主题,

这样的消息推送速度是上面的几百倍。所以你会发现如果你不做任何改动,对于一些大的交易所而言,你的数据是来不及存储的。
补充

这边之前也写过有关 Netty 和 Websocket 相关的博客文章,可以做个参考

1、Netty专题(共9篇)

2、Websocket专题(共5篇)



 我相信,无论今后的道路多么坎坷,只要抓住今天,迟早会在奋斗中尝到人生的甘甜。抓住人生中的一分一秒,胜过虚度中的一月一年!(1)
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解2016年09月02日00:00:36 \牧野(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fme.csdn.net%2Fdcrmg) 阅读数:59593
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这