Redis-Stream


  • Stream:支持多播的可持久化的消息队列
    • 常用操作命令
  • Stream 消息队列的问题

Stream

Stream

  • 每一个Stream都有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容
  • 消息是持久化的,Redis 重启后,内容还在
  • 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用xadd指令追加消息时自动创建
  • 每个 Stream 都可以挂多个消费组,每个消费组会有个游标last_delivered_id在Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了
    • 每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个ID 用来初始化last_delivered_id变量
    • 每个消费组 (Consumer Group) 的状态都是独立的,相互不受影响
      • 同一份 Stream 内部的消息会被每个消费组都消费到
    • 同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer)
      • 这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动
      • 每个消费者有一个组内唯一名称
  • 消费者 (Consumer) 内部会有个状态变量 pending_ids,它记录了当前已经被客户端读取,但是还没有 ack 的消息
    • 如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少
    • 这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构
      • 它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理
  • 消息 ID 的形式是 timestampInMillis-sequence,例如1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第5 条消息
    • 消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID
  • 消息内容就是键值对,形如 hash 结构的键值对

常用操作命令

  • 生产端
    • 命令
      • xadd 追加消息
      • xdel 删除消息,这里的删除仅仅是设置了标志位,不会实际删除消息
      • xrange 获取消息列表,会自动过滤已经删除的消息
      • xlen 消息长度
      • del 删除 Stream
    • xadd streamtest * name mark age 18
      • streamtest 表示当前这个队列的名字,也就是我们一般意义上Redis 中的key
      • * 号表示服务器自动生成 ID
      • name mark age 18 是 key/value 的存储形式
      • 返回值 1626705954593-0 则是生成的消息 ID,由两部分组成:时间戳-序号
        • 时间戳时毫秒级单位,是生成消息的 Redis 服务器时间,它是个 64 位整型
        • 序号是在这个毫秒时间点内的消息序号。它也是个 64 位整型
      • 为了保证消息是有序的,因此 Redis 生成的ID 是单调递增有序的
        • 由于 ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis 的每个 Stream 类型数据都维护一个 latest_generated_id 属性,用于记录最后一个消息的 ID
        • 若发现当前时间戳退后(小于 latest_generated_id 所记录的),则采用时间戳不变而序号递增的方案来作为新消息 ID(这也是序号为什么使用int64 的原因,保证有足够多的的序号),从而保证 ID 的单调递增性质
      • 如果不是非常特别的需求,强烈建议使用 Redis 的方案生成消息ID,因为这种时间戳+序号的单调递增的 ID 方案,几乎可以满足全部的需求,但 ID 是支持自定义的
    • xrange streamtest - +
      • - 表示最小值 , + 表示最大值
    • xrange streamtest - 1626705954593-0 指定消息 ID
    • xdel streamtest 1626706380924-0
    • xlen streamtest
    • del streamtest 删除整个 Stream
  • 单消费者
    • 虽然 Stream 中有消费者组的概念,但是可以在不定义消费组的情况下进行Stream 消息的独立消费,当 Stream 没有新消息时,甚至可以阻塞等待
    • Redis 设计了一个单独的消费指令 xread,可以将 Stream 当成普通的消息队列 (list) 来使用
      • 使用 xread 时,我们可以完全忽略消费组 (Consumer Group) 的存在,就好比 Stream 就是一个普通的列表 (list)
    • xread count 1 streams stream2 0-0
      • “count 1” 表示从 Stream 读取1 条消息,缺省当然是头部
      • “streams” 可以理解为Redis 关键字
      • “stream2” 指明了要读取的队列名称
      • “0-0” 指从头开始
    • xread count 2 streams stream2 1626710882927-0
      • 指定从 streams 的消息Id 开始(不包括命令中的消息 id)
    • xread count 1 streams stream2 $
      • $ 代表从尾部读取,上面的意思就是从尾部读取最新的一条消息
      • 此时默认不返回任何消息,所以最好以阻塞的方式读取尾部最新的一条消息,直到新的消息的到来
        • xread block 0 count 1 streams stream2 $
          • block 后面的数字代表阻塞时间,单位毫秒
    • 一般来说客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID
      • 下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息
  • 消费组
    • xgroup create 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量
      • xgroup create stream2 cg1 0-0
        • “stream2” 指明了要读取的队列名称
        • “cg1” 表示消费组的名称
        • “0-0” 表示从头开始消费
      • xgroup create stream2 cg2 $
        • $ 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略
      • xinfo stream stream2
      • xinfo groups stream2
    • xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID;同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除
      • xreadgroup GROUP cg1 c1 count 1 streams stream2 >
        • “GROUP”属于关键字
        • “cg1”是消费组名称
        • “c1”是消费者名称
        • “count 1”指明了消费数量
        • > 号表示从当前消费组的 last_delivered_id 后面开始读
          • 每当消费者读取一条消息,last_delivered_id 变量就会前进
      • xreadgroup GROUP cg1 c1 block 0 count 1 streams stream2 > 阻塞等待
    • 如果同一个消费组有多个消费者,我们还可以通过 xinfo consumers 指令观察每个消费者的状态
      • xinfo consumers stream2 cg1
      • xack stream2 cg1 1626751586744-0 确认一条消息
        • xack 允许带多个消息 id
    • XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息
      • 每个 Pending 的消息有 4 个属性:消息 ID;所属消费者;IDLE 已读取时长;delivery counter 消息被读取次数
    • XCLAIM 用以进行消息转移的操作,将某个消息转移到自己的 Pending列表中。需要设置组、转移的目标消费者和消息 ID,同时需要提供 IDLE(已被读取时长),只有超过这个时长,才能被转移

