はい、承知いたしました。Redis Stream、Pub/Sub、リストの違いについて、約5000語の詳細な解説を含む記事を作成します。
Redis Streamとは?Pub/Subやリストとの違いを徹底解説
はじめに:分散システムにおけるメッセージングの課題
現代のソフトウェアアーキテクチャは、マイクロサービスやイベント駆動型の設計が主流となり、複数の独立したプロセスやサービスが連携して動作することが一般的です。このような分散システムにおいて、サービス間の効率的かつ信頼性の高いメッセージングは、システム全体の安定性やスケーラビリティを確保する上で非常に重要です。
メッセージングシステムには、様々な要件が求められます。例えば、あるサービスが発生させたイベント(メッセージ)を、複数のサービスが受け取りたい場合(Publish/Subscribe モデル)、あるいは、特定のタスクを複数のワーカーの中から一つだけが処理したい場合(Work Queue モデル)。さらに、メッセージが失われることなく、確実に処理されたことを確認したいという信頼性の要求や、過去のメッセージを再生したり、処理途中の状態を管理したいという高度な要求もあります。
Redisは、インメモリデータ構造ストアとして広く利用されていますが、その機能は単なるキャッシュやKey-Valueストアにとどまりません。リスト(List)やPub/Sub(Publish/Subscribe)といったデータ構造や機能を通じて、シンプルなメッセージングのユースケースに対応してきました。
しかし、これらの既存機能だけでは、分散システムにおけるより複雑なメッセージング要件、特に「信頼性のあるコンシューマーグループによる並列処理」や「メッセージの履歴保持と再生」といったニーズに完全に応えることは困難でした。
そこで登場したのが、Redis 5.0で導入された Redis Stream です。Redis Streamは、ログファイルのような「追記専用(Append-Only)」のデータ構造であり、高スループットなメッセージングを、複数のコンシューマーが協調して、信頼性高く処理できるように設計されています。
本記事では、このRedis Streamについて、その基本的な概念から詳細なコマンド、そして中心的な機能であるコンシューマーグループについて深く掘り下げて解説します。さらに、Redisの既存機能であるPub/Subやリストと比較することで、それぞれの違いと、どのようなユースケースでStreamが優れているのかを明確にしていきます。
Redis Streamとは?
Redis Streamは、一言で言えば「永続化可能で、自動生成されるIDを持つ、追記専用の項目(Entry)のリスト」のようなデータ構造です。各項目はKey-Valueペアのセット(ハッシュマップに似ています)で構成され、時系列順に並べられます。最も重要な特徴は、各項目に一意で単調増加するIDが割り当てられることです。
この構造は、KafkaやKinesisといった専用の分散メッセージングシステムが提供するログベースのモデルに似ています。これにより、以下のような特性を実現します。
- 追記専用(Append-Only Log): 新しい項目は常にストリームの末尾に追加されます。既存の項目を変更したり削除したりすることは基本的にありません(ただし、トリミング機能はあります)。
- ユニークなメッセージID: 各項目には、デフォルトでタイムスタンプとシーケンス番号に基づいた一意なIDが割り当てられます。このIDは単調増加するため、項目の順序を保証し、特定の時点からの読み込みや範囲指定による読み込みを可能にします。
- 複数のコンシューマー: 同じストリームから、複数のコンシューマーが独立して、あるいは協調してメッセージを読み取ることができます。
- コンシューマーグループ(Consumer Groups): これがStreamの最も強力な機能の一つです。複数のコンシューマーを一つのグループとして扱うことで、ストリーム内のメッセージをグループ内のコンシューマー間で分散して処理し、負荷分散と信頼性の高い処理を実現します。
- 永続性: Redisの通常の永続化メカニズム(RDBやAOF)によって、ストリームのデータも永続化されます。これにより、Redisサーバーが再起動してもデータは失われません。
これらの特性から、Redis Streamは、メッセージキュー、イベントソーシング、時系列データの収集、ロギングなど、幅広いアプリケーションに利用できます。
Redis Streamの基本概念
Redis Streamを理解する上で重要な概念をいくつか解説します。
項目(Entry)とメッセージID
ストリームに格納される個々のデータを「項目(Entry)」と呼びます。各項目は、以下の2つの要素で構成されます。
- メッセージID (Message ID): その項目を一意に識別するIDです。デフォルトでは
timestampInMillis-sequenceNumber
の形式で自動生成されます。timestampInMillis
: 項目が追加されたRedisサーバーのローカルタイムスタンプ(ミリ秒単位)。sequenceNumber
: 同一ミリ秒内に複数の項目が追加された場合にそれらを区別するためのシーケンス番号。
このIDは単調増加が保証されており、新しい項目ほど大きなIDを持ちます。この特性が、時系列順の処理や範囲指定による読み込みを可能にしています。
IDは自動生成だけでなく、特定の要件に基づいて手動で指定することも可能ですが、通常は自動生成を利用します(IDに*
を指定)。
- フィールドと値のペア (Fields and Values): 項目の実際のデータです。ハッシュマップのように、複数のフィールド名とその値のペアを持つことができます。例えば、
event_type: 'user_created', user_id: '123', timestamp: '...'
のようになります。
ストリームの構造
ストリームは、これらの項目が時系列順に追記されていくログ構造です。内部的には、特定の効率的なデータ構造(Radix TreeとMacro Node)を使用して、高速な追記と範囲クエリを実現しています。
Redis Streamの基本コマンド
Streamを操作するための基本的なコマンドを見ていきましょう。
XADD
:ストリームへの項目の追加
ストリームに新しい項目を追加するために使用します。
redis
XADD key ID field1 value1 [field2 value2 ...]
key
: 操作対象のストリームのキー名。ID
: 追加する項目のIDを指定します。*
: RedisにIDを自動生成させます。これが最も一般的で推奨される方法です。timestampInMillis-sequenceNumber
: 手動でIDを指定します。既に存在するIDより大きく、かつストリーム内の最後の項目IDより大きい必要があります。特定のマイグレーションシナリオなどで使用されることがあります。MAXLEN ~ count
: ストリームを指定した項目数でトリミングしながら追加します。詳細は後述のXTRIM
で説明します。MINID ~ ID
: 指定したIDより小さい項目をトリミングしながら追加します。
例:
“`redis
XADD my_stream * event_type user_created user_id 1001
“1678886400000-0”
XADD my_stream * event_type product_viewed product_id 99 viewer_id 1001
“1678886400001-0”
“`
XADD
は追加された項目のIDを返します。
トリミング付きの追加 (MAXLEN ~ count
)
XADD
の際にMAXLEN ~ count
オプションを指定すると、ストリームのサイズが指定したcount
を超えないように、自動的に古い項目が削除されます。
redis
XADD my_stream MAXLEN ~ 1000 * field value # ストリームの項目数を約1000以下に保つ
~
はおおよそのサイズを保証するためのフラグです。指定しない場合 (MAXLEN 1000
) は厳密なサイズ保証となり、性能コストが高くなる可能性があります。通常は ~
を使用します。
XRANGE
/ XREVRANGE
:IDの範囲指定による読み込み
指定したIDの範囲に含まれる項目を読み込みます。
redis
XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]
key
: ストリームのキー名。start
: 取得を開始するID。end
: 取得を終了するID。COUNT count
: 取得する項目の最大数(オプション)。
IDには以下の特別な値を使用できます。
-
: ストリームの最小ID(最初の項目)。+
: ストリームの最大ID(最後の項目)。
範囲指定IDには、IDの一部だけを指定することもできます。例えば、1678886400000
はそのミリ秒の最初のIDを、1678886400000-50
はその正確なIDを示します。(
をIDの前に付けると、そのID を含まない 範囲を指定できます(排他的)。
例:
“`redis
XRANGE my_stream – + COUNT 10 # ストリームの最初の10項目を取得
1) 1) “1678886400000-0”
2) 1) “event_type”
2) “user_created”
3) “user_id”
4) “1001”
2) 1) “1678886400001-0”
2) 1) “event_type”
2) “product_viewed”
3) “product_id”
4) “99”
5) “viewer_id”
6) “1001”
…XRANGE my_stream 1678886400000-0 1678886400001-0 # 特定のID範囲を取得
XREVRANGE my_stream + – COUNT 5 # ストリームの最後の5項目を逆順で取得
“`
XRANGE
は開始IDから終了IDに向かって昇順に、XREVRANGE
は終了IDから開始IDに向かって降順に項目を返します。
XREAD
:一つまたは複数のストリームからの読み込み
一つまたは複数のストリームから新しい項目を読み込むために使用します。特定のID以降の項目を読み取る、あるいは新しい項目が追加されるまでブロックすることができます。
redis
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key1 key2 ... ID1 ID2 ...
COUNT count
: 読み込む項目の最大数(オプション)。BLOCK milliseconds
: 新しい項目が到着するまでブロックする時間(ミリ秒)。0
は無限にブロック。省略するとブロックしません。STREAMS key1 key2 ... ID1 ID2 ...
: 読み込み対象のストリームとその開始IDのリスト。key
とID
は1対1で対応します。
STREAMS
の後ろに続くID
は、指定したストリームから「そのIDより新しい項目」を読み込むことを意味します。
ID
に特定のIDを指定: そのIDより新しい項目を読み込む。ID
に$
: そのストリームの最後の項目より新しい項目(つまり、コマンド実行後に新しく追加された項目)を読み込む。
例:非ブロック読み込み
“`redis
XREAD COUNT 2 STREAMS my_stream other_stream 0-0 0-0 # my_stream, other_streamの最初から2項目ずつ読み込む
XREAD COUNT 1 STREAMS my_stream $ # my_streamの最後の項目以降(新しい項目)を最大1つ読み込む
“`
例:ブロック読み込み
“`redis
XREAD BLOCK 5000 COUNT 1 STREAMS my_stream $ # my_streamに新しい項目が追加されるまで最大5秒待つ
“`
XREAD
は、読み込み対象のストリームごとに、見つかった項目のリストを返します。ブロックモードでタイムアウトした場合、nil
を返します。
注意点として、XREAD
はコンシューマーの状態を管理しません。どのメッセージをどのコンシューマーが読んだか、処理中か、処理完了かといった情報を追跡しないため、複数のコンシューマーで負荷分散して処理するようなシナリオには向いていません。そのような場合は、後述のコンシューマーグループを使用する必要があります。
XLEN
:ストリームの長さの取得
ストリームに含まれる項目の数を取得します。
redis
XLEN key
例:
“`redis
XLEN my_stream
(integer) 2
“`
XTRIM
:ストリームのトリミング
ストリームが大きくなりすぎるのを防ぐために、古い項目を削除します。これはXADD
のMAXLEN
/MINID
オプションでも実行できますが、XTRIM
コマンドとして独立して実行することも可能です。
redis
XTRIM key strategy parameter [LIMIT count]
key
: 対象のストリーム。strategy
: トリミング戦略。現在サポートされているのはMAXLEN
とMINID
です。MAXLEN
: ストリームの最大項目数を指定します。MINID
: 指定したIDより小さい項目を削除します。
parameter
:MAXLEN
の場合は項目数、MINID
の場合はID。[LIMIT count]
:MINID
戦略で、削除する項目の最大数を制限します(オプション)。
MAXLEN
戦略では、~
フラグをパラメータの前に付けることができます。MAXLEN ~ count
とすると、厳密な数ではなく、おおよその数を保つようにトリミングが行われ、性能が向上します。
例:
“`redis
XTRIM my_stream MAXLEN ~ 1000 # ストリームの項目数を約1000以下に保つ
(integer) 50 # 削除された項目の数
XTRIM my_stream MINID 1678886400000-0 # 指定したIDより小さい項目を全て削除
“`
ストリームは追記専用のログですが、メモリ使用量を管理するためにトリミングが不可欠です。
Redis Streamのコンシューマーグループ (Consumer Groups)
XREAD
コマンドは複数のクライアントが独立してストリームを読むことができますが、各クライアントはストリーム全体の項目を読み込みます。もし、ストリームを流れるメッセージを複数のワーカープロセスで分担して処理したい場合、単純なXREAD
では各ワーカーが同じメッセージを重複して処理してしまいます。
このような「複数のコンシューマーでメッセージを分担処理し、信頼性を確保したい」というユースケースのために設計されたのが コンシューマーグループ です。
コンシューマーグループを使用すると、ストリームを流れるメッセージをグループ内のコンシューマー間で負荷分散して処理できます。さらに、各コンシューマーがどのメッセージを処理中か、あるいは処理完了したかを追跡し、コンシューマーのクラッシュなどが発生した場合に、処理途中のメッセージを他のコンシューマーが引き継いで処理できるような信頼性メカニズムを提供します。
コンシューマーグループの概念
コンシューマーグループは、以下の要素で構成されます。
- グループ名 (Group Name): ストリームに対して定義されるコンシューマーグループの名前。一つのストリームに対して複数のグループを定義できます。各グループはストリームを独立して読み進めます。
- コンシューマー名 (Consumer Name): グループに参加する個々のコンシューマー(クライアント)を一意に識別する名前。
- 最後の配信ID (Last Delivered ID): そのグループに最後に正常に配信された(読み取られた)項目のID。新しい
XREADGROUP
コマンドは、このIDより新しい項目を読み込みます。 - 保留中のエントリリスト (Pending Entry List – PEL): 各コンシューマーが現在処理中の、しかしまだ処理完了の確認(ACK)が行われていない項目のリスト。このリストは、コンシューマーがクラッシュした場合に、他のコンシューマーが引き継ぐために使用されます。
コンシューマーグループの動作フロー
- グループ作成: ストリームに対してコンシューマーグループを作成します。
- コンシューマー参加: クライアントは特定のグループ名と独自のコンシューマー名を指定して、グループに参加します。
- メッセージ読み込み: コンシューマーは
XREADGROUP
コマンドを使用して、グループの「最後の配信ID」より新しい項目を読み込みます。Redisは、これらの項目をグループ内のコンシューマーにラウンドロビン方式で(またはその他の負荷分散方式で)配信します。読み取られた項目は、そのコンシューマーのPELに追加されます。 - メッセージ処理: コンシューマーは読み取ったメッセージを処理します。
- 処理完了の確認 (Acknowledgement – ACK): 処理が正常に完了したら、コンシューマーは
XACK
コマンドでその項目をRedisに通知します。Redisは通知された項目をコンシューマーのPELから削除します。 - コンシューマーの障害: もしコンシューマーが処理中にクラッシュした場合、そのコンシューマーのPELに残っている項目は、他のコンシューマー(同じグループ内または別のグループ内のコンシューマー)が
XCLAIM
コマンドを使って引き継ぎ、処理を続行できます。 - 保留中のメッセージの確認:
XPENDING
コマンドを使うと、特定のグループ内で保留中のメッセージ(PELにあるメッセージ)を確認できます。
コンシューマーグループ関連コマンド
XGROUP CREATE
:コンシューマーグループの作成
ストリームに対して新しいコンシューマーグループを作成します。
redis
XGROUP CREATE key groupname id [MKSTREAM]
key
: 対象のストリーム。groupname
: 作成するグループの名前。id
: グループの開始ID(最初の配信ID)を指定します。0-0
: ストリームの最初から読み込みを開始します。$
: グループ作成時点のストリームの最後の項目より新しい項目から読み込みを開始します。- 特定のID: そのIDより新しい項目から読み込みを開始します。
MKSTREAM
: 指定したkey
のストリームが存在しない場合に、空のストリームを同時に作成します(オプション)。
例:
“`redis
XGROUP CREATE my_stream my_group 0-0 MKSTREAM # ストリームの最初から読み込むグループを作成 (ストリームがなければ作成)
OK
XGROUP CREATE another_stream another_group $ # グループ作成以降のメッセージのみを読み込むグループを作成
OK
“`
XREADGROUP
:コンシューマーグループからの読み込み
コンシューマーグループに参加しているコンシューマーがメッセージを読み込むために使用します。
redis
XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key1 key2 ... ID1 ID2 ...
GROUP groupname consumername
: 属するグループ名と、自身のコンシューマー名を指定します。コンシューマー名はグループ内で一意である必要はありませんが、通常はワーカーの識別子などユニークな名前を使用します。COUNT count
: 読み込む項目の最大数(オプション)。BLOCK milliseconds
: 新しい項目が到着するまでブロックする時間(ミリ秒)。0
は無限にブロック。省略するとブロックしません。NOACK
: このフラグを指定すると、読み取った項目はPELに追加されず、すぐにACKされたとみなされます。信頼性より低遅延を優先する場合に使用します(非推奨の場合が多い)。STREAMS key1 key2 ... ID1 ID2 ...
: 読み込み対象のストリームとその開始IDのリスト。key
とID
は1対1で対応します。
STREAMS
の後のID
には、特別な意味を持つ値を使用します。
>
: そのグループに「最後に配信された項目」より新しい項目を読み込みます。新しいメッセージを取得する際に最も一般的に使用されるIDです。- 特定のID: そのIDより古い、まだPENDING状態のメッセージ(PELにあるメッセージ)を読み込みます。これは、コンシューマーが再起動した場合や、障害から回復した場合に、処理中のメッセージを引き継ぐために使用されます。
例:新しいメッセージの読み込み (非ブロック)
“`redis
XREADGROUP GROUP my_group consumer_1 COUNT 1 STREAMS my_stream > # my_groupのconsumer_1として、my_streamから新しい項目を最大1つ読み込む
1) 1) “my_stream”
2) 1) 1) “1678886400002-0”
2) 1) “event_type”
2) “order_placed”
3) “order_id”
4) “A123”
“`
例:新しいメッセージの読み込み (ブロック)
“`redis
XREADGROUP GROUP my_group consumer_1 BLOCK 0 COUNT 1 STREAMS my_stream > # my_streamに新しい項目が追加されるまで無限に待つ
“`
例:保留中のメッセージの読み込み
コンシューマーが再起動した場合など、まだ処理が完了していない(PELにある)メッセージを読み込み直すには、開始IDとして 0-0
を指定します。
“`redis
XREADGROUP GROUP my_group consumer_1 STREAMS my_stream 0-0 # consumer_1のPELにあるメッセージを読み込む
“`
これは、特定のコンシューマーがクラッシュした場合に、他のコンシューマーがそのPELを引き継ぐ (XCLAIM
) 前に、そのコンシューマー自身が回復して自分のPELを処理するシナリオで有用です。
XACK
:処理完了の通知
XREADGROUP
で読み込んだ項目を正常に処理した場合に、PELからその項目を削除するために使用します。
redis
XACK key groupname id [id ...]
key
: 対象のストリーム。groupname
: 属するグループの名前。id [id ...]
: 処理完了した項目のIDを一つまたは複数指定します。
例:
“`redis
XACK my_stream my_group 1678886400002-0 # ID “1678886400002-0” の処理完了を通知
(integer) 1 # 正常にACKされたメッセージの数
“`
XACK
されない限り、メッセージはコンシューマーのPELに残り続け、そのコンシューマーが XREADGROUP ... 0-0
で読み込むか、他のコンシューマーが XCLAIM
で引き継ぐまで、保留状態となります。
XPENDING
:保留中のメッセージの確認
コンシューマーグループ内で保留中のメッセージ(PELにあるメッセージ)の情報を確認します。
redis
XPENDING key groupname [[start end count] [consumername]]
key
: 対象のストリーム。groupname
: 対象のグループ。- 引数なし: グループ全体の保留中のメッセージに関する要約情報を返します。
- 保留中のメッセージの総数
- 最小ID、最大ID
- 保留中のメッセージを持つコンシューマーのリストと、それぞれのメッセージ数
start end count
: 特定のID範囲内の保留中のメッセージの詳細リストを取得します。start
,end
: IDの範囲。-
,+
を使用できます。count
: 取得する最大数。
consumername
:start end count
とともに指定すると、特定のコンシューマーが保留中のメッセージのみを取得します。
例:グループ全体の要約
“`redis
XPENDING my_stream my_group
1) (integer) 1 # 保留中のメッセージ総数
2) “1678886400002-0” # 最小ID
3) “1678886400002-0” # 最大ID
4) 1) 1) “consumer_1” # コンシューマー名
2) “1” # そのコンシューマーの保留中のメッセージ数
“`
例:保留中のメッセージの詳細
“`redis
XPENDING my_stream my_group – + 10 # グループ内の保留中のメッセージを最大10件取得
1) 1) “1678886400002-0” # メッセージID
2) “consumer_1” # 所有コンシューマー名
3) (integer) 120000 # 保留時間 (ミリ秒)
4) (integer) 1 # 配信回数 (delivery counter)
“`
XPENDING
は、コンシューマーがクラッシュしていないか、あるいはメッセージが長時間処理されないまま滞留していないかなどを監視するために非常に役立ちます。
XCLAIM
:保留中のメッセージの引き継ぎ
あるコンシューマーが長時間処理を行わない(クラッシュなど)場合、そのコンシューマーのPELに残っているメッセージを、別のコンシューマーが強制的に引き継いで処理するために使用します。
redis
XCLAIM key groupname consumername min-idle-time id [id ...] [IDLE idle-time] [TIME ms-unix-time] [FORCE] [JUSTID]
key
: 対象のストリーム。groupname
: 対象のグループ。consumername
: 引き継ぐメッセージの新しい所有者となるコンシューマーの名前。min-idle-time
: 指定したミリ秒以上、そのメッセージが保留されたままになっている場合にのみ引き継ぎを行います。これにより、アクティブなコンシューマーが処理中のメッセージを誤って引き継ぐのを防ぎます。id [id ...]
: 引き継ぐメッセージのIDを一つまたは複数指定します。通常はXPENDING
コマンドで確認した保留中のメッセージIDを指定します。IDLE idle-time
: 引き継いだメッセージのIDLE時間を指定します。デフォルトは0。TIME ms-unix-time
: 引き継いだメッセージの最後の配信時間をUnixミリ秒タイムスタンプで指定します。FORCE
: メッセージが存在しない、または指定したmin-idle-time
を満たさない場合でも、強制的にメッセージを作成または引き継ぎます。通常は使用しません。JUSTID
: メッセージの内容ではなく、引き継がれたメッセージのIDのみを返します。
例:
“`redis
XPENDINGで長時間保留されているメッセージID (例: 1678886400002-0) を見つける
そのメッセージを consumer_2 が引き継ぐ
XCLAIM my_stream my_group consumer_2 30000 1678886400002-0 # 30秒以上保留されている場合に引き継ぐ
1) 1) “1678886400002-0”
2) 1) “event_type”
2) “order_placed”
3) “order_id”
4) “A123”
“`
XCLAIM
は、指定したIDのメッセージが存在し、かつmin-idle-time
の条件を満たす場合、そのメッセージの所有者を指定したconsumername
に変更し、PELエントリを更新します。そして、引き継いだメッセージの内容を返します(JUSTID
を指定しない場合)。
XAUTOCLAIM
:自動的な保留メッセージの引き継ぎとクリーンアップ (Redis 6.2+)
XCLAIM
は特定のメッセージIDを指定する必要がありますが、XAUTOCLAIM
は指定したID以降の範囲で、一定時間アイドル状態になっているメッセージを自動的に検索し、指定したコンシューマーが引き継ぎます。同時に、存在しないコンシューマーのPENDINGエントリのクリーンアップも行います。
redis
XAUTOCLAIM key groupname consumername min-idle-time start [COUNT count] [JUSTID]
key
,groupname
,consumername
,min-idle-time
,JUSTID
:XCLAIM
と同様。start
: 検索を開始するメッセージID。通常は0-0
を指定してPEL全体を対象とします。COUNT count
: 一度に検索/引き継ぎを行うメッセージの最大数(オプション)。
例:
“`redis
XAUTOCLAIM my_stream my_group consumer_2 3600000 0-0 COUNT 100 # my_streamのmy_groupで、1時間以上保留されているメッセージを0-0から最大100件検索し、consumer_2が引き継ぐ
1) “0-0” # 次の検索開始ID
2) 1) 1) “1678886400002-0” # 引き継がれたメッセージのリスト
2) 1) “event_type”
2) “order_placed”
3) “order_id”
4) “A123”
3) (empty array) # 削除された無効なPELエントリのIDリスト
“`
XAUTOCLAIM
は、障害発生後の自動復旧や、定期的なPELのクリーンアップに非常に便利です。
XGROUP SETID
:グループの最後の配信IDを変更
コンシューマーグループの最後の配信IDを手動で変更します。これは、特定の時点からグループの読み込みを再開させたい場合などに使用します。
redis
XGROUP SETID key groupname id
key
: 対象のストリーム。groupname
: 対象のグループ。id
: 新しい最後の配信ID。$
を指定すると、現在のストリームの最後のIDに設定され、今後新しいメッセージのみを読み込むようになります。
例:
“`redis
XGROUP SETID my_stream my_group $ # グループの読み込み位置を現在のストリームの末尾に設定
OK
“`
XGROUP DELCONSUMER
:コンシューマーの削除
コンシューマーグループから特定のコンシューマーを削除します。そのコンシューマーが所有していたPENDINGメッセージは削除されます。
redis
XGROUP DELCONSUMER key groupname consumername
例:
“`redis
XGROUP DELCONSUMER my_stream my_group consumer_1 # グループからconsumer_1を削除
“`
XGROUP DESTROY
:コンシューマーグループの削除
コンシューマーグループ自体を削除します。グループに属するすべてのコンシューマーと、そのPENDINGメッセージも削除されます。
redis
XGROUP DESTROY key groupname
例:
“`redis
XGROUP DESTROY my_stream my_group # my_groupを削除
“`
XINFO
:ストリームやグループ、コンシューマー情報の取得
ストリームやコンシューマーグループ、個々のコンシューマーの状態を取得します。デバッグや監視に非常に役立ちます。
redis
XINFO STREAM key [FULL [COUNT count]]
XINFO GROUPS key
XINFO CONSUMERS key groupname
XINFO STREAM key
: ストリームに関する基本情報(項目数、最初/最後のID、コンシューマーグループ数など)を取得します。FULL [COUNT count]
: ストリームの項目や、グループ、コンシューマー、PENDINGメッセージの詳細を含む完全な情報を取得します。COUNT
で取得する項目数を制限できます。
XINFO GROUPS key
: ストリームに定義されているすべてのコンシューマーグループのリストとその情報を取得します。XINFO CONSUMERS key groupname
: 特定のコンシューマーグループに属するすべてのコンシューマーのリストとその情報を取得します。
これらのコマンドを組み合わせることで、Redis Streamをメッセージングキューやイベントバスとして、信頼性とスケーラビリティを持たせて利用することができます。特にXREADGROUP
, XACK
, XPENDING
, XCLAIM
, XAUTOCLAIM
のメカニズムは、分散ワーカーによるメッセージの並列処理と障害時の回復において中心的な役割を果たします。
Redis Stream vs Pub/Sub
Redisには、Pub/Sub(Publish/Subscribe)という別のメッセージング機能があります。これは、パブリッシャーがチャンネルにメッセージを送信し、そのチャンネルを購読しているすべてのサブスクライバーがメッセージを受信するモデルです。
StreamとPub/Subは、どちらもメッセージングに利用されますが、設計思想と特性が大きく異なります。
特性 | Redis Stream | Redis Pub/Sub |
---|---|---|
モデル | ログベース、追記専用、時系列データ構造 | チャンネルベース、fire-and-forget |
永続性 | あり (Redisの永続化機能による) | なし (メッセージはメモリに保持されず、送信後すぐに破棄) |
配信保証 | At-least-once (コンシューマーグループ利用時、ACK/PELによる) | Fire-and-forget (保証なし) |
コンシューマー | 独立したコンシューマー、コンシューマーグループによる協調/負荷分散 | チャンネルを購読するすべてのサブスクライバーに配信 |
メッセージ消費 | コンシューマーグループ内でメッセージを分担処理 | 全てのサブスクライバーが同じメッセージを受信 |
履歴/再生 | 可能 (ID指定、XRANGE , XREAD による) |
不可能 (メッセージは即時破棄) |
スケーラビリティ | コンシューマーグループにより、多数のワーカーで負荷分散しやすい | 全てのサブスクライバーが全てのメッセージを受信するため、サブスクライバー数やメッセージ量が増えると負荷が高まる可能性がある |
複雑さ | 高い (ID管理、コンシューマーグループ、ACK/PEL) | 低い (シンプル) |
コマンド | XADD , XREAD , XRANGE , XTRIM , XLEN , XGROUP , XREADGROUP , XACK , XPENDING , XCLAIM など |
PUBLISH , SUBSCRIBE , UNSUBSCRIBE , PSUBSCRIBE , PUNSUBSCRIBE |
主なユースケース | 信頼性の高いメッセージキュー、イベントソーシング、タスクキュー、時系列データ、ログ収集 | シンプルなリアルタイム通知、ブロードキャスト、キャッシュ無効化 |
違いの詳細解説
-
永続性と配信保証:
- Pub/Subは、メッセージがチャンネルに公開されると、その時点でオンラインで購読しているクライアントに即座に配信されます。メッセージはRedis自体には保持されません。したがって、メッセージが送信されたときにサブスクライバーがオフラインだった場合、そのメッセージは永遠に失われます。配信保証はありません(Fire-and-forget)。
- Streamは、メッセージをストリームに追記して永続的に保存します(Redisの永続化設定による)。コンシューマーグループを使用する場合、メッセージは
XREADGROUP
で読み取られてもすぐに削除されず、コンシューマーのPELに残ります。コンシューマーがXACK
コマンドで処理完了を通知して初めてPELから削除されます。これにより、コンシューマーがクラッシュしたりメッセージの処理に失敗したりした場合でも、メッセージはPELに残り、後で再処理されるか、別のコンシューマーに引き継がれることが保証されます(At-least-once 配信)。
-
消費モデル:
- Pub/Subでは、一つのチャンネルに公開されたメッセージは、そのチャンネルを購読している「全ての」サブスクライバーにコピーされて配信されます(Fan-outモデル)。これは、リアルタイム通知やキャッシュ無効化など、同じ情報を複数の異なるコンシューマーがそれぞれ利用するシナリオに適しています。
- Streamでは、コンシューマーグループを使用すると、ストリーム内のメッセージはグループ内のコンシューマー間で分担して配信されます。つまり、一つのメッセージはグループ内の「一つの」コンシューマーによってのみ処理されます(Competing Consumersモデル)。これは、タスクキューのように、多数のワーカーで大量のジョブを並列処理するシナリオに適しています。もちろん、コンシューマーグループを使用せず、複数のクライアントがそれぞれ独立したコンシューマーとして
XREAD
を使用することも可能ですが、この場合はPub/Subのように各コンシューマーが全てのメッセージを読み取ります(ただし、履歴を追える点がPub/Subと異なります)。
-
履歴と再生:
- Pub/Subでは、メッセージは送信後すぐに破棄されるため、過去のメッセージを後から読み返すことはできません。
- StreamはメッセージをID付きでログとして保持するため、
XRANGE
やXREAD
コマンドを使って、過去の任意の時点からのメッセージを読み返したり、特定のID範囲のメッセージを取得したりすることが可能です。これは、イベントソーシングや障害発生時の状態回復などのシナリオで非常に強力な機能です。
-
スケーラビリティ:
- Pub/Subは、サブスクライバーが増えると、Redisサーバーは各サブスクライバーに対してメッセージを送信する必要があるため、負荷が高まります。
- Streamのコンシューマーグループは、メッセージの処理をグループ内のコンシューマーに分散させます。グループ内のコンシューマー数を増やすことで、並列処理能力を容易にスケールさせることができます。Redisサーバー側では、どのメッセージをどのコンシューマーに配信したか、ACKの状態などを管理する必要がありますが、メッセージ自体の配信は分担されるため、Pub/Subよりもコンシューマーが多い場合の負荷分散に優れています。
どちらを選ぶべきか?
- Pub/Subを選ぶ場合:
- メッセージの失われたくないという要件が厳しくない(Fire-and-forgetで十分)。
- メッセージを購読するすべてのクライアントが、すべてのメッセージを受信する必要がある(ブロードキャスト、通知)。
- システムのセットアップと管理を可能な限りシンプルにしたい。
- 過去のメッセージを読み返す必要がない。
- Redis Streamを選ぶ場合:
- メッセージが失われるのを避けたい(At-least-once配信が必要)。
- 複数のワーカープロセスでメッセージを分担処理し、並列処理能力をスケールさせたい(タスクキュー、ワークキュー)。
- コンシューマーの障害発生時に、処理中のメッセージが失われず、引き継がれて処理されるようにしたい。
- 過去のメッセージをID指定で読み返したり、特定の時点から処理を再開したりしたい(イベントソーシング、ログ処理)。
- メッセージ自体をRedis内に永続的に保持したい。
StreamはPub/Subよりも高機能で信頼性が高いですが、その分、概念やコマンドが複雑になります。ユースケースの要件に応じて適切な方を選択することが重要です。
Redis Stream vs リスト (List)
Redisのリスト(List)は、要素が順序付けされたコレクションであり、両端から要素を出し入れする操作(LPOP/RPOP, LPUSH/RPUSH)を高速に行えるため、古くからシンプルなメッセージキューとして利用されてきました。特にブロッキング操作(BLPOP/BRPOP)を使うことで、要素が追加されるまでコンシューマーを待機させることができます。
Streamとリストも、メッセージキューとして利用される点では共通していますが、そのデータ構造、消費モデル、機能において大きな違いがあります。
特性 | Redis Stream | Redis List |
---|---|---|
データ構造 | ログベース、ID付き、追記専用、フィールド/値 | 順序付けられた要素のリスト (文字列) |
要素 | ID + 複数のフィールド/値 (ハッシュマップ風) | 単一の文字列 |
消費モデル | ログから読み進める、コンシューマーグループによる分担処理 | 片側から要素を取り出す (通常は1対1の消費) |
複数のコンシューマー | 独立したコンシューマー、コンシューマーグループによる協調処理が容易 | 実現困難 (複数のLPOP/RPOPで同じ要素が取り出される可能性) |
配信保証 | At-least-once (コンシューマーグループ利用時) | 保証なし (LPOP/RPOP後に処理失敗すると要素は失われる) |
履歴/再生 | 可能 (ID指定、XRANGE , XREAD ) |
困難/不可能 (要素は取り出し時に削除される) |
メッセージID | 自動生成される一意なID | なし (インデックスはあるが、要素自体にIDは付与されない) |
部分読み込み | ID範囲指定で可能 (XRANGE ) |
インデックス範囲指定で可能 (LRANGE ) |
複雑さ | 高い | 低い (シンプル) |
コマンド | XADD , XREAD , XGROUP , etc. |
LPUSH , RPUSH , LPOP , RPOP , BLPOP , BRPOP , LRANGE , etc. |
主なユースケース | 信頼性の高いキュー、イベントログ、時系列データ | シンプルなキュー (1対1消費)、スタック、限定サイズリスト |
違いの詳細解説
-
データ構造と要素:
- リストは、単なる文字列の順序付きコレクションです。各要素は一つの文字列しか保持できません。
- Streamの項目は、ユニークなIDと、複数のフィールド-値ペアを持つことができます。これにより、構造化されたメッセージを直接格納できます。
- Streamの項目には自動生成される単調増加のIDが付与される点がリストと決定的に異なります。リストにはインデックスがありますが、これはリスト内の位置を示すだけであり、項目自体に永続的なIDが付与されるわけではありません。
-
消費モデルと複数コンシューマー:
- リストをキューとして使用する場合、通常はLPUSH/RPUSHで要素を追加し、LPOP/RPOP/BLPOP/BRPOPで要素を取り出します。これらのPOP操作は要素をリストから削除するため、複数のコンシューマーが同じリストに対してPOP操作を行うと、各要素は一つのコンシューマーによってのみ取得されます。これはシンプルなワークキューとしては機能しますが、要素の取り出し(POP)と処理完了の通知(ACK)が分離されていません。 要素を取り出した時点でリストから消えるため、もしコンシューマーがPOP後にクラッシュした場合、その要素(メッセージ)は失われてしまいます。
- Streamのコンシューマーグループは、メッセージの読み取り(
XREADGROUP
)と処理完了の通知(XACK
)を分離しています。メッセージはXACK
されるまでPELに残り、コンシューマーの障害時にも再処理が可能です。また、複数のコンシューマーがグループに参加することで、ストリーム内のメッセージを効率的に分担して処理できます。リストでは、複数のコンシューマーでメッセージを分担処理し、かつ信頼性を確保するためには、別途ロック機構や状態管理をアプリケーション側で実装する必要があり、非常に複雑になります。
-
配信保証と信頼性:
- リストを使用したキューは、POP操作後のコンシューマー障害に対して脆弱です。At-least-onceのような信頼性の高い配信は組み込まれていません。
- Streamのコンシューマーグループは、PELとACKメカニズムにより、At-least-once配信をサポートしています。メッセージがACKされるまで失われることがありません。
-
履歴と再生:
- リストからPOPされた要素は削除されるため、過去にキューに追加された要素を後から読み返すことはできません。
- Streamはログ構造であり、メッセージは
XTRIM
などで明示的に削除されない限りストリームに保持されます。これにより、過去の任意の時点からのメッセージをID指定で読み込むことが可能です。
-
部分読み込みとID:
- リストは
LRANGE
コマンドでインデックス範囲の要素を取得できますが、要素自体に意味のあるIDはありません。 - StreamはユニークなメッセージIDを持ち、
XRANGE
でID範囲の要素を取得できます。これは時系列データのクエリや、特定のイベントからの追跡に非常に強力です。
- リストは
どちらを選ぶべきか?
- リストを選ぶ場合:
- 非常にシンプルなキューとして使いたい(1対1の消費)。
- メッセージは文字列単体で十分。
- メッセージが失われても問題ない、あるいはアプリケーション側で回復ロジックをシンプルに実装できる。
- コンシューマーの障害リカバリの必要がない、あるいはダウンタイムが許容される。
- 過去のメッセージを読み返す必要がない。
- スタック(後入れ先出し)として使いたい(LPUSH/LPOP)。
- 限定された数の要素を保持したい(TRIM)。
- Redis Streamを選ぶ場合:
- メッセージの信頼性が重要(失われたくない)。
- 複数のワーカープロセスでメッセージを並列に、かつ信頼性高く処理したい。
- コンシューマーの障害発生時に処理中のメッセージを失いたくない、引き継ぎたい。
- 過去のメッセージをID指定で読み返したり、特定の時点から処理を再開したりしたい。
- 構造化されたメッセージ(複数のフィールドを持つ)を格納したい。
- 時系列データやイベントログを扱いたい。
リストはシンプルで高速なキューイングには適していますが、信頼性、並列処理の容易さ、履歴保持といった点でStreamには劣ります。Streamはより複雑なメッセージングやイベント処理の要件に応えるための機能を提供します。
Redis Streamのユースケース
Redis Streamの特性を活かせる具体的なユースケースをいくつか紹介します。
-
信頼性の高いメッセージキュー/タスクキュー:
- 複数のワーカープロセスが、共通のタスクストリームからジョブを取り出して処理するシナリオ。コンシューマーグループを使用することで、各ジョブが一度だけ処理されること(At-least-once配信)を保証し、ワーカーの追加/削除によるスケーリングや、ワーカー障害時のジョブ引き継ぎを容易に実現できます。
- 例:バックグラウンド処理、メール送信、画像処理、データ処理パイプライン。
-
イベントソーシング/イベントログ:
- システム内で発生したすべてのイベントを時系列順に記録する immutable なログとして利用。各項目がイベントを表し、IDがその発生順序を保証します。
- アプリケーションの状態をイベントのログから再構築したり、過去のイベントを再生して分析を行ったりすることが可能です。
- 例:ユーザーアクティビティログ、注文履歴、金融取引ログ。
-
時系列データの収集と処理:
- センサーデータ、メトリクス、ログエントリなど、時系列で発生するデータを収集・保存する場所として利用。メッセージIDのタイムスタンプ部分が時系列インデックスとして機能します。
XRANGE
コマンドを使って特定の期間のデータを効率的にクエリできます。- 例:IoTデバイスからのデータ収集、サーバーメトリクスの監視、アプリケーションログの集約。
-
マイクロサービス間の非同期通信:
- マイクロサービス間でイベントやコマンドを非同期に受け渡しするための疎結合なメッセージングバスとして利用。サービスは関連するストリームにイベントを発行し、関心のあるサービスはコンシューマーグループを作成してイベントを購読・処理します。
- 例:ユーザー登録イベントを複数のサービス(メール送信サービス、プロファイリングサービスなど)が購読して処理。
-
変更データキャプチャ (CDC) の配信:
- データベースの変更ログ(Binlogなど)をキャプチャし、その変更イベントをStreamに書き込むことで、他のサービスがその変更を購読してリアルタイムにデータ同期やキャッシュ更新を行うシナリオ。
これらのユースケースにおいて、StreamはPub/Subやリストだけでは実現が難しかった、高信頼性、並列処理、履歴保持といった機能を提供し、分散システムの設計をよりシンプルかつ堅牢にします。
Redis Stream利用上の注意点
Redis Streamは非常に強力ですが、利用にあたってはいくつかの注意点があります。
- メモリ使用量: ストリームはメッセージをRedisに永続的に保存するため、メッセージ量が増えるとメモリ使用量も増加します。
XTRIM
コマンドやXADD
のMAXLEN
オプションを適切に使用して、ストリームのサイズを管理することが非常に重要です。不要になった古いメッセージは積極的に削除しましょう。 - 永続化: StreamデータはRedisの通常の永続化設定(RDB, AOF)に従います。信頼性を確保するためには、適切な永続化設定が必要です。AOFの
appendfsync everysec
やalways
は、メッセージの耐久性を高めますが、性能に影響を与えます。 - コンシューマーグループ管理: 長期間アクティブでないコンシューマーや不要になったグループは、
XGROUP DELCONSUMER
やXGROUP DESTROY
で適切に削除する必要があります。これらの情報はメモリを消費します。また、PENDINGメッセージが滞留していないか、XPENDING
やXAUTOCLAIM
で定期的に監視・処理することが重要です。 - メッセージIDの理解: IDの形式(
timestampInMillis-sequenceNumber
)と、XREADGROUP
のID引数(>
, 特定ID,0-0
)の意味を正しく理解することが、コンシューマーグループの正確な動作を保証するために不可欠です。 - 単一キーのボトルネック: Redisはシングルスレッドの性質を持つため、非常に高スループットな単一のストリームに対して、大量の
XADD
やXREADGROUP
操作が集中すると、ボトルネックになる可能性があります。必要に応じて複数のストリームに負荷を分散することを検討してください。Redis Clusterを使用する場合、ストリームはキーによってシャーディングされます。
これらの注意点を理解し、適切に運用することで、Redis Streamのメリットを最大限に活かすことができます。
まとめ:Redis Stream、Pub/Sub、リストの使い分け
本記事では、Redis Streamについて詳細に解説し、Pub/Subやリストといった既存のメッセージング関連機能との違いを比較しました。
- Redis Stream は、信頼性の高いメッセージキュー、イベントソーシング、時系列データ処理など、複雑なメッセージング要件に対応するために設計された、ID付き追記専用のログ構造です。特にコンシューマーグループによる複数ワーカーでのメッセージ分担処理と、ACK/PELによるAt-least-once配信が大きな特徴です。過去のメッセージ履歴を保持し、再生できる点も強力です。
- Redis Pub/Sub は、シンプルなブロードキャストやリアルタイム通知に適したFire-and-forgetモデルの機能です。メッセージの永続性や配信保証はなく、過去のメッセージを読み返すこともできませんが、そのシンプルさがメリットです。
- Redis List は、シンプルなFIFOまたはLIFOキューとして利用できます。要素を取り出すとリストから削除されるため、信頼性や複数コンシューマーによる分担処理には向きませんが、非常に高速な単一消費者キューや限定サイズリストとして手軽に利用できます。
それぞれの機能は異なる設計思想と特性を持っており、特定のユースケースに対して得意なものがあります。
- シンプルで信頼不要なブロードキャスト/通知: Pub/Sub
- シンプルで高速な1対1キュー: リスト
- 信頼性が高く、複数ワーカーで負荷分散したいキュー/タスクキュー: Redis Stream (コンシューマーグループ)
- イベントログ、時系列データ、履歴の保持が必要な場合: Redis Stream
Redis Streamは、従来のPub/Subやリストでは対応が難しかった分散システムのメッセージング要件に応える、現代的なソリューションと言えます。その豊富なコマンドとコンシューマーグループ機能は、信頼性の高いスケーラブルなアプリケーション構築において強力なツールとなるでしょう。
ただし、StreamはPub/Subやリストに比べて学習コストや運用管理の複雑さが増します。ユースケースの要件を慎重に検討し、機能と複雑さのバランスを考慮して、最適なRedisの機能を選択することが重要です。
Redis Streamを深く理解し、そのパワーを使いこなすことで、あなたのアプリケーションはより堅牢でスケーラブルになるはずです。本記事が、その理解の一助となれば幸いです。