Elasticsearch reindex コマンド:インデックス再構築の手順

はい、承知いたしました。Elasticsearchの _reindex API によるインデックス再構築の詳細な手順に関する記事を約5000語で記述します。


Elasticsearch _reindex API: インデックス再構築の手順と詳細

Elasticsearch を運用していると、既存のインデックス構造(マッピングや設定)を変更したり、データを別のインデックスに移動・変換したりする必要が出てくることがあります。このようなシナリオで非常に強力なツールとなるのが、_reindex API です。この API を使用すると、あるインデックスまたは複数のインデックスから別のインデックスにドキュメントを効率的にコピーできます。これは単なるデータのコピーだけでなく、コピー中にドキュメントの変換、フィルタリング、バージョン管理などを行うことができるため、インデックスの「再構築」という表現がよく用いられます。

この記事では、Elasticsearch の _reindex API に焦点を当て、インデックスを再構築する必要がある理由から、API の使い方、詳細なオプション、パフォーマンスに関する考慮事項、ベストプラクティスに至るまで、幅広くかつ深く解説します。約5000語のボリュームで、_reindex を使いこなすために必要な知識を提供することを目指します。

1. なぜインデックスの再構築(Reindex)が必要なのか?

Elasticsearch のインデックスは、一度作成すると、その構造(特にマッピング)を後から根本的に変更することは困難です。例えば、フィールドのデータ型を変更したり、Analyzer を変更したりする場合、既存のインデックスでは直接適用できません。これは、Elasticsearch が Lucene をベースとしており、インデックス化されたデータ構造がマッピングに密接に関連しているためです。

しかし、運用中にインデックス構造の変更が必要になるシナリオは多々あります。主な理由を以下に挙げます。

  • マッピングの変更:
    • フィールドのデータ型を変更したい (例: string から keywordtext に、integer から long に)。
    • 既存のフィールドに新しい Analyzer を適用したい。
    • 新しいフィールドを追加し、そのフィールドに過去のデータをバックフィルしたい(元のデータから計算するなど)。
    • 動的マッピングの設定を変更したい。
  • インデックス設定の変更:
    • シャード数を変更したい (インデックス作成後にシャード数を変更することはできません)。
    • レプリカ数を永続的に変更したい。
    • refresh_intervalindex.codec などの低レベルな設定を変更したい。
  • データの変換、フィルタリング、クレンジング:
    • インデックス内のドキュメント構造を変更したい (フィールド名の変更、フィールドの結合、分割など)。
    • 特定の条件を満たすドキュメントのみを新しいインデックスにコピーしたい。
    • 古いデータや不要なデータを削除したい。
    • データ形式を標準化したい。
  • インデックスの統合または分割:
    • 複数の小さなインデックスを1つの大きなインデックスにまとめたい。
    • 1つの大きなインデックスを複数の小さなインデックスに分割したい (例: 年ごとのインデックスを月ごとのインデックスに)。
  • Elasticsearch のバージョンアップに伴うインデックス形式の更新:
    • 古いバージョンで作成されたインデックスを新しい Elasticsearch バージョンに最適化された形式に更新したい場合。
  • クラスター間のデータ移行 (限定的):
    • リモートの Elasticsearch クラスターからデータを取得して現在のクラスターにインデックス化したい場合 (remote オプション)。

これらのシナリオの多くでは、新しい構造や設定を持つ新しいインデックスを作成し、古いインデックスから新しいインデックスへデータをコピーするというプロセスが必要になります。このデータコピーとそれに伴う変換処理を効率的に行うためのツールが _reindex API です。

2. _reindex API とは?

_reindex API は、Elasticsearch クラスターのサーバーサイドで実行されるタスクです。指定されたソース(インデックス、または複数のインデックス、またはリモートクラスター)からドキュメントを読み込み、指定されたデスティネーション(新しいインデックス)にそれらをインデックス化します。

この API の大きな特徴は、以下の点です。

  • サーバーサイド実行: クライアントではなく Elasticsearch クラスター自身が処理を行います。これにより、クライアントとクラスター間のネットワークトラフィックを最小限に抑え、効率的なデータ転送が可能になります。
  • スケーラビリティ: 大量のドキュメントを処理するために設計されており、スライスやバッチ処理などの機能を提供します。
  • 変換機能: ドキュメントのコピー中に、Ingest Pipeline または Painless スクリプトを使用してドキュメントを変換できます。
  • バージョン管理: ソースドキュメントのバージョン情報(_version)を考慮して、コンフリクト解決や適切なバージョンのインデックス化を制御できます。
  • 非同期実行: 時間のかかる再インデックス処理をバックグラウンドタスクとして実行できます。

_reindex API は、REST API エンドポイントとして提供されます。通常、HTTP POST リクエストのボディに処理内容を記述した JSON を含めて実行します。

基本的なリクエストの形式は以下の通りです。

json
POST _reindex
{
"source": {
"index": "old_index"
},
"dest": {
"index": "new_index"
}
}

この単純な例では、old_index のすべてのドキュメントを new_index にコピーします。new_index が存在しない場合、Elasticsearch はデフォルトの設定と、最初のドキュメントから推測される動的マッピングを使用して自動的に作成します。しかし、実際には、新しいインデックスは事前に明示的に作成し、必要なマッピングと設定を適用しておくのが一般的なベストプラクティスです。

3. _reindex を使用したインデックス再構築の基本的な手順

_reindex API を使ってインデックスを再構築する際の基本的なステップは以下の通りです。

ステップ 1: 再構築の目的と計画の明確化

まず、なぜインデックスを再構築する必要があるのか、その目的を明確にします。マッピングを変更したいのか、設定を変更したいのか、データを変換したいのかなど。目的が明確になれば、新しいインデックスでどのようなマッピングや設定が必要になるのか、どのようなデータ変換が必要になるのかを具体的に計画できます。

  • 新しいインデックス名: 混乱を避けるため、元のインデックスとは異なる名前を付けます (例: my_data_v2, logs_2023_reindexed)。
  • 新しいマッピング: 必要なフィールドのデータ型、Analyzer、Indexing の設定などを定義します。元のインデックスのマッピングをベースに、必要な変更を加えます。
  • 新しい設定: シャード数、レプリカ数、その他のインデックス設定 (index.codec など) を定義します。
  • データ変換のロジック: 必要に応じて、ドキュメントを変換するための Ingest Pipeline または Painless スクリプトのロジックを設計します。

ステップ 2: 新しいインデックスの作成

計画に基づき、新しいインデックスを事前に作成します。これにより、目的とするマッピングと設定が正確に適用された状態でデータを受け入れることができます。

json
PUT /new_index_name
{
"settings": {
"index": {
"number_of_shards": 3,
"number_of_replicas": 1,
"refresh_interval": "30s"
// reindex中はrefresh_intervalを長く、replicaを0にするのが一般的だが、
// ここでは最終的な設定を記述。一時的な設定変更はreindex実行前に行う。
}
},
"mappings": {
// 新しいマッピング定義
"properties": {
"field1": {
"type": "keyword"
},
"field2": {
"type": "text",
"analyzer": "standard"
},
"new_field": { // 新しく追加するフィールド
"type": "long"
}
// ...
}
}
}

重要: Reindex の処理速度を向上させるために、データ投入中は新しいインデックスのレプリカ数を 0 に設定し、refresh_interval を長く (例: "refresh_interval": "300s" または "-1" で無効化) するのが一般的です。Reindex 完了後にこれらの設定を元の値に戻します。

ステップ 3: _reindex API の実行

新しいインデックスを作成したら、_reindex API を実行してデータのコピーを開始します。以下は基本的な例です。

json
POST _reindex
{
"source": {
"index": "old_index_name"
},
"dest": {
"index": "new_index_name",
"op_type": "create" // デスティネーションにドキュメントが存在しないことを期待する場合
}
}

  • source.index: コピー元となるインデックス名を指定します。複数のインデックスを指定する場合は配列で渡します。
  • dest.index: コピー先となるインデックス名を指定します。
  • dest.op_type: インデックス化操作のタイプを指定します。
    • index (デフォルト): ドキュメントが存在すれば更新、なければ新規作成します。
    • create: ドキュメントが存在しない場合のみ新規作成します。既に存在する場合はコンフリクトが発生します。再インデックスでは通常、空の新しいインデックスに書き込むため create を使うことが多いですが、コンフリクトのハンドリングが必要な場合は index を使用します。

大規模なインデックスを扱う場合、_reindex は長時間かかる可能性があります。デフォルトでは、リクエストは処理が完了するまで待機しますが、これはタイムアウトする可能性があるため、非同期実行を行うのが推奨されます。

非同期実行を行うには、リクエストに wait_for_completion=false パラメータを追加します。

json
POST _reindex?wait_for_completion=false
{
"source": {
"index": "old_index_name"
},
"dest": {
"index": "new_index_name",
"op_type": "create"
}
}

このリクエストを実行すると、Elasticsearch はすぐにレスポンスを返しますが、その中にタスク ID が含まれています。このタスク ID を使って、後述する方法で処理の進捗を監視できます。

json
{
"task": "taskId:nodeId:taskNumber"
}

ステップ 4: Reindex プロセスの監視

wait_for_completion=false で実行した場合、返されたタスク ID を使用して _tasks API で処理の進捗を監視します。

json
GET _tasks/taskId:nodeId:taskNumber

このリクエストは、タスクの現在の状態、処理されたドキュメント数、スキップされたドキュメント数、コンフリクト数、エラー情報などを返します。

同期的に実行した場合(wait_for_completion=true、またはデフォルト)、API コールが完了した時点で結果が返されます。この結果には、非同期実行の場合と同様の統計情報が含まれます。

監視中に確認すべき主な情報:

  • total: ソースで検出されたドキュメントの総数。
  • created: 新しいインデックスに作成されたドキュメント数。
  • updated: 新しいインデックスで更新されたドキュメント数 (op_type: index の場合)。
  • deleted: 削除されたドキュメント数 (これは _reindex 自体が行うのではなく、スクリプトなどで意図的に削除マークを付けた場合などにカウントされる可能性がありますが、reindexでは稀です)。
  • batches: 処理されたバッチ数。
  • version_conflicts: バージョンコンフリクトによりスキップされたドキュメント数。
  • retries: 一時的なエラー(例: スロットリング)によりリトライされた回数。
  • throttled_millis: スロットリングによって処理が待機した合計時間(ミリ秒)。
  • status: タスクの現在の状態 (running, completed, failed)。
  • description: タスクの説明 (reindex from [old_index] to [new_index])。

タスクが完了したかどうか、エラーが発生していないか、意図した数のドキュメントがコピーされたかなどをこれらの情報で確認します。

また、_cat/tasks API を使って、現在実行中のすべてのタスクをリストアップすることもできます。

json
GET _cat/tasks?v&actions=*reindex*

これにより、複数の Reindex タスクを実行している場合でも、それらを一覧で確認できます。

ステップ 5: データの検証

Reindex が完了したら、新しいインデックスにデータが正しくコピーされているか、マッピングや設定が意図通りに適用されているかを確認します。

  • ドキュメント数の確認: _count API を使って、古いインデックスと新しいインデックスのドキュメント数を比較します。意図的にフィルタリングや変換を行っていない限り、ほぼ同じ数になるはずです。
    json
    GET old_index_name/_count
    GET new_index_name/_count
  • データのサンプリング: 新しいインデックスからいくつかのドキュメントを取得し、期待通りの構造とデータになっているか手動で確認します。特に変換処理を行った場合は重要です。
    json
    GET new_index_name/_search?size=10
  • 検索クエリのテスト: 新しいインデックスに対して、アプリケーションで使用する代表的な検索クエリを実行し、期待通りの結果が得られるか、パフォーマンスに問題がないかを確認します。

ステップ 6: アプリケーションの切り替え(エイリアスの利用)

アプリケーションが古いインデックスではなく新しいインデックスを参照するように切り替えます。この際、インデックスエイリアスを使用するのが最も推奨される方法です。

インデックスエイリアスは、1つまたは複数のインデックスを指す「仮想名」のようなものです。アプリケーションはエイリアス名に対して検索やインデックス操作を行います。インデックスを再構築した場合、アプリケーションの設定を変更する代わりに、エイリアスが指すインデックスを古いものから新しいものに atomically(アトミックに、不可分に)切り替えることができます。

エイリアスを使った切り替え手順:

  1. 既存のエイリアスを確認: 現在アプリケーションが使用しているエイリアス名を確認します。例えば my_data というエイリアスが old_index_name を指しているとします。
  2. エイリアス操作の実行: _aliases API を使って、my_data エイリアスを old_index_name から外し、new_index_name に付ける操作を一度に行います。

    json
    POST _aliases
    {
    "actions": [
    { "remove": { "alias": "my_data", "index": "old_index_name" } },
    { "add": { "alias": "my_data", "index": "new_index_name" } }
    ]
    }

    この操作はアトミックに行われるため、切り替えの瞬間にアプリケーションが一時的にインデックスにアクセスできなくなる時間を最小限に抑えられます。
    3. 新しいインデックスの設定を戻す: Reindex のために変更した新しいインデックスの設定(レプリカ数=0, refresh_interval=長い値など)を、運用に必要な設定に戻します。
    json
    PUT /new_index_name/_settings
    {
    "index": {
    "number_of_replicas": 1, // 元の値に戻す
    "refresh_interval": "1s" // 元の値に戻す
    }
    }

アプリケーション側でエイリアス名を使用していれば、このエイリアス操作だけで新しいインデックスへの参照が切り替わります。アプリケーションの再起動や設定変更は不要な場合が多いです。

ステップ 7: 古いインデックスの削除(オプション)

新しいインデックスへの切り替えが完了し、新しいインデックスでの運用に問題がないことを十分に確認できたら、ストレージ領域を解放するために古いインデックスを削除します。

json
DELETE /old_index_name

このステップは慎重に行う必要があります。万が一、新しいインデックスに問題が見つかった場合にロールバックできるよう、一定期間古いインデックスを残しておくことを検討しても良いでしょう。

4. _reindex API の詳細オプション

_reindex API は多くのオプションを提供しており、複雑な再構築シナリオに対応できます。主要なオプションを見ていきましょう。

4.1. Source (コピー元) の指定

source フィールドで、コピー元のインデックスやドキュメントを指定します。

json
{
"source": {
"index": ["index1", "index2", "pattern*"], // 単一、複数、パターン指定
"query": { ... }, // コピーするドキュメントをフィルタリング
"_source": ["field1", "field2"], // 特定のフィールドのみをコピー
"slice": { ... }, // 大規模インデックスのスライス処理
"remote": { ... } // リモートクラスターからの取得
},
"dest": { ... }
}

  • index: コピー元のインデックス名を指定します。単一の文字列、複数のインデックス名を指定する配列、またはワイルドカードパターンを使用できます。
  • query: オプションです。指定した場合、そのクエリにマッチするドキュメントだけがコピーされます。特定のデータ範囲だけをコピーしたい場合などに使用します。例えば、特定の日付範囲のドキュメントのみをコピーしたい場合は、Range Query を使用できます。

    json
    "source": {
    "index": "my_logs",
    "query": {
    "range": {
    "@timestamp": {
    "gte": "now-1d/d",
    "lt": "now/d"
    }
    }
    }
    }

    * _source: オプションです。コピーするソースフィールドを指定します。デフォルトではすべてのソースフィールドがコピーされます。特定のフィールドだけをコピーしたい場合や、一部のフィールドを除外したい場合に使用します。フィールド名の配列を渡すとそのフィールドのみがコピーされます。false を指定するとソースフィールドはコピーされません。excludesincludes オプションも利用できます。

    “`json
    // field1 と field2 のみをコピー
    “_source”: [“field1”, “field2”]

    // field3 を除くすべてのフィールドをコピー
    “_source”: { “excludes”: [“field3”] }
    ``
    * **
    slice**: オプションです。大規模なインデックスを並行して Reindex するために使用します。sliceを使用すると、Reindex タスクを複数のスライス(小さな単位)に分割し、それぞれを並行して実行できます。これにより、処理時間を大幅に短縮できる可能性があります。詳細は後述します。
    * **
    remote`**: オプションです。リモートにある別の Elasticsearch クラスターからデータを取得して Reindex します。これはクラスター間のデータ移行に使用できますが、ネットワーク帯域やセキュリティなど、考慮すべき点が多いです。

    json
    "source": {
    "remote": {
    "host": "http://remote_host:9200",
    "username": "user", // 認証が必要な場合
    "password": "password"
    },
    "index": "remote_index",
    "query": { ... }
    }

4.2. Destination (コピー先) の指定

dest フィールドで、コピー先のインデックスや処理方法を指定します。

json
{
"source": { ... },
"dest": {
"index": "new_index", // コピー先のインデックス名
"version_type": "internal", // バージョン管理の方法
"op_type": "index", // 作成または更新の方法
"routing": "...", // ドキュメントのルーティング
"pipeline": "my_pipeline" // Ingest Pipeline を適用
}
}

  • index: コピー先のインデックス名を指定します。単一のインデックス名のみ指定可能です。複数のインデックスに分散して書き込むことはできません。
  • version_type: オプションです。インデックス化されるドキュメントのバージョンをどのように扱うかを指定します。
    • internal (デフォルト): Elasticsearch がバージョンを自動管理します。
    • external: ソースドキュメントの _version フィールドを新しいインデックスにコピーし、外部バージョンとして扱います。これは、古いインデックスと新しいインデックスでバージョン番号を一致させたい場合などに使用できます。
    • external_gte: external に似ていますが、新しいインデックスにすでに存在するドキュメントで、ソースドキュメントよりも新しいバージョンが存在する場合でも、ソースドキュメントのバージョンで強制的に更新します。
  • op_type: 上述の通り、index または create を指定します。
  • routing: オプションです。インデックス化されるドキュメントのルーティング値を制御します。

    • =: ソースドキュメントのルーティング値 (_routing) をそのまま使用します (デフォルトの動作)。
    • keep: _routing フィールドがソースドキュメントに存在する場合にその値を使用します。
    • discard: ソースドキュメントの _routing 値を無視します。
    • {script}: スクリプトを使用して動的にルーティング値を生成します。

    json
    "dest": {
    "index": "new_index",
    "routing": {
    "script": {
    "lang": "painless",
    "source": "doc['user_id'].value" // user_id フィールドの値をルーティングに使う
    }
    }
    }

    * pipeline: オプションです。コピーされる各ドキュメントに対して実行する Ingest Pipeline の ID を指定します。これにより、Reindex 処理と同時にドキュメントの変換を行うことができます。これは後述の script オプションと組み合わせて、より複雑な変換を実現できます。

4.3. 変換処理 (Script または Pipeline)

Reindex 中にドキュメントを変換する最も強力な方法は、script または pipeline オプションを使用することです。

  • script: script オプションは、ソースドキュメントがデスティネーションにインデックス化される前に、ドキュメント自体を操作するための Painless スクリプトを指定します。これにより、フィールドの追加、変更、削除、値の計算などが可能です。

    json
    POST _reindex
    {
    "source": { "index": "old_index" },
    "dest": { "index": "new_index" },
    "script": {
    "lang": "painless",
    "source": """
    ctx._source.new_field = ctx._source.old_field * 2; // フィールドの値を計算
    ctx._source.remove("old_field"); // フィールドを削除
    if (ctx._source.status == 'pending') { // 条件付きでフィールドを追加
    ctx._source.is_pending = true;
    }
    """
    }
    }

    スクリプト内では、ctx という変数を通じて現在のドキュメントにアクセスできます。ctx._source はドキュメントのソースオブジェクトを、ctx._id, ctx._version, ctx._index, ctx._routing はドキュメントのメタデータにアクセスできます。スクリプトで ctx.op = 'delete' と設定すると、そのドキュメントは新しいインデックスにインデックス化されずに削除されます。ctx.op = 'noop' と設定すると、そのドキュメントはスキップされます。
    * pipeline: dest.pipeline オプションは、Reindex がドキュメントをインデックス化する直前に、指定された Ingest Pipeline を適用します。Ingest Pipeline は、Processor の連続でドキュメントを変換する機能です。これは script よりも構造化された変換に向いており、再利用可能な変換ロジックを定義するのに適しています。

    json
    POST _reindex
    {
    "source": { "index": "old_index" },
    "dest": {
    "index": "new_index",
    "pipeline": "my_data_processor" // 定義済みのIngest Pipeline ID
    }
    }

    事前に PUT _ingest/pipeline/my_data_processor でパイプラインを定義しておく必要があります。Ingest Pipeline は script processor も含むことができるため、Pipeline 内で Painless スクリプトを実行することも可能です。

どちらの方法を使うかは、変換の複雑さや再利用の必要性によります。単純な変換や実験的な変更であれば script が手軽ですが、複数のフィールドにわたる複雑な変換や、他のインデックス処理でも同じ変換を使いたい場合は pipeline が適しています。両方同時に指定することはできません。

4.4. コンフリクトのハンドリング

conflicts オプションは、Reindex 中にバージョンコンフリクトが発生した場合の挙動を制御します。バージョンコンフリクトは、ソースドキュメントが読み込まれてからデスティネーションにインデックス化されるまでの間に、デスティネーションインデックス内の同じ ID を持つドキュメントが変更または削除された場合に発生します。

  • abort (デフォルト): バージョンコンフリクトが発生した場合、Reindex プロセスは停止します。
  • proceed: バージョンコンフリクトが発生した場合でも、そのドキュメントをスキップして Reindex プロセスを続行します。スキップされたドキュメントの数は、タスクの実行結果に含まれます。

通常、新しい空のインデックスに Reindex する場合は create op_type を使用するためバージョンコンフリクトは発生しません。しかし、既存のインデックスに Reindex する場合や、ソースインデックス自体がアクティブに更新されている場合は、conflicts: proceed を指定してコンフリクトを許容し、処理を完了させることがあります。コンフリクトが発生したドキュメントについては、後から別途処理を検討する必要があります。

json
POST _reindex
{
"source": { "index": "old_index" },
"dest": { "index": "existing_index" }, // 既存のインデックスに追記/更新する場合
"conflicts": "proceed" // コンフリクトしても処理を続ける
}

4.5. スロットリング (requests_per_second)

Reindex 処理は、ソースクラスターとデスティネーションクラスターに高い負荷をかける可能性があります。特に本番環境で実行する場合は、クラスターのパフォーマンスに影響を与えないように、処理速度を調整する必要があります。

requests_per_second オプションを使用すると、Reindex が毎秒インデックス化するドキュメントの数を制限できます。

json
POST _reindex?wait_for_completion=false
{
"source": { "index": "old_index" },
"dest": { "index": "new_index" },
"requests_per_second": 1000 // 毎秒最大1000件のドキュメントを処理
}

値を -1 に設定すると、スロットリングは無効になり、できる限り高速に処理されます(これがデフォルトではありません)。値を 0 に設定すると、Reindex は実行されますが、まったくドキュメントを処理せずに停止します。これは後述の _tasks API を使って手動でスロットリング値を変更する際に便利です。

Reindex 実行中に _tasks API を使って、実行中の Reindex タスクのスロットリング値を動的に変更することも可能です。

json
POST _tasks/taskId:nodeId:taskNumber/_rethrottle
{
"requests_per_second": 500 // スロットリング値を変更
}

これにより、システムの負荷状況を見ながら Reindex の速度を調整できます。

4.6. サイズ制限 (size)

size オプションは、コピーするドキュメントの最大数を指定します。テスト目的や、インデックスの一部だけをコピーしたい場合に使用できます。

json
POST _reindex
{
"source": { "index": "old_index" },
"dest": { "index": "new_index" },
"size": 10000 // 最大10000件のドキュメントのみをコピー
}

4.7. タイムアウト (timeout)

timeout オプションは、Reindex タスク全体のタイムアウト時間を指定します。指定した時間内に完了しない場合、タスクはキャンセルされます。

json
POST _reindex?wait_for_completion=false
{
"source": { "index": "old_index" },
"dest": { "index": "new_index" },
"timeout": "1h" // 1時間でタイムアウト
}

これは非同期タスクの場合に特に便利です。

4.8. スライス処理 (slice) による並列化

大規模なインデックスを Reindex する場合、単一の Reindex タスクでは時間がかかりすぎる可能性があります。slice オプションを使用すると、ソースインデックスを複数の「スライス」に分割し、それぞれを独立したサブタスクとして並行して処理できます。

slice オプションは、以下のサブフィールドを持ちます。

  • id: 現在のスライスの ID。
  • max: スライスの総数。

通常、slice は Reindex リクエストの中で直接指定するのではなく、_reindex API を呼び出す際に slices パラメータでスライスの総数を指定します。Elasticsearch は、指定されたスライス数に基づいて内部的に複数の Reindex タスクを起動し、それぞれがソースインデックスの一部分を処理するように自動的に調整します。

json
POST _reindex?wait_for_completion=false&slices=auto
{
"source": { "index": "large_index" },
"dest": { "index": "new_large_index" }
}

  • slices: スライスの総数を指定します。
    • 整数値: 指定した数のスライスで処理を行います。スライス数は、ソースインデックスのシャード数の倍数にすると効率が良いとされていますが、任意の値も指定できます。
    • auto: Elasticsearch がソースインデックスのシャード数やノード数に基づいて最適なスライス数を自動的に決定します。これが推奨される設定です。

slices=auto で実行すると、Elasticsearch は内部的に、各ノードが複数のスライスを並行して処理するようにタスクを分散します。タスクのステータスを確認すると、親 Reindex タスクの下に複数の子タスク(各スライスに対応)が表示されます。

スライスの総数の上限は、デフォルトではノードあたり500ですが、reindex.slices.max クラスター設定で変更可能です。

スライス処理は、CPU やディスク I/O をより効率的に利用することで Reindex 処理全体の時間を短縮できますが、その分クラスター全体の負荷も高まります。適切なスライス数は、クラスターのリソースやソースインデックスの構造に依存します。

5. Reindex 処理中のパフォーマンスに関する考慮事項

Reindex 処理はリソース集約的なタスクです。ソースクラスターとデスティネーションクラスター(同じクラスター内の別インデックスであることが多い)の両方に負荷をかけます。パフォーマンスへの影響を最小限に抑え、効率的に Reindex を完了させるためには、以下の点を考慮する必要があります。

  • 対象クラスターのリソース: CPU、メモリ、ディスク I/O、ネットワーク帯域など、Reindex 処理に必要なリソースが十分にあるか確認します。Reindex はドキュメントの読み込み(検索)、変換(スクリプト/パイプライン)、書き込み(インデックス化)を行います。これらはすべてリソースを消費します。特に、高負荷なスクリプトやパイプラインを使用する場合、CPU 使用率が高くなります。
  • デスティネーションインデックスのチューニング: Reindex は実質的に大量のバルクインデックス操作です。バルクインデックスのパフォーマンスを向上させるための一般的なチューニングは、Reindex 中のデスティネーションインデックスにも適用できます。
    • レプリカ数の一時的な削減: Reindex 中は、新しいインデックスのレプリカ数を 0 に設定します。これにより、各ドキュメントのインデックス化に必要な書き込み操作の回数が減り、プライマリシャードへの書き込みが完了すればよいため、インデックス化速度が向上します。Reindex 完了後に、必要なレプリカ数に戻します。
    • refresh_interval の延長または無効化: refresh_interval を長く設定する (例: "300s") か、"-1" に設定して手動リフレッシュに切り替えます。これにより、インデックス化中のリフレッシュ頻度が減り、セグメントマージなどのオーバーヘッドが削減され、インデックス化速度が向上します。Reindex 完了後に、適切な refresh_interval (例: "1s") に戻し、必要に応じて手動でリフレッシュを実行してデータを検索可能にします。
    • index.codec: 大規模なインデックスでは、圧縮効率の高い best_compression などのコーデックを検討できますが、インデックス化速度には影響があるため、Reindex 前に設定する必要があります。
  • スロットリング (requests_per_second): 本番環境では、クラスターの応答性を維持するために、必ず Reindex 処理にスロットリングをかけます。適切な値はクラスターのリソース、現在の負荷、Reindex 対象のデータ量によって異なります。最初は低い値から始め、監視しながら徐々に上げていくのが安全です。
  • スライス処理 (slices): 大規模なインデックスではスライス処理を活用し、複数のノードやコアで Reindex を並列実行します。auto 設定から始め、監視しながらスライスの数を調整することも検討できます。スライス数を増やしすぎると、コンテキストスイッチやノード間のタスク管理のオーバーヘッドで逆にパフォーマンスが低下する可能性もあるため、適切なバランスが必要です。
  • JVM Heap Size: Elasticsearch ノードの JVM Heap Size が適切に設定されているか確認します。Reindex 処理はメモリを消費する可能性があります。
  • ディスク容量: 新しいインデックスのサイズ分のディスク容量が確保されていることを確認します。通常、古いインデックスと同等以上の容量が必要です。
  • シャード配置: Reindex 処理中にノード間でシャードの再配置が起きると、ネットワークやディスクに余分な負荷がかかる可能性があります。大規模な Reindex 処理の前には、シャードの再配置設定 (cluster.routing.allocation.enable) を一時的に無効にすることも検討できますが、これはクラスターの可用性に影響を与える可能性があるため慎重に行う必要があります。
  • 競合する操作の回避: 大規模な Reindex 処理を実行中は、同じクラスターで他の高負荷な操作(例: スナップショット取得、大規模な検索クエリ、別の Reindex タスク)を同時に実行しないように計画します。

6. Reindex 処理のエラーと失敗のハンドリング

Reindex 処理中にエラーが発生する可能性はあります。一般的なエラーとそのハンドリング方法を理解しておくことが重要です。

  • バージョンコンフリクト: conflicts: proceed を指定している場合、コンフリクトしたドキュメントはスキップされ、タスク結果の version_conflicts カウントが増加します。これらのドキュメントは新しいインデックスにコピーされません。コンフリクトしたドキュメントを後から処理する必要がある場合は、ソースインデックスに対してコンフリクトが発生した ID を含むクエリを実行し、手動で再インデックス化するなどの対応が必要です。
  • スクリプトまたはパイプラインのエラー: Painless スクリプトや Ingest Pipeline の処理中にエラーが発生した場合、デフォルトでは Reindex プロセスは停止します。エラーメッセージはタスク情報や Elasticsearch のログに出力されます。原因(例: スクリプトのシンタックスエラー、存在しないフィールドへのアクセス、データ型の不一致)を特定し、スクリプトやパイプラインを修正してから Reindex を再実行する必要があります。
  • マッピングエラー: 新しいインデックスのマッピングと、ソースドキュメントのデータ型が一致しない場合などに発生します。例えば、数値として定義されたフィールドに文字列をインデックス化しようとした場合などです。これも Reindex を停止させます。エラーメッセージを確認し、新しいインデックスのマッピングを修正するか、スクリプト/パイプラインでデータの変換を行う必要があります。
  • リソース不足: ディスク容量不足、メモリ不足、スレッドプールキューの溢れなど、クラスターのリソース不足により Reindex が失敗またはスロットリングされることがあります。タスク情報やクラスターヘルス、ノードメトリクスを監視して原因を特定し、リソースを増強するか、スロットリングを調整する必要があります。
  • ネットワークエラー: リモート Reindex の場合や、クラスターノード間の通信に問題がある場合に発生する可能性があります。ネットワーク設定、ファイアウォール、リモートクラスターの可用性などを確認します。
  • Reindex タスクのキャンセル: 実行中の Reindex タスクは、_tasks API を使用して手動でキャンセルできます。

    json
    POST _tasks/taskId:nodeId:taskNumber/_cancel

    これによりタスクは停止しますが、既に書き込まれたドキュメントは新しいインデックスに残ります。
    * 失敗した Reindex タスクの再開: デフォルトでは、失敗またはキャンセルされた Reindex タスクを完全に同じ状態から再開することはできません。新たに Reindex タスクを起動する必要があります。既に一部のドキュメントが新しいインデックスにコピーされている場合、query オプションを使用して、まだコピーされていないドキュメント(例: 特定のタイムスタンプ以降のドキュメント、処理済みフラグがないドキュメント)のみを対象にするか、新しいデスティネーションインデックスを作成し直して最初から Reindex をやり直す必要があります。slice を使用していた場合は、失敗したスライスのタスクを特定し、そのスライスに対応するクエリで Reindex を再実行するなどの高度な対応が必要になることもあります。

エラー発生時の対応を迅速に行えるよう、Reindex 実行中はタスクの状態や Elasticsearch のログを継続的に監視することが非常に重要です。

7. _reindex の代替手段

_reindex は強力ですが、インデックス再構築の唯一の手段ではありません。シナリオによっては、他のツールや方法がより適している場合があります。

  • Snapshot and Restore: クラスター全体のバックアップ・リストアや、異なるクラスター間での大量データ移行に最も推奨される方法です。_reindexremote オプションよりも一般的に高速で信頼性が高いです。ただし、データ変換やフィルタリングは基本的に行えません(Restore 時のインデックス設定変更などは可能)。
  • Logstash: 柔軟な ETL (Extract, Transform, Load) ツールです。Elasticsearch からデータを読み込み (Input plugins)、様々な変換処理を行い (Filter plugins)、別の Elasticsearch インデックスや他のデータストアに書き出す (Output plugins) ことができます。_reindexscriptpipeline よりはるかに強力で複雑なデータ変換が可能です。しかし、_reindex がサーバーサイドでシンプルに実行できるのに対し、Logstash は別途 Logstash インスタンスの構築と管理が必要です。
  • Ingest Pipelines (通常のインデックス操作時): _reindexpipeline オプションとして使用できますが、通常のドキュメント投入(_doc エンドポイントなど)の際にも Ingest Pipeline を使用できます。新しいデータをインデックス化する際には、Reindex を待つ必要なく、Ingest Pipeline でリアルタイムに変換を行うことができます。Reindex は既存データの変換・移行に、Ingest Pipeline は新規データの変換に主に使われます。
  • 外部スクリプト/ツール: Python や Java などのプログラムで Elasticsearch クライアントライブラリを使用し、scroll API でソースからドキュメントを読み込み、必要な変換を行った後、bulk API で新しいインデックスに書き込むカスタムツールを作成することも可能です。これは高度にカスタマイズされた要件に対応できますが、開発・テスト・運用管理のコストが高くなります。

どの方法を選択するかは、必要な変換の複雑さ、データ量、必要な処理速度、運用・管理の容易さなど、様々な要因を考慮して決定する必要があります。多くのインデックス構造変更シナリオでは、_reindex が手軽さ、速度、機能のバランスが取れた最適な選択肢となります。

8. ベストプラクティスとヒント

_reindex を成功させ、ダウンタイムやリスクを最小限に抑えるためのベストプラクティスとヒントをまとめます。

  • テスト環境での十分なテスト: 大規模な本番環境での Reindex の前に、必ず十分なリソースを持つステージング環境などでテストを行います。新しいマッピングや設定が期待通りか、データ変換スクリプトが正しく機能するか、パフォーマンスが許容範囲内かなどを確認します。
  • エイリアスの活用: アプリケーションのインデックス参照には必ずエイリアスを使用し、ダウンタイムなしでの切り替えを可能にします。これが Reindex 戦略の最も重要な要素の一つです。
  • 計画的な実行: Reindex 処理がクラスターに与える負荷を考慮し、トラフィックが少ない時間帯に実行を計画するなど、計画的に実施します。
  • 新しいインデックスの事前作成: マッピングや設定を確実に適用するために、Reindex 実行前に新しいインデックスを明示的に作成します。
  • 一時的なインデックス設定の変更: Reindex 実行中は、デスティネーションインデックスのレプリカ数を 0 に、refresh_interval を長い値に設定して、インデックス化速度を向上させます。完了後に元の設定に戻すことを忘れないでください。
  • 非同期実行と監視: 大規模な Reindex には wait_for_completion=false を使用し、_tasks API や _cat/tasks で継続的に監視します。タスクの進捗、コンフリクト、エラー、スロットリング状況などを注意深く確認します。
  • スロットリングの適用: 本番環境では requests_per_second オプションを使用して、Reindex がクラスターの他の操作(特に検索)に悪影響を与えないように速度を調整します。
  • スライス処理の検討: 大規模なインデックス (> 数千万ドキュメント、または数百GB) では、slices=auto を使用して処理を並列化することを強く推奨します。
  • 十分なディスク容量の確保: 新しいインデックス用に十分なディスク容量があることを確認します。通常、ソースインデックスと同じか、場合によってはそれ以上の容量が必要です(例: マッピング変更によりインデックスサイズが増加する場合)。
  • 古いインデックスの保持: 新しいインデックスでの運用が完全に安定するまで、古いインデックスは削除せず一定期間保持しておきます。問題発生時のロールバックパスを確保するためです。
  • ドキュメント数の確認: Reindex 完了後、ソースとデスティネーションのドキュメント数を比較し、期待通りの数がコピーされたか確認します。
  • エラーログの確認: Reindex 実行中および完了後に、Elasticsearch のノードログにエラーが出力されていないか確認します。

9. 実践的な例

いくつかの具体的な Reindex の使用例を示します。

