Elasticsearch reindex API入門:データ移行やインデックス再構築を超高速化


Elasticsearch reindex API入門:データ移行やインデックス再構築を超高速化

Elasticsearchは、大量のデータを高速に検索・分析するための強力な分散型検索エンジンです。データの性質や要件の変化に伴い、既存のインデックスの構造を変更したり、データを別のインデックスに移行したりする必要が生じることがあります。このような場面で絶大な威力を発揮するのが、Elasticsearchのreindex APIです。

この記事では、reindex APIの基本的な使い方から、詳細なパラメータ、非同期実行、パフォーマンスチューニング、そして実践的な移行戦略まで、徹底的に解説します。Elasticsearchでデータの再配置や構造変更を超高速に行いたいと考えている方にとって、必読の内容となるでしょう。

はじめに:なぜreindex APIが必要なのか?

Elasticsearchで運用しているデータに対して、以下のような変更が必要になることは珍しくありません。

  • インデックス構造の変更:
    • マッピング(フィールドのデータ型や解析方法)を変更したい。
    • アナライザーやトークナイザーの設定を変更したい。
    • プライマリーシャード数やレプリカシャード数を変更したい。
    • 新しいインデックス設定を適用したい。
  • データの再配置:
    • 古いインデックスから新しいインデックスへデータを移行したい。
    • 複数のインデックスのデータを1つのインデックスに統合したい。
    • 特定の条件に一致するドキュメントだけを別のインデックスに移したい。
    • 別のElasticsearchクラスターへデータを移行したい。
  • データの加工・整形:
    • ドキュメントの一部フィールドの値を変更したい。
    • 特定のフィールドを削除したり、新しいフィールドを追加したりしたい。
    • データの正規化やクレンジングを行いたい。

これらの操作を行う最もシンプルで確実な方法は、新しい(あるいは移行先の)インデックスを作成し、そこに元のインデックスからデータをコピーすることです。しかし、データ量が膨大になると、この「コピー」という作業が大きな課題となります。

従来の方法とその課題

reindex APIが登場する以前や、reindex APIを知らない場合に考えられるデータ移行・再構築の方法としては、以下のようなものがあります。

  1. 外部スクリプトによるdump & load:
    • scroll APIなどを使って元のインデックスから全データを取得し、ローカルファイルなどに保存(dump)。
    • 保存したデータを加工(必要であれば)。
    • bulk APIなどを使って新しいインデックスにデータを投入(load)。
    • 課題: クライアント側のリソース(CPU, メモリ, ネットワーク帯域)を消費する、処理速度が遅い、実装が複雑になりがち、元のクラスターと移行先のクラスター間でネットワーク通信がボトルネックになりやすい。
  2. Logstashなどのデータパイプラインツール:
    • Logstash Input plugin(Elasticsearch)で元のインデックスからデータを読み込み、Output plugin(Elasticsearch)で新しいインデックスに書き込む。
    • Filter pluginでデータの加工も可能。
    • 課題: 別途Logstash環境の構築・運用が必要、処理能力はLogstashインスタンスのリソースに依存する。
  3. snapshot/restore:
    • インデックス全体のバックアップを作成し、それを復元する。
    • 課題: インデックス設定(マッピング、シャード数など)の変更には向かない(基本的にバックアップ時の設定が復元される)、データ全体を復元するため一部のデータ移行や結合には使えない。

これらの方法には、一般的に以下のような共通の課題があります。

  • 処理速度: 大量のデータ処理には時間がかかる。
  • 複雑性: スクリプトの実装や外部ツールの設定が必要。
  • 運用負荷: 処理中の監視やエラーハンドリングが必要。
  • ダウンタイム: アプリケーションが古いインデックスを参照している間に移行を行う場合、データの整合性やアプリケーションの停止が必要になることがある。

reindex APIの登場と利点

Elasticsearch 2.3で導入されたreindex APIは、これらの課題を解決するために設計されました。reindex APIは、Elasticsearchクラスター内部でデータの読み込みと書き込みを実行します。つまり、外部のクライアントやツールを介さずに、ノード間通信を利用して効率的にデータがコピーされます。

reindex APIの主な利点は以下の通りです。

  • 高速: クラスター内部で並列処理されるため、外部ツールを使うよりもはるかに高速です。ディスクI/Oとネットワーク帯域の制約を受けにくくなります。
  • シンプル: REST APIエンドポイントを叩くだけで実行できます。複雑なスクリプトや設定は不要です。
  • サーバーサイド処理: クライアント側はリクエストを発行するだけでよく、処理中のクライアントリソースを消費しません(非同期実行の場合)。
  • 柔軟性: 特定の条件でフィルタリングしたり、データを変換したりしながらreindexできます。
  • インデックス構造変更への対応: 新しいインデックスの設定やマッピングを事前に定義しておけば、その構造に合わせてデータが書き込まれます。

これらの利点から、reindex APIはElasticsearchにおけるデータ移行やインデックス再構築のデファクトスタンダードとなっています。

reindex APIの基本

reindex APIは、指定された一つ以上のソースインデックスからドキュメントを読み込み、指定されたデスティネーションインデックスに書き込むためのAPIです。

基本となるAPIエンドポイントは POST /_reindex です。リクエストボディに、ソース (source) とデスティネーション (dest) の情報を記述します。

最小限の例

最もシンプルな例は、sourceに元のインデックス名を、destに移行先のインデックス名を指定するケースです。

例えば、my_old_indexというインデックスの全データを、新しく作成したmy_new_indexというインデックスにコピーする場合、以下のようになります。

bash
POST /_reindex
{
"source": {
"index": "my_old_index"
},
"dest": {
"index": "my_new_index"
}
}

このリクエストを送信すると、Elasticsearchはmy_old_indexからドキュメントをバッチで読み込み、my_new_indexに書き込みます。ドキュメントはデフォルトで元のIDが保持されます。もしmy_new_indexに同じIDのドキュメントが既に存在する場合、デフォルトでは上書きされます。

同期実行と非同期実行

デフォルトでは、reindex APIは同期実行されます。つまり、APIリクエストを送信したクライアントは、reindex処理が完了するまで待機します。処理が完了(または失敗)すると、結果がJSON形式で返されます。

しかし、大量のデータをreindexする場合、処理には長時間かかる可能性があります。その間クライアントを待機させるのは現実的ではありません。そこで、reindex APIは非同期実行をサポートしています。非同期実行については後述しますが、リクエストに特定のパラメータを追加するだけで簡単に利用できます。

詳細なパラメータとオプション

reindex APIは非常に多くのパラメータをサポートしており、様々なユースケースに対応できます。ここでは主要なパラメータを詳しく見ていきます。

source オブジェクト

source オブジェクトは、reindex元となるインデックスやドキュメントを定義します。

  • index:

    • 必須パラメータです。reindex元のインデックス名を指定します。
    • 単一のインデックス名 ("my_old_index")
    • 複数のインデックス名を配列で指定 (["index1", "index2"]) – これらのインデックスのデータがまとめてdestインデックスにreindexされます。
    • ワイルドカードを使ったインデックス名 ("logstash-*")
    • エイリアス名

    例:複数のインデックスからreindex

    bash
    POST /_reindex
    {
    "source": {
    "index": ["index_2023_01", "index_2023_02", "index_2023_03"]
    },
    "dest": {
    "index": "quarter_1_data"
    }
    }

  • type:

    • Elasticsearch 6.xまではドキュメントタイプを指定できましたが、7.x以降は非推奨となり、8.xでは完全に削除されました。特別な理由がない限り、使用すべきではありません。デフォルトの_docタイプに対してreindexされます。
  • query:

    • 特定の条件に一致するドキュメントのみをreindexしたい場合に指定します。ElasticsearchのQuery DSLを使用します。
    • これにより、全件ではなく、必要なデータだけを効率的に移行できます。

    例:特定のユーザーのドキュメントのみをreindex

    bash
    POST /_reindex
    {
    "source": {
    "index": "my_index",
    "query": {
    "term": {
    "user_id": "user123"
    }
    }
    },
    "dest": {
    "index": "user_specific_index"
    }
    }

  • size:

    • reindexするドキュメントの最大数を指定します。テスト目的などで一部のデータだけを試したい場合に便利です。

    例:先頭100件のドキュメントのみをreindex

    bash
    POST /_reindex
    {
    "source": {
    "index": "my_index",
    "size": 100
    },
    "dest": {
    "index": "sample_index"
    }
    }

  • slice:

    • reindex処理を並列実行するための設定です。reindex処理は内部的にscrollとbulk APIを使用しますが、sliceを使用すると、元のインデックスを複数の「スライス」に分割し、各スライスを独立したタスクとして並列に処理できます。これにより、特に大規模なインデックスのreindex速度が劇的に向上します。
    • sliceオブジェクトには、id(このスライスのID)とmax(全スライス数)を指定します。ただし、通常はこの情報はAPIリクエスト時に手動で指定するのではなく、slicesパラメータ(後述)を使って全スライス数を指定するだけで、Elasticsearchが内部的に各スライスタスクを生成・管理します。source.sliceは、APIを内部的に実行する際のパラメータとして理解しておけば十分です。
  • remote:

    • 別のElasticsearchクラスターからデータをreindexしたい場合に指定します。
    • 別のクラスターのURL、認証情報などを指定します。
    • これは、クラスター間のデータ移行に非常に便利な機能です。

    例:リモートクラスターからreindex

    bash
    POST /_reindex
    {
    "source": {
    "remote": {
    "host": "http://remote-cluster-host:9200",
    "username": "elastic",
    "password": "changeme"
    },
    "index": "remote_index"
    },
    "dest": {
    "index": "local_index"
    }
    }

    remoteには他にも、接続タイムアウトやソケットタイムアウトなどの設定が可能です。認証情報は、username/passwordの他に、APIキーやBearerトークンも指定できます。

dest オブジェクト

dest オブジェクトは、reindex先となるインデックスを定義します。

  • index:
    • 必須パラメータです。reindex先のインデックス名を指定します。
    • 指定したインデックスが存在しない場合、reindex処理を開始する前に自動的に作成されます(デフォルト設定)。この際、デフォルトの設定(シャード数、レプリカ数など)が適用されます。通常は、reindexを開始する前に、適切な設定(マッピング、シャード数、レプリカ数など)を持つ移行先インデックスを手動で作成しておくことを強く推奨します。
  • op_type:

    • 書き込み操作のタイプを指定します。
    • index (デフォルト): 同じIDのドキュメントが既に存在する場合、上書きします。
    • create: 同じIDのドキュメントが既に存在する場合、書き込みをスキップし、バージョン競合エラーとみなします。reindex元とreindex先でドキュメントIDが重複しないことが保証されている場合や、既存のドキュメントを絶対に上書きしたくない場合に使用します。

    例:上書きせずに新しいドキュメントのみ追加

    bash
    POST /_reindex
    {
    "source": {
    "index": "updates"
    },
    "dest": {
    "index": "master_data",
    "op_type": "create"
    },
    "conflicts": "proceed" # 競合が発生しても処理を続行
    }

    op_type: "create"を使用する場合、競合(同じIDが存在する場合)が発生するとデフォルトではreindex処理全体が中断します。これを避けるためには、conflicts: "proceed"パラメータを指定して、競合が発生してもエラーを記録しつつ処理を続行するようにします。

  • version_type:

    • ドキュメントのバージョン管理の挙動を指定します。リアルタイムで更新されるデータをreindexする際に重要になります。
    • internal (デフォルト): Elasticsearhが内部で管理するバージョン番号を使用します。reindex元からコピーされたバージョン番号は無視されます。
    • external: reindex元のドキュメントのバージョン番号をreindex先のバージョン番号として使用します。reindex先のドキュメントのバージョンがreindex元のバージョンよりも新しい場合、バージョン競合エラーとなります。
    • external_gte: reindex元のドキュメントのバージョン番号をreindex先のバージョン番号として使用します。reindex先のドキュメントのバージョンがreindex元のバージョンよりも大きいか等しい場合、バージョン競合エラーとなります。
    • force: バージョン競合チェックを行いません(非推奨)。
    • リアルタイムに近いデータをreindexする場合、version_type: "external"external_gteconflicts: "proceed"を組み合わせて、reindex中に発生した更新によってreindex先のドキュメントが新しくなっている場合はそのドキュメントを上書きしない、という挙動を実現することがあります。
  • routing:

    • reindex先のドキュメントのルーティング値を指定します。デフォルトではreindex元のドキュメントのルーティング値(指定されていなければドキュメントIDのハッシュ値)が使用されます。
    • 特定の静的なルーティング値を設定したり、スクリプトを使って動的にルーティング値を計算したりできます。
  • pipeline:

    • reindex先のドキュメントをインデクシングする前に適用するIngest Node PipelineのIDを指定します。
    • Ingest Node Pipelineは、ドキュメントを受け取ってから実際にインデクシングされるまでの間に、データを変換、整形、加工する機能を提供します。これにより、reindex APIのscriptパラメータよりも柔軟かつ高性能なデータ変換が可能です。

    例:Ingest Pipelineを使ってデータ変換しながらreindex

    “`bash

    事前に ingest pipeline を定義しておく (例: user_agent フィールドをパース)

    PUT _ingest/pipeline/user_agent_parser
    {
    “description” : “parse user agent”,
    “processors” : [
    {
    “user_agent” : {
    “field” : “user_agent”
    }
    }
    ]
    }

    reindex リクエストで pipeline を指定

    POST /_reindex
    {
    “source”: {
    “index”: “web_logs_old”
    },
    “dest”: {
    “index”: “web_logs_new”,
    “pipeline”: “user_agent_parser”
    }
    }
    ``
    Ingest Pipelineを使うと、ユーザーエージェントのパース、IPアドレスのGeoIP情報付与、フィールド名の変更、データの型変換など、様々な加工処理を効率的に行うことができます。reindex時に複雑なデータ変換が必要な場合は、
    scriptよりもpipeline`の利用を検討すると良いでしょう。

