RxJS的另外四种实现方式(六)——使用Stream类实现

Stella981
• 阅读 573

接上一篇 RxJS的另外四种实现方式(五)——使用生成器实现

该实现方式与之前几种不同的,该实现方式仅针对Nodejs环境。在Nodejs环境中,提供了Stream类,包括Readable、Transform、Writeable等子类都是可扩展的。从字面上看,正好对应Rx中的生产者、传递者、消费者。

实现该库的起因是,一次在Nodejs中需要在koa框架里面提供event-stream功能,目前除了IE浏览器外其他浏览器都支持了服务端事件推送,这个功能可以很好的代替轮询。webpack用的热更新就是通过这个功能实现的。

言归正传,首先得实现生产者,我们先来看interval

class Interval extends Readable {
    constructor(period) {
        super({ objectMode: true })
        this.period = period
        this.i = 0
    }
    _read(size) {
        setTimeout(() => this.push(this.i++), this.period)
    }
}
exports.interval = period => new Interval(period)

说明一下,构造函数传入objectMode:true的对象是让stream处于对象模式,而不是二进制流模式。_read函数必须覆盖父类,否则出错,当有订阅者连接上来后,就会调用_read方法。我们在这个方法里面发送数据,即调用push方法,将数据发送给流的接收者。

当调用过push方法后,后面的接收者如果调用了callback回调,则表示数据消费完毕,会再次调用_read方法,直到push(null)表示生产者已经complete

FromArray也十分简单易读

class FromArray extends Readable {
    constructor(array) {
        super({ objectMode: true })
        this.array = array
        this.pos = 0
        this.size = array.length
    }
    _read(size) {
        if (this.pos < this.size) {
            this.push(this.array[this.pos++])
        } else
            this.push(null)
    }
}
exports.fromArray = array => new FromArray(array)

下面要实现一个转换器(操作符)Filter

class Filter extends Transform {
    constructor(f) {
        super({ readableObjectMode: true, writableObjectMode: true })
        this.f = f
    }
    _transform(data, encoding, callback) {
        const f = this.f
        if (f(data)) {
            this.push(data);
        }
        callback();
    }
    _flush(callback) {
        callback()
    }
}
exports.filter = f => new Filter(f)

这时候我们需要覆盖_transform、_flush函数,同样的,push方法会让数据流到下面的流中,而callback回调会使得上一个流继续发送数据。

最后我们来实现Subscriber

class Subscriber extends Writable {
    constructor(n, e, c) {
        super({ objectMode: true })
        this.n = n
        this.e = e
        this.c = c
    }
    _write(chunk, encoding, callback) {
        this.n(chunk)
        callback(null)
    }
    _final(callback) {
        this.c()
        callback()
    }
}

exports.subscribe = (n, e = noop, c = noop) => new Subscriber(n, e, c)

Subscriber是一个可写流,我们必须覆盖_write方法用于消费数据,_final方法用于complete事件处理。这里没有实现error事件。有兴趣的同学可以思考如何实现。

最后我们需要把各种stream串起来,变成一个长长的水管

exports.pipe = pipeline || ((first, ...cbs) => cbs.reduce((aac, c) => aac.pipe(c), first));

高版本的Nodejs已经提供了pipeline方法,可以直接使用,低版本的话,可以用上面的方法进行连接。

至此,我们已经使用Nodejs提供的Stream类实现了Rx的基本逻辑。(完)

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
RxJS的另外四种实现方式(五)——使用生成器实现
接上一篇RxJS的另外四种实现方式(四)——性能最高的库(续)(https://my.oschina.net/langhuihui/blog/2071675)js的生成器一般情况下使用场景很少,开发者接触的不是很多。不了解的可以先行查看js语法了解。这里把其中的执行顺序图解一下调用方
Wesley13 Wesley13
3年前
RxJS的另外四种实现方式(后记)—— 同时实现管道和链式编程
目录RxJS的另外四种实现方式(序)(https://my.oschina.net/langhuihui/blog/2051754)RxJS的另外四种实现方式(一)——代码最小的库(https://my.oschina.net/langhuihui/blog/2051770)RxJS的另外四种实现方式(二)——代码最小的库(续)
Wesley13 Wesley13
3年前
RxJS的另外四种实现方式(三)——性能最高的库
接上篇RxJS的另外四种实现方式(二)——代码最小的库(续)(https://my.oschina.net/langhuihui/blog/2052019)代码最小的库rx4rxlite虽然在性能测试中超过了callbag,但和most库较量的时候却落败了,于是我下载了most库,要解开most库性能高的原因。我们先上一组测试数据,这是
Wesley13 Wesley13
3年前
RxJS的另外四种实现方式(四)——性能最高的库(续)
接上一篇RxJS的另外四种实现方式(三)——性能最高的库(https://my.oschina.net/langhuihui/blog/2054887)上一篇文章我展示了这个最高性能库的实现方法。下面我介绍一下这个性能提升的秘密。首先,为了弄清楚Most库究竟为何如此快,我必须借助其他工具。比如chrome的devtools性能分析,刚开始
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
SpringBoot开发案例之整合Dubbo提供者(二)
!00.jpg(https://blog.52itstyle.com/usr/uploads/2017/07/1329278006.jpg)大家有没有注意到,上一篇中提供者,暴露接口的方式?混搭。springboot本身接口实现使用了注解的方式,而Dubbo暴露接口使用的是配置文件的实现方式,即如下:代码importorg.s
Easter79 Easter79
3年前
SpringBoot开发案例之整合Dubbo提供者(二)
!00.jpg(https://blog.52itstyle.com/usr/uploads/2017/07/1329278006.jpg)大家有没有注意到,上一篇中提供者,暴露接口的方式?混搭。springboot本身接口实现使用了注解的方式,而Dubbo暴露接口使用的是配置文件的实现方式,即如下:代码importorg.s