English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Redis Stream

RedisストリームはRedis 5バージョンで新しいデータ構造が追加されました。

Redisストリームは主にメッセージキュー(MQ、Message Queue)に使用されており、Redis自体にはRedisパブリッシャー・サブスクライバー(pub/sub)ですが、欠点としてメッセージの永続化ができず、ネットワーク切断やRedisのクラッシュなどが発生すると、メッセージが消失します。

簡単に言えば、メッセージキューの機能を実現するために使用されるパブリッシャー・サブスクライバー(pub/sub)はメッセージを配信できますが、歴史的なメッセージを記録することはできません。

Redisストリームはメッセージの永続化と主副複製機能を提供しており、どんなクライアントもどんな瞬間のデータにアクセスできるようにし、各クライアントのアクセス位置を覚え、メッセージの損失を防ぎます。

Redisストリームの構造は以下の通りで、メッセージリストがあり、すべての追加されたメッセージが並んでいます。各メッセージにはユニークなIDと対応する内容があります:

各ストリームにはユニークな名前があり、それはRedisのキーであり、xaddコマンドを使用して最初にメッセージを追加したときに自動的に作成されます。

上図の解説:

  • Consumer Group :消費グループ、XGROUP CREATEコマンドを使用して作成されます。消費グループには複数の消費者(Consumer)があります。

  • last_delivered_id :カーソル、各消費グループにはlast_delivered_idというカーソルがあり、どの消費者もメッセージを読み取ると、カーソルlast_delivered_idが前に進みます。

  • pending_ids :消費者(Consumer)の状態変数で、消費者の未確認のIDを維持するために使用されます。pending_idsは、クライアントが読み取ったがまだACK(確認文字:ACKnowledge character)を受け取っていないメッセージを記録します。

メッセージキューに関連するコマンド:

  • XADD - メッセージを末尾に追加する

  • XTRIM - 对流进行修剪,限制长度

  • XDEL - 删除消息

  • XLEN - 获取流包含的元素数量,即消息长度

  • XRANGE - 获取消息列表,会自动过滤已经删除的消息

  • XREVRANGE -  反向获取消息列表,ID 从大到小

  • XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

  • XGROUP CREATE - 创建消费者组

  • XREADGROUP GROUP - 读取消费者组中的消息

  • XACK - 将消息标记为"已处理"

  • XGROUP SETID - 为消费者组设置新的最后递送消息ID

  • XGROUP DELCONSUMER - 删除消费者

  • XGROUP DESTROY - 删除消费者组

  • XPENDING - 显示待处理消息的相关信息

  • XCLAIM - 转移消息的归属权

  • XINFO - 查看流和消费者组的相关信息;

  • XINFO GROUPS - 打印消费者组的信息;

  • XINFO STREAM - 打印流信息

XADD

使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列,XADD 语法格式:

XADD key ID field value [field value ...]
  • key : キュー名、存在しない場合に作成されます

  • ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。

  • field value :記録。

redis> XADD mystream * name Sara surname O'Connor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) ""1601372323627-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "O'Connor"
2) 1) ""1601372323627-1"
   2) 1) "field"1"
      2) "value"1"
      3) "field"2"
      4) "value"2"
      5) "field"3"
      6) "value"3"
redis>

XTRIM

使用 XTRIM 对流进行修剪,限制长度, 语法格式:

XTRIM key MAXLEN [~] count
  • key :キュー名

  • MAXLEN :長さ

  • count :数量

127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) ""1601372434568-0"
   2) 1) "field"1"
      2) "A"
      3) "field"2"
      4) "B"
      5) "field"3"
      6) "C"
      7) "field"4"
      8) "D"
127.0.0.1:6379> 
redis>

XDEL

使用 XDEL 删除消息,语法格式:

XDEL key ID [ID ...]
  • key:キュー名

  • ID :消息 ID

> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) ""1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) ""3"

XLEN

使用 XLEN 获取流包含的元素数量,即消息长度,语法格式:

XLEN key
  • key:キュー名

redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>

XRANGE

使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XRANGE key start end [COUNT count]
  • key :队列名

  • start :开始值, - 表示最小值

  • end :结束值, + 表示最大值

  • count :数量

redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) ""1601372577811-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) ""1601372577811-1"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
redis>

XREVRANGE

使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XREVRANGE key end start [COUNT count]
  • key :队列名

  • end :结束值, + 表示最大值

  • start :开始值, - 表示最小值

  • count :数量

redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) ""1601372731459-3"
   2) 1) "name"
      2) "Ngozi"
      3) "surname"
      4) "Adichie"
redis>

XREAD

使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count :数量

  • milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式

  • key :队列名

  • id :消息 ID

# 从 Stream 头部读取两条消息
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) ""1532"
            3) "event"-id"
            4) ""5"
            5) "user"-id"
            6) ""7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) ""812"
            3) "event"-id"
            4) ""9"
            5) "user"-id"
            6) ""388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

XGROUP CREATE

使用 XGROUP CREATE 创建消费者组,语法格式:

XGROUP [CREATE key groupname id-または-$] [SETID key groupname id]-または-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key : キュー名、存在しない場合に作成されます

  • groupname : グループ名。

  • $ : 末尾から消費することを示し、新しいメッセージのみを受け入れ、現在のStreamメッセージはすべて無視されます。

先頭から消費開始:

XGROUP CREATE mystream consumer-group-name 0-0

末尾から消費開始:

XGROUP CREATE mystream consumer-group-name $

XREADGROUP GROUP

XREADGROUP GROUPを使用して消費グループ内のメッセージを読み取る場合、構文は以下の通りです:

XREADGROUP GROUP consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group : 消費グループ名

  • consumer : 消費者名。

  • count : 読み取り数量。

  • milliseconds : ブロッキングミリ秒数。

  • key : キュー名。

  • ID : メッセージ ID。

XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >