Streams:深入理解Redis5.0新特性

Easter79
• 阅读 718

概述

相较于Redis4.0,Redis5.0增加了很多新的特性,而streams是其中最重要的特性之一。streams是redis 的一种基本数据结构,它是一个新的强大的支持多播的可持久化的消息队列,在设计上借鉴了kafaka。streams的数据类型本身非常简单,有点类似于hash结构,但是它的额外特性异常强大且复杂:

  • 支持持久化。streams能持久化存储数据,不同于pub/sub机制和list 消息被消费后就会被删除,streams消费过的数据会被持久化的保存在历史中。
  • 支持多播。 这一点跟 pub/sub有些类似。
  • 支持消费者组。streams 允许同一消费组内的消费者竞争消息,并提供了一系列机制允许消费者查看自己的历史消费消息。并允许监控streams的消费者组信息,消费者组内消费者信息,也可以监控streams内消息的状态。

基础内容

数据 ID

streams 提供了默认的id模式用来唯一标识streams中的每一条数据,由两部分组成: <millisecondsTime>-<sequenceNumber>

millisecondsTime是redis服务所在机器的时间,sequenceNumber用于同一毫秒创建的数据。需要注意的一点是streams的id总是单调增长的,即使redis服务所在的服务器时间异常。如果当前的毫秒数小于以前的毫秒数,就会使用历史记录中最大的毫秒数,然后序列号递增。而这样做的原因是因为streams的机制允许根据时间区间或者某一个时间节点或者某一id查找数据。

向streams插入数据

streams 的基础写命令为XADD,其语法为XADD key ID field value [field value ...]

127.0.0.1:6379> XADD mystream * name dwj age 18
"1574925508730-0"
127.0.0.1:6379>

上面的例子使用XADD向名为mystream的streams中添加了一条数据,ID使用*表示使用streams使用默认的ID,在本例中redis返回的1574925508730-0就是redis为我们插入的数据生成的ID。

另外streams 查看streams长度的命令为XLEN

127.0.0.1:6379> XLEN mystream
(integer) 3
127.0.0.1:6379>

从streams中读取数据

从streams中读取数据会比写数据复杂很多,用日志文件进行对比,我们可以查看历史日志,可以根据范围查询日志,我们可以通过unix的命令tail -f来监听日志,可以多个用户查看到同一份日志,也可以多个用户只能查看到自己有权限查看的那一部分日志。

按范围查询: XRANGE 和 XREVRANGE

首先来介绍一下 根据范围查询,这两种操作都比较简单,以XRANGE为例,它的语法格式为XRANGE key start end [COUNT count], 我们只需要提供两个id,startend,返回的将是一个包含startend的闭区间。两个特殊的ID-+分别表示可能的最小ID和最大ID。

127.0.0.1:6379> XRANGE mystream - +
1) 1) "1574835253335-0"
   2) 1) "name"
      2) "bob"
      3) "age"
      4) "23"
2) 1) "1574925508730-0"
   2) 1) "name"
      2) "dwj"
      3) "age"
      4) "18"
127.0.0.1:6379>

我们前边提到过数据id中包含了创建数据的时间信息,这意味着我们可以根据时间范围查询数据,为了根据时间范围查询,我们省略掉ID的序列号部分,如果省略,对于start ID会使用0作为默认的序列号,对于end ID会使用最大序列号作为默认值,这样的话我们使用两个unix时间戳去查询数据就可以得到那个时间区间内所有的数据。

1) 1) "1574835253335-0"
   2) 1) "name"
      2) "bob"
      3) "age"
      4) "23"
127.0.0.1:6379>

可能还会有同学注意到语法的最后边还有count参数,这个参数允许我们一次只返回固定数量的数据,然后根据返回数据的last_id,作为下一次查询的start,这样就允许我们在一个量非常大的streams里批量返回数据。 XREVRANGE命令与XRANGE相同,但是以相反的顺序返回元素,就不重复介绍了。

通过XREAD读取数据

XREAD允许我们从某一结点开始从streams中读取数据,它的语法为XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...],我们在这里主要将的是通过XREAD来订阅到达streams新的数据。这种操作可能跟REDIS中原有的pub/sub机制或者阻塞队列的概念有些类似,都是等待一个key然后获取到新的数据,但是跟这两种有着本质的差别:

  • streams跟pub/sub阻塞队列允许多个客户端一起等待数据,默认情况下,streams会把消息推送给所有等待streams数据的客户端,这个能力跟pub/sub有点类似,但是streams也允许把消息通过竞争机制推送给其中的一个客户端(这种模式需要用到消费者组的概念,会在后边讲到)。

  • pub/sub的消息是fire and forget并且从不存储,你只可以订阅到在你订阅时间之后产生的消息,并且消息只会推送给客户端一次,不能查看历史记录。以及使用阻塞队列时,当客户端收到消息时,这个元素会从队列中弹出,换句话说,不能查看某个消费者消费消息的历史。而在streams中所有的消息会被无限期的加入到streams中(消息可以被显式的删除并且存在淘汰机制),客户端需要记住收到的最后一条消息,用于获取到节点之后的新消息。

  • Streams 消费者组提供了一种Pub/Sub或者阻塞列表都不能实现的控制级别,同一个Stream不同的群组,显式地确认已经处理的项目,检查待处理的项目的能力,申明未处理的消息,以及每个消费者拥有连贯历史可见性,单个客户端只能查看自己过去的消息历史记录。 从streams中读取数据

    127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream 0

      1. "mystream"
          1. "1574835253335-0"
            1. "name"
            2. "bob"
            3. "age"
            4. "23"
          1. "1574925508730-0"
            1. "name"
            2. "dwj"
            3. "age"
            4. "18"

    127.0.0.1:6379>

同list结构一样,streams也提供了阻塞读取的命令

XREAD BLOCK 0 STREAMS mystream

在上边的命令中指定了BLOCK选项,超时时间为0毫秒(意味着永不会过期)。此外,这个地方使用了特殊的id $,这个特殊的id代表着当前streams中最大的id,这就意味着你只会读取streams中在你监听时间以后的消息。有点类似于Unix的tail -f。另外XREAD可以同时监听多个流中的数据。

消费者组

如果我们想要的不是多个客户端处理相同的消息,而是多个客户端从streams中获取到不同的消息进行处理。也就是我们常用的生产者-消费者模型。假如想象我们具有两个生产者p1,p2,三个消费者c1,c2,c3以及7个商品。我们想按照下面的效果进行处理

p1 =>item1 => c1
p2 =>item2 => c2
p1 =>item3 => c3
p2 =>item4 => c1
p1 =>item5 => c2
p2 =>item6 => c3
p1 =>item7 => c1

为了解决这种场景,redis使用了一个名为消费者的概念,有点类似于kafka,但只是表现上。消费者组就像是一个伪消费者,它从流内读取数据,然后分发给组内的消费者,并记录该消费者组消费了哪些数据,处理了那些数据,并提供了一系列功能。

  1. 每条消息都提供给不同的消费者,因此不可能将相同的消息传递给多个消费者。
  2. 消费者在消费者组中通过名称来识别,该名称是实施消费者的客户必须选择的区分大小写的字符串。这意味着即便断开连接过后,消费者组仍然保留了所有的状态,因为客户端会重新申请成为相同的消费者。 然而,这也意味着由客户端提供唯一的标识符。
  3. 每一个消费者组都有一个第一个ID永远不会被消费的概念,这样一来,当消费者请求新消息时,它能提供以前从未传递过的消息。
  4. 消费消息需要使用特定的命令进行显式确认,表示:这条消息已经被正确处理了,所以可以从消费者组中逐出。
  5. 消费者组跟踪所有当前所有待处理的消息,也就是,消息被传递到消费者组的一些消费者,但是还没有被确认为已处理。由于这个特性,当访问一个Stream的历史消息的时候,每个消费者将只能看到传递给它的消息。

它的模型类似于如下

| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |

从上边的模型中我们可以看出消费者组记录处理的最后一条消息,将消息分发给不同的消费者,每个消费者只能看到自己的消息。如果把消费者组看做streams的辅助数据结构,我们可以看出一个streams可以拥有多个消费者组,一个消费者组内可以拥有多个消费者。实际上,一个streams允许客户端使用XREAD读取的同时另一个客户端通过消费者群组读取数据。

创建一个消费者群组

我们首先创建一个包含了一些数据的streams

127.0.0.1:6379> XADD fruit * message apple
"1574935311149-0"
127.0.0.1:6379> XADD fruit * message banada
"1574935315886-0"
127.0.0.1:6379> XADD fruit * message pomelo
"1574935323628-0"

然后创建一个消费者组

127.0.0.1:6379> XGROUP CREATE fruit mygroup $
OK

注意我们需要指定一个id,这里我们使用的是特殊id$,我们也可以使用0或者一个unix时间戳,这样,消费者组只会读取这个节点之后的消息。

现在消费者组创建好了,我们可以使用XREADGROUP命令立即开始尝试通过消费者组读取消息。 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...],与XREAD类似,提供了BLOCK选项。假设指定消费者分别是Alice和Bob,来看看系统会怎样返回不同消息给Alice和Bob。

127.0.0.1:6379> XREADGROUP GROUP  mygroup Alice COUNT 1 STREAMS fruit >
1) 1) "fruit"
   2) 1) 1) "1574936034258-0"
         2) 1) "message"
            2) "apple"
127.0.0.1:6379>

上边命令代表的信息是:我要通过mygroup读取streams fruit中的数据,我在群组中的身份是Alice,请给我一条数据。 >操作符只在消费者组的上线文中有效,代表消息到目前为止没有交给其它消费者处理过。

我们也可以使用一个有效的id,在这种情况下,消费者组会告诉我们的历史待处理消息,而不会告诉我们新的消息。这个特性也是很有用的,当消费者因为某些原因重新启动后,我们可以查看自己的历史待处理消息,处理完待处理消息后再去处理新的消息。

我们可以通过XACK命令告诉消费者组某条消息已经被正确处理,不要显示在我的历史待处理消息列表中。XACK的语法为XACK key group ID [ID ...]

127.0.0.1:6379> XACK fruit mygroup 1574936034258-0
(integer) 1

有几件事需要记住:

  1. 消费者是在他们第一次被提及的时候自动创建的,不需要显式创建。
  2. 即使使用XREADGROUP,你也可以同时从多个key中读取,但是要让其工作,你需要给每一个Stream创建一个名称相同的消费者组。这并不是一个常见的需求,但是需要说明的是,这个功能在技术上是可以实现的。
  3. XREADGROUP命令是一个写命令,因为当它从Stream中读取消息时,消费者组被修改了,所以这个命令只能在master节点调用。

从永久失败中恢复

在一个消费者群组中可能存在多个消费者消费消息,但是也可能会存在某一个消费者永久退出消费者群组的情况,这样我们就需要一种机制,把该消费者的待处理消息分配给消费者群组的另一个消费者。这就需要我们具有查看待处理消息的能力以及把某个消息分配给指定消费者的能力。前者是通过一个叫XPENDING的命令,它的语法为XPENDING key group [start end count] [consumer]

127.0.0.1:6379> XPENDING fruit mygroup
1) (integer) 1
2) "1574936042937-0"
3) "1574936042937-0"
4) 1) 1) "Alice"
      2) "1"

上述返回结果代表的是消费者群组有1条待处理命令,待处理消息的起始id为1574936042937-0,结束id为1574936042937-0,名为Alice的消费者有一个待处理命令,可能有人会好奇我们在前边往fruit放入了3个水果,使用XACK处理了一个水果,消费者待处理列表中应该有两个水果,而事实上消费者群组的待处理列表为该群组下消费者待处理消息的合集,当有消费者通过群组获取消息的时候会改变消费者群组的状态,这也是前边提到的为什么XREADGROUP必须在master节点进行调用。 我们可以使用start end count 参数来查看某个范围内消息的状态

127.0.0.1:6379> XPENDING fruit mygroup - + 10 Alice
1) 1) "1574936042937-0"
   2) "Alice"
   3) (integer) 903655
   4) (integer) 1
2) 1) "1574936052018-0"
   2) "Alice"
   3) (integer) 491035
   4) (integer) 1

这样我们就看到了一条消息的详细信息,id为1574936042937-0的消息的消费者为Alice,它的pending时间为903655,这个消息被分配了1次。 我们会发现第一条消息的处理时间有点长,我们怀疑Alice已经不能处理这条消息了,于是我们想把这条消息分配给Bob,这种场景下就需要用到了XCLAIM命令,它的语法为XCLAIM ... ,其中min-idle-time为消息的最小空闲时间,只有消息的空闲时间大于这个值消息才会被分配,因为消息被分配的时候会重置消息的空闲时间,如果有同时把一条消息分配给两个客户端,只会第一条命令生效,因为当消息分配给第一个客户端的时候重置空闲时间,第二条命令则会失效。

我们也可以使用一个独立的进程来不断寻找超时的消息,并把它分配给活跃的消费者,不过需要注意的是,如果消息的分配次数达到某个阙值,不应该把消息再分配出去,而是应该放到别的地方。

streams的可观察性

streams具有不错的可观察性,前边的XPENDING命令允许我们查看streams在某个消费者群组内待处理消息的状态。但是我们想看的更多,比如在这个streams下有多少个group, 在这个group下有多少消费者。这就要用到XINFO命令: 查看streams信息:

127.0.0.1:6379> XINFO STREAM mystream
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 1
 9) "last-generated-id"
10) "1574925508730-0"
11) "first-entry"
12) 1) "1574835253335-0"
    2) 1) "name"
       2) "bob"
       3) "age"
       4) "23"
13) "last-entry"
14) 1) "1574925508730-0"
    2) 1) "name"
       2) "dwj"
       3) "age"
       4) "18"

输出中会告诉我们streams的长度,群组数量,第一条和最后一条信息的详情。下面看一下streams下群组的信息:

127.0.0.1:6379> XINFO GROUPS fruit
1) 1) "name"
   2) "mygroup"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 2
   7) "last-delivered-id"
   8) "1574936052018-0"