その他のオプション

  • conflicts:

    • reindex中にバージョン競合などの書き込み競合が発生した場合の挙動を指定します。
    • abort (デフォルト): 競合が発生した場合、reindex処理を中断します。
    • proceed: 競合が発生しても処理を続行し、競合エラーを記録します。通常はこちらを使用します。非同期実行の結果で、どのドキュメントで競合が発生したかを確認できます。

    例:競合を無視して続行

    bash
    POST /_reindex
    {
    "source": {
    "index": "my_index"
    },
    "dest": {
    "index": "my_new_index"
    },
    "conflicts": "proceed"
    }

  • script:

    • 各ドキュメントがreindexされる直前に実行されるPainlessスクリプトを指定します。これにより、ドキュメントの内容を動的に変更(フィールドの追加、削除、値の変更など)しながらreindexすることができます。
    • sourceオブジェクトから読み込まれたドキュメントは、スクリプトのctx._source変数として利用できます。
    • scriptパラメータは非常に強力ですが、Ingest Pipelineに比べてパフォーマンスが劣る場合があるため、可能な限りIngest Pipelineの利用を検討しましょう。簡単な変換や条件分岐にはscriptが適しています。

    例:フィールドの値を変更し、新しいフィールドを追加

    bash
    POST /_reindex
    {
    "source": {
    "index": "articles"
    },
    "dest": {
    "index": "articles_modified"
    },
    "script": {
    "lang": "painless",
    "source": """
    // タイトルフィールドがあれば大文字に変換
    if (ctx._source.containsKey('title')) {
    ctx._source.title = ctx._source.title.toUpperCase(Locale.ROOT);
    }
    // 作成日フィールドをUNIXタイムスタンプとして追加
    if (ctx._source.containsKey('created_at')) {
    def date = new Date(ctx._source.created_at);
    ctx._source.created_timestamp = date.getTime();
    }
    // 古い不要なフィールドを削除
    ctx._source.remove('deprecated_field');
    """
    }
    }

    Painlessスクリプトでは、ctx._sourceに対して様々な操作が可能です。ctx._source.fieldNameでフィールドにアクセスしたり、ctx._source['fieldName']ctx._source.put('new_field', value)ctx._source.remove('field_to_remove')などの操作が行えます。また、ctx._idctx._type(7.xまで)、ctx._versionといったメタ情報にもアクセスできます。

  • slices:

    • reindex処理を並列実行するためのスライス数を指定します。auto (デフォルト) または具体的な数値を指定できます。
    • autoの場合、Elasticsearchはソースインデックスのプライマリーシャード数に基づいて最適なスライス数を自動的に決定します。通常はこの設定で十分ですが、より高い並列度が必要な場合や、ソースインデックスのシャード数が非常に少ない場合は明示的に数値を指定することも有効です。
    • 指定する数値は、reindex元の全シャード数(プライマリ+レプリカ)や、クラスター内のノード数などを考慮して決定します。あまりに多くのスライス数を指定すると、タスク管理のオーバーヘッドが増える場合があります。経験則として、クラスター内のデータノード数の数倍程度が目安となることがありますが、ワークロードによって最適な値は異なります。
    • slicesパラメータはreindex全体の並列度を制御し、内部的にsource.slice情報を持つ複数のタスクを生成します。

    例:スライス数を明示的に指定

    bash
    POST /_reindex
    {
    "source": {
    "index": "large_index"
    },
    "dest": {
    "index": "large_index_reindexed"
    },
    "slices": 8, # 8つのスライスで並列実行
    "conflicts": "proceed"
    }

  • requests_per_second:

    • reindex処理の速度を制限するためのスロットリング設定です。Elasticsearchクラスターへの負荷が高くなりすぎるのを防ぎたい場合に指定します。
    • -1 (デフォルト): スロットリングなし。可能な限り高速に実行します。
    • 任意の正の数値: 1秒あたりに処理するドキュメントのバッチ数を指定します。
    • この値は、Elasticsearchが内部的にドキュメントを読み書きするバッチサイズに依存して速度を制御します。小さい値を指定すると、reindex処理はゆっくり進みますが、クラスター全体の負荷は低減されます。

    例:1秒あたり100バッチに制限

    bash
    POST /_reindex
    {
    "source": {
    "index": "very_large_index"
    },
    "dest": {
    "index": "large_index_throttled"
    },
    "requests_per_second": 100, # 1秒あたり最大100バッチ
    "conflicts": "proceed"
    }

    この設定は、本番稼働中のクラスターでreindexを実行する際に非常に重要です。他の重要なワークロード(検索、インデクシングなど)への影響を最小限に抑えるために、適切なスロットリング値を設定することを推奨します。

  • wait_for_completion:

    • reindex APIを同期実行するか非同期実行するかを指定します。
    • true (デフォルト): 同期実行。処理が完了するまでクライアントは待機します。
    • false: 非同期実行。リクエストを受け付けた後、すぐにタスクIDを返します。処理はバックグラウンドで続行されます。長時間かかるreindexの場合は、必ずfalseを指定します。

    例:非同期実行

    bash
    POST /_reindex?wait_for_completion=false # URLパラメータで指定
    {
    "source": {
    "index": "massive_index"
    },
    "dest": {
    "index": "massive_index_reindexed"
    },
    "conflicts": "proceed"
    }

  • pretty:

    • APIレスポンスを整形して読みやすくするかどうかを指定します。trueまたはfalseを指定します。通常はURLパラメータとして?prettyを使用します。

非同期実行とタスク管理

前述の通り、wait_for_completion=falseパラメータを指定することで、reindex APIは非同期で実行されます。これは、長時間かかるreindex処理をバックグラウンドで実行し、クライアントがすぐに解放されるため、本番運用では必須の実行方法です。

非同期リクエストを送信すると、レスポンスとして以下のようなJSONがすぐに返されます。

json
{
"task" : "o_oU8ktzsyabKx_J21X_Gw:12345"
}

このtaskフィールドに含まれる文字列(例: o_oU8ktzsyabKx_J21X_Gw:12345)が、reindex処理を表すタスクIDです。このタスクIDを使って、reindex処理の進捗状況を確認したり、必要に応じて処理をキャンセルしたりすることができます。

タスクの進捗確認

Task API (GET /_tasks/{task_id}) を使用して、特定のタスクの現在の状態を確認できます。

bash
GET /_tasks/o_oU8ktzsyabKx_J21X_Gw:12345

レスポンスには、タスクの状態、完了率、処理されたドキュメント数、スロットリング情報、エラーなどが含まれます。

