English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
RedisストリームはRedis 5バージョンで新しいデータ構造が追加されました。
Redisストリームは主にメッセージキュー(MQ、Message Queue)に使用されており、Redis自体にはRedisパブリッシャー・サブスクライバー(pub/sub)ですが、欠点としてメッセージの永続化ができず、ネットワーク切断やRedisのクラッシュなどが発生すると、メッセージが消失します。
簡単に言えば、メッセージキューの機能を実現するために使用されるパブリッシャー・サブスクライバー(pub/sub)はメッセージを配信できますが、歴史的なメッセージを記録することはできません。
Redisストリームはメッセージの永続化と主副複製機能を提供しており、どんなクライアントもどんな瞬間のデータにアクセスできるようにし、各クライアントのアクセス位置を覚え、メッセージの損失を防ぎます。
Redisストリームの構造は以下の通りで、メッセージリストがあり、すべての追加されたメッセージが並んでいます。各メッセージにはユニークなIDと対応する内容があります:
各ストリームにはユニークな名前があり、それはRedisのキーであり、xaddコマンドを使用して最初にメッセージを追加したときに自動的に作成されます。
上図の解説:
Consumer Group :消費グループ、XGROUP CREATEコマンドを使用して作成されます。消費グループには複数の消費者(Consumer)があります。
last_delivered_id :カーソル、各消費グループにはlast_delivered_idというカーソルがあり、どの消費者もメッセージを読み取ると、カーソルlast_delivered_idが前に進みます。
pending_ids :消費者(Consumer)の状態変数で、消費者の未確認のIDを維持するために使用されます。pending_idsは、クライアントが読み取ったがまだACK(確認文字:ACKnowledge character)を受け取っていないメッセージを記録します。
メッセージキューに関連するコマンド:
XADD - メッセージを末尾に追加する
XTRIM - 对流进行修剪,限制长度
XDEL - 删除消息
XLEN - 获取流包含的元素数量,即消息长度
XRANGE - 获取消息列表,会自动过滤已经删除的消息
XREVRANGE - 反向获取消息列表,ID 从大到小
XREAD - 以阻塞或非阻塞方式获取消息列表
消费者组相关命令:
XGROUP CREATE - 创建消费者组
XREADGROUP GROUP - 读取消费者组中的消息
XACK - 将消息标记为"已处理"
XGROUP SETID - 为消费者组设置新的最后递送消息ID
XGROUP DELCONSUMER - 删除消费者
XGROUP DESTROY - 删除消费者组
XPENDING - 显示待处理消息的相关信息
XCLAIM - 转移消息的归属权
XINFO - 查看流和消费者组的相关信息;
XINFO GROUPS - 打印消费者组的信息;
XINFO STREAM - 打印流信息
使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列,XADD 语法格式:
XADD key ID field value [field value ...]
key : キュー名、存在しない場合に作成されます
ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
field value :記録。
redis> XADD mystream * name Sara surname O'Connor "1601372323627-0" redis> XADD mystream * field1 value1 field2 value2 field3 value3 "1601372323627-1" redis> XLEN mystream (integer) 2 redis> XRANGE mystream - + 1) 1) ""1601372323627-0" 2) 1) "name" 2) "Sara" 3) "surname" 4) "O'Connor" 2) 1) ""1601372323627-1" 2) 1) "field"1" 2) "value"1" 3) "field"2" 4) "value"2" 5) "field"3" 6) "value"3" redis>
使用 XTRIM 对流进行修剪,限制长度, 语法格式:
XTRIM key MAXLEN [~] count
key :キュー名
MAXLEN :長さ
count :数量
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D "1601372434568-0" 127.0.0.1:6379> XTRIM mystream MAXLEN 2 (integer) 0 127.0.0.1:6379> XRANGE mystream - + 1) 1) ""1601372434568-0" 2) 1) "field"1" 2) "A" 3) "field"2" 4) "B" 5) "field"3" 6) "C" 7) "field"4" 8) "D" 127.0.0.1:6379> redis>
使用 XDEL 删除消息,语法格式:
XDEL key ID [ID ...]
key:キュー名
ID :消息 ID
> XADD mystream * a 1 1538561698944-0 > XADD mystream * b 2 1538561700640-0 > XADD mystream * c 3 1538561701744-0 > XDEL mystream 1538561700640-0 (integer) 1 127.0.0.1:6379> XRANGE mystream - + 1) 1) 1538561698944-0 2) 1) "a" 2) ""1" 2) 1) 1538561701744-0 2) 1) "c" 2) ""3"
使用 XLEN 获取流包含的元素数量,即消息长度,语法格式:
XLEN key
key:キュー名
redis> XADD mystream * item 1 "1601372563177-0" redis> XADD mystream * item 2 "1601372563178-0" redis> XADD mystream * item 3 "1601372563178-1" redis> XLEN mystream (integer) 3 redis>
使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:
XRANGE key start end [COUNT count]
key :队列名
start :开始值, - 表示最小值
end :结束值, + 表示最大值
count :数量
redis> XADD writers * name Virginia surname Woolf "1601372577811-0" redis> XADD writers * name Jane surname Austen "1601372577811-1" redis> XADD writers * name Toni surname Morrison "1601372577811-2" redis> XADD writers * name Agatha surname Christie "1601372577812-0" redis> XADD writers * name Ngozi surname Adichie "1601372577812-1" redis> XLEN writers (integer) 5 redis> XRANGE writers - + COUNT 2 1) 1) ""1601372577811-0" 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) ""1601372577811-1" 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen" redis>
使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:
XREVRANGE key end start [COUNT count]
key :队列名
end :结束值, + 表示最大值
start :开始值, - 表示最小值
count :数量
redis> XADD writers * name Virginia surname Woolf "1601372731458-0" redis> XADD writers * name Jane surname Austen "1601372731459-0" redis> XADD writers * name Toni surname Morrison "1601372731459-1" redis> XADD writers * name Agatha surname Christie "1601372731459-2" redis> XADD writers * name Ngozi surname Adichie "1601372731459-3" redis> XLEN writers (integer) 5 redis> XREVRANGE writers + - COUNT 1 1) 1) ""1601372731459-3" 2) 1) "name" 2) "Ngozi" 3) "surname" 4) "Adichie" redis>
使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
count :数量
milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
key :队列名
id :消息 ID
# 从 Stream 头部读取两条消息 > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) "mystream" 2) 1) 1) 1526984818136-0 2) 1) "duration" 2) ""1532" 3) "event"-id" 4) ""5" 5) "user"-id" 6) ""7782813" 2) 1) 1526999352406-0 2) 1) "duration" 2) ""812" 3) "event"-id" 4) ""9" 5) "user"-id" 6) ""388234" 2) 1) "writers" 2) 1) 1) 1526985676425-0 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) 1526985685298-0 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen"
使用 XGROUP CREATE 创建消费者组,语法格式:
XGROUP [CREATE key groupname id-または-$] [SETID key groupname id]-または-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
key : キュー名、存在しない場合に作成されます
groupname : グループ名。
$ : 末尾から消費することを示し、新しいメッセージのみを受け入れ、現在のStreamメッセージはすべて無視されます。
先頭から消費開始:
XGROUP CREATE mystream consumer-group-name 0-0
末尾から消費開始:
XGROUP CREATE mystream consumer-group-name $
XREADGROUP GROUPを使用して消費グループ内のメッセージを読み取る場合、構文は以下の通りです:
XREADGROUP GROUP consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group : 消費グループ名
consumer : 消費者名。
count : 読み取り数量。
milliseconds : ブロッキングミリ秒数。
key : キュー名。
ID : メッセージ ID。
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >