PythonでElasticsearchを操作する!入門編


PythonでElasticsearchを操作する!入門編

はじめに:ElasticsearchとPythonの出会い

現代のデータ活用において、「検索」と「分析」は非常に重要な要素です。大量のログデータから特定のイベントを探したり、顧客の行動データを分析して傾向を掴んだり、Webサイトのコンテンツを高速に検索可能にしたりと、様々なシーンでこれらの機能が求められています。

ここでその力を発揮するのがElasticsearchです。Elasticsearchは、RESTful APIを備えた、スケーラブルなオープンソースの分散型検索・分析エンジンです。ペタバイト級のデータであっても、驚異的な速度で検索や集計を行うことができます。その柔軟性とパフォーマンスから、ログ分析(Elastic Stackの中心)、リアルタイムアプリケーション監視、セキュリティ分析、全文検索エンジンなど、幅広い用途で利用されています。

そして、この強力なElasticsearchをプログラミングで操作する際に、最もよく使われる言語の一つがPythonです。Pythonは、その簡潔なシンタックス、豊富なライブラリ、そして強力なデータ処理能力から、多くの開発者やデータサイエンティストに愛されています。Pythonの柔軟性とElasticsearchのパワーを組み合わせることで、高度なデータ検索・分析アプリケーションを効率的に開発することが可能になります。

この記事では、Pythonを使ってElasticsearchを操作するための基本的な知識と実践的な方法を、初心者の方にも分かりやすく詳細に解説していきます。具体的には、以下の内容を学びます。

  • Elasticsearchの基本的な概念
  • Pythonクライアントライブラリ elasticsearch-py の導入と使い方
  • インデックスの作成、ドキュメントのCRUD操作(作成、読み込み、更新、削除)
  • 基本的な検索(全文検索、条件検索)の方法
  • より高度な検索クエリ(Query DSL)の活用
  • データの集計(Aggregations)による分析
  • インデックスのマッピングとアナライザーの役割

この記事を最後まで読めば、Pythonを使ってElasticsearchにデータを投入し、必要な情報を検索・分析するための基礎がしっかりと身につくでしょう。さあ、ElasticsearchとPythonの世界へ踏み出しましょう!

第1部:Elasticsearchの基本概念

Pythonによる操作に入る前に、Elasticsearchがどのような仕組みで動いているのか、その基本的な概念を理解しておくことが重要です。データベースとの比較を交えながら見ていきましょう。

1. クラスター、ノード、インデックス、ドキュメント、フィールド

  • クラスター (Cluster): Elasticsearchの最も基本的な単位で、1つ以上のノードの集合体です。データはクラスター全体に分散され、検索や分析のリクエストはクラスター全体で処理されます。高可用性やスケーラビリティを実現するために、複数のノードでクラスターを構成するのが一般的です。
  • ノード (Node): Elasticsearchを実行している単一のサーバーインスタンスです。クラスター内の各ノードは連携してデータの格納や検索を行います。
  • インデックス (Index): 関連性のあるドキュメントの集合体です。データベースにおける「テーブル」に似ていますが、NoSQLの特性を持ち、スキーマの柔軟性が高いのが特徴です。(Elasticsearch 7.x以降では、タイプ(Type)の概念は非推奨または廃止され、通常はインデックス内に多様なドキュメントを格納します。この記事では、インデックス名だけで操作する方法を前提とします。)
  • ドキュメント (Document): Elasticsearchに格納される最小単位のデータです。JSON形式で表現されます。データベースにおける「行」や、JSONデータベースにおける「ドキュメント」に相当します。各ドキュメントはインデックス内に格納され、一意のIDを持ちます。
  • フィールド (Field): ドキュメント内の個々のデータ項目です。JSONにおけるキーと値のペアの「キー」に相当します。データベースにおける「列」に似ています。Elasticsearchは各フィールドのデータ型(文字列、数値、日付など)を認識し、適切な方法でインデックス化します。

これらの概念の関係を図で表すと、クラスター > ノード > インデックス > ドキュメント > フィールド、のようになります。

2. RESTful API

Elasticsearchは、すべての操作をRESTful API経由で提供しています。これは、HTTPメソッド(GET, POST, PUT, DELETEなど)とURLを使って、Elasticsearchクラスターとやり取りすることを意味します。Pythonクライアントライブラリ elasticsearch-py も、内部的にはこのRESTful APIを呼び出しています。このアーキテクチャのおかげで、どのプログラミング言語からでもElasticsearchを容易に操作できます。

3. シャードとレプリカ

Elasticsearchが高速かつスケーラブル、そして高可用性を持つ秘密は、シャード (Shard)レプリカ (Replica) の概念にあります。

  • シャード (Shard): インデックスは、物理的に複数のシャードに分割されます。各シャードは独立したLuceneインデックスであり、Elasticsearchクラスター内の任意のノードに配置できます。インデックスを複数のシャードに分割することで、大規模なインデックスを複数のノードに分散して格納・処理できるため、スケールアウトや並列処理が可能になり、検索速度が向上します。シャードは「プライマリシャード」と呼ばれます。
  • レプリカ (Replica): プライマリシャードの複製です。レプリカシャードは、プライマリシャードとは異なるノードに配置されます。レプリカを持つことで、ノード障害が発生した場合でもデータの損失を防ぎ(高可用性)、検索リクエストを処理するシャードの数を増やせるため検索スループットも向上します(スケーラビリティ)。レプリカの数はインデックス作成時に設定できます。

例えば、3つのプライマリシャードと各シャードの1つのレプリカを持つインデックスは、合計で 3 (プライマリ) + 3 * 1 (レプリカ) = 6 つのシャードで構成されます。

4. 転置インデックス (Inverted Index)

Elasticsearchが高速な全文検索を実現している核となる技術が「転置インデックス」です。一般的なデータベース(リレーショナルデータベースなど)は「順方向インデックス」を使用しており、これはドキュメント(行)を主キーで引いて、そこに含まれる内容を取得する仕組みです。

一方、転置インデックスは、ドキュメントに含まれるすべての「単語」をキーとして、その単語が出現するドキュメントのリストを値として保持します。

例:
ドキュメント1: “Elasticsearch is powerful”
ドキュメント2: “Python is powerful and flexible”

転置インデックス:
“Elasticsearch” -> [ドキュメント1]
“is” -> [ドキュメント1, ドキュメント2]
“powerful” -> [ドキュメント1, ドキュメント2]
“Python” -> [ドキュメント2]
“and” -> [ドキュメント2]
“flexible” -> [ドキュメント2]

ユーザーが「powerful」という単語を検索すると、Elasticsearchはこの転置インデックスを参照して、即座に「ドキュメント1」と「ドキュメント2」がヒットすることを見つけ出します。これにより、全文検索が非常に高速に行えます。

テキストをどのように「単語」に分割するか、大文字小文字を区別するか、ステミング(単語の語幹への変換)を行うかなどは、「アナライザー」と呼ばれる仕組みが担当します。(これについては後述します。)

第2部:Python環境の準備

Elasticsearchの基本概念を理解したところで、Pythonから操作するための準備を始めましょう。

1. Pythonのインストール

これはこの記事の前提とします。公式ウェブサイトからPython 3.6以上のバージョンをインストールしておいてください。

2. elasticsearch ライブラリのインストール

PythonからElasticsearchを操作するためには、公式クライアントライブラリである elasticsearch-py を使用するのが最も一般的です。pipを使って簡単にインストールできます。

ターミナルまたはコマンドプロンプトを開き、以下のコマンドを実行してください。

bash
pip install elasticsearch

最新バージョンをインストールすることをお勧めします。特定のバージョンを指定したい場合は、例えば pip install elasticsearch==7.17.0 のようにバージョン番号を指定します。

3. Elasticsearchサーバーの準備

Pythonから接続するためには、稼働中のElasticsearchサーバーが必要です。いくつか方法があります。

  • ローカルでの実行: Elasticsearchの公式ウェブサイトからダウンロードして、ローカルマシンで直接実行します。展開後、binディレクトリにある実行ファイルを実行します。
  • Dockerの利用: Dockerを使えば、簡単にElasticsearchコンテナを起動できます。これは開発やテスト環境で非常に便利です。

    bash
    docker pull elasticsearch:7.17.0 # または利用したいバージョン
    docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.17.0

    (注意: 本番環境では discovery.type=single-node は使用しないでください。)

  • Elastic Cloudの利用: Elastic社が提供するマネージドサービスです。手軽に試すためのトライアル期間が用意されています。認証情報などが必要になります。

この記事では、Elasticsearchがデフォルトのポート(9200番)で、認証なしでローカルホスト (localhost:9200) で稼働していることを前提に進めます。必要に応じて接続情報を適宜読み替えてください。

第3部:Pythonクライアント elasticsearch-py の使い方(基本操作)

elasticsearch-py ライブラリを使って、Elasticsearchに対する基本的な操作(インデックスの作成、ドキュメントのCRUDなど)を行ってみましょう。

1. クライアントの初期化と接続

まず、elasticsearch ライブラリをインポートし、Elasticsearchクラスのインスタンスを作成します。これがElasticsearchクラスターとの接続を管理するクライアントオブジェクトになります。

“`python
from elasticsearch import Elasticsearch

デフォルト設定 (localhost:9200) で接続

host=’localhost’, port=9200 はデフォルトなので省略可能

client = Elasticsearch(
cloud_id=”YOUR_CLOUD_ID”, # クラウドを使用する場合
api_key=(“id”, “api_key”), # APIキー認証の場合
# その他の接続設定…
# hosts=[“localhost:9200”, “other_host:9201”], # 複数のホストを指定する場合
# http_auth=(“user”, “password”), # Basic認証の場合
# use_ssl=True, # SSL/TLSを使用する場合
# verify_certs=True,
# ca_certs=”/path/to/ca.crt”,
)

接続確認

try:
client.ping()
print(“Connected to Elasticsearch”)
except Exception as e:
print(f”Connection failed: {e}”)

“`

基本的には Elasticsearch() だけでローカルホストへの接続を試みます。Elastic Cloudや認証が必要な場合は、対応する引数を指定します。ping() メソッドで接続を確認できます。

クライアントオブジェクトはスレッドセーフなので、通常はアプリケーション全体で一つのインスタンスを共有して使用します。

2. インデックスの作成と削除

データを格納するためには、まずインデックスを作成する必要があります。client.indices オブジェクトを使ってインデックス関連の操作を行います。

インデックスの作成:

“`python

インデックス名

index_name = “my_first_index”

インデックスが存在するか確認し、存在しなければ作成

if not client.indices.exists(index=index_name):
try:
# インデックスを作成
response = client.indices.create(index=index_name)
print(f”Index ‘{index_name}’ created.”)
print(response)
except Exception as e:
print(f”Error creating index ‘{index_name}’: {e}”)
else:
print(f”Index ‘{index_name}’ already exists.”)

``client.indices.exists(index=index_name)でインデックスの存在を確認できます。client.indices.create(index=index_name)` でインデックスを作成します。成功するとレスポンスが返ります。

インデックスの削除:

“`python

削除するインデックス名

index_to_delete = “my_first_index”

インデックスが存在するか確認し、存在すれば削除

if client.indices.exists(index=index_to_delete):
try:
# インデックスを削除
response = client.indices.delete(index=index_to_delete)
print(f”Index ‘{index_to_delete}’ deleted.”)
print(response)
except Exception as e:
print(f”Error deleting index ‘{index_to_delete}’: {e}”)
else:
print(f”Index ‘{index_to_delete}’ does not exist.”)

``client.indices.delete(index=index_to_delete)` でインデックスを削除できます。一度削除したインデックス内のデータは復旧できないので注意してください。

3. ドキュメントのCRUD操作

インデックスができたので、いよいよドキュメント(データ)を操作してみましょう。

ドキュメントのインデックス(追加/更新):client.index()

client.index() メソッドは、ドキュメントを指定したインデックスに追加または更新するために使用します。

“`python

追加/更新するインデックス名

index_name = “my_data_index”

ドキュメントデータ(Python辞書)

doc_data = {
“title”: “PythonでElasticsearch入門”,
“author”: “GenaIKun”,
“published_date”: “2023-10-27”,
“content”: “この記事はPythonからElasticsearchを操作する基本的な方法を解説しています。”,
“tags”: [“python”, “elasticsearch”, “入門”, “プログラミング”]
}

ドキュメントをインデックスに追加(IDはElasticsearchが自動生成)

try:
response = client.index(
index=index_name,
# id=None, # IDを省略すると自動生成
document=doc_data # bodyパラメータの代わりにdocumentパラメータを使用 (>=7.14)
)
print(f”Document indexed successfully with ID: {response[‘_id’]}”)
print(response)
except Exception as e:
print(f”Error indexing document: {e}”)

IDを指定してドキュメントを追加または更新

IDが既に存在すれば更新、存在しなければ追加 (PUT相当)

doc_id = “article_001”
doc_data_with_id = {
“title”: “PythonとElasticsearchの連携”,
“author”: “GenaIKun”,
“published_date”: “2023-11-01”,
“content”: “Pythonクライアントを使った高度な連携方法を探求します。”,
“tags”: [“python”, “elasticsearch”, “連携”, “応用”]
}

try:
response_with_id = client.index(
index=index_name,
id=doc_id,
document=doc_data_with_id
)
print(f”Document indexed successfully with ID: {response_with_id[‘_id’]}”)
print(response_with_id)
except Exception as e:
print(f”Error indexing document with ID {doc_id}: {e}”)

``indexパラメータで対象のインデックス名を指定します。documentパラメータに、追加または更新したいドキュメントデータをPython辞書で渡します。idパラメータを指定しない場合、ElasticsearchはドキュメントごとにユニークなIDを自動生成します。id` パラメータを指定した場合、そのIDのドキュメントが存在すれば上書き更新(PUT相当)、存在しなければ新規追加となります。

ドキュメントの取得:client.get()

特定のIDを持つドキュメントを取得するには、client.get() メソッドを使用します。

“`python

取得するドキュメントのIDとインデックス名

doc_id_to_get = “article_001”
index_name = “my_data_index”

try:
response = client.get(index=index_name, id=doc_id_to_get)
print(f”Document found:”)
print(response[‘_source’]) # ‘_source’ に元のドキュメントデータが含まれる
print(f”Version: {response[‘_version’]}”) # ドキュメントのバージョン
except Exception as e:
print(f”Error getting document with ID {doc_id_to_get}: {e}”)
# 例: elasticsearch.exceptions.NotFoundError: ドキュメントが見つからない場合

``indexidを指定して呼び出します。
成功すると、ドキュメントのメタデータ(
_index,_id,_versionなど)と、元のドキュメントデータが格納されている_sourceフィールドを含むレスポンスが返ります。
指定したIDのドキュメントが存在しない場合は、
elasticsearch.exceptions.NotFoundError` 例外が発生します。

ドキュメントの更新:client.update()

既存のドキュメントの一部を更新するには、client.update() メソッドを使用します。client.index() でIDを指定して上書きすることも可能ですが、update() は部分更新に適しています。

“`python

更新するドキュメントのIDとインデックス名

doc_id_to_update = “article_001”
index_name = “my_data_index”

更新データ

doc パラメータに更新したいフィールドのみ含む辞書を渡す

update_data = {
“doc”: {
“published_date”: “2023-11-02”, # 日付を更新
“status”: “published” # 新しいフィールドを追加
}
}

または script パラメータを使ってスクリプトで更新

update_script = {

“script”: {

“source”: “ctx._source.views = ctx._source.views + params.count”,

“lang”: “painless”,

“params”: { “count”: 1 }

}

}

try:
response = client.update(
index=index_name,
id=doc_id_to_update,
body=update_data # または update_script
)
print(f”Document with ID {doc_id_to_update} updated successfully.”)
print(response)
except Exception as e:
print(f”Error updating document with ID {doc_id_to_update}: {e}”)

``update()メソッドでは、docパラメータの中に更新したいフィールドとその値を指定します。既存のフィールドは指定した値で上書きされ、指定されなかったフィールドはそのまま保持されます。docパラメータに存在しないフィールドを指定すると、そのフィールドが新規に追加されます。
より複雑な更新(例: カウンターのインクリメント、配列への要素追加)を行いたい場合は、
script` パラメータに Elasticsearchのスクリプト言語(デフォルトはPainless)を指定します。

ドキュメントの削除:client.delete()

特定のIDを持つドキュメントを削除するには、client.delete() メソッドを使用します。

“`python

削除するドキュメントのIDとインデックス名

doc_id_to_delete = “article_001”
index_name = “my_data_index”

try:
response = client.delete(index=index_name, id=doc_id_to_delete)
print(f”Document with ID {doc_id_to_delete} deleted successfully.”)
print(response)
except Exception as e:
print(f”Error deleting document with ID {doc_id_to_delete}: {e}”)
# 例: elasticsearch.exceptions.NotFoundError: ドキュメントが見つからない場合

``indexidを指定して呼び出します。
成功するとレスポンスが返ります。存在しないドキュメントを削除しようとすると
elasticsearch.exceptions.NotFoundError` 例外が発生します。

4. バルク操作:client.bulk()

多数のドキュメントに対して一括でインデックス、更新、削除などの操作を行いたい場合、個別のリクエストを繰り返すのは非効率です。このような場合にバルク操作が役立ちます。client.bulk() メソッドを使うと、複数の操作をまとめて1つのリクエストとして送信できます。これにより、ネットワークラウンドトリップの回数を劇的に減らし、パフォーマンスを向上させることができます。

バルク操作のデータ形式は少し特殊で、各操作(インデックス、更新、削除など)とその対象ドキュメントのメタデータ(_index, _idなど)、そして操作内容(更新データなど)を、JSON形式の複数行データとして連結して作成します。Pythonクライアントでは、この形式をリスト形式で渡すことができます。リストの各要素は、操作を定義する辞書と、必要に応じてドキュメントデータを定義する辞書です。

“`python

バルク操作用のデータリストを作成

各要素は操作を定義する辞書 + (必要に応じて) ドキュメントデータ

bulk_data = [
# ドキュメント追加 (ID自動生成)
{“index”: {“_index”: index_name}},
{“title”: “記事 その2”, “author”: “Author B”, “tags”: [“elasticsearch”, “bulk”]},

# ドキュメント追加 (ID指定)
{"index": {"_index": index_name, "_id": "article_002"}},
{"title": "記事 その3", "author": "Author C", "tags": ["python"]},

# ドキュメント更新 (ID指定)
{"update": {"_index": index_name, "_id": "article_002"}},
{"doc": {"title": "記事 その3 (更新)", "status": "updated"}}, # 更新データは doc キーの下

# ドキュメント削除 (ID指定)
{"delete": {"_index": index_name, "_id": "some_other_id_to_delete"}}, # 削除するIDが存在しない場合もエラーにならない(結果に失敗として含まれる)

]

バルク操作の実行

try:
response = client.bulk(body=bulk_data)
print(“Bulk operation executed.”)
# レスポンスには各操作の成否が含まれる
# errors: Trueの場合、いずれかの操作が失敗している
if response[“errors”]:
print(“Some bulk operations failed:”)
# 失敗した操作の詳細を確認するにはレスポンスをパースする必要がある
for item in response[“items”]:
# 例えば index 操作の失敗を確認
if “index” in item and item[“index”][“status”] >= 400:
print(f” Index operation failed for ID {item[‘index’][‘_id’]}: {item[‘index’][‘error’][‘type’]} – {item[‘index’][‘error’][‘reason’]}”)
# 例えば update 操作の失敗を確認
elif “update” in item and item[“update”][“status”] >= 400:
print(f” Update operation failed for ID {item[‘update’][‘_id’]}: {item[‘update’][‘error’][‘type’]} – {item[‘update’][‘error’][‘reason’]}”)
# 例えば delete 操作の失敗を確認
elif “delete” in item and item[“delete”][“status”] >= 400:
print(f” Delete operation failed for ID {item[‘delete’][‘_id’]}: {item[‘delete’][‘error’][‘type’]} – {item[‘delete’][‘error’][‘reason’]}”)

else:
    print("All bulk operations succeeded.")

except Exception as e:
print(f”Error during bulk operation: {e}”)

``
バルクデータのリストは、
{“operation”: {…}}{“document_data”: {…}}のペア、または削除のようにデータが不要な場合は{“operation”: {…}}のみで構成されます。operationキーにはindex,create,update,deleteのいずれかを指定します。_index,_idなどは操作を特定するためのメタデータとして操作定義辞書に含めます。update操作の場合、更新データはdocキーの下に指定します。client.bulk()bodyパラメータにこのリストを渡します。
バルク操作のレスポンスには、全体が成功したかどうかの
errorsフラグと、各操作の結果を含むitemsリストが含まれます。個別の操作の失敗はerrorsフラグとitems` リストを確認して判断する必要があります。

5. エラーハンドリング

Elasticsearchとの通信や操作中に様々なエラーが発生する可能性があります。ネットワークエラー、認証エラー、インデックスやドキュメントが存在しない、クエリのシンタックスエラーなどです。elasticsearch-py は、これらのエラーを elasticsearch.exceptions モジュールで定義された例外クラスとして発生させます。適切なエラーハンドリングを行うことで、アプリケーションの堅牢性を高めることができます。

主要な例外クラス:
* elasticsearch.exceptions.ConnectionError: Elasticsearchへの接続に失敗した場合。
* elasticsearch.exceptions.TransportError: HTTPリクエストの送信中にエラーが発生した場合(ネットワーク問題など)。
* elasticsearch.exceptions.ApiError: Elasticsearch APIがエラーを返した場合。
* elasticsearch.exceptions.NotFoundError: 存在しないインデックスやドキュメントにアクセスしようとした場合(HTTPステータスコード 404)。
* elasticsearch.exceptions.RequestError: リクエストボディ(クエリなど)に問題がある場合(HTTPステータスコード 400)。
* elasticsearch.exceptions.AuthenticationException: 認証に失敗した場合(HTTPステータスコード 401)。

“`python
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ApiError, NotFoundError, ConnectionError

client = Elasticsearch() # 接続設定は省略

index_name = “non_existent_index”
doc_id = “some_id”

try:
# 存在しないインデックスにドキュメントを追加しようとする
client.index(index=index_name, id=doc_id, document={“key”: “value”})
print(“Document indexed (unexpected).”)
except NotFoundError:
print(f”Caught NotFoundError: Index ‘{index_name}’ does not exist.”)
except ConnectionError:
print(“Caught ConnectionError: Could not connect to Elasticsearch.”)
except ApiError as e:
print(f”Caught ApiError: Status {e.status_code}, Error: {e.error}, Info: {e.info}”)
except Exception as e:
print(f”Caught unexpected error: {type(e).name}: {e}”)

try:
# 存在しないドキュメントを取得しようとする
client.get(index=”my_data_index”, id=”non_existent_id”)
print(“Document retrieved (unexpected).”)
except NotFoundError:
print(“Caught NotFoundError: Document does not exist.”)
except ApiError as e:
print(f”Caught ApiError: Status {e.status_code}, Error: {e.error}, Info: {e.info}”)
except Exception as e:
print(f”Caught unexpected error: {type(e).name}: {e}”)

``
重要な操作を行う際には、関連する例外を捕捉する
try…exceptブロックを使用することが推奨されます。特にNotFoundErrorは、存在しないリソースへのアクセスを示す一般的なエラーなので、適切にハンドリングすることが多いです。ApiError` は包括的なエラークラスであり、特定のHTTPステータスコードやエラータイプに基づいて処理を分岐させたい場合に便利です。

第4部:検索の基本

Elasticsearchの真骨頂は検索です。インデックスに格納されたドキュメントを、様々な条件で検索する方法を学びましょう。

1. 全文検索の仕組みとアナライザー(再訪)

検索に入る前に、テキストデータの検索において重要な「アナライザー」についてもう少し詳しく見てみましょう。ドキュメントがインデックスされる際、テキストフィールドの内容は「アナライザー」によって処理されます。この処理には以下のステップが含まれるのが一般的です。

  1. 文字フィルター (Character Filters): テキストを処理する前に、特定文字の置換(例: HTMLタグの除去、全角/半角の変換)を行います。
  2. トークナイザー (Tokenizer): テキストを個々の「トークン」(単語やフレーズ)に分割します。どの文字を区切りとするか(スペース、句読点など)はトークナイザーの種類によって異なります。
  3. トークンフィルター (Token Filters): 生成されたトークンに対してさらに処理を行います。例として、小文字化(lowercase)、ストップワード(”the”, “is”, “a”など)の除去、シノニム(類義語)への変換、ステミング(単語の語幹への変換、例: “running” -> “run”)などがあります。

このように処理されて生成されたトークンが、転置インデックスに登録されます。

ユーザーが検索クエリを発行した際も、通常は検索文字列が同じアナライザーで処理され、生成されたトークンを使って転置インデックスが検索されます。インデックス時と検索時で同じアナライザーを使用することで、期待通りの検索結果が得られます。

Elasticsearchには様々な組み込みアナライザーが用意されています。例えば、standard アナライザーは、スペースや句読点で区切り、小文字化を行います。日本語の場合は、単語の区切り方が複雑なため、特別なアナライザー(例: Kuromojiプラグイン)が必要です。

2. 簡単な検索クエリ:client.search()

Pythonで検索を行うには、client.search() メソッドを使用します。最も基本的な検索は、インデックス内のすべてのドキュメントを取得する match_all クエリです。

“`python

検索対象のインデックス名

index_name = “my_data_index”

全てのドキュメントを検索 (match_all クエリ)

try:
response = client.search(
index=index_name,
query={“match_all”: {}} # Query DSL をボディとして渡す
)

print(f"Total documents found: {response['hits']['total']['value']}") # 検索結果の総件数
print("Hits:")
for hit in response['hits']['hits']:
    print(f"  ID: {hit['_id']}, Score: {hit['_score']}, Source: {hit['_source']}")

except Exception as e:
print(f”Error during search: {e}”)

``indexパラメータで検索対象のインデックスを指定します。複数のインデックスを指定することも可能です(例:“index1,index2”または[“index1”, “index2”])。queryパラメータに検索クエリをQuery DSLというJSON形式の辞書で渡します。{“match_all”: {}}は、インデックス内の全てのドキュメントにマッチする最も単純なクエリです。
検索結果はレスポンス辞書の
[‘hits’][‘hits’]にリストとして含まれます。各要素はヒットしたドキュメントを表す辞書で、_id_score(関連性スコア)、_source(元のドキュメントデータ)などの情報を含みます。[‘hits’][‘total’][‘value’]` は、検索条件にマッチしたドキュメントの総数を返します。

単語による検索:match クエリ

特定のフィールドに特定の単語が含まれるドキュメントを検索したい場合は、match クエリを使用します。これは全文検索で最もよく使われるクエリの一つです。

“`python

検索対象のインデックス名

index_name = “my_data_index”

‘content’ フィールドに ‘Python’ という単語を含むドキュメントを検索

search_term = “Python”

try:
response = client.search(
index=index_name,
query={
“match”: {
“content”: search_term # フィールド名と検索語を指定
}
}
)

print(f"Documents containing '{search_term}': {response['hits']['total']['value']}")
for hit in response['hits']['hits']:
    print(f"  ID: {hit['_id']}, Score: {hit['_score']}, Title: {hit['_source']['title']}")

except Exception as e:
print(f”Error during search: {e}”)

``{“match”: {“field_name”: “search_text”}}` の形式でクエリを指定します。Elasticsearchは指定されたフィールドに対して、アナライザーを使って検索語を処理し、転置インデックスを使って検索を行います。検索語が複数の単語で構成されている場合(例: “Python Elasticsearch”)、デフォルトではこれらの単語のいずれかがフィールドに含まれていればヒットします(OR結合のような動作)。より厳密な一致(AND結合やフレーズ検索)を指定することも可能ですが、それはQuery DSLの詳細で説明します。

完全一致検索:term クエリ

match クエリがアナライザーを通した全文検索であるのに対し、term クエリは指定された単語と完全に一致するドキュメントを検索します。これは通常、アナライズされないフィールド(例: マッピングで keyword 型として定義されたフィールド)に対して使用されます。例えば、タグ、ID、ステータスなどの正確な値を検索したい場合に適しています。

“`python

検索対象のインデックス名

index_name = “my_data_index”

‘tags’ フィールドに完全に ‘python’ という単語が含まれるドキュメントを検索

注意: ‘tags’ フィールドが keyword 型またはそれに準ずる形でインデックスされている必要がある

search_tag = “python”

try:
response = client.search(
index=index_name,
query={
“term”: {
“tags”: search_tag # フィールド名と完全一致させたい単語を指定
}
}
)

print(f"Documents tagged with '{search_tag}': {response['hits']['total']['value']}")
for hit in response['hits']['hits']:
    print(f"  ID: {hit['_id']}, Score: {hit['_score']}, Title: {hit['_source']['title']}")

except Exception as e:
print(f”Error during search: {e}”)

``{“term”: {“field_name”: “exact_term”}}の形式で指定します。termクエリは、検索語をアナライズせずに転置インデックスを直接参照します。そのため、検索語はインデックスされているトークンと完全に一致している必要があります。例えば、フィールドに "Python" と格納されていても、検索語が "python" (小文字) だと、デフォルトのアナライザー(小文字化する)が適用されたmatchクエリではヒットしますが、termクエリではヒットしません。テキストフィールドに対してtermクエリを使うと、期待通りの結果が得られないことが多いので注意が必要です。通常はkeyword` 型のフィールドに使用します。

3. 検索結果の取得と処理

client.search() メソッドのレスポンスは辞書形式で、以下のような主要なキーを含みます。

  • took: 検索にかかった時間(ミリ秒)。
  • timed_out: 検索がタイムアウトしたかどうか(ブール値)。
  • _shards: 検索に関与したシャードの情報。
  • hits: 検索結果の主要部分。
    • total: 検索条件にマッチしたドキュメントの総数。value (件数) と relation (“eq”または”gte”) を含む辞書です。大規模データでは正確な件数ではなく下限値が返される場合があります。
    • max_score: ヒットした中で最も高い関連性スコア。
    • hits: ヒットしたドキュメントのリスト。各要素は以下の情報を持つ辞書です。
      • _index: ドキュメントが格納されているインデックス名。
      • _id: ドキュメントのID。
      • _score: このドキュメントの関連性スコア。クエリとの関連性の高さを表し、デフォルトではこのスコアでソートされます。
      • _source: ドキュメントの元のデータ(インデックスされた時のJSONデータ)。デフォルトで含まれますが、_source パラメータで取得するフィールドを制限したり、除外したりすることも可能です。

Pythonコードでは、このレスポンス辞書から必要な情報を取り出して処理します。

“`python
response = client.search(index=”my_data_index”, query={“match_all”: {}})

総件数

total_hits = response[‘hits’][‘total’][‘value’]
print(f”Total hits: {total_hits}”)

ヒットしたドキュメントをリストで取得

documents = response[‘hits’][‘hits’]

各ドキュメントの情報を表示

for doc in documents:
doc_id = doc[‘_id’]
score = doc[‘_score’]
source_data = doc[‘_source’] # Python辞書として取得される

print(f"ID: {doc_id}, Score: {score}")
print(f"  Title: {source_data.get('title', 'N/A')}") # 安全にフィールドを取得
print(f"  Tags: {', '.join(source_data.get('tags', []))}")
# その他のフィールドも source_data から取得・表示可能

“`

4. ページネーション (from, size)

デフォルトでは、client.search() は最大10件のドキュメントを返します。より多くのドキュメントを取得したり、検索結果をページ分けして表示したりするには、from パラメータと size パラメータを使用します。

  • size: 1ページあたりのヒット件数。デフォルトは10。
  • from: 結果の取得を開始するオフセット(スキップする件数)。デフォルトは0。

例えば、2ページ目(1ページあたり10件)の結果を取得するには、size=10, from=10 と指定します。

“`python
index_name = “my_data_index”
page_size = 5 # 1ページあたり5件
page_number = 2 # 取得したいページ番号 (1から始まる)

オフセットを計算: (ページ番号 – 1) * ページサイズ

search_from = (page_number – 1) * page_size

try:
response = client.search(
index=index_name,
query={“match_all”: {}}, # 例として全てのドキュメントを対象
size=page_size,
from_=search_from # ‘from’ はPythonのキーワードなので ‘from_’ を使用
)

print(f"Total documents found: {response['hits']['total']['value']}")
print(f"Showing results for page {page_number} (from {search_from} to {search_from + len(response['hits']['hits']) -1})")
for hit in response['hits']['hits']:
     print(f"  ID: {hit['_id']}, Title: {hit['_source']['title']}")

except Exception as e:
print(f”Error during search: {e}”)

``from_パラメータを指定する際は、Pythonの予約語fromと衝突するため、アンダースコアを付けてfrom_` とする必要があります。

注意点: from/size を使った深いページネーション(例: from が数万件を超えるような場合)は、クラスターに大きな負荷をかけるため推奨されません。大量のデータを効率的にスクロールして取得したい場合は、scroll APIや search_after パラメータの使用を検討してください(これらは入門編の範囲を超えるためここでは詳しく扱いません)。

第5部:より高度な検索クエリ (Query DSL)

Elasticsearchの検索能力のほとんどは、JSON形式のクエリ言語であるQuery DSL (Domain Specific Language) を通じて提供されます。これまでに紹介した match_all, match, term もQuery DSLの一部です。ここでは、より複雑な検索条件を組み合わせるための bool クエリや、その他の便利なクエリタイプを見ていきましょう。

Query DSLは、client.search() メソッドの query パラメータに渡す辞書として記述します。基本的な構造は以下のようになります。

json
{
"query": {
"query_type": {
"parameter1": "value1",
"parameter2": "value2",
...
}
}
}

または、複合クエリの場合:
json
{
"query": {
"compound_query_type": {
"clause1": { ... },
"clause2": { ... },
...
}
}
}

1. 複合クエリ:bool クエリ

複数の検索条件を組み合わせたい場合に最もよく使用されるのが bool クエリです。bool クエリは、以下の4つの句(clause)を使って条件をAND、OR、NOTのように組み合わせることができます。

  • must: この句に含まれる全てのクエリにマッチするドキュメントが返されます。AND条件に相当します。スコア計算に関与します。
  • filter: この句に含まれる全てのクエリにマッチするドキュメントが返されます。AND条件に相当しますが、must とは異なり、スコア計算には関与しません。フィルタリングに適しており、キャッシュされるため通常 must よりも高速です。条件にマッチするかどうかだけを判定し、関連性のスコアは不要な場合に最適です。
  • should: この句に含まれるクエリのいずれか1つ以上にマッチするドキュメントが返されます。OR条件に相当します。マッチしたクエリの数が多いほど関連性スコアが高くなります。mustfilter 句がない場合、should 句のいずれかにマッチするドキュメントが全て返されます。mustfilter 句がある場合、should 句はスコア計算に影響を与えたり(ソフトなOR)、あるいは最小マッチ数を指定して必須にしたりする(ハードなOR)ために使用されます。
  • must_not: この句に含まれるクエリのいずれにもマッチしないドキュメントが返されます。NOT条件に相当します。スコア計算には関与しません。

これらの句を組み合わせて、柔軟な検索条件を表現できます。

例:mustfilter の組み合わせ

「タイトルに’Elasticsearch’が含まれ、かつ発行日が2023年以降のドキュメント」を検索する場合。タイトル検索は関連性が重要なので must、日付範囲は厳密な条件なので filter を使うのが適切です。

“`python
index_name = “my_data_index”

try:
response = client.search(
index=index_name,
query={
“bool”: {
“must”: [
# タイトルに ‘Elasticsearch’ が含まれる (match クエリ)
{“match”: {“title”: “Elasticsearch”}}
],
“filter”: [
# 発行日が 2023-01-01 以降 (range クエリ)
{“range”: {“published_date”: {“gte”: “2023-01-01”}}}
]
}
}
)

print(f"Bool query results: {response['hits']['total']['value']} documents found.")
for hit in response['hits']['hits']:
     print(f"  ID: {hit['_id']}, Score: {hit['_score']}, Title: {hit['_source']['title']}, Date: {hit['_source']['published_date']}")

except Exception as e:
print(f”Error during search: {e}”)

``mustfilter` 句の中には、複数のクエリをリストとして指定できます。指定された全てのクエリにドキュメントがマッチする必要があります。

例:should の組み合わせ

「タグに ‘python’ または ‘入門’ のいずれかが含まれるドキュメント」を検索する場合。

“`python
index_name = “my_data_index”

try:
response = client.search(
index=index_name,
query={
“bool”: {
“should”: [
# タグに ‘python’ が含まれる (term クエリ、tags が keyword 型と仮定)
{“term”: {“tags”: “python”}},
# タグに ‘入門’ が含まれる (term クエリ)
{“term”: {“tags”: “入門”}}
],
“minimum_should_match”: 1 # should句の少なくとも1つにマッチする必要がある (必須のOR)
}
}
)

print(f"Should query results: {response['hits']['total']['value']} documents found.")
for hit in response['hits']['hits']:
     print(f"  ID: {hit['_id']}, Score: {hit['_score']}, Title: {hit['_source']['title']}, Tags: {', '.join(hit['_source']['tags'])}")

except Exception as e:
print(f”Error during search: {e}”)

``should句を使用する際は、minimum_should_matchパラメータを指定することが多いです。これは、should句に含まれるクエリのうち、最低いくつにマッチすればドキュメントをヒットとするかを指定します。minimum_should_match: 1は一般的なOR条件として機能します。このパラメータを省略した場合、should句だけではデフォルトでヒットしない(スコアに影響を与えるだけ)か、あるいはmustfilterがない場合に全てのshould句のORとして機能するか、といった複雑な挙動になります。明示的にminimum_should_match` を指定するのが安全です。

例:must_not の利用

「タイトルに ‘Python’ が含まれるが、タグに ‘入門’ が含まれないドキュメント」を検索する場合。

“`python
index_name = “my_data_index”

try:
response = client.search(
index=index_name,
query={
“bool”: {
“must”: [
{“match”: {“title”: “Python”}}
],
“must_not”: [
{“term”: {“tags”: “入門”}}
]
}
}
)

print(f"Must_not query results: {response['hits']['total']['value']} documents found.")
for hit in response['hits']['hits']:
     print(f"  ID: {hit['_id']}, Score: {hit['_score']}, Title: {hit['_source']['title']}, Tags: {', '.join(hit['_source']['tags'])}")

except Exception as e:
print(f”Error during search: {e}”)

``must_not` 句に指定されたクエリにマッチするドキュメントは検索結果から除外されます。

2. その他の便利なクエリタイプ

Query DSLには非常に多くのクエリタイプが用意されています。ここでは代表的なものをいくつか紹介します。

  • range クエリ: 数値や日付の範囲で検索します。

    • gte: 以上 (>=)
    • gt: より大きい (>)
    • lte: 以下 (<=)
    • lt: より小さい (<)

    json
    {"range": {"price": {"gte": 1000, "lte": 5000}}} // 価格が1000以上5000以下
    {"range": {"published_date": {"gt": "2023-10-01", "lt": "now"}}} // 2023年10月1日以降現在まで

  • fuzzy クエリ: 検索語に多少のタイポやスペルミスがあってもヒットさせるファジー検索を行います。編集距離(Levenshtein距離)を指定して許容範囲を調整できます。

    json
    {"fuzzy": {"title": {"value": "Pythn", "fuzziness": "AUTO"}}} // 'Pythn' (typo) に対してファジー検索

    fuzziness には AUTO を指定するのが便利です。フィールドの値の長さに応じて適切な編集距離を自動的に決定します。

  • wildcard クエリ: ワイルドカード文字 (*?) を使ってパターンマッチ検索を行います。* は0文字以上の任意の文字列、? は1文字の任意の文字にマッチします。

    json
    {"wildcard": {"name": "J*n"}} // 'John', 'Joan', 'Jansen' などにマッチ
    {"wildcard": {"filename": "*.log"}} // '.log' で終わるファイル名にマッチ

    注意: wildcard クエリはパフォーマンスコストが高い場合があります。特に検索語の先頭にワイルドカードを使用すると、インデックス全体をスキャンする必要が生じる可能性があり、非常に遅くなることがあります。可能な限り避けるか、検索語の先頭にワイルドカードを使用しないように設計するのが望ましいです。

  • regexp クエリ: 正規表現パターンで検索します。非常に強力ですが、wildcard クエリと同様にパフォーマンスに注意が必要です。

    json
    {"regexp": {"ip_address": "192\\.168\\.\\d{1,3}\\.\\d{1,3}"}} // 特定のIPアドレスパターンにマッチ

  • 複数フィールド検索: match クエリなどで複数のフィールドを対象に検索したい場合、multi_match クエリが便利です。

    json
    {
    "multi_match": {
    "query": "elasticsearch python",
    "fields": ["title", "content", "tags"] # 検索対象となるフィールドのリスト
    }
    }

    このクエリは、「”elasticsearch python” という検索語」を「title」「content」「tags」のいずれかのフィールドから検索します。検索語全体が単一のフィールドに含まれている必要はありません。各フィールドに対して個別に match クエリを実行し、その結果を組み合わせるような動作をします。

これらのクエリタイプを bool クエリと組み合わせることで、非常に複雑で具体的な検索条件を表現することができます。

Query DSLは非常に奥が深く、様々なクエリやパラメータ(スコア計算への影響、ブースト、オペレーターなど)が存在します。詳細については、Elasticsearchの公式ドキュメント(Query DSLの章)を参照することをお勧めします。Pythonクライアントでは、Query DSLをPythonの辞書として構築して body または query パラメータに渡すだけです。

第6部:集計 (Aggregations)

Elasticsearchは単なる検索エンジンではなく、強力な分析エンジンでもあります。これは集計 (Aggregations) と呼ばれる機能によって実現されます。集計は、検索結果のドキュメントに対して、グループ化、カウント、合計、平均、最小値、最大値などの統計情報を計算するために使用されます。

例えば、商品の販売データに対して「カテゴリ別の売上合計」や「月別の販売個数」、「価格帯別の商品数」などを計算したい場合に集計が役立ちます。

集計は client.search() メソッドの aggs パラメータ(または body パラメータ内の aggs キー)としてQuery DSLとは別に定義します。

基本的な集計の構造は以下のようになります。

json
{
"query": {
... (検索条件, 集計対象のドキュメントを絞り込むために任意)
},
"aggs": { # 集計定義
"aggregation_name_1": { # 任意の集計名
"aggregation_type_1": {
... (集計タイプ固有のパラメータ)
},
"aggs": { # ネストされた集計 (バケット集計内で使用)
"aggregation_name_2": {
"aggregation_type_2": {
...
}
}
}
},
"aggregation_name_3": { ... },
...
}
}

集計には大きく分けて以下の2種類があります。

  1. バケット集計 (Bucket Aggregations): ドキュメントを特定の条件に基づいて「バケット」(グループ)に分割します。例: 「国別」、「価格帯別」、「発行年別」など。各バケットには、そのバケットに含まれるドキュメントのリストやドキュメント数などが含まれます。
  2. メトリクス集計 (Metric Aggregations): バケット内のドキュメント、またはクエリ全体にマッチしたドキュメントに対して、数値を計算します。例: 「合計 (sum)」、「平均 (avg)」、「最小値 (min)」、「最大値 (max)」、「件数 (value_count)」、「パーセンタイル (percentiles)」など。

これらの集計タイプを組み合わせることで、複雑な分析を行うことができます。特に、バケット集計の中に別のバケット集計やメトリクス集計をネストすることで、多次元の分析が可能になります(例: 国別 > 都市別 > 売上合計)。

1. 代表的な集計の使い方

例:terms 集計 (バケット集計)

フィールドの値に基づいてドキュメントをグループ化し、各グループの件数をカウントする場合に使用します。カテゴリ別、タグ別、ユーザー別などの集計によく使われます。

“`python
index_name = “my_data_index”

try:
response = client.search(
index=index_name,
size=0, # 集計結果のみが欲しい場合、ヒットドキュメントは不要なので size=0 にする
aggs={
“articles_by_author”: { # 集計名 (任意)
“terms”: { # 集計タイプ
“field”: “author.keyword” # 集計対象フィールド (keyword型を推奨)
# “size”: 10 # 上位10件の著者を表示 (デフォルトは10)
}
}
}
)

print("Aggregations results:")
# 集計結果は response['aggregations'] に格納される
authors_agg = response['aggregations']['articles_by_author']
print(f"  Buckets found: {len(authors_agg['buckets'])}")

for bucket in authors_agg['buckets']:
    author_name = bucket['key'] # バケットのキー (著者の名前)
    doc_count = bucket['doc_count'] # そのバケットに含まれるドキュメント数
    print(f"    Author: {author_name}, Articles: {doc_count}")

except Exception as e:
print(f”Error during aggregation: {e}”)

``
集計リクエストを送信する際、通常は検索結果リスト(
hits)は不要なため、size=0を指定して集計結果のみを取得します。
集計定義は
aggsキーの中にネストされます。articles_by_authorは任意の集計名です。terms集計は、対象となるフィールド(ここではauthor.keyword)を指定します。テキストフィールドを対象とする場合、アナライズされないkeywordサブフィールド(マッピングで定義されている場合)を指定するのが一般的です。そうしないと、単語ごとにバケットが作成されてしまい、期待する結果(著者名ごとのバケット)が得られません。
集計結果は
response[‘aggregations’]に、定義した集計名(articles_by_author)をキーとして格納されます。terms集計の結果はbucketsリストとして返され、各バケットはkey(フィールドの値)とdoc_count`(その値を持つドキュメント数)を含みます。

例:sum 集計 (メトリクス集計) とネストされた集計

「著者別の記事数をカウント」という例に加えて、「各著者の記事のタグ総数」を計算したいとします。これは、「著者別」というバケット集計の中に、「タグ数の合計」というメトリクス集計をネストすることで実現できます。(ただし、これは記事ごとのタグ数ではなく、全記事のタグを合算する例としては不適切かもしれません。より良い例として、「商品カテゴリ別の売上合計」を考えます。データモデルに categoryprice フィールドがあると仮定します。)

“`python

新しいインデックスとダミーデータを作成 (必要に応じて実行)

product_index_name = “products”
if not client.indices.exists(index=product_index_name):
client.indices.create(index=product_index_name)
bulk_products = [
{“index”: {“_index”: product_index_name, “_id”: “prod_001”}}, {“name”: “Laptop”, “category”: “Electronics”, “price”: 1200.00, “stock”: 50},
{“index”: {“_index”: product_index_name, “_id”: “prod_002”}}, {“name”: “Keyboard”, “category”: “Electronics”, “price”: 75.00, “stock”: 200},
{“index”: {“_index”: product_index_name, “_id”: “prod_003”}}, {“name”: “Desk”, “category”: “Furniture”, “price”: 300.00, “stock”: 10},
{“index”: {“_index”: product_index_name, “_id”: “prod_004”}}, {“name”: “Chair”, “category”: “Furniture”, “price”: 150.00, “stock”: 30},
{“index”: {“_index”: product_index_name, “_id”: “prod_005”}}, {“name”: “Monitor”, “category”: “Electronics”, “price”: 250.00, “stock”: 80},
]
client.bulk(body=bulk_products)
print(f”Indexed dummy product data into ‘{product_index_name}’.”)

カテゴリ別の売上合計を計算

try:
response = client.search(
index=product_index_name,
size=0,
aggs={
“products_by_category”: { # バケット集計: カテゴリ別
“terms”: {
“field”: “category.keyword”
},
“aggs”: { # ネストされた集計
“total_price”: { # メトリクス集計: 各カテゴリの合計価格
“sum”: { # sum 集計タイプ
“field”: “price” # 合計するフィールド
}
},
“avg_price”: { # ネストされた別の集計: 平均価格
“avg”: {
“field”: “price”
}
}
}
}
}
)

print("\nCategory wise price aggregation:")
category_agg = response['aggregations']['products_by_category']

for bucket in category_agg['buckets']:
    category_name = bucket['key']
    doc_count = bucket['doc_count']
    total_price = bucket['total_price']['value'] # ネストされた集計結果の取得
    avg_price = bucket['avg_price']['value'] # ネストされた別の集計結果

    print(f"  Category: {category_name}, Count: {doc_count}, Total Price: {total_price:.2f}, Avg Price: {avg_price:.2f}")

except Exception as e:
print(f”Error during aggregation: {e}”)

``aggsキーの中に、まずproducts_by_categoryというterms集計を定義しています。これがドキュメントをcategory.keywordフィールドの値でグループ化します。
この
products_by_category集計の定義の中に、さらにaggsキーを使ってネストされた集計を定義できます。ここではtotal_priceという名前のsum集計と、avg_priceという名前のavg集計を定義しています。これらのメトリクス集計は、それぞれ親であるproducts_by_categoryバケットに含まれるドキュメントに対して実行されます。
集計結果のレスポンスでは、バケット (
buckets) の中に、ネストされた集計の結果がキー(total_price,avg_price)とその結果値(value`)として含まれています。

その他にも、数値の範囲でグループ化する range 集計、日付の範囲でグループ化する date_range 集計、特定の値をフィルターしてグループ化する filter 集計など、様々なバケット集計があります。メトリクス集計には min, max, stats (件数、合計、平均、最小、最大を一度に計算) などがあります。

集計機能は非常に強力で、ビジネスインテリジェンスやログ分析などで幅広く活用されています。Query DSLと同様に、集計の機能も非常に豊富です。

第7部:マッピングとアナライザー(詳細)

Elasticsearchはスキーマレスであると言われることがありますが、これは「事前に厳密なスキーマ定義が必須ではない」という意味であり、内部的にはドキュメントをインデックスする際にフィールドの型を推測して「マッピング」を自動生成しています。しかし、最適な検索・分析パフォーマンスを得るためには、明示的にマッピングを定義することが非常に重要です。特にテキストデータの扱いには、マッピングとアナライザーの理解が不可欠です。

1. マッピングの重要性

マッピングは、インデックス内のドキュメントのフィールドがどのように格納され、インデックスされ、検索可能になるかを定義します。具体的には、以下のことを制御します。

  • フィールドのデータ型: 文字列 (text, keyword)、数値 (long, integer, double, float), 日付 (date), 真偽値 (boolean), オブジェクト (object), 配列 (array), ジオポイント (geo_point) など、様々なデータ型を指定できます。適切な型を指定することで、その型に適した検索や集計が可能になります。
  • 文字列フィールドのインデックス方法:
    • text 型: 全文検索に適しており、デフォルトでアナライザーが適用されます。巨大なテキストフィールドや、部分一致、関連性検索を行いたいフィールドに使用します。
    • keyword 型: 完全一致検索やソート、集計に適しており、アナライザーは適用されません。タグ、ID、カテゴリ、ステータスコード、短い名前など、厳密な値で検索したいフィールドに使用します。デフォルトでは、文字列フィールドをインデックスする際に、text 型としてインデックスすると同時に、.keyword という名前で keyword 型としてもインデックスするダイナミックマッピングが適用されることが多いです。これにより、同じフィールドに対して全文検索と完全一致検索の両方が可能になります。
  • アナライザーの指定: text 型フィールドに対して、どの組み込みまたはカスタムアナライザーを使用するかを指定できます。これにより、テキストの分割方法や正規化ルールを制御し、検索精度を向上させます。
  • その他の設定: フィールドがインデックスされるかどうか (index), フィールド値が実際に _source に格納されるかどうか (store), スコア計算に関与するかどうか (norms) など、様々な低レベルな設定が可能です。

マッピングを適切に定義しないと、以下のような問題が発生する可能性があります。

  • 期待通りの検索結果が得られない(例: text 型でインデックスされたフィールドに対して term クエリを使う)
  • 集計が正しく行えない(例: アナライズされた text フィールドで terms 集計を行うと、単語ごとの集計になってしまう)
  • パフォーマンスの低下

したがって、インデックスにデータを投入する前に、どのようなフィールドがあり、それぞれどのようなデータ型で、どのように検索・分析したいかを設計し、それに沿ったマッピングを明示的に定義することを強く推奨します。

2. 明示的なマッピングの定義

マッピングは、インデックスを作成する際に mappings パラメータとして指定します。

“`python
from elasticsearch import Elasticsearch

client = Elasticsearch() # 接続設定は省略
index_name = “articles_with_mapping”

マッピング定義

properties キーの下に、フィールド名と、そのフィールドの定義を記述

mapping = {
“properties”: {
“title”: {
“type”: “text”, # 全文検索用
“analyzer”: “standard” # standard アナライザーを使用
},
“author”: {
“type”: “text”,
“fields”: { # author フィールドにサブフィールドを定義
“keyword”: { # .keyword という名前で keyword 型としてもインデックス
“type”: “keyword”,
“ignore_above”: 256 # 256文字を超える値はインデックスしない (デフォルト)
}
}
},
“published_date”: {
“type”: “date” # 日付型
},
“content”: {
“type”: “text”
# analyzer を指定しない場合、デフォルトのアナライザーが適用される (通常 standard)
},
“tags”: {
“type”: “keyword” # タグは完全一致検索や集計に使うので keyword 型が適している
},
“views”: {
“type”: “long” # 数値型 (整数)
}
}
}

インデックスが存在するか確認し、存在しなければマッピングを指定して作成

if not client.indices.exists(index=index_name):
try:
response = client.indices.create(
index=index_name,
mappings=mapping # mappings パラメータでマッピングを指定
)
print(f”Index ‘{index_name}’ created with mapping.”)
print(response)
except Exception as e:
print(f”Error creating index ‘{index_name}’ with mapping: {e}”)
else:
print(f”Index ‘{index_name}’ already exists.”)

``mappings辞書は、propertiesキーの下にフィールド定義を格納します。各フィールドは、フィールド名をキーとし、データ型 (“type”) などの設定を含む辞書を値とします。text型のフィールドには、analyzerパラメータで明示的にアナライザーを指定できます。fieldsパラメータを使うと、一つのフィールドを複数の方法でインデックスできます。上記の例では、authorフィールドをデフォルトのtext型としてインデックスすると同時に、author.keywordという名前でkeyword型としてもインデックスしています。これにより、authorフィールドで全文検索 (matchクエリ) も、author.keywordフィールドで完全一致検索や集計 (termクエリ,terms集計) も可能になります。
日付型のフィールド (
date) は、ISO 8601 形式などの特定のフォーマットで指定する必要があります(例: "YYYY-MM-DD", "YYYY-MM-DDTHH:mm:ssZ")。デフォルトではいくつかのフォーマットを自動認識しますが、マッピングでformat` を明示的に指定することも可能です。

一度作成したインデックスのマッピングを後から完全に変更することは、一般的にできません。変更したい場合は、新しいマッピングで新しいインデックスを作成し、古いインデックスから新しいインデックスへデータをReindex(再インデックス)する必要があります。ただし、既存のフィールドに新しいサブフィールドを追加したり、新しいフィールドを追加したりするような変更は、Update Mapping APIを使って行うことが可能です。

3. アナライザーの種類

Elasticsearchには多くの組み込みアナライザーがあります。代表的なものをいくつか紹介します。

  • standard: 大文字小文字を区別せず、句読点などで分割します。多くのラテン語系言語に標準的です。
  • simple: 非アルファベット文字で分割し、小文字化します。
  • whitespace: 空白文字でのみ分割し、小文字化はしません。
  • english, japanese, french など: 各言語に特化したアナライザー。ストップワード除去やステミングなどを行います。日本語アナライザー (kuromoji) は形態素解析を行います。これらは通常、プラグインとしてインストールが必要です。

また、既存の文字フィルター、トークナイザー、トークンフィルターを組み合わせて、独自に「カスタムアナライザー」を定義することも可能です。例えば、日本語と英語が混在するテキストを処理するためのカスタムアナライザーを作成できます。カスタムアナライザーの定義はマッピングの一部として行います。

マッピングとアナライザーの適切な設定は、Elasticsearchの検索精度とパフォーマンスに大きく影響します。データモデルを設計する初期段階で、これらの設定を検討することが重要です。

第8部:実用的なヒントとベストプラクティス

PythonとElasticsearchを使った開発をより効率的かつ堅牢に行うためのいくつかのヒントとベストプラクティスを紹介します。

1. クライアントの再利用

Elasticsearchクライアントオブジェクト (elasticsearch.Elasticsearch のインスタンス) の生成は、内部的に接続プールの初期化などを含むため、ある程度のコストがかかります。リクエストごとにクライアントインスタンスを生成するのではなく、アプリケーションのライフサイクル全体で一つのクライアントインスタンスを生成し、それを共有して使用することが推奨されます。特にWebアプリケーションなどでは、リクエストハンドラ内でクライアントを生成するのではなく、アプリケーション起動時に生成したクライアントを各ハンドラから利用するように設計すべきです。

“`python

悪い例: リクエストごとにクライアントを生成

def handle_request():

client = Elasticsearch(…)

client.search(…)

良い例: アプリケーション起動時にクライアントを生成し、使い回す

client = Elasticsearch(…)

def handle_request():

global client # または DI などで渡す

client.search(…)

“`

2. バルク操作の活用

大量のドキュメントをインデックスしたり、更新したり、削除したりする場合は、必ずバルク操作 (client.bulk()) を使用してください。個別の操作を繰り返すよりもネットワーク効率が圧倒的に向上し、Elasticsearchクラスター側の負荷も軽減されます。

バルク操作で一度に送信するドキュメント数は、メモリやネットワーク帯域、ドキュメントのサイズなどによって最適な値が異なりますが、一般的には1000件〜5000件程度が推奨されることが多いです。client.bulk() に渡すリストが大きすぎると、クライアント側のメモリを圧迫したり、Elasticsearch側でリクエストボディのサイズ制限に引っかかったり、処理に時間がかかりすぎてタイムアウトしたりする可能性があります。適切なバルクサイズを見つけるには、実際の環境でテストを行うことが重要です。

また、elasticsearch.helpers モジュールには、バルク操作をさらに扱いやすくするためのヘルパー関数が用意されています。特に streaming_bulkbulk 関数は、イテレーターからドキュメントを読み込み、自動的にチャンクに分割してバルクAPIに送信してくれるため、大量データ投入スクリプトなどを書く際に非常に便利です。

“`python
from elasticsearch.helpers import streaming_bulk

ダミーデータのジェネレーター関数

def generate_docs(num_docs):
for i in range(num_docs):
yield {
index”: “bulk_test_index”,
“_id”: f”doc
{i}”,
“_source”: {“sequence”: i, “data”: f”This is document {i}”}
}

index_name = “bulk_test_index”

インデックス作成 (存在しない場合)

if not client.indices.exists(index=index_name):
client.indices.create(index=index_name)

streaming_bulk を使用して大量データをインデックス

try:
# 100件ごとにバルク処理を実行
success_count = 0
for ok, result in streaming_bulk(
client=client,
actions=generate_docs(10000), # 10000件のドキュメントを生成
chunk_size=100, # チャンクサイズ
# max_retries=3, # リトライ設定
# initial_backoff=1, # リトライ間隔
):
if ok:
success_count += 1
else:
print(f”Bulk item failed: {result}”)

print(f"Successfully indexed {success_count} documents.")

except Exception as e:
print(f”Error during streaming_bulk: {e}”)

``streaming_bulkはジェネレーターを受け取り、指定したchunk_size` ごとに自動でバルクAPIを呼び出します。これにより、メモリ効率良く大量データを処理できます。

3. 検索パフォーマンスの考慮

高速な検索を実現するためには、以下の点に注意が必要です。

  • 適切なマッピング: 前述のように、フィールドのデータ型やインデックス方法を適切に設定することはパフォーマンスの基本です。
  • filter コンテキストの活用: bool クエリで、スコア計算に関係なく、単にドキュメントを絞り込みたい条件は filter 句に配置してください。filter 句内のクエリ結果はキャッシュされやすく、スコア計算のオーバーヘッドがないため高速です。
  • 不要なフィールドの排除: _source パラメータを使って、検索結果で取得するフィールドを必要なものだけに絞ることで、ネットワーク帯域やパースのコストを削減できます。
  • 深いページネーションを避ける: from/size による深いページネーションは負荷が高いため、scroll APIや search_after の使用を検討してください。
  • スクリプトの利用に注意: Query DSLやAggregationの中で、計算や複雑な条件のために Painless などのスクリプトを使用することがありますが、スクリプトの実行はインデックス化されたデータ構造を直接利用できない場合があるため、遅くなる可能性があります。可能な限り、スクリプトを使わずに Query DSL や Aggregation の組み込み機能で実現できないかを検討してください。

4. 公式ドキュメントの活用

Elasticsearchは非常に機能が豊富で、常に進化しています。elasticsearch-py ライブラリもElasticsearchのアップデートに合わせて更新されます。最も信頼できる情報源は、Elasticsearchおよび elasticsearch-py の公式ドキュメントです。新しい機能の使い方や、詳細なパラメータ、バージョン間の違いなどを確認する際は、積極的に公式ドキュメントを参照してください。

第9部:まとめと次のステップ

この記事では、Pythonを使ってElasticsearchを操作するための基礎を網羅的に解説しました。

  • Elasticsearchの基本概念(クラスター、ノード、インデックス、ドキュメント、シャード、レプリカ、転置インデックス)を理解しました。
  • Pythonクライアントライブラリ elasticsearch-py の導入方法と、クライアントの初期化方法を学びました。
  • インデックスの作成・削除、ドキュメントの追加・取得・更新・削除といった基本的なCRUD操作を実践しました。
  • 効率的な複数操作のためのバルクAPIの使い方を知りました。
  • 例外処理の重要性と基本的なエラーハンドリング方法を学びました。
  • 全文検索の仕組み(アナライザー)と、Query DSLを使った基本的な検索(match_all, match, term)を行いました。
  • bool クエリを使ったAND/OR/NOT条件の組み合わせ方や、range, fuzzy, wildcard などの様々なクエリタイプを紹介しました。
  • データの分析に不可欠な集計(Aggregations)の基本的な使い方(terms, sum, ネストされた集計)を学びました。
  • 検索精度とパフォーマンスに影響するマッピングとアナライザーの役割、そして明示的なマッピングの定義方法を理解しました。
  • 実践的なヒントとして、クライアントの再利用、バルク操作の活用、検索パフォーマンスに関する注意点、そして公式ドキュメントの重要性を説明しました。

これらの知識があれば、Pythonを使って独自のアプリケーションからElasticsearchを効果的に活用し始めることができるでしょう。

ElasticsearchとPythonの世界は非常に奥深く、この記事で紹介できたのはそのほんの一部にすぎません。さらに学習を進めるための次のステップとして、以下のトピックに挑戦することをお勧めします。

  • 高度なQuery DSL: スコア計算の制御(ブースト、ファンクションスコア)、より複雑なクエリ(geo クエリ、more_like_this クエリなど)。
  • 高度なAggregations: パイプライン集計、行列集計、スコープの制御。
  • ソートとスクリプト: _score 以外のフィールドでソートする方法、複雑な処理のためのスクリプト(Painless)。
  • 検索結果のハイライト: 検索語がヒットした箇所を結果に含める方法。
  • Suggest API: 入力補完やスペルチェック機能の実装。
  • Reindex API: インデックス構造の変更やデータの移行。
  • Index Lifecycle Management (ILM): インデックスの作成、ロールオーバー、削除などを自動化する方法。
  • セキュリティ: 認証、認可、SSL/TLS。
  • 監視: クラスターの状態監視、Pythonクライアントのロギング。
  • Elastic Stackの他のコンポーネント: Kibana (データの可視化・分析UI)、Logstash (データ収集・変換)、Beats (軽量データシッパー) と連携することで、ログ分析やメトリクス監視などのソリューションを構築できます。

Elasticsearchは非常に多機能で柔軟なツールです。この記事で得た基礎を元に、ぜひ実際に手を動かしながら様々な機能を試してみてください。Pythonの豊富なライブラリと組み合わせることで、あなたのデータ活用プロジェクトの可能性は大きく広がるはずです。

Happy coding with Python and Elasticsearch!


コメントする

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

上部へスクロール