json
{
"nodes": {
"o_oU8ktzsyabKx_J21X_Gw": { # ノードID
"tasks": {
"o_oU8ktzsyabKx_J21X_Gw:12345": { # タスクID
"node": "o_oU8ktzsyabKx_J21X_Gw",
"id": 12345,
"type": "indices",
"action": "indices:data/write/reindex",
"status": {
"total": 1000000, # 対象ドキュメント総数 (推定)
"updated": 0,
"created": 500000, # 作成済みドキュメント数
"deleted": 0,
"batches": 500, # 処理済みバッチ数
"version_conflicts": 0, # バージョン競合数
"noops": 0, # スキップされたドキュメント数 (scriptでctx.op='none'など)
"retries": { # リトライ情報
"bulk": 0,
"search": 0
},
"throttled_millis": 0, # スロットリングによる待機時間 (ms)
"requests_per_second": -1.0, # 設定されたスロットリング値
"throttled_until_millis": 0 # スロットリング解除までの残り時間 (ms)
},
"description": "reindex from [source_index] to [dest_index]",
"start_time": 1678886400000,
"running_time_in_nanos": 60000000000, # 実行時間 (ns)
"cancellable": true, # キャンセル可能か
"parent_task": "parent_task_id_if_any" # 親タスク (スライス処理など)
}
}
}
}
}

statusフィールドの中に、createdupdatedなどの項目があり、進捗状況を確認できます。totalは処理対象のドキュメント総数の推定値ですが、正確でない場合もあります。

特定のノードや特定のアクション(例: indices:data/write/reindex)に絞ってタスクをリストすることも可能です。

bash
GET /_tasks?actions=indices:data/write/reindex&detailed

detailedパラメータは、各タスクのstatusフィールドを詳細に表示するために使用します。

タスクのキャンセル

実行中のreindexタスクを途中で中断したい場合は、Task APIのキャンセルエンドポイントを使用します。

bash
POST /_tasks/o_oU8ktzsyabKx_J21X_Gw:12345/_cancel

このリクエストを送信すると、Elasticsearchは該当タスクにキャンセルシグナルを送信し、タスクは可能な限り速やかに停止します。既に書き込まれたドキュメントはそのまま残り、reindex先のインデックスは不完全な状態になります。

reindex APIの応用例

reindex APIは単なるデータコピーにとどまらず、様々なユースケースで活用できます。

  1. インデックススキーマ(マッピング/設定)の変更

    • これはreindex APIの最も一般的なユースケースの一つです。
    • Elasticsearchでは、一度定義されたマッピングやシャード数などの重要な設定は、基本的に後から変更できません。変更するには、新しいインデックスを作成し、そこに既存のデータをreindexする必要があります。
    • 手順:
      1. 変更後のマッピングや設定を持つ新しいインデックスを作成します。
      2. reindex APIを使って、古いインデックスから新しいインデックスへデータをコピーします。
      3. reindex中に発生した新しいデータや更新データは、アプリケーション側で新しいインデックスにも書き込むようにするか、またはreindex完了後に差分を別途処理します。
      4. アプリケーションが参照するインデックスを、エイリアスを使って新しいインデックスに切り替えます。(エイリアスを使ったシームレスな移行戦略は後述)
      5. 古いインデックスを削除します。
  2. シャード数やレプリカ数の変更

    • これもスキーマ変更と同様に、新しいインデックスを作成してreindexする必要があります。
    • シャード数はインデックス作成後に変更できませんが、reindexすることで適切なシャード数を持つ新しいインデックスにデータを移行できます。レプリカ数はインデックス作成後も変更可能ですが、データノードの再配置などの目的でreindexすることもあります。
  3. データのフィルタリングと変換

    • queryパラメータで特定のドキュメントのみを抽出し、新しいインデックスにreindexできます。
    • scriptパラメータやpipelineパラメータを使って、reindex中にドキュメントの構造や内容を変更できます。
      • 例: 不要なフィールドの削除、フィールド名の変更、日付フォーマットの変換、外部データの結合(LogstashやIngest Pipelineで可能)。
  4. 複数インデックスの結合

    • source.indexにインデックス名の配列を指定することで、複数のソースインデックスからデータをまとめて1つのデスティネーションインデックスにreindexできます。
    • 例: 月別に作成していたインデックス(例: logs-2023-01, logs-2023-02, logs-2023-03)を、四半期ごとのインデックス(例: logs-2023-Q1)にまとめる場合。
  5. 別クラスターへのデータ移行

    • source.remoteパラメータを使用することで、異なるネットワーク上にあるElasticsearchクラスター間でも直接データをreindexできます。
    • これは、開発/ステージング環境から本番環境へのデータ移行、データレイクへの転送、クラウド移行などのシナリオで役立ちます。
  6. タイムベースインデックスの再構築

    • ログデータや時系列データなど、日付に基づいて分割されるインデックス(例: logs-YYYY-MM-DD)は、時間の経過と共に保持期間が過ぎた古いインデックスを削除していくのが一般的です。
    • しかし、過去の特定期間のデータを別の構造で再利用したい場合などに、reindex APIを使って古いインデックスを新しい構造にコピーすることができます。

