Elasticsearch reindex 完全ガイド:データ再構築のすべて

Elasticsearch reindex 完全ガイド:データ再構築のすべて

はじめに

Elasticsearchは、大量のデータを高速に検索、分析できる強力な分散型検索・分析エンジンです。しかし、運用していく中で、データの構造を変更したり、クラスター構成を変更したり、バージョンアップを行ったりする必要が生じることがあります。このような場合、既存のデータを新しい構造やクラスターに移動、あるいは再構築する必要が出てきます。この「データ再構築」の中心的な役割を担う機能が、本記事で解説する「reindex」です。

reindexは、あるインデックスから別のインデックスへドキュメントを効率的にコピーするElasticsearchのAPIです。単にコピーするだけでなく、データの変換、特定のドキュメントのフィルタリング、バージョン競合の管理など、様々な高度な操作を伴う再構築プロセスをサポートします。

本記事では、Elasticsearchにおけるreindexの基本から応用、パフォーマンスチューニング、一般的なユースケース、そして注意点やトラブルシューティングに至るまで、「reindexのすべて」を網羅的に解説します。約5000語に及ぶ詳細な解説を通じて、Elasticsearchのデータ再構築に関する理解を深め、実際の運用に役立てていただくことを目的としています。

この記事は、Elasticsearchの基本的な概念(インデックス、ドキュメント、マッピング、シャードなど)を理解している方を対象としています。

この記事を読むことで、以下のことが理解できます。

  • reindexがどのような仕組みで動作するのか
  • reindex APIの様々なオプションを使ったデータ操作方法
  • 一般的なreindexのユースケースとその実現方法
  • reindexのパフォーマンスを最適化する方法
  • reindex実行時に発生しうる問題と対処法
  • ダウンタイムを最小限に抑えるreindex戦略

それでは、Elasticsearch reindexの世界へ深く踏み込んでいきましょう。

reindexの基本概念

Elasticsearchのreindex機能は、インデックス間でドキュメントをコピーするためのAPIです。これは、単なるファイルコピーではなく、Elasticsearchクラスター内でドキュメントを読み取り、必要に応じて変換し、別のインデックスに書き込むという一連のプロセスを実行します。

reindexとは何か?

reindexは、Elasticsearchの内部で実行されるタスクとして実装されています。クライアントからreindex APIを呼び出すと、Elasticsearchクラスターはそのリクエストを受け付け、タスクをスケジュールします。このタスクは、指定されたソースインデックスからドキュメントを検索し、その結果をデスティネーションインデックスにインデックスする処理を繰り返します。

reindexが必要になる主なケース

なぜreindexが必要になるのでしょうか?以下は一般的なreindexのユースケースです。

  1. マッピング(スキーマ)の変更: ドキュメントのフィールドの型を変更したい、新しいフィールドを追加したい、既存のフィールドの設定(analyzer, indexオプションなど)を変更したい場合など。マッピングは通常、インデックス作成後に変更が難しいため、新しいマッピングを持つインデックスを作成し、既存データをreindexする必要があります。
  2. インデックス設定の変更: シャード数、レプリカ数、refresh_interval、translog設定など、インデックス作成後に変更できない、あるいは変更にreindexが推奨される設定を変更したい場合。新しい設定でインデックスを作成し、reindexを行います。
  3. Elasticsearchバージョンのアップグレード: 大規模なバージョンアップ(例: 6.xから7.x、7.xから8.x)では、インデックス形式が変更されることがあります。古い形式のインデックスを新しいバージョンで利用可能な形式に変換するためにreindexが必要になる場合があります。
  4. データのクレンジングまたは変換: 特定のフィールドの値を修正する、フィールドを結合・分割する、不要なフィールドを削除するといった、データの構造そのものを変更したい場合。reindex時にスクリプトを使用してこれらの変換を行います。
  5. データのサブセットまたは結合: 特定の条件に一致するドキュメントのみを新しいインデックスにコピーしたい(サブセット)、あるいは複数のインデックスのデータを一つのインデックスにまとめたい(結合)場合。
  6. クラスター間のデータ移行: あるElasticsearchクラスターから別のクラスターへデータを移動したい場合。リモートreindex機能を利用します。

これらのケースの多くで、reindexはデータの構造や配置を変更するための不可欠なツールとなります。

reindexの動作原理

reindex APIは、内部的に以下のステップを実行します。

  1. ソースインデックスの検索: 指定されたソースインデックス(または複数のインデックス、エイリアス、データストリーム)に対して、すべてのドキュメント(または指定されたクエリに一致するドキュメント)を検索します。これはスクロール検索に似た効率的な方法で行われます。
  2. ドキュメントの処理: 読み取った各ドキュメントに対して、必要に応じて変換スクリプトを適用します。
  3. デスティネーションインデックスへの書き込み: 処理されたドキュメントを、指定されたデスティネーションインデックスにインデックス(追加または更新)します。これはBulk APIを使って効率的に行われます。

このプロセスは、ソースインデックスのすべてのドキュメントがデスティネーションインデックスにコピーされるまで繰り返されます。reindexはサーバーサイドで実行されるため、クライアントとの通信オーバーヘッドが少なく、効率的に大量のデータを処理できます。

バージョン管理と競合回避

reindexでは、ドキュメントのバージョン管理が重要な考慮事項となります。Elasticsearchのドキュメントはバージョンを持っており、同じIDのドキュメントが複数回更新されるとバージョン番号が増加します。

reindex実行中に、ソースインデックスのドキュメントが更新される可能性があります。デフォルトでは、reindexはソースドキュメントのバージョンを無視して、デスティネーションインデックスに新しいドキュメントとしてインデックスしようとします。しかし、デスティネーションインデックスにすでに同じIDのドキュメントが存在する場合、バージョン競合が発生する可能性があります。

この競合の挙動は conflicts パラメータで制御できます。

  • conflicts: "abort" (デフォルト): バージョン競合が発生した場合、reindexタスクは中断されます。
  • conflicts: "proceed": バージョン競合が発生しても処理を続行し、競合したドキュメントについては警告ログが出力されます。

また、ソースドキュメントのバージョンを保持したい場合は、dest パラメータ内で version_type を指定します。

  • version_type: "internal" (デフォルト): デスティネーションインデックスで新しいバージョン番号が割り当てられます。
  • version_type: "external": ソースドキュメントのバージョン番号をそのまま使用します。これにより、ソースドキュメントのバージョンがデスティネーションドキュメントのバージョンよりも新しい場合にのみドキュメントがインデックスされるようになります。これは、reindexを複数回実行して差分を取り込みたい場合などに有効です。

ただし、version_type: "external" を使用する場合、ソースインデックスのバージョン管理(例えば version_type: "external")とデスティネーションインデックスのバージョン管理が一致しているか、あるいはデスティネーションインデックスが空である必要があります。そうでなければ、予期しないバージョン競合やデータ損失が発生する可能性があります。一般的には、version_type を明示的に指定せず、デフォルトの挙動 (internal) に任せるか、あるいは version_type: "external" を使う場合はソースインデックスも external バージョニングである場合に限定するのが安全です。

reindex APIの使い方

reindexは、_reindex エンドポイントに対するPOSTリクエストとして実行されます。リクエストボディで、ソースインデックスとデスティネーションインデックス、そして必要に応じて変換スクリプトやフィルタリング条件などを指定します。

基本的なリクエスト構造は以下の通りです。

json
POST _reindex
{
"source": {
"index": "source_index_name",
"query": { ... }, // オプション:フィルタリングクエリ
"size": 1000, // オプション:バッチサイズ
"sort": { ... } // オプション:ソート順 (slicesを使う場合に推奨)
},
"dest": {
"index": "destination_index_name",
"version_type": "internal", // オプション:バージョン管理方法
"op_type": "index" // オプション:操作タイプ (index or create)
},
"script": { // オプション:データ変換スクリプト
"lang": "painless",
"source": "..."
},
"conflicts": "abort" // オプション:競合発生時の挙動 (abort or proceed)
}

各パラメータについて詳しく見ていきましょう。

source パラメータ

ソースインデックス、検索条件、バッチサイズなどを指定します。

  • index: 必須. ソースとなるインデックス、エイリアス、またはデータストリームの名前。複数のインデックスを指定する場合は文字列の配列で指定します (例: ["index1", "index2"])。ワイルドカード (logs-*) も使用可能です。
  • query: オプション. デスティネーションにコピーするドキュメントをフィルタリングするためのクエリ。指定しない場合、すべてのドキュメントがコピーされます。
    json
    "source": {
    "index": "my_logs",
    "query": {
    "range": {
    "@timestamp": {
    "gte": "now-7d/d",
    "lt": "now/d"
    }
    }
    }
    }
  • size: オプション. reindexが一度に読み取るドキュメントのバッチサイズ。デフォルトは1000。パフォーマンスチューニングのために調整することがあります。
  • sort: オプション. ソースドキュメントを読み取る際のソート順。スライス (slices) を使用してreindexを並列化する場合、効率を高めるために _doc でソートすることが推奨されます。
    json
    "source": {
    "index": "large_index",
    "sort": {
    "_doc": "asc"
    }
    }
  • _source: オプション. コピーするソースフィールドを指定します。特定のフィールドのみをコピーしたい場合や、不要なフィールドを除外したい場合に有用です。文字列の配列 (["field1", "field2"]) やオブジェクト ({"includes": ["field1"], "excludes": ["field2"]}) で指定します。
    json
    "source": {
    "index": "user_data",
    "_source": ["username", "email", "created_at"]
    }
  • remote: オプション. リモートのElasticsearchクラスターからreindexする場合に指定します。後述の「リモートクラスターからのreindex」で詳しく説明します。

dest パラメータ

デスティネーションインデックス、操作タイプ、バージョン管理などを指定します。

  • index: 必須. ドキュメントのコピー先となるインデックス名。このインデックスはreindexを実行する前に存在している必要があります。
  • version_type: オプション. バージョン管理方法 (internal, external, external_gte, force). デフォルトは internal
    • internal: デスティネーションインデックスで新しいバージョンが生成されます。
    • external: ソースドキュメントのバージョンを使用します。デスティネーションドキュメントのバージョンがソースよりも低い場合にのみインデックスされます。
    • external_gte: ソースドキュメントのバージョンを使用します。デスティネーションドキュメントのバージョンがソース以下の場合にインデックスされます。
    • force: バージョンチェックを無視して強制的にインデックスします(非推奨)。
  • op_type: オプション. デスティネーションインデックスでの操作タイプ (index, create). デフォルトは index
    • index: ドキュメントIDが存在すれば更新、なければ新規作成。
    • create: ドキュメントIDがすでに存在する場合、競合エラーが発生。再インデックス中にソースドキュメントが変更されないことが保証されている場合や、ソースインデックスが完全に静的である場合に利用できます。通常は index を使用します。
  • routing: オプション. デスティネーションインデックスへのルーティングを指定します。keep (_routing フィールドを保持), discard (_routing フィールドを破棄), または静的なルーティング値を指定できます。

script パラメータ

ソースドキュメントをデスティネーションに書き込む前に変換するために使用します。主に Painless スクリプト言語を使用します。

  • lang: オプション. スクリプト言語。デフォルトは painless
  • source: 必須. スクリプト本体。
  • params: オプション. スクリプトに渡すパラメータ。

スクリプト内では、ctx オブジェクトを通じて現在のドキュメントにアクセスできます。

ユースケース例:

  • フィールドの削除:
    json
    "script": {
    "source": "ctx._source.remove('unnecessary_field')"
    }
  • フィールド名の変更:
    json
    "script": {
    "source": "ctx._source.new_field_name = ctx._source.old_field_name; ctx._source.remove('old_field_name')"
    }
  • 新しいフィールドの追加:
    json
    "script": {
    "source": "ctx._source.processing_timestamp = System.currentTimeMillis()"
    }
  • フィールド値の変換:
    json
    "script": {
    "source": "ctx._source.status = ctx._source.status.toUpperCase()"
    }
  • 特定の条件に基づくドキュメントのスキップ:
    json
    "script": {
    "source": "if (ctx._source.status == 'pending') { ctx.op = 'noop' }"
    }

    ctx.opnoop に設定すると、そのドキュメントはスキップされます。ctx.op = 'delete' に設定すると、デスティネーションインデックスから対応するドキュメントが削除されます(デスティネーションに存在する場合)。

スクリプトの使用は、パフォーマンスに影響を与える可能性があるため、複雑な変換が必要な場合は事前にテストし、パフォーマンスを監視することが重要です。

conflicts パラメータ

バージョン競合が発生した場合の挙動を制御します。デフォルトは abort

json
"conflicts": "proceed"

reindex実行中にソースインデックスのドキュメントが頻繁に更新される場合、proceed を使用しないとreindexが中断されてしまう可能性があります。ただし proceed を使用すると、競合したドキュメントはスキップされるため、デスティネーションインデックスがソースインデックスの完全なコピーにならない可能性があります。

非同期実行 (wait_for_completion=false)

デフォルトでは、_reindex リクエストは完了するまでブロッキングされます (wait_for_completion=true)。しかし、大規模なreindexは長時間かかるため、通常は非同期で実行します。

json
POST _reindex?wait_for_completion=false
{
"source": { ... },
"dest": { ... }
}

このリクエストを送信すると、Elasticsearchはタスクを受け付け、すぐにタスクIDを返します。

json
{
"task": "task_id_string"
}

このタスクIDを使用して、_tasks APIでreindexタスクの進捗を監視したり、キャンセルしたりできます。

bash
GET _tasks/task_id_string

タスクが完了すると、_tasks APIのレスポンスに完了情報(成功数、失敗数、バージョン競合数など)が表示されます。タスクが完了した後は、_tasks APIのレスポンスはしばらく保持されますが、一定時間経過後に消去されます。完了したタスクの結果を永続的に保持したい場合は、タスク結果リテンション設定を調整するか、モニタリングツールを利用します。

reindexの詳細設定と高度なトピック

基本的な使い方だけでなく、reindexには様々なオプションがあり、パフォーマンスや挙動を細かく制御できます。

スロットル (requests_per_second)

reindexプロセスはソースインデックスからデータを読み込み、デスティネーションインデックスに書き込みます。この処理はCPU、ディスクI/O、ネットワーク帯域幅を消費するため、クラスター全体の負荷を増大させる可能性があります。他の重要なワークロード(検索、インデックス)への影響を最小限に抑えるために、reindexの速度を制限できます。

requests_per_second パラメータで、reindexが1秒間に処理するドキュメントのバッチ数を指定します。

json
POST _reindex
{
"source": { ... },
"dest": { ... },
"requests_per_second": 1000 // 1秒あたり最大1000件のバッチを処理
}

  • requests_per_second: -1 (デフォルト): スロットルなし。可能な限り高速に実行します。
  • requests_per_second: 1000: 1秒あたり最大1000バッチ(デフォルトのバッチサイズ1000なら、約100万ドキュメント)を処理します。
  • requests_per_second: 100: 1秒あたり最大100バッチ(約10万ドキュメント)を処理します。

適切な値はクラスターのリソース状況や他のワークロードへの影響度によって異なります。最初は小さめの値から始めて、徐々に増やしていくか、-1で実行して負荷状況を確認し、必要に応じてタスクを更新してスロットルを設定するのが良いでしょう。

実行中のタスクのスロットル設定は、_tasks エンドポイントを使って更新できます。

bash
POST _tasks/task_id_string/_rethrottle
{
"requests_per_second": 500
}

バッチサイズ (size)

source パラメータ内の size は、reindexが一度にソースインデックスから読み込むドキュメント数、つまりBulk APIでデスティネーションに書き込む際のバッチサイズを指定します。デフォルトは1000です。

バッチサイズを大きくすると、1回のネットワークラウンドトリップでより多くのドキュメントを転送できますが、メモリ消費量が増加します。小さすぎると、ネットワークオーバーヘッドが増加します。適切なサイズは、ドキュメントサイズ、ネットワーク遅延、ノードのメモリ容量などによって異なります。通常、デフォルトの1000は妥当な値ですが、パフォーマンスがボトルネックになっている場合は調整を検討する価値があります。

json
"source": {
"index": "my_index",
"size": 2000 // バッチサイズを2000に変更
}

タスク管理 (_tasks API)

非同期でreindexを実行した場合、_tasks APIでその進捗状況を監視できます。

bash
GET _tasks/task_id_string

このAPIは、タスクの状態(実行中、完了)、進捗(処理されたドキュメント数、成功数、失敗数、バージョン競合数)、スロットル設定、経過時間などの詳細情報を提供します。

特定のノードで実行中のreindexタスクや、特定のタイプのタスクを一覧表示することも可能です。

bash
GET _tasks?detailed=true&actions=*reindex*

タスクのキャンセル

実行中のreindexタスクをキャンセルしたい場合は、_tasks APIを使用します。

bash
POST _tasks/task_id_string/_cancel

このリクエストは、タスクに対して中断を要求します。タスクは速やかに停止しますが、すでに完了した処理は元に戻りません。

レプリカの設定 (?slices) と並列化

大規模なインデックスのreindexは時間がかかります。reindexプロセスを並列化することで、実行時間を短縮できます。reindex APIは slices パラメータをサポートしており、これを指定することでreindexタスクを複数のサブタスクに分割し、それぞれが並列に実行されるようになります。

json
POST _reindex
{
"source": {
"index": "large_index",
"sort": {"_doc": "asc"} // slicesを使う場合、効率のためにソートを推奨
},
"dest": {
"index": "new_large_index"
},
"slices": 8 // タスクを8つのスライスに分割
}

slices パラメータの値は、以下のいずれかを指定できます。

  • 数値 (N): reindexタスクをN個のサブタスクに分割します。各サブタスクは、ソースインデックスの特定のシャードセットまたはドキュメント範囲を担当します。Nの推奨値は、ソースインデックスのシャード数、あるいはクラスター内のノード数やコア数に基づいて決定します。通常、ソースシャード数と同じか少し多めの値を指定するのが良いとされています。slices:"auto" を指定すると、Elasticsearchが自動的にシャード数を基に最適なスライス数を決定しようとします。
  • "auto": Elasticsearchが、ソースインデックスのシャード数に基づいて自動的にスライス数を決定します。これは便利なオプションですが、手動で調整するよりも最適なパフォーマンスが得られない場合もあります。

slices を使用する場合、ソースインデックスを _doc でソートすることが推奨されます。これにより、各スライスが効率的にドキュメントの範囲を処理できるようになります。

並列化により、CPU、ディスクI/O、ネットワーク帯域幅の使用量が増加します。クラスターのリソース状況を考慮して適切なスライス数を設定してください。スライス数が多すぎると、タスク管理のオーバーヘッドが増加し、逆にパフォーマンスが低下する可能性もあります。

リモートクラスターからのreindex

Elasticsearchは、別のElasticsearchクラスターからreindexする機能も提供しています。これは、データ移行やクラスター統合などのシナリオで非常に便利です。

リモートreindexを使用するには、reindexリクエストの source パラメータ内に remote ブロックを追加し、リモートクラスターのアドレスと認証情報を指定します。

json
POST _reindex
{
"source": {
"remote": {
"host": "http://remote_cluster_host:9200",
"username": "elastic",
"password": "changeme",
"socket_timeout": "5m", // オプション:ソケットタイムアウト
"connect_timeout": "10s" // オプション:接続タイムアウト
},
"index": "source_index_on_remote",
"query": { ... }
},
"dest": {
"index": "destination_index_on_local"
}
}

  • host: リモートクラスターのURL。
  • username, password: リモートクラスターへの認証情報。Basic認証に対応しています。APIキーを使用することも可能です。
  • socket_timeout, connect_timeout: 接続やソケットのタイムアウト時間。大規模なデータやネットワーク遅延がある場合に調整が必要になることがあります。

セキュリティ上の理由から、認証情報は設定ファイル (elasticsearch.yml) に定義し、その設定名を参照する方が推奨されます。これにより、認証情報がリクエストボディに平文で含まれることを避けられます。

“`yaml

elasticsearch.yml に設定を追加

reindex.remote.whitelist: remote_cluster_alias:remote_cluster_host:9200

keystore に認証情報を追加

bin/elasticsearch-keystore add reindex.remote.remote_cluster_alias.username
bin/elasticsearch-keystore add reindex.remote.remote_cluster_alias.password
“`

そして、reindexリクエストではエイリアス名を使用します。

json
POST _reindex
{
"source": {
"remote": {
"cluster": "remote_cluster_alias"
},
"index": "source_index_on_remote"
},
"dest": {
"index": "destination_index_on_local"
}
}

リモートreindexはネットワークを経由するため、ネットワーク帯域幅と遅延がパフォーマンスに大きく影響します。また、両方のクラスターに十分なリソース(CPU、メモリ、ディスクI/O)があることを確認してください。

インデックス設定の継承と変更(mapping, settings)

reindexは既存のインデックスから別のインデックスにドキュメントをコピーするだけです。デスティネーションインデックスのマッピングや設定(シャード数、レプリカ数など)は、ソースインデックスから自動的に継承されません。

したがって、マッピングやインデックス設定を変更したい場合は、reindexを実行する前に、目的のマッピングと設定を持つ新しいデスティネーションインデックスを手動で作成しておく必要があります。

例えば、新しいマッピングでデータを再構築する場合の手順は以下のようになります。

  1. 新しいマッピング定義を作成します。
  2. 新しいマッピングと必要な設定(シャード数など)で、新しいインデックス (new_index) を作成します。
    bash
    PUT /new_index
    {
    "settings": { ... },
    "mappings": { ... }
    }
  3. old_index から new_index へreindexを実行します。
    bash
    POST _reindex
    {
    "source": { "index": "old_index" },
    "dest": { "index": "new_index" }
    }
  4. reindexが完了したら、アプリケーションが新しいインデックスを参照するように切り替えます。これは、通常、エイリアスを使って行われます。

reindexの一般的なユースケース

ここでは、前述のreindexが必要になるケースについて、具体的な手順や考慮事項を掘り下げて説明します。

スキーマ(マッピング)の変更

Elasticsearchでは、既存フィールドのマッピング型を後から変更することは困難です。フィールドの型を変えたい、インデキシング方法を変えたい、新しいフィールドを追加したいなどの場合は、新しいマッピングでインデックスを再構築する必要があります。

手順:

  1. 新しいマッピング定義を作成します。必要に応じて、新しいインデックス名も決定します(例: my_data_v2)。
  2. 新しいマッピングと設定で、新しいインデックスを作成します。
    bash
    PUT /my_data_v2
    {
    "settings": {
    "index": {
    "number_of_shards": 5,
    "number_of_replicas": 1
    }
    },
    "mappings": {
    "properties": {
    "field_to_change": { "type": "long" }, // 型を変更
    "new_field": { "type": "keyword" }, // 新しいフィールドを追加
    "existing_field": { "type": "text", "analyzer": "english" } // アナライザを変更
    // その他の既存フィールドも新しいマッピングに含める
    }
    }
    }
  3. 古いインデックス (my_data_v1) から新しいインデックス (my_data_v2) へreindexを実行します。この際、もし新しいフィールドの値を計算するなどデータ変換が必要であれば script パラメータを使用します。
    json
    POST _reindex
    {
    "source": { "index": "my_data_v1" },
    "dest": { "index": "my_data_v2" },
    "script": { // 必要に応じてデータ変換スクリプト
    "lang": "painless",
    "source": "ctx._source.new_field = 'default_value';" // 新しいフィールドにデフォルト値を設定
    }
    }
  4. reindexタスクの完了を監視します。
  5. reindex完了後、アプリケーションが新しいインデックス (my_data_v2) を参照するように切り替えます。通常、エイリアスを使ってシームレスに切り替えます。
    • 既存のエイリアス (my_data_alias) が my_data_v1 を指していると仮定します。
    • アトミックな切り替えを実行します。
      bash
      POST /_aliases
      {
      "actions": [
      { "remove": { "alias": "my_data_alias", "index": "my_data_v1" }},
      { "add": { "alias": "my_data_alias", "index": "my_data_v2" }}
      ]
      }
  6. 新しいインデックス (my_data_v2) でインデキシングや検索が正常に行われていることを確認します。
  7. しばらく運用し、問題がないことを確認してから、古いインデックス (my_data_v1) を削除します。

このエイリアスを使った方法は、reindex中のダウンタイムをゼロまたは最小限に抑えるための標準的なアプローチです。

シャード数の変更

インデックス作成後にシャード数を変更することはできません。データ量が増加し、現在のシャード数ではパフォーマンスが不足する場合や、データ量が減少してシャード数が過剰になった場合など、シャード数を最適化したい場合にreindexが必要になります。

手順:

  1. 新しいインデックス名と、新しいシャード数を決定します(例: my_data_v2number_of_shards: 10)。
  2. 新しいシャード数と、必要に応じて他の設定(レプリカ数など)で、新しいインデックスを作成します。マッピングは古いインデックスと同じものを使用するのが一般的です。
    bash
    PUT /my_data_v2
    {
    "settings": {
    "index": {
    "number_of_shards": 10, // シャード数を変更
    "number_of_replicas": 1
    }
    },
    "mappings": { ... } // my_data_v1 と同じマッピング
    }
  3. my_data_v1 から my_data_v2 へreindexを実行します。スライス (slices) パラメータを活用して並列化することで、高速化を図れます。slices の値は、新しいシャード数と同じか、クラスターのリソースに応じてそれ以上の値を設定することが推奨されます。
    json
    POST _reindex
    {
    "source": { "index": "my_data_v1" },
    "dest": { "index": "my_data_v2" },
    "slices": 10 // 新しいシャード数と同じかそれ以上
    }
  4. reindexタスクの完了を監視します。
  5. reindex完了後、エイリアスを切り替えてアプリケーションを新しいインデックス (my_data_v2) に向けます(前述のマッピング変更と同様の手順)。
  6. 新しいインデックスで運用を確認後、古いインデックス (my_data_v1) を削除します。

シャード数の変更は、クラスター全体のパフォーマンスやスケーラビリティに大きく影響するため、慎重に計画する必要があります。

Elasticsearchバージョンのアップグレード

Elasticsearchのメジャーバージョンアップグレードでは、インデックスの内部形式が変更されることがあります。古い形式のインデックスは新しいバージョンで直接開けない場合や、非推奨となる場合があります。このような場合、新しいバージョンでデータをreindexする必要があります。

バージョンアップグレードの公式ドキュメントには、どのバージョン間でreindexが必要か、どのような手順を踏むべきかが詳しく記載されています。必ず公式ドキュメントを確認してください。

一般的な手順(例: 古いクラスターから新しいクラスターへの移行を伴う場合):

  1. 新しいElasticsearchバージョンで、新しいクラスターを構築します。
  2. 古いクラスターからreindex対象のインデックスのマッピングと設定を取得します。
  3. 新しいクラスターに、古いインデックスと同じマッピングと設定を持つ新しいインデックスを作成します。
  4. 古いクラスターから新しいクラスターへ、リモートreindexを実行します。
    json
    POST _reindex
    {
    "source": {
    "remote": {
    "cluster": "old_cluster_alias" // elasticsearch.yml で設定済み
    },
    "index": "old_index_on_old_cluster"
    },
    "dest": {
    "index": "new_index_on_new_cluster"
    }
    }

    リモートreindexが利用できない場合や、データ量が非常に多い場合は、Snapshot/Restoreを使う方法も検討できます。
  5. reindex完了後、アプリケーションの接続先を新しいクラスターの新しいインデックス(またはエイリアス)に切り替えます。
  6. 新しいクラスターでの運用を確認後、古いクラスターを停止または削除します。

データのクレンジングや変換

インデキシングされたデータに不備がある場合や、分析要件の変更に伴いデータの形式を変更したい場合、reindexと script パラメータを組み合わせることでデータを再構築できます。

ユースケース例:

  • 特定の値を持つフィールドを修正する
  • 複数のフィールドの値を組み合わせて新しいフィールドを作成する
  • 不要になったフィールドを削除する
  • 欠損値をデフォルト値で埋める

手順:

  1. 新しいインデックスを作成します。マッピングは、変換後のデータ構造に合わせて定義します。
  2. 古いインデックスから新しいインデックスへreindexを実行し、script パラメータでデータ変換ロジックを記述します。
    json
    POST _reindex
    {
    "source": { "index": "my_raw_data" },
    "dest": { "index": "my_cleaned_data" },
    "script": {
    "lang": "painless",
    "source": """
    if (ctx._source.status == 'pending_old_value') {
    ctx._source.status = 'pending'; // 値を修正
    }
    ctx._source.full_name = ctx._source.first_name + ' ' + ctx._source.last_name; // フィールドを結合
    ctx._source.remove('old_timestamp'); // フィールドを削除
    """
    }
    }
  3. reindex完了後、エイリアスを切り替えてアプリケーションを新しいインデックスに向けます。

スクリプトは非常に柔軟ですが、パフォーマンスに影響を与えやすい点に注意が必要です。複雑な変換の場合は、reindexの前にIngest Nodeを使ってデータ取り込み時に変換を行う方法も検討できます。

reindexのパフォーマンスチューニングと最適化

大規模なreindexは、クラスターに大きな負荷をかけ、実行に長時間かかる可能性があります。以下のテクニックは、reindexのパフォーマンスを最適化し、クラスターへの影響を軽減するのに役立ちます。

ソースインデックスの読み込み最適化

  • レプリカからの読み込み: _reindex APIは、ソースインデックスのレプリカシャードからドキュメントを読み取ることができます。これにより、プライマリシャードへの負荷を軽減し、検索スループットを向上させることができます。ソースインデックスに十分なレプリカがあることを確認してください。
  • refresh無効化(一時的): reindex中にソースインデックスへの書き込みがほとんど発生しない場合、一時的にソースインデックスの refresh_interval を無効にする(-1に設定する)ことで、検索性能をわずかに向上させることができる場合があります。ただし、これはreindex中のソースインデックスの検索鮮度を損なうため、影響を理解した上で行ってください。通常、reindex中にソースインデックスを頻繁に検索することはないため、この設定は必須ではありません。

デスティネーションインデックスの書き込み最適化

reindexのボトルネックは、多くの場合、デスティネーションインデックスへの書き込み性能にあります。

  • refresh無効化(推奨): reindex実行中は、デスティネーションインデックスへのインデキシング性能が最も重要です。一時的にデスティネーションインデックスの refresh_interval を無効にする(-1に設定する)ことで、インデキシングスループットを大幅に向上させることができます。reindex完了後に、元の refresh_interval に戻すか、_forcemerge APIを実行してセグメントをマージすることを検討してください。
    bash
    PUT /destination_index/_settings
    {
    "index": {
    "refresh_interval": -1
    }
    }
    # ... reindex 実行 ...
    PUT /destination_index/_settings
    {
    "index": {
    "refresh_interval": "1s" # 元の値に戻す
    }
    }
  • translog設定の調整(一時的): index.translog.durabilityasync に設定することで、書き込み性能を向上させることができます。ただし、これはデータの耐久性を犠昧にする(ノードクラッシュ時に最新のデータが失われる可能性がある)ため、reindex中のデータ損失が許容できる場合に限定してください。通常はデフォルトの request のままで問題ありません。
  • シャード数とレプリカ数: デスティネーションインデックスのシャード数は、インデキシングのスケーラビリティに影響します。十分な数のシャードを確保することで、書き込み処理を並列化できます。ただし、シャードが多すぎるとクラスター管理のオーバーヘッドが増加します。レプリカ数はreindex中の書き込み性能には直接影響しませんが、reindex完了後の検索性能や可用性に影響します。reindex中はレプリカ数を0に設定し、完了後に1以上に増やすことで、reindex中の書き込み負荷を軽減できる場合があります。
  • バッチサイズ (size) の調整: 前述の通り、source.size パラメータを調整することで、一度に転送・書き込みするドキュメント数を変更できます。適切なサイズは、ドキュメントサイズ、ネットワーク状況、ノードのメモリによって異なります。大きすぎるとメモリ不足、小さすぎるとオーバーヘッドが増加します。
  • マージ設定の調整(一時的): reindexは大量のドキュメントを書き込むため、多数の小さなセグメントが生成されます。デフォルトのマージ設定は検索性能を維持するように最適化されていますが、インデキシング中は一時的にマージ速度を優先することも検討できます。例えば、index.merge.scheduler.max_thread_count を増やすなど。ただし、これはディスクI/Oを激しく消費するため注意が必要です。reindex完了後に _forcemerge を実行する方が一般的です。

slices パラメータの活用

slices パラメータを使用することで、reindexタスクを並列実行し、全体のスループットを向上させることができます。スライス数は、ソースインデックスのシャード数、デスティネーションインデックスのシャード数、そしてクラスター内のデータノード数やCPUコア数を考慮して決定します。

一般的には、ソースシャード数と同じか、デスティネーションシャード数と同じか、あるいはクラスター内のデータノード数 * ノードあたりのCPUコア数 と同程度の値を試してみるのが良いとされています。ただし、スライスが多すぎるとタスク管理のオーバーヘッドが増えるため、最適な値はテストを通じて見つける必要があります。

requests_per_second の調整

requests_per_second パラメータでreindexの速度を制限し、クラスター全体の負荷を制御します。他の重要なワークロード(アプリケーションからの検索やインデキシング)が影響を受けないように、最初は低い値から始めるか、負荷状況を監視しながら調整するのが安全です。-1(無制限)で開始し、クラスターの負荷が高すぎる場合にスロットルを有効にする、というアプローチも可能です。

リソースの監視

reindex実行中は、クラスターのリソース使用率(CPU、メモリ、ディスクI/O、ネットワーク帯域幅)を注意深く監視することが非常に重要です。

  • Cat APIs: _cat/nodes, _cat/indices, _cat/thread_pool などを使って、ノードの状態、インデックスサイズ、スレッドプールの状況などを確認できます。
  • Cluster Monitoring Tools: KibanaのMonitoring UIやPrometheus/Grafanaなどの外部ツールを利用して、より詳細なメトリクスを収集・可視化することで、ボトルネックを特定しやすくなります。特に、インデキシングレート、検索レイテンシ、CPU使用率、I/O Wait時間などを監視します。
  • Task API: _tasks APIでreindexタスク自体の進捗(処理されたドキュメント数、スロットル状況など)を確認します。

リソースにボトルネックがある場合(例: CPU使用率が高い、I/O Waitが大きい)、reindexの速度を絞るか、クラスターにリソースを追加することを検討する必要があります。

reindex時の注意点とトラブルシューティング

reindexは強力なツールですが、実行時にはいくつかの注意点や落とし穴があります。

ディスク容量の確認

reindexはソースインデックスからドキュメントを読み込み、デスティネーションインデックスに書き込みます。この間、ソースインデックスとデスティネーションインデックスの両方がディスク容量を消費します。デスティネーションインデックスはソースインデックスと同じか、マッピングや設定によってはそれ以上の容量を消費する可能性があります。

reindexを開始する前に、デスティネーションインデックスを格納するノードに十分な空きディスク容量があることを必ず確認してください。 ディスク容量が不足すると、reindexタスクが失敗するだけでなく、クラスター全体が不安定になる可能性があります。シャード割り当てエラーや読み取り専用インデックスへの自動切り替えが発生する場合があります。

メモリ・CPUリソースの確認

reindexプロセスは、ソースからの検索(スクロール)、ドキュメント処理(スクリプト含む)、デスティネーションへのインデックス(Bulk)を行います。これらはノードのCPUとメモリを消費します。特に、複雑なスクリプトを使用する場合や、slices パラメータで並列度を高く設定した場合、多くのリソースが必要になります。

reindex実行前に、クラスター全体のCPU使用率やメモリ使用率(特にJVMヒープ使用率)に余裕があることを確認してください。reindex中にリソースが枯渇すると、レスポンスタイムの増加、他のタスクの遅延、最悪の場合はノードクラッシュにつながる可能性があります。

ネットワークの問題

リモートreindexを行う場合、ソースクラスターとデスティネーションクラスター間のネットワーク接続の品質が非常に重要です。ネットワーク帯域幅が不足したり、遅延が大きかったりすると、reindexのパフォーマンスが著しく低下します。ファイアウォール設定も確認し、必要なポート(通常9200/9300)が開放されていることを確認してください。

クラスター内のreindexであっても、ノード間の通信は発生するため、ネットワークの状態はパフォーマンスに影響します。

バージョン競合

前述の通り、conflicts パラメータの扱いに注意が必要です。デフォルトの abort は安全ですが、reindex中にソースインデックスが更新される可能性がある場合はreindexが中断されてしまいます。proceed を使用すると中断は回避できますが、競合したドキュメントはスキップされるため、データの欠落が発生します。

reindex中にソースインデックスへの書き込みが発生する場合は、以下のいずれかを検討します。

  • reindex中はソースインデックスへの書き込みを一時停止する(ダウンタイムが発生します)。
  • conflicts: "proceed" を使用し、reindex後にスキップされたドキュメントを特定して手動で処理する。
  • ダウンタイムを最小限に抑えるためのエイリアス切り替え戦略中に、書き込みを新しいインデックスと古いインデックスの両方に対して行う(デュアルライティング)期間を設ける(アプリケーション側の変更が必要です)。
  • reindexを複数回実行し、version_type: "external" を使用して差分を取り込む(複雑なケースでは難しい場合があります)。

最も一般的なベストプラクティスは、後述するエイリアスを使ったダウンタイム最小化戦略です。

スクリプトのエラー

script パラメータでデータ変換を行う場合、スクリプトの記述ミスやデータ形式の不一致などによりエラーが発生する可能性があります。スクリプトのエラーが発生すると、そのドキュメントのreindexは失敗します。conflicts: "abort" の場合はタスク全体が中断されます。

スクリプトを使用する場合は、少量のデータで事前にテストし、エラーが発生しないことを確認することが重要です。reindexタスクの失敗が発生した場合は、タスク情報やElasticsearchのログを確認してエラー原因を特定します。

タスクの失敗と原因特定

reindexタスクが失敗した場合、_tasks APIでタスクの詳細情報を取得し、エラーメッセージやスタックトレースを確認します。

bash
GET _tasks/task_id_string

また、Elasticsearchのログファイル (elasticsearch.log) にも、より詳細なエラー情報が出力されていることが多いです。ログを分析することで、ディスク容量不足、メモリ不足、ネットワークエラー、スクリプトエラー、パーミッションエラーなど、具体的な失敗原因を特定できます。

失敗したタスクを再開する直接的な方法はありません。原因を修正した後、必要であれば再度reindexタスクを実行する必要があります。その際、query パラメータを使って前回の実行で処理されなかったドキュメントのみを対象とするなど、工夫が必要になる場合があります。

ロングランニングタスクの管理

大規模なreindexは数時間、あるいは数日かかることがあります。非同期実行 (wait_for_completion=false) で開始し、タスクIDを使って進捗を定期的に監視することが重要です。

_tasks APIは、タスクの進捗(processed数、total数)やスロットル状況を表示します。これらの情報を見て、reindexが順調に進んでいるか、ボトルネックが発生していないかを判断します。

ダウンタイム最小化戦略(エイリアス利用)

ほとんどの運用環境では、reindexによるダウンタイムは許容できません。前述の「マッピング(スキーマ)の変更」のセクションで触れたエイリアスを使った方法は、ダウンタイムを最小限に抑えるための標準的な戦略です。

手順の再掲:

  1. 新しいインデックスを作成: 新しいマッピングや設定で、新しい空のインデックス (new_index) を作成します。この時点では、アプリケーションはまだ古いインデックス (old_index) を使用しています。
  2. reindexの実行: old_index から new_index へreindexを実行します。この間もアプリケーションからの書き込みは old_index に対して行われます。
  3. 追いつき期間(オプション): reindex中に old_index に書き込まれた新しいドキュメントや更新されたドキュメントを new_index に反映させる必要があります。これは、reindexを複数回実行して差分を取り込むか、アプリケーション側で二重書き込みを行うなどの方法で実現します。最も簡単なのは、最初のreindex完了後、短い期間だけアプリケーションからの書き込みを停止し、その間に発生した差分を手動または別のreindexタスクでコピーする方法です。または、reindex完了直前のわずかな時間で発生した更新は許容する、というポリシーもあり得ます。ダウンタイムゼロを目指す場合は、継続的な差分同期メカニズム(例えば、Kafkaなどのメッセージキュー経由で新しいドキュメントを両方のインデックスに書き込む)を導入する必要があるかもしれません。
  4. エイリアスの切り替え: reindexが完了し、new_indexold_index と同期された状態になったら、アプリケーションが参照しているエイリアスを old_index から new_index にアトミックに切り替えます。
    bash
    POST /_aliases
    {
    "actions": [
    { "remove": { "alias": "my_data_alias", "index": "old_index" }},
    { "add": { "alias": "my_data_alias", "index": "new_index" }}
    ]
    }

    この操作は非常に高速で、アプリケーションからの検索・インデックスリクエストはほぼ瞬時に新しいインデックスに向けられるようになります。アプリケーションは常にエイリアス名を使用している必要があります。
  5. 古いインデックスの削除: 新しいインデックスでの運用が安定したことを確認してから、古いインデックス (old_index) を削除します。

この戦略は「ブルー/グリーンデプロイメント」に類似しており、非常に効果的です。

reindex以外のデータ再構築方法との比較

Elasticsearchにはreindex以外にもデータを操作するAPIがいくつかあります。それぞれ目的や得意なことが異なるため、状況に応じて適切な方法を選択する必要があります。

Update by Query API

指定したクエリに一致するドキュメントをインプレースで更新するAPIです。

  • 目的: 既存のインデックス内のドキュメントを、元のインデックス形式を維持したまま更新したい場合。マッピングの変更はできません。
  • reindexとの違い: reindexがインデックス間でデータをコピーするのに対し、Update by Queryは同じインデックス内でデータを更新します。マッピング変更を伴う再構築には使えません。
  • ユースケース: フィールドの値の一括変更、スクリプトを使ったデータの部分的修正など。

Delete by Query API

指定したクエリに一致するドキュメントをインプレースで削除するAPIです。

  • 目的: 既存のインデックスから不要なドキュメントを一括で削除したい場合。
  • reindexとの違い: reindexはデータをコピーし、Delete by Queryはデータを削除します。組み合わせて使うことで、特定のドキュメントを削除しながら別のインデックスにコピーする、といった操作も可能です(reindex時に scriptctx.op = 'delete' を使うのと似ています)。

Bulk API

複数のインデックス/削除/更新操作をまとめて一度のリクエストで行うためのAPIです。

  • 目的: クライアントからElasticsearchに大量のドキュメントを効率的に投入したい場合。
  • reindexとの違い: reindexはElasticsearchクラスター内でサーバーサイドで実行されるタスクとしてドキュメントをコピーするのに対し、Bulk APIはクライアントアプリケーションがデータソースからドキュメントを読み込み、加工し、BulkリクエストとしてElasticsearchに送信します。reindexの内部でもBulk APIが利用されています。
  • ユースケース: アプリケーションからの定常的なデータ投入、ETLツールからのデータロードなど。reindexと同様の処理をクライアント側で行うことも可能ですが、大規模なデータや長時間の処理にはreindexの方が適しています。

Ingest Node

ドキュメントがインデックスされる前に、インジェストパイプラインを使ってデータを変換する機能です。

  • 目的: データ投入時にリアルタイムでデータの形式変換、エンリッチメント、フィルタリングなどを行いたい場合。
  • reindexとの違い: reindexは既存のインデックスのデータを事後的に再構築するのに対し、Ingest NodeはデータがElasticsearchに取り込まれる「前」に変換処理を行います。
  • ユースケース: ログデータのパース、IPアドレスからの位置情報追加、不要なフィールドの削除など、データ取り込み時の前処理。reindexの script パラメータで行うデータ変換と似たことができますが、Ingest Nodeはより専用のプロセッサーが用意されており、パフォーマンスが高い場合があります。reindexとIngest Nodeを組み合わせて、reindex時にIngest Pipelineを指定することも可能です。
    json
    POST _reindex
    {
    "source": { "index": "my_raw_data" },
    "dest": { "index": "my_processed_data", "pipeline": "my_ingest_pipeline" }
    }

Snapshot/Restore API

クラスターやインデックスのスナップショット(バックアップ)を取得し、それを別のクラスターにリストアする機能です。

  • 目的: クラスター全体のバックアップとリカバリ、大規模なクラスター間データ移行。
  • reindexとの違い: Snapshot/Restoreはインデックス形式をそのままコピー・移動するため、メジャーバージョンが異なるクラスター間での直接的なデータ移行には使えない場合があります(互換性がある場合を除く)。また、データ変換を行うことはできません。reindexはドキュメントレベルで読み書きするため、バージョン互換性の問題が少なく、データ変換も可能です。
  • ユースケース: 定期的なバックアップ、ディザスタリカバリ、同じバージョン間のクラスター移行。大規模なデータ移行では、reindexよりも高速な場合があります。

それぞれの機能の特性を理解し、目的と要件に合った最適な方法を選択することが重要です。マッピングや設定変更を伴うデータ再構築のほとんどのシナリオでは、reindexが最も適したツールとなります。

まとめ

Elasticsearchのreindex機能は、インデックスのマッピングや設定変更、バージョンアップグレード、データ変換、クラスター間移行など、様々なデータ再構築タスクにおいて中心的な役割を担います。ソースインデックスからドキュメントを効率的に読み取り、必要に応じてスクリプトで変換し、デスティネーションインデックスに書き込むことで、柔軟かつ強力なデータ操作を実現します。

本ガイドでは、reindexの基本概念から始め、APIの各パラメータの詳細な使い方、script によるデータ変換、slices による並列化、リモートreindex、そしてパフォーマンス最適化やトラブルシューティングについて網羅的に解説しました。

成功のためのポイント:

  • 計画: reindexの目的(マッピング変更、シャード数変更など)を明確にし、新しいインデックスのマッピングと設定を事前に適切に定義する。
  • テスト: 小規模なデータセットや開発環境でreindexプロセスを事前にテストし、特にスクリプトの動作やパフォーマンスを確認する。
  • リソースの確認: reindex実行前に、クラスターのディスク容量、メモリ、CPUリソースに十分な余裕があることを確認する。
  • 監視: 非同期実行 (wait_for_completion=false) を使用し、_tasks APIやモニタリングツールで進捗とクラスターリソースを監視する。
  • ダウンタイム最小化: エイリアスを使ったアトミックな切り替え戦略を採用し、アプリケーションへの影響を最小限に抑える。
  • スクリプトの注意: スクリプトは強力だが、パフォーマンスへの影響やエラーの可能性を理解し、慎重に使用する。

reindexは大規模なデータ操作であり、クラスターに負荷をかける可能性があります。計画、テスト、監視を怠らず、本ガイドで解説したパフォーマンスチューニングやトラブルシューティングの知識を活用することで、安全かつ効率的にデータ再構築を実行できます。

Elasticsearchを運用する上で、reindexは避けて通れない重要な機能の一つです。この完全ガイドが、皆様のElasticsearchにおけるデータ再構築タスクの成功の一助となれば幸いです。

コメントする

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

上部へスクロール