广告位联系
返回顶部
分享到

Redis数据类型Streams的介绍

Redis 来源:互联网 作者:佚名 发布时间:2024-10-23 21:35:30 人浏览
摘要

Redis Streams 是 Redis 5.0 引入的一种新的数据类型,它提供了一种强大的日志结构化数据存储方式。Streams 类型非常适合用于构建消息队列、事件日志以及其他需要持久化和高效处理时间序列数据

Redis Streams 是 Redis 5.0 引入的一种新的数据类型,它提供了一种强大的日志结构化数据存储方式。Streams 类型非常适合用于构建消息队列、事件日志以及其他需要持久化和高效处理时间序列数据的应用场景。

1 基本特性

  • 持久性:与传统的发布/订阅不同,Streams 中的消息是持久化的,即使客户端断开连接后重新连接,仍然可以访问到之前的消息。
  • 多消费者支持:支持多个消费者组(consumer groups),每个组可以独立地消费流中的消息。消费者组允许不同的消费者处理相同的消息,但每个消息在一个组内只能被一个消费者处理一次。
  • 消息 ID 和范围查询:每条消息都有一个唯一的 ID,由时间戳和序列号组成。可以通过指定消息 ID 范围来获取特定时间段内的消息。
  • 阻塞读取:支持阻塞读取(XREAD 和 XREADGROUP 命令的 BLOCK 选项),使得客户端可以在没有新消息时等待一段时间。
  • 自动删除:可以设置最大长度(MAXLEN 选项)来限制流的大小,超过长度的消息会自动被删除。
  • 灵活的消息格式:每条消息可以包含多个字段-值对,类似于哈希表,这使得消息可以携带丰富的信息。

2 主要操作命令 

2.1 XADD key ID field value [field value ...]

向指定的流中添加一条新消息,ID 可以是 *(表示自动生成)或指定的时间戳和序列号。

1

2

127.0.0.1:6379> xadd mystream * sensor_id 123 temmperature 22.5

"1729306027171-0"

返回的结构可以分为两部分:

  • 时间戳:1729306027171 (表示条目被添加的时间,单位是毫秒。你可以将这个时间戳转换为可读的日期和时间格式。)
  • 序列号:0 (表示在同一毫秒内这是第一个条目。如果在同一毫秒内添加了多个条目,序列号将会递增,例如 1729306027171-1、1729306027171-2 等。)

2.2 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

  • 从一个或多个 Stream 中读取数据。
  • COUNT 指定返回的最大条目数。
  • BLOCK 指定在没有新消息时阻塞的时间(毫秒)。
  • STREAMS 指定要读取的 Stream 和起始 ID。

COUNT 指定返回的最大条目数。BLOCK 指定在没有新消息时阻塞的时间(毫秒)。STREAMS 指定要读取的 Stream 和起始 ID。

1

2

3

4

5

6

7

127.0.0.1:6379> xread count 2 streams mystream 0-0

1) 1) "mystream"

   2) 1) 1) "1729306027171-0"

         2) 1) "sensor_id"

            2) "123"

            3) "temmperature"

            4) "22.5"

2.3 XRANGE key start end [COUNT count]

  • 返回指定 ID 范围内的条目。
  • start 和 end 是 ID,可以使用 - 表示最小 ID,+ 表示最大 ID。

1

2

3

4

5

6

127.0.0.1:6379> xrange mystream - +

1) 1) "1729306027171-0"

   2) 1) "sensor_id"

      2) "123"

      3) "temmperature"

      4) "22.5"

2.4 XREVRANGE key end start [COUNT count]

返回指定 ID 范围内的条目,但按逆序排列。

1

2

3

4

5

6

7

8

9

10

11

12

13

127.0.0.1:6379> xadd mystream * sensor_id 234 temmperature 23.5

"1729329067777-0"

127.0.0.1:6379> xadd mystream * sensor_id 345

"1729329079135-0"

127.0.0.1:6379> xrevrange mystream + - count 2

1) 1) "1729329079135-0"

   2) 1) "sensor_id"

      2) "345"

2) 1) "1729329067777-0"

   2) 1) "sensor_id"

      2) "234"

      3) "temmperature"

      4) "23.5"

2.5 XGROUP CREATE key groupname id-or-$ [MKSTREAM]

  • 创建一个新的消费者组。
  • id-or-$ 是起始位置,可以是具体的 ID 或 $ 表示只消费新的条目。
  • MKSTREAM 如果 Stream 不存在则创建它。

1

2

127.0.0.1:6379> xgroup create mystream mygroup 0

OK

2.6 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

  • 从消费者组中读取数据。
  • GROUP:指定消费者组的名称。
  • consumer:指定消费者的名称。
  • COUNT count:可选参数,指定一次最多读取的消息数量。
  • BLOCK milliseconds:可选参数,如果当前没有可用的消息,命令将阻塞指定的时间(以毫秒为单位),等待新消息的到来。
  • NOACK: 表示不确认消息,通常用于快速消费。
  • STREAMS:指定要读取的流及其对应的 ID。
  • ID 通常是一个特殊值 >,表示只读取新的消息;也可以是具体的 ID,表示从该 ID 开始读取。