2) 1) "name"
   2) "mygroup-1"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"

我们可以从输出中看到fruit下有两个群组,群组的名称以及待处理消息的数量,处理的最后一条消息。我们可以在详细的查看下消费者群组内消费者的状态。

127.0.0.1:6379> XINFO CONSUMERS fruit mygroup
1) 1) "name"
   2) "Alice"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 1990242
2) 1) "name"
   2) "Bob"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 9178

从输出中可以看到消费者待处理消息的数量以及消费者的闲置时间。

设置streams上限

如果从streams可以查看到历史记录,我们可能会有疑惑,如果streams无限期的加入内存会不会够用,一旦消息数量达到上限,将消息永久删除或者持久化到数据库都是有必要的,redis也提供了诸如此类场景的支持。

一种方法是我们使用XADD的时候指定streams的最大长度,XADD mystream MAXLEN ~ 1000 其中的数值前可以加上~标识不需要精确的将长度保持在1000,比1000多一些也可以接受。如果不使用该标识,性能会差一些。另一种方法是使用XTRIM,该命令也是使用MAXLEN选项,> XTRIM mystream MAXLEN ~ 10

一些特殊的id

前面提到了在streams API里边存在一些特殊的id。 首先是-+,这两个ID在XRANGE命令中使用,分别代表最小的id和最大的id。-代表0-1+代表18446744073709551615-18446744073709551615,从使用上方便了很多。在XPENDING等范围查询中都可以使用。 $代表streams中当前存在的最大的id,在XREADXGROUP中代表只获取新到的消息。需要注意的是$+的含义并不一致。 还有一个特殊的id是>,这个id只能够在XREADGROUP命令中使用,意味着在这个消费者群组中,从来没有分配给其他的消费者,所以总是使用>作为群组中的last delivered ID

持久化,复制和消息安全性

与redis的其它数据结构一样,streams会异步复制到从节点,并持久化到AOF和RDB文件中,并且消费者群组的状态也会按照此机制进行持久化。 需要注意的几点是:

  • 如果消息的持久化以及状态很重要,则AOF必须使用强fsync配合(AOF记录每一条更改redis数据的命令,有很多种持久化机制,在这个地方要用到的是appendfsync always 这样会严重降低Redis的速度)
  • 默认情况下,异步复制不能保证从节点的数据与主节点保持一致,在故障转移以后可能会丢失一些内容,这跟从节点从主节点接受数据的能力有关。
  • WAIT命令可以用于强制将更改传输到一组从节点上。虽然这使得数据不太可能会丢失,但是redis的Sentinel和cluster在进行故障转移的时候不一定会使用具有最新数据的从节点,在一些特殊故障下,反而会使用缺少一些数据的从节点。 因此在使用redis streams和消费者群组在设计程序的时候,确保了解你的应用程序在故障期间的应对策略,并进行相应地配置,评估它对你的程序是否足够安全。

从streams中删除数据

删除streams中的数据使用XDEL命令,其语法为XDEL key ID [ID ...],需要注意的是在当前的实现中,在宏节点完全为空之前,内存并没有真正回收,所以你不应该滥用这个特性。

streams的性能

streams的不阻塞命令,比如XRANGE或者不使用BLOCK选项的XREADXREADGROUP跟redis普通命令一致,所以没有必要讨论。如果有兴趣的话可以在redis的文档中查看到对应命令的时间复杂度。streams命令的速度在一定范围内跟set是一致的,XADD命令的速度非常快,在一个普通的机器上,一秒钟可以插入50w~100w条数据。

我们感兴趣的是在消费者群组的阻塞场景下,从通过XADD命令向streams中插入一条数据,到消费者通过群组读取到这条消息的性能。

为了测试消息从产生到消费间的延迟,我们使用ruby程序进行测试,将消息的产生时间作为消息的一个字段,然后把消息推送到streams中,客户端收到消息后使用当前时间跟生产时间进行对比,从而计算出消息的延迟时间。这个程序未进行性能优化,运行在一个双核的机器上,同时redis也运行在这台机器上,以此来模拟不是理想条件下的场景。消息每秒钟产生1w条,群组内有10个消费者消费数据。测试结果如下:

Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%

99.9%的请求的延迟小于等于2毫秒,而且异常值非常接近平均值。另外需要注意的两点:

  1. 消费者每次处理1w条消息,这样增加了一些延迟,这样做是为了消费速度较慢的消费者能够保持保持消息流。
  2. 用来做测试的系统相比于现在的系统非常慢。

原文链接: https://redis.io/topics/streams-intro

Worktile 官网:worktile.com

译者:Worktile工程师 杜文杰

文章首发于「Worktile官方博客」,转载请注明出处。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
1小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
6
获赞
1.2k