パフォーマンスチューニングと注意点

reindex APIは非常に強力ですが、大量のデータを扱う際にはパフォーマンスを最適化し、クラスターへの影響を最小限に抑えるための考慮が必要です。

  1. スライス処理 (slices) の活用

    • reindex速度に最も大きな影響を与える設定の一つがslicesです。
    • 仕組み: slicesを指定すると、Elasticsearchは内部的に元のインデックスを複数の独立したセグメント(スライス)に分割し、それぞれに対して並列にreindexタスクを割り当てます。各スライスタスクは、指定されたスライスIDに属するドキュメントのみを処理します。この分割は、ドキュメントIDのハッシュ値に基づいて行われるため、特定のシャードに依存しない均等な分散が期待できます。
    • 最適なスライス数:
      • デフォルトのautoは、ソースインデックスのプライマリーシャード数に基づいて決定され、多くの場合良いスタート地点となります。
      • 手動で指定する場合、一般的にはElasticsearchクラスター内のデータノード数や、ソースインデックスのプライマリーシャード数と関連付けて検討します。
      • 目安:
        • ソースインデックスのプライマリーシャード数 <= データノード数: slices = プライマリーシャード数
        • ソースインデックスのプライマリーシャード数 > データノード数: slices = データノード数 * N (Nは2〜4程度が良いことが多い)
      • ただし、最適な値はクラスター構成、ハードウェア、データ特性、他のワークロードなどによって大きく変動するため、実際に様々な値を試して計測することが重要です。スライス数を増やしすぎると、タスク管理のオーバーヘッドが増加し、逆に性能が低下する可能性もあります。
      • スライス数が増えると、同時に多くのシャードが読み書きされることになり、I/O性能がボトルネックになる可能性があります。
    • 注意点: スライス処理は、ソースインデックスに十分なシャード数がないと効果が薄い場合があります。また、source.queryを使用する場合、そのクエリがスライス処理に対応している必要があります(基本的には対応していますが、一部複雑なクエリでは注意が必要です)。
  2. スロットリング (requests_per_second) の設定

    • 本番稼働中のクラスターでreindexを実行する場合、最も重要な設定の一つです。
    • reindex処理はCPU、メモリ、ディスクI/O、ネットワーク帯域など、クラスターリソースを大量に消費する可能性があります。無制限に実行すると、他の重要な処理(検索、インデクシング)に遅延や障害を引き起こす可能性があります。
    • requests_per_secondを適切な値に設定することで、reindex処理の速度を意図的に遅くし、クラスターへの負荷を制御できます。
    • 最適な値の決定:
      • 事前に検証環境でテストを行い、reindex実行中のクラスターリソース使用状況(CPU負荷、I/O待ち、ネットワークトラフィックなど)を監視します。
      • 他のワークロードのパフォーマンスへの影響を評価します。
      • これらの情報に基づいて、他の処理への影響が許容範囲内に収まるような最大のrequests_per_second値を決定します。
      • 最初は低めの値から始め、徐々に上げていくのが安全なアプローチです。
      • reindexの進捗速度とクラスター負荷のバランスを常に意識します。
  3. 移行先インデックスの設定

    • reindexを開始する前に、移行先インデックスを適切な設定で手動で作成しておくことを強く推奨します。
    • 特に、シャード数、レプリカ数、マッピング、アナライザー設定などは事前に定義しておきます。reindex APIにインデックスを自動作成させると、デフォルト設定が適用され、意図しない構造になる可能性があります。
    • 一時的な設定変更: reindex中はインデクシング性能が重要になります。移行先のインデックスに対して、reindex中のみ一時的に以下の設定を変更することで、インデクシングを高速化できる場合があります。
      • index.refresh_interval: リフレッシュ間隔を長くする(例: -1 無効化、または 300s)。リフレッシュ回数を減らすことでI/O負荷が軽減されますが、reindex中のデータが検索できるようになるまでの時間が長くなります。
      • index.number_of_replicas: レプリカ数を0にする。データの複製にかかるオーバーヘッドがなくなります。ただし、reindex中はプライマリシャードのみとなるため、冗長性が失われます。reindex完了後に元のレプリカ数に戻す必要があります。
      • index.number_of_shards: これはreindex前に設定するシャード数であり、reindex中に変更するものではありません。
  4. ディスクI/Oとネットワーク帯域

    • reindex処理は、ソースインデックスからの読み込みと、デスティネーションインデックスへの書き込みの両方でディスクI/Oを大量に消費します。
    • リモートクラスターからのreindexや、シャードがネットワークを介して別のノードに分散している場合、ネットワーク帯域もボトルネックになり得ます。
    • クラスターのI/O性能やネットワーク性能がreindex速度の上限となることを理解しておきましょう。NVMe SSDなどの高速ストレージや、十分な帯域幅を持つネットワーク環境が、reindex速度向上に寄与します。
  5. バージョン競合 (version_type, conflicts)

    • reindex対象のインデックスが、reindex実行中にもアプリケーションによって書き込みが行われている場合、バージョン競合が発生する可能性があります。
    • conflicts: "proceed"を設定することで処理の中断を防ぎ、非同期実行の完了後に返される結果やタスクステータスで競合が発生したドキュメント数を確認できます。
    • version_typeを適切に設定することで、reindex元の古いデータでreindex先の新しいデータを上書きしてしまうことを防ぐことができます(例: externalexternal_gte)。
    • リアルタイム性が重要なシステムでは、reindex中に発生する差分データの取り扱いについて、より高度な移行戦略(後述のエイリアス切り替えと差分吸収など)を検討する必要があります。
  6. メモリとCPUリソース

    • reindex処理は、内部的にドキュメントの読み込み、加工(scriptpipeline使用時)、バルクインデクシングキューへの投入などを行うため、クラスターノードのCPUとメモリを消費します。
    • 特にscriptや複雑なIngest Pipelineを使用する場合、CPU負荷が高くなる傾向があります。
    • reindex実行中は、Elasticsearchのcat API (/_cat/nodes?v&h=ip,heap.percent,cpu,load_1m,load_5m,load_15m,node.role&s=heap.percent:desc) や監視ツール(Metricbeat, APMなど)でノードのリソース使用率を監視し、ボトルネックがないかを確認することが重要です。
  7. 本番環境での実行

    • 必ず事前に検証環境でテストを行う: reindex対象のデータ量に近いデータセットで、同じ設定(特にslicesrequests_per_second)を使ってテストし、所要時間、クラスターへの負荷、他の処理への影響を確認します。
    • トラフィックの少ない時間帯を選ぶ: 可能であれば、アプリケーションからのアクセスが少ない時間帯にreindexを実行します。
    • 監視体制を整える: reindex実行中は、クラスターの状態、ノードリソース、reindexタスクの進捗を継続的に監視します。問題が発生した際に迅速に対応できるよう、アラートを設定しておくことも有効です。

エイリアスを使ったダウンタイム最小限の移行戦略

インデックスの設定やマッピングを変更するためにreindexを行う際、アプリケーションが古いインデックスを参照していると、reindex中は古いデータしか利用できない、あるいは一時的にサービスを停止する必要がある、といった問題が発生しがちです。

Elasticsearchのエイリアス機能とreindex APIを組み合わせることで、アプリケーション側のダウンタイムを最小限に抑えた(あるいはゼロにした)移行を実現できます。

エイリアスは、1つ以上のインデックスに対する別名のようなものです。アプリケーションはインデックス名の代わりにエイリアス名を使ってElasticsearchと通信し、そのエイリアスが現在指しているインデックスに対して操作を行います。エイリアスが指すインデックスをアトミックに切り替えることができるため、アプリケーション側はエイリアスの切り替えを意識する必要がありません。

エイリアスを使ったダウンタイム最小限の移行戦略の一般的な手順は以下の通りです。

  1. 現在の状況:

    • アプリケーションはmy_app_aliasというエイリアスを使ってElasticsearchにアクセスしています。
    • my_app_aliasは現在、古い設定を持つmy_index_v1というインデックスを指しています。
  2. 新しいインデックスの準備:

    • 変更後の設定(マッピング、シャード数など)を持つ新しいインデックスmy_index_v2を作成します。この時点では、my_app_aliasはこの新しいインデックスを指していません。 アプリケーションは引き続きmy_index_v1にアクセスします。

    bash
    PUT /my_index_v2
    {
    "settings": {
    // New settings (e.g., number_of_shards, analyzers)
    },
    "mappings": {
    // New mappings
    }
    }

  3. データのreindex:

    • reindex APIを使って、my_index_v1からmy_index_v2へデータをコピーします。非同期実行 (wait_for_completion=false) とconflicts: "proceed"を使用します。必要に応じてslicesrequests_per_secondも設定します。

    “`bash
    POST /_reindex?wait_for_completion=false&pretty
    {
    “source”: {
    “index”: “my_index_v1”
    },
    “dest”: {
    “index”: “my_index_v2”,
    “op_type”: “create” # 既にv2に存在するドキュメントは上書きしない (もしreindex中にv2に直接書き込む場合)
    },
    “conflicts”: “proceed”
    }

    レスポンスでタスクID (例: “task” : “…”) を確認

    “`

  4. reindex中の差分データの取り扱い(重要):

    • 手順3のreindexは、reindex開始時点のmy_index_v1のスナップショットのようなデータをコピーします。reindex処理が完了するまでの間に、アプリケーションがmy_index_v1に対して行った新しい書き込み(追加、更新、削除)は、自動的にはmy_index_v2に反映されません。
    • この差分データをどう扱うかが、ダウンタイム最小化の鍵となります。いくつかの方法があります。
      • 方法 A: アプリケーション側で両方に書き込む (Dual Write): reindex実行中、アプリケーションは古いインデックス(my_index_v1)だけでなく、新しいインデックス(my_index_v2)にも同じデータを書き込むように一時的に変更します。reindex完了後、両方に書き込む処理を停止します。読み込みは引き続きエイリアス(my_app_alias -> my_index_v1)から行います。
      • 方法 B: バージョンベースreindex + 差分reindex: version_type: "external_gte"conflicts: "proceed" を使ってreindexを実行します。これにより、reindex中にmy_index_v1で更新されたドキュメントが、reindex先のmy_index_v2ですでに新しいバージョンで存在する場合、reindex元の古いバージョンで上書きされるのを防ぎます。reindex完了後、差分データを再度reindexします。ただし、削除されたドキュメントは自動的に移行されない点に注意が必要です。
      • 方法 C: Logstash/Kafkaなどの中間層を利用: アプリケーションの書き込み先を一時的に中間層(例: Kafka Topic)に変更し、Logstashを使って中間層からmy_index_v1my_index_v2の両方にリアルタイムで書き込みます。reindex完了後、Logstashの出力をmy_index_v2のみに変更し、古いインデックスへの書き込みを停止します。

    最も確実なのは方法A(Dual Write)ですが、アプリケーション側の改修が必要です。システム構成や要件に応じて最適な方法を選択します。

  5. reindex完了の確認:

    • Task API (GET /_tasks/{task_id}) でreindexタスクのステータスを確認し、完了したことを確認します。特に、version_conflictsfailuresがないかを確認します。差分データの取り扱い方法に応じて、差分がすべてreindex先のインデックスに反映されているか確認します。
  6. エイリアスの切り替え:

    • reindexが完了し、差分データもすべて新しいインデックス(my_index_v2)に反映されたことを確認したら、エイリアスmy_app_aliasをアトミックにmy_index_v1からmy_index_v2に切り替えます。これはAliases APIの原子性保証されたリクエストで行います。

    bash
    POST /_aliases
    {
    "actions": [
    { "remove": { "alias": "my_app_alias", "index": "my_index_v1" } }, # 古いインデックスからエイリアスを解除
    { "add": { "alias": "my_app_alias", "index": "my_index_v2" } } # 新しいインデックスにエイリアスを追加
    ]
    }

    このAPIコールはアトミックに実行されるため、アプリケーションは一瞬たりともエイリアスからインデックスが見えなくなる時間がありません。切り替え後、アプリケーションからの検索・書き込みリクエストはすべて自動的にmy_index_v2に向けられるようになります。

  7. 古いインデックスの削除:

    • エイリアスが新しいインデックスに切り替わり、アプリケーションが正常に動作していることを確認したら、不要になった古いインデックスmy_index_v1を削除します。

    bash
    DELETE /my_index_v1