1

2

3

4

5

6

7

8

9

10

11

12

127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream >

1) 1) "mystream"

   2) 1) 1) "1729306027171-0"

         2) 1) "sensor_id"

            2) "123"

            3) "temmperature"

            4) "22.5"

      2) 1) "1729329067777-0"

         2) 1) "sensor_id"

            2) "234"

            3) "temmperature"

            4) "23.5"

2.7 XACK key group ID [ID ...]

确认已处理的消息。XACK 命令用于确认消费者组中的消息已经被成功处理。当你使用 XACK 命令时,Redis 会将指定的消息从“待处理”状态转换为“已确认”状态,并从消费者的待处理列表中移除。

1

2

3

4

127.0.0.1:6379> xack mystream mygroup 1729329067777-0

(integer) 1

127.0.0.1:6379> xack mystream mygroup 1729329079135-0

(integer) 0

当 XACK 命令成功确认一条消息时,返回值为 1,表示该消息已经被确认并且从待处理列表中移除。例如,如果消息 1729329067777-0 是由 consumer1 处理的,并且现在调用 XACK 确认它,那么这条消息将不再出现在 consumer1 的待处理列表中。

2.8 XPENDING key group [start end count] [IDLE idle]

查看待处理的消息。

1

2

3

4

5

6

7

8

9

10

11

12

13

127.0.0.1:6379> xpending mystream mygroup

1) (integer) 1

2) "1729306027171-0"

3) "1729306027171-0"

4) 1) 1) "consumer1"

      2) "1"

127.0.0.1:6379> xack mystream mygroup 1729306027171-0

(integer) 1

127.0.0.1:6379> xpending mystream mygroup

1) (integer) 0

2) (nil)

3) (nil)

4) (nil)

2.9 XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE idle] [TIME time] [RETries count] [FORCE]

用于将一个或多个消息从一个消费者转移到另一个消费者。这个命令通常用于处理消息超时或重新分配消息的情况。XCLAIM 允许你手动将消息从一个消费者的待处理列表移动到另一个消费者的待处理列表。

1

2

3

4

5

6

7

8

9

127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream >

1) 1) "mystream"

   2) 1) 1) "1729329079135-0"

         2) 1) "sensor_id"

            2) "345"

127.0.0.1:6379> xclaim mystream mygroup consumer2 10000 1729329079135-0

1) 1) "1729329079135-0"

   2) 1) "sensor_id"

      2) "345"

  • mystream:流的名称。mygroup:消费者组的名称。
  • consumer2:目标消费者的名称,即消息将被转移给这个消费者。
  • 10000:消息的空闲时间(以毫秒为单位)。只有那些空闲时间超过这个值的消息才会被转移。
  • 1729329079135-0:要转移的消息 ID。

2.10 XINFO 

获取 Stream 或消费者组的信息。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

127.0.0.1:6379> xinfo stream mystream

 1) "length"

 2) (integer) 3

 3) "radix-tree-keys"

 4) (integer) 1

 5) "radix-tree-nodes"

 6) (integer) 2

 7) "groups"

 8) (integer) 1

 9) "last-generated-id"

10) "1729329079135-0"

11) "first-entry"

12) 1) "1729306027171-0"

    2) 1) "sensor_id"

       2) "123"

       3) "temmperature"

       4) "22.5"

13) "last-entry"

14) 1) "1729329079135-0"

    2) 1) "sensor_id"

       2) "345"

127.0.0.1:6379> xinfo groups mystream

1) 1) "name"

   2) "mygroup"

   3) "consumers"

   4) (integer) 2

   5) "pending"

   6) (integer) 1

   7) "last-delivered-id"

   8) "1729329079135-0"

127.0.0.1:6379> xinfo consumers mystream mygroup

1) 1) "name"

   2) "consumer1"

   3) "pending"

   4) (integer) 0

   5) "idle"

   6) (integer) 255317

2) 1) "name"

   2) "consumer2"

   3) "pending"

   4) (integer) 1

   5) "idle"

   6) (integer) 191940

XINFO STREAM mystream

length:

流中的消息总数:3 条。

radix-tree-keys:

用于存储流数据的 radix tree 中的键的数量:1 个。

radix-tree-nodes:

用于存储流数据的 radix tree 中的节点数量:2 个。

groups:

与该流关联的消费者组数量:1 个。

last-generated-id:

流中最后生成的消息 ID:1729329079135-0。

first-entry:

流中的第一条消息: 消息 ID: 1729306027171-0消息内容: sensor_id: 123temmperature: 22.5

last-entry:

流中的最后一条消息: 消息 ID: 1729329079135-0消息内容: sensor_id: 345

XINFO GROUPS mystream

name:

消费者组的名称:mygroup。

consumers:

该组中的消费者数量:2 个。

pending:

该组中待处理的消息数量:1 条。

last-delivered-id:

该组中最后一个被交付的消息 ID:1729329079135-0。

XINFO CONSUMERS mystream mygroup

第一个消费者:

第一个消费者:

  • name: consumer1
  • pending: 待处理的消息数量:0 条
  • idle: 空闲时间(以毫秒为单位):255,317 毫秒(约 4 分钟 15 秒)

第二个消费者:

  • name: consumer2
  • pending: 待处理的消息数量:1 条
  • idle: 空闲时间(以毫秒为单位):191,940 毫秒(约 3 分钟 12 秒)

2.11  XDEL key ID [ID ...]

从 Stream 中删除一个或多个条目。

1

2

3

4

5

6

7

8

9

10

11

12

13

127.0.0.1:6379> xdel mystream 1729306027171-0

(integer) 1

127.0.0.1:6379> xrange mystrea - +

(empty list or set)

127.0.0.1:6379> xrange mystream - +

1) 1) "1729329067777-0"

   2) 1) "sensor_id"

      2) "234"

      3) "temmperature"

      4) "23.5"

2) 1) "1729329079135-0"

   2) 1) "sensor_id"

      2) "345"

2.12 XTRIM key MAXLEN [~] len

修剪 Stream,保留最多 len 个条目,~ 表示近似长度。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

127.0.0.1:6379> xrange mystream - +

1) 1) "1729329067777-0"

   2) 1) "sensor_id"

      2) "234"

      3) "temmperature"

      4) "23.5"

2) 1) "1729329079135-0"

   2) 1) "sensor_id"

      2) "345"

127.0.0.1:6379> xtrim mystream maxlen 1

(integer) 1

127.0.0.1:6379> xrange mystream - +

1) 1) "1729329079135-0"

   2) 1) "sensor_id"

      2) "345"

3 使用场景

  • 日志记录:可以用来存储系统的日志信息,方便后续分析和处理。
  • 事件流:处理实时事件,如传感器数据、用户行为等。
  • 消息队列:实现可靠的消息传递系统,支持多个消费者组。
  • 任务队列:管理后台任务,确保任务被正确处理。

更多命令请参考:Commands | Docs 


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 :
相关文章
  • Redis数据类型Streams的介绍
    Redis Streams 是 Redis 5.0 引入的一种新的数据类型,它提供了一种强大的日志结构化数据存储方式。Streams 类型非常适合用于构建消息队列、事
  • Redis内存碎片率调优处理方式

    Redis内存碎片率调优处理方式
    1.背景概述 在生产环境中Redis Cluster集群触发了内存碎片化的告警(碎片率1.5),集群节点分布三台宿主机六个节点三主三从架构,Redis版本
  • Redis怎么处理Hash冲突
    在 Redis 中,哈希表是一种常见的数据结构,通常用于存储对象的属性,对于哈希表,最常遇到的是哈希冲突,那么,当 Redis遇到Hash冲突会如
  • Redis实现分布式锁时需要考虑的问题解决方案
    分布式系统中的多个节点经常需要对共享资源进行并发访问,若没有有效的协调机制,可能会导致数据竞争、资源冲突等问题。分布式锁应
  • Redis连接池监控(连接池是否已满)与优化方法
    Redis作为一个高性能的内存数据库,广泛应用于各类高并发场景中。然而,在使用Redis时,连接池的管理至关重要,特别是在高并发应用中,
  • redis搭建哨兵模式实现一主两从三哨兵

    redis搭建哨兵模式实现一主两从三哨兵
    一、Redis 哨兵模式: 哨兵的核心功能:在主从复制的基础上,哨兵引入了主节点的自动故障转移 1、哨兵模式原理: 哨兵:是一个分布式系
  • Redis在Ubuntu系统上的安装步骤

    Redis在Ubuntu系统上的安装步骤
    1. 先切换到 root 用户 在 Ubuntu 20.04 中,可以通过以下步骤切换到 root 用户: 输入以下命令,以 root 用户身份登录: 1 sudo su - 按回车键,并输
  • redis生成全局id的实现步骤
    使用redis生成全局id 在现代软件开发中,生成全局唯一的标识符是非常常见的需求。这些全局唯一ID在分布式系统中尤其重要,用于标识各种
  • Redis锁的过期时间小于业务的执行时间如何续期

    Redis锁的过期时间小于业务的执行时间如何续期
    假设我们给锁设置的过期时间太短,业务还没执行完成,锁就过期了,这块应该如何处理呢?是否可以给分布式锁续期? 解决方案:先设置
  • Redis分布式锁及4种常见实现方法
    线程锁 主要用来给方法、代码块加锁。当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计