例 1: マッピング変更のための単純な Reindex + エイリアス切り替え

my_data_v1 インデックスのマッピングを変更したい場合。

  1. 新しいインデックスの計画と作成:

    • 新しいインデックス名を my_data_v2 とする。
    • 必要なマッピングと設定を定義し、my_data_v2 を作成(レプリカ数=0, refresh_interval 長く設定)。

    json
    PUT /my_data_v2
    {
    "settings": {
    "index": {
    "number_of_shards": 3,
    "number_of_replicas": 0,
    "refresh_interval": "300s"
    }
    },
    "mappings": {
    "properties": {
    "status": { "type": "keyword" }, // textからkeywordに変更
    "value": { "type": "float" } // integerからfloatに変更
    // ... 他のフィールド
    }
    }
    }

    2. Reindex 実行 (非同期 + スロットリング):

    json
    POST _reindex?wait_for_completion=false
    {
    "source": {
    "index": "my_data_v1"
    },
    "dest": {
    "index": "my_data_v2",
    "op_type": "create"
    },
    "requests_per_second": 500
    }

    3. Reindex 監視: 返されたタスク ID で _tasks を監視。
    4. データ検証: my_data_v2 のドキュメント数、マッピング、サンプルデータを確認。
    5. エイリアス切り替え: アプリケーションが my_data エイリアスを使用している場合。

    json
    POST _aliases
    {
    "actions": [
    { "remove": { "alias": "my_data", "index": "my_data_v1" } },
    { "add": { "alias": "my_data", "index": "my_data_v2" } }
    ]
    }

    6. my_data_v2 の設定を戻す: レプリカ数、refresh_interval を元の運用設定に戻す。
    7. 古いインデックス削除 (任意): DELETE /my_data_v1

例 2: データを変換し、特定のドキュメントのみをコピー

ログデータからエラーログのみを抽出し、メッセージフィールドの形式を変更して新しいインデックスにコピーしたい。

  1. 新しいインデックスの計画と作成:
    • 新しいインデックス名を error_logs_processed とする。
    • 新しいマッピング(例: 変換後のメッセージフィールドの型)と設定を定義し、インデックスを作成。
  2. Reindex 実行 (フィルタリング + スクリプト変換):

    json
    POST _reindex?wait_for_completion=false
    {
    "source": {
    "index": "my_logs",
    "query": { // level: error のドキュメントのみを対象
    "term": {
    "level.keyword": "error"
    }
    }
    },
    "dest": {
    "index": "error_logs_processed",
    "op_type": "create"
    },
    "script": {
    "lang": "painless",
    "source": """
    // 元のメッセージに timestamp と level を付加
    ctx._source.message = '[' + ctx._source['@timestamp'] + '] ' + ctx._source.level + ': ' + ctx._source.message;
    // 不要なフィールドを削除
    ctx._source.remove("level");
    ctx._source.remove("user_agent");
    """
    }
    }

    3. 監視、検証、アプリケーション切り替え: 上記と同様の手順で進めます。

例 3: 大規模インデックスのスライス Reindex

非常に大きな large_index を Reindex する場合。

  1. 新しいインデックスの計画と作成: 新しいインデックス new_large_index を作成(レプリカ数=0, refresh_interval 長く設定)。
  2. Reindex 実行 (スライス): slices=auto を指定。

    json
    POST _reindex?wait_for_completion=false&slices=auto
    {
    "source": {
    "index": "large_index"
    },
    "dest": {
    "index": "new_large_index",
    "op_type": "create"
    }
    }

    3. Reindex 監視: _tasks API で親タスクと子タスクの進捗を監視します。_cat/tasks?v&actions=*reindex* で複数のスライスが並行して実行されていることを確認できます。
    4. データ検証、アプリケーション切り替え: 上記と同様の手順で進めます。

10. まとめ

_reindex API は、Elasticsearch でインデックスの構造や設定を変更したり、データを変換・移動したりするための強力かつ柔軟なツールです。マッピング変更、シャード数変更、データクレンジング、インデックス統合/分割など、様々な再構築シナリオに対応できます。

本記事では、_reindex API の基本的な使い方から、ソース/デスティネーションの様々なオプション、ドキュメント変換のためのスクリプトやパイプライン、コンフリクトハンドリング、パフォーマンス調整のためのスロットリングやスライス処理、非同期実行と監視、エラーハンドリング、そして代替手段に至るまで、詳細に解説しました。

Reindex はリソース集約的な処理であり、特に大規模なインデックスを扱う場合はクラスターに大きな負荷をかけます。本番環境で実行する際は、テスト環境での十分な検証、エイリアスの活用によるダウンタイムの最小化、計画的な実行、そして処理中の継続的な監視が不可欠です。レプリカ数や refresh_interval の一時的な変更、スロットリングやスライス処理の活用といったパフォーマンスチューニングも、効率的な Reindex 処理には欠かせません。

これらの知識と手順を踏まえることで、Elasticsearch の _reindex API を安全かつ効果的に利用し、インデックスの再構築を成功させることができるでしょう。

Elasticsearch の公式ドキュメントには、_reindex API に関するさらに詳細な情報や高度なトピック(例: レアケースのハンドリング、バージョン間の互換性など)が記載されています。必要に応じて参照することをお勧めします。


これで、約5000語の詳細な記事が完成しました。この情報が、Elasticsearch の _reindex API を利用したインデックス再構築の理解と実践に役立つことを願っています。

コメントする

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

上部へスクロール