この戦略を採用することで、reindex処理自体は時間がかかりますが、アプリケーションがインデックスにアクセスできなくなる時間は、手順6のエイリアス切り替えの一瞬のみとなり、実質的なダウンタイムをゼロに近づけることができます。

reindex APIの制限事項

reindex APIは非常に便利ですが、いくつかの制限事項や考慮すべき点があります。

  • データの一貫性: 前述の通り、reindexは特定の時点のスナップショットのようなデータをコピーします。reindex実行中にソースインデックスに対して行われた変更(追加、更新、削除)は、自動的にはreindex先のインデックスに反映されません。リアルタイムで更新されるデータの場合、差分データの取り扱いについて別途考慮が必要です。
  • 削除済みドキュメント: ソースインデックスで論理的に削除(ただしセグメント上はまだ存在)されているドキュメントは、デフォルトではreindexされません。これは通常望ましい挙動ですが、注意が必要です。
  • Transform APIとの使い分け: データ集計やグループ化、構造変換による新しいインデックス生成など、継続的なデータ加工パイプラインを構築したい場合は、reindex APIよりもTransform APIの方が適している場合があります。reindexはあくまで一度限りのデータコピー/移行を主な目的としています。
  • クラスターバージョン互換性: reindex元とreindex先のElasticsearchクラスターのバージョンに互換性がある必要があります。メジャーバージョンを跨ぐreindex(例: 6.xから8.x)は、remote reindexの場合、間に互換性のあるバージョン(例: 7.x)のクラスターを挟むなどの工夫が必要になる場合があります。公式ドキュメントでバージョン互換性を確認することが重要です。

まとめ

Elasticsearchのreindex APIは、データ移行、インデックススキーマ変更、シャード数変更など、Elasticsearchの運用において頻繁に発生するデータ再配置タスクを超高速かつシンプルに実行するための必須ツールです。クラスター内部での効率的な処理、柔軟なフィルタリング・変換機能、非同期実行とタスク管理、そしてパフォーマンスチューニングの豊富なオプションにより、大規模なデータセットに対しても実用的なソリューションを提供します。

特に、エイリアスと組み合わせたダウンタイム最小限の移行戦略は、本番環境でインデックス構造に変更を加える際の標準的なアプローチです。reindex APIの機能、パラメータ、そしてパフォーマンスへの考慮事項を深く理解することは、Elasticsearchクラスターを効率的かつ安定して運用するために不可欠です。

この記事が、reindex APIの入門として、またその詳細な活用法を学ぶ一助となれば幸いです。実際の運用では、この記事で解説した各種パラメータや移行戦略を、ご自身の環境やデータ特性に合わせて調整し、事前に十分なテストを行うことを忘れないでください。Elasticsearchの世界では、常に新しい機能や改善が取り入れられています。公式ドキュメントを参照しながら、最新の情報をキャッチアップしていくことも重要です。

reindex APIをマスターして、Elasticsearchの運用をもっと高速でスムーズにしましょう!


コメントする

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

上部へスクロール