Stream 消息队列的问题

  • 消息太多
    • 定长 Stream :在 xadd 的指令提供一个定长长度 maxlen,就可以将老的消息干掉,确保最多不超过指定长度
  • 消费者忘记 ACK
    • 导致 PEL 列表不断增长,如果有很多消费组的话,那么这个 PEL 占用的内存就会放大
  • PEL 如何避免消息丢失
    • 在客户端消费者读取 Stream 消息时,Redis 服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了
    • 但是 PEL 里已经保存了发出去的消息 ID
    • 客户端重新连上之后,可以再次收到 PEL 中的消息 ID 列表
      • 不过此时 xreadgroup 的起始消息 ID 不能为参数 >,而必须是任意有效的消息ID,一般将参数设为 0-0,表示读取所有的 PEL 消息以及自last_delivered_id之后的新消息
  • 死信问题
    • 如果某个消息,不能被消费者处理,也就是不能被 XACK,这是要长时间处于 Pending 列表中,即使被反复的转移给各个消费者也是如此
    • 此时该消息的delivery counter(通过XPENDING 可以查询到)就会累加,当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息)
      • 将坏消息处理掉即可,删除即可,使用XDEL 语法
        • 注意,这个命令并没有删除 Pending 中的消息,因此查看 Pending,消息还会在,可以在执行执行 XDEL 之后,XACK 这个消息标识其处理完毕
  • Stream 的高可用
    • Stream 的高可用是建立主从复制基础上的,它和其它数据结构的复制机制没有区别
    • 也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的
    • 不过鉴于 Redis 的指令复制是异步的,在 failover 发生时,Redis 可能会丢失极小部分数据
      • 这点 Redis 的其它数据结构也是一样的
  • 分区 Partition
    • Redis 的服务器没有原生支持分区能力,如果想要使用分区,那就需要分配多个 Stream
      • 提供不同的 Stream 名称,对消息进行 hash 取模来选择往哪个 Stream 里塞
    • 然后在客户端使用一定的策略来生产消息到不同的 Stream
  • 使用场景
    • 如果是中小项目和企业,在工作中已经使用了 Redis,在业务量不是很大,而又需要消息中间件功能的情况下,可以考虑使用 Redis 的Stream 功能
    • 如果并发量很高,资源足够支持下,还是以专业的消息中间件,比如RocketMQ、Kafka 等来支持业务更好

文章作者: 钱不寒
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 钱不寒 !
  目录