はい、承知いたしました。FastAPIとSQLAlchemyを組み合わせたAPI開発とデータベース連携の基礎について、約5000語の詳細な記事を作成します。
FastAPI + SQLAlchemy: 高速API開発とDB連携の基礎
はじめに
現代のWeb開発において、高速かつスケーラブルなAPIの構築は不可欠です。Pythonの世界では、様々なフレームワークやライブラリがAPI開発のために提供されていますが、近年特に注目を集めているのが「FastAPI」と「SQLAlchemy」の組み合わせです。
FastAPIは、非同期処理に対応し、高いパフォーマンスと生産性を両立させたモダンなWebフレームワークです。その核となる機能は、Pydanticによるデータ検証とシリアライゼーション、そしてStarletteによる高性能なASGI Webフレームワークです。型ヒントを積極的に活用することで、エディタの補完機能やコードの可読性が向上し、さらに自動生成されるAPIドキュメント(Swagger UI, ReDoc)は開発効率を飛躍的に高めます。
一方、SQLAlchemyは、Pythonにおける最も強力で柔軟なデータベースツールキットの一つです。SQLAlchemyはORM(Object-Relational Mapper)機能を提供し、Pythonオブジェクトを通じてデータベースを操作できるようにします。これにより、生SQLを書く手間を省き、よりPythonらしいコードでデータベースとのやり取りが可能になります。また、ORMだけでなく、SQL Expression Languageという低レベルな抽象化レイヤーも提供しており、複雑なクエリも柔軟に構築できます。
FastAPIの高速な非同期処理能力と、SQLAlchemyの堅牢かつ柔軟なデータベース操作能力を組み合わせることで、高性能でメンテナンスしやすいAPIを効率的に開発できます。特に、FastAPIの依存性注入システムは、SQLAlchemyのセッション管理と非常に相性が良く、データベース接続の管理をシンプルに行えます。
この記事では、FastAPIとSQLAlchemyを初めて使う方、あるいはこれらのライブラリを使ったAPI開発の基礎を学びたい方を対象に、それぞれの基本的な使い方から、両者を組み合わせて実践的なAPIを構築する方法までを詳細に解説します。約5000語のボリュームで、概念説明から具体的なコード例までを網羅し、読者がご自身のプロジェクトでFastAPI+SQLAlchemyを活用できるようになることを目指します。
FastAPIの基礎
まずは、FastAPIの基本的な概念と使い方を学びましょう。
FastAPIとは?
FastAPIは、Python 3.7以降で利用できる、モダンで高速な(高いパフォーマンスを発揮する)Webフレームワークです。主な特徴は以下の通りです。
- 高速: Starlette(ASGIフレームワーク)とPydantic(データ検証ライブラリ)を基盤としており、非常に高いスループットを実現します。NodeJSやGoに匹敵する性能を持つとされています。
- 高い開発効率: Pythonの型ヒントをフル活用しており、エディタの補完機能が強力に効きます。これにより、typoや引数の誤りを減らし、開発速度が向上します。
- 自動ドキュメント: コードに型ヒントを書くだけで、インタラクティブなAPIドキュメント(Swagger UIとReDoc)が自動生成されます。これはAPIの仕様確認やテストに非常に役立ちます。
- データ検証: Pydanticモデルを使用することで、リクエストデータの構造と型を定義し、自動的に検証・変換を行います。無効なリクエストは自動的にエラーレスポンスとして返されます。
- 依存性注入: 依存性注入システムが組み込まれており、共通のロジック(データベース接続、認証など)を簡単に再利用できます。
- 非同期処理:
async
/await
構文をサポートしており、非同期I/Oを効率的に扱えます。データベース操作のようなI/Oバウンドな処理でパフォーマンスを発揮します。
インストール
FastAPIを使うには、まずインストールが必要です。ASGIサーバーとして、一般的にはUvicornを使います。
bash
pip install fastapi uvicorn[standard]
基本的な使い方:Hello World API
FastAPIの基本的な構造を見てみましょう。
“`python
main.py
from fastapi import FastAPI
app = FastAPI()
@app.get(“/”)
async def read_root():
return {“Hello”: “World”}
@app.get(“/items/{item_id}”)
async def read_item(item_id: int, q: str = None):
return {“item_id”: item_id, “q”: q}
“`
このコードを main.py
という名前で保存します。
次に、Uvicornを使ってこのアプリケーションを実行します。
bash
uvicorn main:app --reload
--reload
オプションは、コードの変更を検知してサーバーを自動的に再起動します。
ブラウザで http://127.0.0.1:8000/
にアクセスすると {"Hello": "World"}
が表示されます。
http://127.0.0.1:8000/items/5?q=somequery
にアクセスすると {"item_id": 5, "q": "somequery"}
が表示されます。
http://127.0.0.1:8000/docs
にアクセスすると自動生成されたSwagger UI、http://127.0.0.1:8000/redoc
にアクセスするとReDocが表示されます。
ルーティング、パスパラメータ、クエリパラメータ
- ルーティング:
app.get()
,app.post()
,app.put()
,app.delete()
などのデコレータを使って、特定のHTTPメソッドとURLパスに関数を紐付けます。 - パスパラメータ: URLパスの一部をパラメータとして受け取るには、パス内で
{parameter_name}
のように指定し、関数の引数として同名のパラメータを受け取ります。型ヒントを付けることで、FastAPIが自動的に型変換と検証を行います(例:item_id: int
)。 - クエリパラメータ: URLの
?key1=value1&key2=value2
の形式で渡されるパラメータです。関数の引数として、デフォルト値付きで定義することで受け取れます(例:q: str = None
)。デフォルト値がない場合は必須パラメータとなります。
リクエストボディ(Pydanticモデルの利用)
POST, PUTなどのメソッドで送信されるリクエストボディのデータは、Pydanticモデルを使って定義・検証するのが一般的です。
まず、Pydanticモデルを定義します。
“`python
schemas.py (ファイル名を分けるのは一般的な慣習)
from pydantic import BaseModel
class Item(BaseModel):
name: str
price: float
is_offer: bool = None
“`
次に、FastAPIのエンドポイントでこのモデルを引数として受け取ります。
“`python
main.py (schemas.py と同じディレクトリにあると仮定)
from fastapi import FastAPI
from .schemas import Item # schemas.py からインポート
app = FastAPI()
… read_root, read_item エンドポイント …
@app.post(“/items/”)
async def create_item(item: Item):
return item
“`
このエンドポイントにPOSTリクエストを送信する際、リクエストボディが Item
モデルの構造(name: str
, price: float
, is_offer: bool
optional)に合致しているかFastAPIが自動的に検証します。合致しない場合は、適切なエラーレスポンスが返されます。
レスポンスモデル
FastAPIはレスポンスの形式もPydanticモデルで定義することを推奨しています。これにより、APIの出力構造が明確になり、自動ドキュメントにも反映されます。
“`python
schemas.py
from pydantic import BaseModel
class Item(BaseModel):
name: str
price: float
is_offer: bool = None
class ItemResponse(BaseModel): # レスポンス用のモデル
id: int
name: str
price: float
is_offer: bool = None
# その他のレスポンスに含めたいフィールド…
class Config: # ORMモードを有効にする
orm_mode = True
from_attributes = True # Pydantic v2以降はこちらを使用
“`
ItemResponse
の Config
クラスにある orm_mode = True
(Pydantic v2以降は from_attributes = True
) は重要です。これは、このPydanticモデルがORMオブジェクト(SQLAlchemyモデルのインスタンスなど)からデータを読み取れるようにします。SQLAlchemyモデルは属性アクセス(例: item.name
)でデータにアクセスしますが、Pydanticモデルは辞書のようにキーアクセス(例: item_dict['name']
)でアクセスするのが基本です。orm_mode = True
を設定することで、PydanticはORMオブジェクトの属性も読み取れるようになります。
エンドポイント関数に response_model
引数を指定します。
“`python
main.py
from fastapi import FastAPI
from .schemas import Item, ItemResponse # ItemResponse もインポート
app = FastAPI()
… create_item エンドポイント …
Dummy data for demonstration
items_db = {
1: {“id”: 1, “name”: “Foo”, “price”: 50.5, “is_offer”: False},
2: {“id”: 2, “name”: “Bar”, “price”: 120.0, “is_offer”: True},
}
@app.get(“/items/{item_id}”, response_model=ItemResponse)
async def read_item_response(item_id: int):
# 実際にはDBから取得する
item_data = items_db.get(item_id)
if item_data:
return item_data # Pydanticは辞書からItemResponseモデルに変換しようとする
# 存在しない場合は適切なエラーを返す(後述)
return None # または HTTPException を raise する
“`
このように response_model
を指定すると、FastAPIはエンドポイント関数の戻り値を自動的に指定されたPydanticモデルに変換し、JSONレスポンスとして返します。これにより、レスポンスの形式が保証されます。
非同期処理 (async def
, await
)
FastAPIはASGI(Asynchronous Server Gateway Interface)標準に基づいています。これにより、非同期I/O(データベースアクセス、外部API呼び出しなど)を効率的に扱うことができます。Pythonでは async def
と await
構文を使って非同期処理を記述します。
エンドポイント関数を async def
で定義すると、その関数内で await
を使って非同期処理を実行できます。非同期処理中にCPUがブロックされることなく、他のリクエストを処理できるため、特にI/Oバウンドなアプリケーションで高いパフォーマンスを発揮します。
データベース操作は典型的なI/Oバウンドな処理です。SQLAlchemyもバージョン1.4以降で非同期対応(AsyncIO)が強化されました。しかし、SQLAlchemyの標準的な使い方(同期API)はブロッキングI/Oです。FastAPIの async def
エンドポイント内で同期SQLAlchemyの操作を行うと、データベース操作中にイベントループがブロックされてしまい、非同期処理のメリットが損なわれる可能性があります。
注意点: 非同期FastAPIエンドポイント内でブロッキングI/O(例: 同期版SQLAlchemy、requestsライブラリなど)を実行すると、その操作が完了するまでイベントループがブロックされ、他の非同期タスクやリクエストの処理が停止してしまいます。FastAPIはこれを検知すると、そのブロッキング処理を別のスレッドで実行するように試みますが、基本的には非同期エンドポイントでは非同期ライブラリを使うのが最も効率的です。
この記事の後半では、同期版SQLAlchemyをFastAPIと連携させる方法を主に解説しますが、非同期版SQLAlchemyについても触れます。
依存性注入 (Dependency Injection)
FastAPIの最も強力な機能の一つが依存性注入システムです。これは、エンドポイント関数や他の依存関数(dependency function)の引数として、様々なリソースやオブジェクト(データベースセッション、設定値、認証されたユーザーなど)を自動的に提供する仕組みです。
依存性を定義するには、単なる関数(またはクラスなど、呼び出し可能なオブジェクト)を作成し、それをFastAPIの Depends
関数で指定します。
“`python
from fastapi import FastAPI, Depends, Header
app = FastAPI()
async def get_token_header(x_token: str = Header(…)):
if x_token != “fake-super-secret-token”:
raise HTTPException(status_code=400, detail=”X-Token header invalid”)
return x_token
@app.get(“/items/”, dependencies=[Depends(get_token_header)]) # 依存性をリストで指定
async def read_items():
return [{“item”: “Foo”}, {“item”: “Bar”}]
あるいは、引数として直接依存性を定義
async def get_current_user(token: str = Depends(get_token_header)):
# token を使ってユーザーを特定するロジック
user = {“username”: “testuser”, “token”: token}
return user
@app.get(“/users/me”)
async def read_users_me(current_user: dict = Depends(get_current_user)):
return current_user
“`
依存性注入は、データベースセッションの管理に特に役立ちます。リクエストごとに新しいセッションを作成し、処理終了時に適切に閉じたりロールバックしたりするロジックを、各エンドポイント関数から切り離して、依存関数として一元管理できます。
自動ドキュメント (Swagger UI, ReDoc)
FastAPIは、コード中の型ヒントやPydanticモデルの情報から、OpenAPI標準に基づいたドキュメントを自動生成します。デフォルトで /docs
にSwagger UI、/redoc
にReDocが提供されます。
これは開発チーム内でのAPI仕様共有や、クライアント開発者への情報提供に非常に便利です。Swagger UIからは、各エンドポイントの詳細を確認したり、実際にリクエストを送信してAPIを試したりすることも可能です。
SQLAlchemyの基礎
次に、SQLAlchemyの基本的な概念と使い方を学びましょう。この記事では、ORM(Object-Relational Mapper)機能を中心に解説します。
SQLAlchemyとは?
SQLAlchemyは、PythonのSQLツールキットおよびORMです。主な特徴は以下の通りです。
- 柔軟性: ORMとSQL Expression Languageの二つの主要なモードを提供します。ORMはPythonオブジェクトとしてデータベース操作を抽象化し、Expression Languageはより低レベルでSQLに近い柔軟なクエリ構築を可能にします。
- 強力なORM: Pythonクラスをデータベーステーブルにマッピングし、オブジェクト指向的にデータのCRUD(Create, Read, Update, Delete)操作を行えます。リレーションシップ(一対多、多対多など)の定義と操作も強力にサポートします。
- データベース非依存: 様々なデータベース(PostgreSQL, MySQL, SQLite, Oracle, MS-SQLなど)に対応しており、データベース固有の構文を意識することなく操作できます。
- コネクションプール: データベース接続の管理を効率的に行い、パフォーマンスを向上させます。
- トランザクション: データベース操作をトランザクションとして扱い、原子性、一貫性、独立性、永続性(ACID特性)を保証できます。
インストール
SQLAlchemyを使うには、まずインストールが必要です。対象のデータベースに応じたDBドライバーもインストールする必要があります。ここではSQLiteを例に進めます。
bash
pip install sqlalchemy
pip install databases # 後述の非同期対応ライブラリ(これはSQLAlchemy Core向けだが、ORMと組み合わせることも)
データベース接続 (Engine)
データベースへの接続は、create_engine
関数を使って行います。これはデータベースとの主要な通信ポイントである Engine
オブジェクトを作成します。
“`python
from sqlalchemy import create_engine
SQLiteの場合
DATABASE_URL = “sqlite:///./test.db” # .envなどで管理するのが一般的
Engineの作成
connect_args={“check_same_thread”: False} はSQLiteで必要な設定(FastAPIで複数リクエストが同時に発生する場合)
engine = create_engine(DATABASE_URL, connect_args={“check_same_thread”: False})
“`
create_engine
はデータベースURL(通常は dialect+driver://user:password@host:port/database
の形式)を受け取ります。
メタデータとテーブル定義 (Declarative Base)
SQLAlchemy ORMでは、Pythonクラスとしてデータベーステーブルのスキーマを定義します。これを「Declarative Base」と呼びます。
“`python
from sqlalchemy.ext.declarative import declarative_base # SQLAlchemy 1.x 形式
SQLAlchemy 2.0 形式: from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import Column, Integer, String, Boolean
Declarative Base クラスを作成
SQLAlchemy 1.x
Base = declarative_base()
SQLAlchemy 2.0
class Base(DeclarativeBase):
pass
データベースモデル(テーブル定義)
class Item(Base):
tablename = “items” # テーブル名
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
price = Column(Integer) # デモンストレーションのためIntegerとする
is_offer = Column(Boolean, default=None)
“`
declarative_base()
またはDeclarativeBase
を継承したクラスBase
を作成します。これは、後続で定義する全てのORMモデルの基盤となります。Item
クラスはBase
を継承し、__tablename__
属性で対応するデータベーステーブル名を指定します。- クラス属性として
Column
オブジェクトを定義し、カラム名、データ型、その他の制約(主キーprimary_key=True
、インデックスindex=True
、デフォルト値default
など)を指定します。
このモデル定義は、データベーステーブルをPythonオブジェクトとして扱うためのマッピング情報となります。
データベースに実際にテーブルを作成するには、Base.metadata.create_all(engine)
を実行します。これは、Base
に紐付けられた全てのモデル定義に対応するテーブルをデータベース上に作成します(既に存在する場合は何もしません)。
“`python
テーブル作成スクリプト(例: create_db.py)
from sqlalchemy import create_engine
from .models import Base # 上記で定義したBaseクラス
DATABASE_URL = “sqlite:///./test.db”
engine = create_engine(DATABASE_URL, connect_args={“check_same_thread”: False})
def create_database_tables():
print(“Creating database tables…”)
Base.metadata.create_all(bind=engine)
print(“Database tables created.”)
if name == “main“:
create_database_tables()
“`
このスクリプトを一度実行すれば、データベースファイル (./test.db
) が作成され、items
テーブルが生成されます。
セッション (SessionLocal)
SQLAlchemy ORMを使ってデータベース操作を行うには、「セッション」が必要です。セッションは、データベースとの対話の全ての段階(クエリの発行、オブジェクトのロード、変更の永続化など)を管理する、作業領域のようなものです。
セッションは通常、リクエストごと、または一連の関連するデータベース操作ごとに作成され、操作が完了したら閉じられます。セッションを使い終わったら必ず閉じることが重要です。さもないと、リソースリークや予期しない振る舞いを引き起こす可能性があります。
セッションを作成するには、sessionmaker
関数を使ってセッションファクトリを作成し、それを使ってセッションインスタンスを生成します。
“`python
from sqlalchemy.orm import sessionmaker
sessionmaker を使ってセッションファクトリを作成
autocommit=False: commit() を呼び出すまで変更を確定しない
autoflush=False: query() などの前に自動的に flush() を実行しない
bind=engine: このセッションファクトリが使うEngineを指定
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
セッションを取得する(例)
db = SessionLocal()
try:
# データベース操作
pass
finally:
db.close()
“`
SessionLocal
は、新しいセッションオブジェクトを作成するためのクラス(またはファクトリ)として定義されます。これをFastAPIの依存性注入システムと組み合わせて使うことで、リクエストごとにセッションを簡単に取得・管理できます。
CRUD操作の基本
セッションを使って、データベースモデルのインスタンスを操作することで、CRUD操作を行います。
データの挿入 (Create)
ORMモデルのインスタンスを作成し、セッションに追加 (add
) し、コミット (commit
) します。
“`python
from .models import Item # 前述のItemモデル
セッションを取得していると仮定 (db は SessionLocal() で取得したセッションオブジェクト)
db_item = Item(name=”New Item”, price=100)
db.add(db_item) # セッションに追加
db.commit() # データベースに変更を確定
db.refresh(db_item) # データベースに保存された最新の状態をオブジェクトに反映(特にidなどが自動生成される場合)
print(f”Created item with ID: {db_item.id}”)
“`
db.add()
はオブジェクトをセッションの「保留中 (pending)」状態にします。db.commit()
が呼び出されると、保留中の変更がデータベースに書き込まれ、トランザクションが完了します。db.refresh(db_item)
は、データベースからの情報をオブジェクトに再度読み込みます。特に主キーが自動生成される場合などに、オブジェクトのIDを知るために使われます。
データの取得 (Read)
セッションの query()
メソッドを使ってクエリを構築します。
“`python
全て取得
items = db.query(Item).all()
print(f”All items: {items}”) # オブジェクトのリスト
IDで一つ取得
item_id = 1
item = db.query(Item).filter(Item.id == item_id).first()
if item:
print(f”Item with ID {item_id}: {item.name}”)
条件に一致する複数件を取得
items_over_50 = db.query(Item).filter(Item.price > 50).all()
print(f”Items over 50: {[i.name for i in items_over_50]}”)
特定のカラムだけを取得(タプルのリストになる)
item_names = db.query(Item.name).all()
print(f”Item names: {item_names}”)
“`
db.query(Item)
はItem
モデルを対象としたクエリを開始します。filter()
は WHERE 句に相当する条件を指定します。条件式は SQLAlchemy の式言語を使って記述します(例:Item.id == item_id
)。all()
は条件に一致する全ての行を、ORMオブジェクトのリストとして取得します。first()
は条件に一致する最初の行を、ORMオブジェクトとして取得します。一致する行がない場合はNone
を返します。
データの更新 (Update)
取得したORMオブジェクトの属性を変更し、セッションをコミットします。
“`python
IDでアイテムを取得
item_id = 1
item = db.query(Item).filter(Item.id == item_id).first()
if item:
item.price = 150 # 属性値を変更
db.commit() # 変更をデータベースに確定
db.refresh(item) # オブジェクトの状態を最新に更新
print(f”Updated item {item.id}: new price is {item.price}”)
“`
ORMオブジェクトの属性を変更すると、その変更はセッションによって追跡されます。db.commit()
が呼び出されたときに、変更された属性に対応する UPDATE 文が自動的に生成され、データベースに実行されます。
データの削除 (Delete)
取得したORMオブジェクトをセッションから削除 (delete
) し、コミットします。
“`python
IDでアイテムを取得
item_id = 1
item = db.query(Item).filter(Item.id == item_id).first()
if item:
db.delete(item) # セッションから削除対象としてマーク
db.commit() # データベースから削除を確定
print(f”Deleted item with ID: {item_id}”)
“`
db.delete(item)
はオブジェクトをセッションの「削除待ち (deleted)」状態にします。db.commit()
が呼び出されたときに、対応する DELETE 文が自動的に生成され、データベースに実行されます。
トランザクション
SQLAlchemyのセッションはデフォルトでトランザクションを管理します。SessionLocal
を作成する際に autocommit=False
と設定している場合、db.commit()
を呼び出すまで、そのセッション上で行われた全ての変更は一つのトランザクションとして扱われます。
途中でエラーが発生した場合など、変更をデータベースに永続化させたくない場合は、db.rollback()
を呼び出すことで、そのセッション開始以降に行われた全ての操作を取り消すことができます。
通常、FastAPIのエンドポイントでは、リクエスト処理の開始時にセッションを取得し、処理中にエラーが発生した場合はロールバック、成功した場合はコミット、そして最後にセッションを閉じる、という流れになります。これはFastAPIの依存性注入システムで効率的に実現できます。
FastAPIとSQLAlchemyの連携
いよいよ、FastAPIとSQLAlchemyを組み合わせてAPIを構築する方法を解説します。ここでは、典型的なプロジェクト構成を採用し、各コンポーネントの役割を明確にします。
プロジェクト構造の設計案
大規模なアプリケーション開発では、コードを複数のファイルに分割し、役割ごとに整理することが重要です。以下のような構造が一般的です。
.
├── main.py # FastAPIアプリケーションのエントリポイント、ルーティング
├── database.py # DB接続設定、SessionLocal、Base定義
├── models.py # SQLAlchemy ORMモデル定義
├── schemas.py # Pydanticモデル定義(リクエスト/レスポンスのスキーマ)
├── crud.py # データベース操作(CRUD)関数群
└── requirements.txt # プロジェクトの依存関係リスト
main.py
: FastAPIアプリケーションインスタンスを作成し、ルーター(APIエンドポイント)を登録します。依存性注入の設定などもここで行うことがあります。database.py
: データベースURLの設定、create_engine
、sessionmaker
(SessionLocal
)、declarative_base
(Base
) などのデータベース関連の初期設定を行います。models.py
: SQLAlchemyのORMモデル(テーブル定義)を記述します。schemas.py
: Pydanticモデルを記述します。これらはAPIのリクエストボディの検証やレスポンスのシリアライズに使われます。crud.py
: データベースに対する具体的なCRUD操作を行う関数群を記述します。これらの関数はセッションを引数として受け取り、models.py
で定義されたORMモデルを使って操作を行います。この層は、APIのエンドポイントロジックからデータベース操作の詳細を隠蔽します。
データベース接続の設定 (database.py
)
まずはデータベース接続のための基本設定を行います。
“`python
database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base # SQLAlchemy 1.x
from sqlalchemy.orm import DeclarativeBase # SQLAlchemy 2.0
from sqlalchemy.orm import sessionmaker
データベースURL。SQLiteを使用。環境変数などで管理するのが望ましい
相対パスの場合、FastAPI実行時のカレントディレクトリからのパスになる点に注意
DATABASE_URL = “sqlite:///./sql_app.db”
Engineの作成
SQLiteの場合、check_same_thread=False を指定しないと複数リクエスト時にエラーになる
connect_args={“check_same_thread”: False} はSQLiteに特有の設定であり、
PostgreSQLなどの他のDBでは不要。
engine = create_engine(
DATABASE_URL, connect_args={“check_same_thread”: False}
)
SessionLocal クラスを作成
autocommit=False: commit() を呼び出すまで変更を確定しない
autoflush=False: query() などの前に自動的に flush() を実行しない
bind=engine: 作成したEngineにバインド
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Declarative Base クラスを作成 (SQLAlchemy 1.x)
Base = declarative_base()
SQLAlchemy 2.0の場合:
class Base(DeclarativeBase):
pass
このBaseクラスをmodels.pyで使用してテーブルを定義する
“`
データベースモデルの定義 (models.py
)
次に、データベースのテーブル構造をSQLAlchemy ORMモデルとして定義します。ここではユーザーとアイテムという単純なモデルを例にします。
“`python
models.py
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship # リレーションシップ用
from .database import Base # database.py で定義したBaseクラスをインポート
User モデル
class User(Base):
tablename = “users”
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
# リレーションシップ: このユーザーが所有するItemのリスト
items = relationship("Item", back_populates="owner")
Item モデル
class Item(Base):
tablename = “items”
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id")) # 外部キー
# リレーションシップ: このアイテムを所有するUser
owner = relationship("User", back_populates="items")
“`
ForeignKey("users.id")
:items
テーブルのowner_id
カラムがusers
テーブルのid
カラムを参照する外部キーであることを示します。relationship("Item", back_populates="owner")
:User
モデルから関連するItem
オブジェクトへのアクセスを定義します。back_populates="owner"
は、Item
モデルのowner
リレーションシップに対応することを示します。双方向のリレーションシップを定義することで、user.items
でユーザーのアイテムリストを取得したり、item.owner
でアイテムの所有者ユーザーを取得したりできます。
Pydanticモデルの定義 (schemas.py
)
APIのリクエストとレスポンスのデータ形式をPydanticモデルで定義します。
“`python
schemas.py
from pydantic import BaseModel
————- Item 関連 ————-
Item作成時のリクエストボディ用スキーマ
class ItemCreate(BaseModel):
title: str
description: str | None = None # Python 3.10 以降の Union shorthand
Itemの基本スキーマ(レスポンスや他のモデルで継承して使う)
class ItemBase(BaseModel):
title: str
description: str | None = None
Itemレスポンス用スキーマ
class Item(ItemBase): # ItemBase を継承
id: int
owner_id: int
class Config:
orm_mode = True # SQLAlchemy ORMモデルからのデータ読み込みを許可
from_attributes = True # Pydantic v2以降はこちらを使用
————- User 関連 ————-
User作成時のリクエストボディ用スキーマ
class UserCreate(BaseModel):
email: str
password: str
Userの基本スキーマ
class UserBase(BaseModel):
email: str
Userレスポンス用スキーマ
class User(UserBase): # UserBase を継承
id: int
is_active: bool
items: list[Item] = [] # このユーザーが所有するItemのリスト
class Config:
orm_mode = True # SQLAlchemy ORMモデルからのデータ読み込みを許可
from_attributes = True # Pydantic v2以降はこちらを使用
“`
ItemCreate
やUserCreate
は、クライアントがPOSTやPUTリクエストで送信するデータの構造を定義します。Item
やUser
は、APIからのレスポンスとして返されるデータの構造を定義します。これらのモデルは、SQLAlchemy ORMモデルからデータを読み込むためにConfig
クラスでorm_mode = True
(またはfrom_attributes = True
) を設定しています。User
スキーマのitems: list[Item] = []
は、ユーザーレスポンスにそのユーザーが所有するアイテムのリストを含めることを示しています。PydanticはSQLAlchemyのUserモデルから関連するitems
リレーションシップを通じてデータを取得し、それぞれのItemオブジェクトをItemスキーマに変換します。
CRUD操作の実装 (crud.py
)
データベースに対する具体的な操作(CRUD関数)を実装します。これらの関数は、FastAPIのエンドポイントから呼び出されます。
“`python
crud.py
from sqlalchemy.orm import Session # SQLAlchemyのSession型ヒント用
from . import models, schemas # models.py, schemas.py をインポート
————- User 関連 CRUD ————-
IDでユーザーを取得
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
メールアドレスでユーザーを取得
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
全ユーザーを取得
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
ユーザーを作成
def create_user(db: Session, user: schemas.UserCreate):
# パスワードはハッシュ化するのが一般的だが、ここでは簡略化
fake_hashed_password = user.password + “notreallyhashed”
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user) # データベースに保存された状態(IDなど)をオブジェクトに反映
return db_user
————- Item 関連 CRUD ————-
特定のユーザーのアイテムを取得
def get_user_items(db: Session, user_id: int, skip: int = 0, limit: int = 100):
return db.query(models.Item).filter(models.Item.owner_id == user_id).offset(skip).limit(limit).all()
アイテムを作成
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
db_item = models.Item(item.model_dump(), owner_id=user_id) # Pydantic v2: item.model_dump()
# Pydantic v1: db_item = models.Item(item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
IDでアイテムを取得
def get_item(db: Session, item_id: int):
return db.query(models.Item).filter(models.Item.id == item_id).first()
全アイテムを取得
def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Item).offset(skip).limit(limit).all()
… update, delete 関数などもここに追加する …
“`
- 各関数は最初の引数として
db: Session
を受け取ります。これはFastAPIの依存性注入によって提供されるデータベースセッションです。 - これらの関数は、
models.py
のORMモデルとschemas.py
のPydanticモデルを引数や戻り値の型として使用します。 - データベース操作(
query()
,add()
,commit()
,refresh()
,delete()
など)は全てdb
セッションオブジェクトを通じて行われます。 create_user_item
関数では、PydanticモデルのデータをORMモデルのインスタンス作成に渡しています。item.model_dump()
(Pydantic v2) またはitem.dict()
(Pydantic v1) は、Pydanticモデルのフィールドを辞書として返します。
FastAPIエンドポイントの実装 (main.py
)
最後に、FastAPIのエンドポイントを定義し、依存性注入を使ってデータベースセッションを取得し、crud.py
の関数を呼び出します。
“`python
main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas # crud.py, models.py, schemas.py をインポート
from .database import SessionLocal, engine # database.py からSessionLocalとengineをインポート
データベーステーブルを全て作成
本番環境ではマイグレーションツール(Alembicなど)を使うのが一般的
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
DBセッションを取得するための依存関数
def get_db():
db = SessionLocal() # 新しいセッションを作成
try:
yield db # セッションを提供
finally:
db.close() # リクエスト処理が完了したらセッションを閉じる
————- User エンドポイント ————-
@app.post(“/users/”, response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): # 依存性注入でDBセッションを取得
# メールアドレスが既に使われていないかチェック
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail=”Email already registered”)
# ユーザーを作成し、作成したユーザーオブジェクトを返す
return crud.create_user(db=db, user=user)
@app.get(“/users/”, response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
# 全ユーザーを取得して返す
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get(“/users/{user_id}”, response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
# IDでユーザーを取得して返す
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail=”User not found”)
return db_user
————- Item エンドポイント ————-
@app.post(“/users/{user_id}/items/”, response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
# 特定ユーザー向けのアイテムを作成
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get(“/items/”, response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
# 全アイテムを取得して返す
items = crud.get_items(db, skip=skip, limit=limit)
return items
@app.get(“/items/{item_id}”, response_model=schemas.Item)
def read_item(item_id: int, db: Session = Depends(get_db)):
# IDでアイテムを取得して返す
db_item = crud.get_item(db, item_id=item_id)
if db_item is None:
raise HTTPException(status_code=404, detail=”Item not found”)
return db_item
… update, delete エンドポイントなども追加 …
“`
models.Base.metadata.create_all(bind=engine)
: アプリケーション起動時にデータベーステーブルを全て作成します。注意: これは開発時のみに使い、本番環境では Alembic のようなマイグレーションツールを使うべきです。get_db()
関数は、データベースセッションを取得するための依存関数です。SessionLocal()
で新しいセッションを作成し、yield db
でFastAPIにセッションを提供します。try...finally
ブロックを使うことで、リクエスト処理が完了した後(成功しても失敗しても)必ずdb.close()
が呼び出され、セッションが適切に閉じられます。これはFastAPIの依存性注入がジェネレーターやasynccontextmanager
をサポートしているおかげです。- 各エンドポイント関数の引数に
db: Session = Depends(get_db)
を追加することで、リクエストごとに新しいデータベースセッションが自動的に取得され、そのセッションがdb
引数として関数に渡されます。 - リクエストボディやレスポンスの型ヒントに
schemas.py
で定義したPydanticモデルを使用しています。これにより、FastAPIによる自動的なデータ検証、シリアライズ、ドキュメント生成が行われます。 - データベース操作は、直接SQLAlchemyのクエリを書くのではなく、
crud.py
で定義した関数を呼び出す形にしています。これにより、エンドポイントのロジックとデータベース操作のロジックが分離され、コードの見通しが良くなります。 response_model
にPydanticモデルを指定することで、返されるデータが自動的に指定されたスキーマに変換されます。SQLAlchemyのORMオブジェクトを返す場合、response_model
が設定されたPydanticモデルのorm_mode = True
(またはfrom_attributes = True
) により、オブジェクトの属性からデータを読み取ってスキーマにマッピングしてくれます。- データが存在しない場合など、エラーが発生する可能性のある箇所では
HTTPException
を raise しています。FastAPIはこれを捕捉し、適切なHTTPステータスコードとJSONレスポンスを自動的に返します。
プロジェクトを実行する
プロジェクトを実行するには、以下の手順を行います。
- 必要なライブラリをインストールします(
requirements.txt
を作成し、pip install -r requirements.txt
を実行するのが良いでしょう)。
fastapi
uvicorn[standard]
sqlalchemy -
create_db.py
のようなスクリプト(models.Base.metadata.create_all(bind=engine)
を実行するもの)を作成して実行し、データベースファイルとテーブルを作成します。“`python
create_db.py (main.pyと同じディレクトリに配置)
from .database import Base, engine
from . import models # models.py をインポートしてテーブル定義を読み込むdef create_db_tables():
print(“Creating database tables…”)
# Baseに登録されている全てのモデル定義に対応するテーブルを作成
Base.metadata.create_all(bind=engine)
print(“Database tables created.”)if name == “main“:
create_db_tables()
``
python -m src.create_db
コマンドラインで実行:(もしプロジェクトを src ディレクトリ配下に置いている場合) または
python create_db.py` (トップディレクトリに置いている場合)。 -
FastAPIアプリケーションをUvicornで起動します。
bash
uvicorn main:app --reload
これで、http://127.0.0.1:8000/docs
にアクセスして自動生成されたAPIドキュメントを確認し、APIをテストできるようになります。
実践的な連携例:簡単なTODOアプリ
上記の構成を使って、より具体的なTODOアプリを構築する手順を見ていきましょう。ユーザー機能は前述のものを利用し、各ユーザーがTODOアイテムを持つ形にします。
1. テーブル定義 (models.py
)
Userモデルはそのまま使い、Todoモデルを追加します。
“`python
models.py
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base # database.py からBaseをインポート
User モデル (変更なし)
class User(Base):
tablename = “users”
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship(“Item”, back_populates=”owner”) # 仮にItemと名付けていたが、Todoに名前を変える方が自然かも
Todo モデル
class Todo(Base): # Itemの名前をTodoに変えても良い
tablename = “todos” # テーブル名を todos とする
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String, index=True) # descriptionを追加
completed = Column(Boolean, default=False) # 完了フラグを追加
owner_id = Column(Integer, ForeignKey("users.id")) # 外部キー
# リレーションシップ
owner = relationship("User", back_populates="todos") # Userモデル側のリレーション名と一致させる
“`
User
モデルのリレーションシップ名を todos
に変更し、Todo
モデルに description
と completed
カラムを追加しました。
2. スキーマ定義 (schemas.py
)
Todo関連のPydanticスキーマを定義します。
“`python
schemas.py
from pydantic import BaseModel
————- Todo 関連 ————-
Todo作成時のリクエストボディ用スキーマ
class TodoCreate(BaseModel):
title: str
description: str | None = None # descriptionを追加
Todo更新時のリクエストボディ用スキーマ
class TodoUpdate(BaseModel):
title: str | None = None
description: str | None = None
completed: bool | None = None
Todoの基本スキーマ(レスポンス用)
class TodoBase(BaseModel):
title: str
description: str | None = None
completed: bool = False # completedを追加
Todoレスポンス用スキーマ
class Todo(TodoBase): # TodoBase を継承
id: int
owner_id: int
class Config:
orm_mode = True
from_attributes = True
————- User 関連 ————-
UserCreate, UserBase, User はそのまま (User モデルのリレーション名itemsをtodosに変更した場合はこちらも修正)
例:Userレスポンス用スキーマ
class User(BaseModel):
id: int
email: str
is_active: bool
todos: list[Todo] = [] # リレーションシップ名を todos に変更
class Config:
orm_mode = True
from_attributes = True
“`
TodoUpdate
スキーマを追加しました。これはPUTやPATCHリクエストで、一部のフィールドのみを更新する場合に使えます。フィールドをOptional (| None
) にすることで、リクエストボディに含まれないフィールドは更新しないようにできます。
User
スキーマでは、関連するアイテムのリスト名を todos
に変更しました。
3. CRUD関数 (crud.py
)
Todo関連のCRUD関数を実装します。
“`python
crud.py
from sqlalchemy.orm import Session
from . import models, schemas
————- User 関連 CRUD (省略、必要に応じて修正) ————-
def get_user…
def get_user_by_email…
def get_users…
def create_user…
————- Todo 関連 CRUD ————-
特定のユーザーのTODOリストを取得
def get_user_todos(db: Session, user_id: int, skip: int = 0, limit: int = 100):
return db.query(models.Todo).filter(models.Todo.owner_id == user_id).offset(skip).limit(limit).all()
TODOを作成
def create_user_todo(db: Session, todo: schemas.TodoCreate, user_id: int):
# TodoCreate スキーマから ORM モデルインスタンスを作成
db_todo = models.Todo(todo.model_dump(), owner_id=user_id) # Pydantic v2
# Pydantic v1: db_todo = models.Todo(todo.dict(), owner_id=user_id)
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo
IDでTODOを取得
def get_todo(db: Session, todo_id: int):
return db.query(models.Todo).filter(models.Todo.id == todo_id).first()
全TODOを取得 (任意、通常はユーザーごとで十分だがデバッグなどに)
def get_todos(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Todo).offset(skip).limit(limit).all()
TODOを更新
def update_todo(db: Session, db_todo: models.Todo, todo_update: schemas.TodoUpdate):
# Pydanticモデルのデータから、ORMモデルの属性を更新
update_data = todo_update.model_dump(exclude_unset=True) # Pydantic v2: exclude_unset=True でNoneでないフィールドだけ更新
# Pydantic v1: update_data = todo_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_todo, field, value) # ORMオブジェクトの属性を更新
db.add(db_todo) # 変更を追跡させるためaddするか、または自動追跡に任せる
db.commit()
db.refresh(db_todo)
return db_todo
TODOを削除
def delete_todo(db: Session, db_todo: models.Todo):
db.delete(db_todo)
db.commit()
return {“ok”: True} # 削除成功のレスポンス例
“`
update_todo
関数では、todo_update
Pydanticモデルのデータを使って、既存の db_todo
ORMオブジェクトの属性を更新しています。exclude_unset=True
を使うことで、Pydanticモデルで値が設定されていないフィールド(クライアントがリクエストボディに含めなかったフィールド)は更新対象から除外されます。
4. FastAPIエンドポイント (main.py
)
Todo関連のエンドポイントを追加します。
“`python
main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
DBテーブル作成 (開発時のみ)
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
DBセッション依存関数 (変更なし)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
————- User エンドポイント (省略、必要に応じて修正) ————-
@app.post(“/users/”)…
@app.get(“/users/”)…
@app.get(“/users/{user_id}”)…
————- Todo エンドポイント ————-
@app.post(“/users/{user_id}/todos/”, response_model=schemas.Todo)
def create_todo_for_user(
user_id: int, todo: schemas.TodoCreate, db: Session = Depends(get_db)
):
# ユーザーが存在するか確認 (オプション)
user = crud.get_user(db, user_id=user_id)
if user is None:
raise HTTPException(status_code=404, detail=”User not found”)
# 特定ユーザー向けのTODOを作成
return crud.create_user_todo(db=db, todo=todo, user_id=user_id)
@app.get(“/users/{user_id}/todos/”, response_model=list[schemas.Todo])
def read_todos_for_user(
user_id: int, skip: int = 0, limit: int = 100, db: Session = Depends(get_db)
):
# 特定ユーザーのTODOリストを取得
# ユーザーが存在するか確認 (オプション)
user = crud.get_user(db, user_id=user_id)
if user is None:
raise HTTPException(status_code=404, detail=”User not found”)
todos = crud.get_user_todos(db, user_id=user_id, skip=skip, limit=limit)
return todos
@app.get(“/todos/{todo_id}”, response_model=schemas.Todo)
def read_todo(todo_id: int, db: Session = Depends(get_db)):
# IDでTODOを取得
db_todo = crud.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=404, detail=”Todo not found”)
return db_todo
@app.put(“/todos/{todo_id}”, response_model=schemas.Todo)
def update_todo(todo_id: int, todo: schemas.TodoUpdate, db: Session = Depends(get_db)):
# IDでTODOを取得
db_todo = crud.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=404, detail=”Todo not found”)
# TODOを更新
return crud.update_todo(db=db, db_todo=db_todo, todo_update=todo)
@app.delete(“/todos/{todo_id}”)
def delete_todo(todo_id: int, db: Session = Depends(get_db)):
# IDでTODOを取得
db_todo = crud.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=404, detail=”Todo not found”)
# TODOを削除
crud.delete_todo(db=db, db_todo=db_todo)
return {"ok": True} # 削除成功を示すJSONレスポンス
“`
これで、簡単なTODOアプリのAPIが完成しました。
/users/{user_id}/todos/
POST: 特定のユーザーに新しいTODOを作成します。/users/{user_id}/todos/
GET: 特定のユーザーのTODOリストを取得します。/todos/{todo_id}
GET: 特定のTODOの詳細を取得します。/todos/{todo_id}
PUT: 特定のTODOを更新します。/todos/{todo_id}
DELETE: 特定のTODOを削除します。
各エンドポイントは Depends(get_db)
を通じてデータベースセッションを取得し、crud.py
の対応する関数を呼び出しています。また、存在しないIDへのアクセスなど、エラーが発生しうる箇所では HTTPException
を発生させています。
非同期処理とDB操作
前述の例では、FastAPIの非同期エンドポイント(async def
)内で、同期版SQLAlchemyのセッション(SessionLocal
)を使用しました。FastAPIはこれを検知すると、そのブロッキング処理を別のスレッドプールで実行するように内部的に処理します。これにより、同期DB操作中でもメインのイベントループが完全にブロックされるのを避けることができます。しかし、このスレッドプールへの切り替えにはオーバーヘッドがあり、真の非同期SQLAlchemyに比べるとパフォーマンスは劣る可能性があります。
SQLAlchemy 1.4以降では、非同期I/O (asyncio
) に完全に対応したAPIが提供されています。これにより、FastAPIの async def
エンドポイント内で、ブロッキングなしでデータベース操作を実行できます。
非同期SQLAlchemyの基本
非同期SQLAlchemyを使うには、以下の要素が必要です。
- 非同期DBドライバー:
asyncpg
(PostgreSQL),aiomysql
(MySQL),aiosqlite
(SQLite) などの非同期DBドライバーをインストールします。 - AsyncEngine:
create_async_engine
を使って非同期Engineを作成します。 - AsyncSession:
async_sessionmaker
を使って非同期Sessionを作成します。 await
を使用した操作: 非同期セッションに対するクエリやコミット操作は、await
を付けて実行する必要があります。
“`python
database_async.py (非同期版のデータベース設定)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base # ORMモデル定義は同期版と同じBaseを使用
from sqlalchemy.orm import sessionmaker
非同期データベースURL (asyncpgドライバの例)
DATABASE_URL = “postgresql+asyncpg://user:password@host:port/database”
aiosqlite ドライバの例 (SQLite)
DATABASE_URL = “sqlite+aiosqlite:///./sql_app_async.db”
AsyncEngineの作成
echo=True で実行されるSQLが表示される
async_engine = create_async_engine(DATABASE_URL, echo=True)
非同期SessionLocal クラスを作成
AsyncSessionLocal = sessionmaker(
async_engine, expire_on_commit=False, class_=AsyncSession
)
ORMモデル定義用のBase (これは同期版のBaseと同じ)
Base = declarative_base()
非同期セッションを取得するための依存関数 (async def になる)
async def get_async_db():
async with AsyncSessionLocal() as session: # async with 文でセッションを管理
yield session
“`
非同期CRUD関数の実装
CRUD関数も非同期 (async def
) に変更し、データベース操作に await
を付けます。
“`python
crud_async.py
from sqlalchemy import select # 非同期ORMではselect()を使うことが多い
from sqlalchemy.orm import Session # 型ヒント用だが、実際はAsyncSession
from sqlalchemy.ext.asyncio import AsyncSession
from . import models, schemas
非同期版 get_user
async def get_user(db: AsyncSession, user_id: int):
# await db.query(models.User)… ではなく、 await db.execute(select(models.User).filter(…)) の形式になる
result = await db.execute(select(models.User).filter(models.User.id == user_id))
return result.scalars().first() # scalars()でORMオブジェクトを取得
非同期版 create_user
async def create_user(db: AsyncSession, user: schemas.UserCreate):
fake_hashed_password = user.password + “notreallyhashed”
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
await db.commit() # await が必要
await db.refresh(db_user) # await が必要
return db_user
非同期版 get_user_todos
async def get_user_todos(db: AsyncSession, user_id: int, skip: int = 0, limit: int = 100):
result = await db.execute(
select(models.Todo)
.filter(models.Todo.owner_id == user_id)
.offset(skip)
.limit(limit)
)
return result.scalars().all()
非同期版 create_user_todo
async def create_user_todo(db: AsyncSession, todo: schemas.TodoCreate, user_id: int):
db_todo = models.Todo(**todo.model_dump(), owner_id=user_id)
db.add(db_todo)
await db.commit()
await db.refresh(db_todo)
return db_todo
… 他のCRUD関数も同様にasync def に変更し、await をつける …
async def get_todo(db: AsyncSession, todo_id: int):
result = await db.execute(select(models.Todo).filter(models.Todo.id == todo_id))
return result.scalars().first()
async def update_todo(db: AsyncSession, db_todo: models.Todo, todo_update: schemas.TodoUpdate):
update_data = todo_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_todo, field, value)
# db.add(db_todo) # updateの場合、セッションが追跡しているので不要なことが多い
await db.commit()
await db.refresh(db_todo)
return db_todo
async def delete_todo(db: AsyncSession, db_todo: models.Todo):
await db.delete(db_todo)
await db.commit()
return {“ok”: True}
“`
非同期SQLAlchemy ORMでは、同期版で使っていた db.query(...).all()
や db.query(...).first()
の代わりに、await db.execute(select(...))
を使ってクエリを実行し、結果から result.scalars().all()
や result.scalars().first()
などを使ってORMオブジェクトを取得する形式が推奨されます(SQLAlchemy 2.0のスタイル)。
非同期FastAPIエンドポイント
FastAPIエンドポイントは元々 async def
なので、変更は少なく済みます。データベースセッションの依存性注入を、非同期版の get_async_db
に変更するだけです。
“`python
main_async.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session # 同期Sessionは不要
from sqlalchemy.ext.asyncio import AsyncSession # 非同期Sessionの型ヒント用
from . import crud_async, models, schemas # 非同期版CRUDをインポート
from .database_async import get_async_db, async_engine # 非同期版DB設定からインポート
データベーステーブル作成 (開発時のみ、非同期で実行する必要がある)
async def create_db_tables_async():
async with async_engine.begin() as conn: # async with engine.begin() で非同期トランザクションを開始
# await conn.run_sync(models.Base.metadata.create_all) # 同期的なcreate_allを非同期で実行
# または、SQLAlchemy 2.0のAsyncDDLを使う (より複雑)
# 簡単のため、同期スクリプトで一度だけ作成する方法も検討
# ここではデモンストレーションのため、起動時に毎回実行する例を示す
# (本番ではAlembicなどを使うべき)
# 非同期版のcreate_allは提供されていないため、run_syncを使う
from sqlalchemy import inspect
async with async_engine.connect() as conn:
# データベースが存在するか確認し、存在しない場合のみ作成
inspector = inspect(conn.engine)
# 注意: SQLiteのasyncioはinspectorをasyncで実行できない可能性がある
# 実際のアプリケーションでは、事前に同期スクリプトで作成するのが最も簡単
# 以下は理論的な非同期でのテーブル存在チェック&作成例 (SQLite aiosqliteでは動作しない可能性高)
# exists = await inspector.has_table(models.User.tablename)
# if not exists:
# await conn.run_sync(models.Base.metadata.create_all)
# 実際には、create_db.py
の同期スクリプトを一度実行する方が簡単で確実です。
# または Alembic の async モードを使用します。
# デモ目的のため、起動時にテーブルが存在することを前提とするか、
# または同期スクリプトを事前に実行してください。
# もし必要であれば、async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) を使って、起動時にテーブル作成を試みることは可能です。
# ただし、これはSQLiteのaiosqliteではうまく動作しないことがあります。
# 簡単な方法として、起動時にチェックせず常に create_all を試みる (冪等なので問題ないことが多い)
# async with async_engine.begin() as conn:
# await conn.run_sync(models.Base.metadata.create_all)
pass # テーブル作成部分は別途同期スクリプトで実行済みとする
app = FastAPI()
アプリケーション起動時にテーブル作成を試みる場合 (非同期)
@app.on_event(“startup”)
async def startup_event():
await create_db_tables_async()
非同期DBセッションをDependsで取得
@app.post(“/users/”, response_model=schemas.User)
async def create_user(user: schemas.UserCreate, db: AsyncSession = Depends(get_async_db)):
db_user = await crud_async.get_user_by_email(db, email=user.email) # await が必要
if db_user:
raise HTTPException(status_code=400, detail=”Email already registered”)
return await crud_async.create_user(db=db, user=user) # await が必要
他のエンドポイントも同様に async def に変更し、
依存性を Depends(get_async_db) に、CRUD関数の呼び出しに await をつける
例: GET /users/{user_id}
@app.get(“/users/{user_id}”, response_model=schemas.User)
async def read_user(user_id: int, db: AsyncSession = Depends(get_async_db)):
db_user = await crud_async.get_user(db, user_id=user_id) # await が必要
if db_user is None:
raise HTTPException(status_code=404, detail=”User not found”)
return db_user
例: POST /users/{user_id}/todos/
@app.post(“/users/{user_id}/todos/”, response_model=schemas.Todo)
async def create_todo_for_user(
user_id: int, todo: schemas.TodoCreate, db: AsyncSession = Depends(get_async_db)
):
user = await crud_async.get_user(db, user_id=user_id)
if user is None:
raise HTTPException(status_code=404, detail=”User not found”)
return await crud_async.create_user_todo(db=db, todo=todo, user_id=user_id)
例: GET /users/{user_id}/todos/
@app.get(“/users/{user_id}/todos/”, response_model=list[schemas.Todo])
async def read_todos_for_user(
user_id: int, skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db)
):
user = await crud_async.get_user(db, user_id=user_id)
if user is None:
raise HTTPException(status_code=404, detail=”User not found”)
todos = await crud_async.get_user_todos(db, user_id=user_id, skip=skip, limit=limit)
return todos
例: GET /todos/{todo_id}
@app.get(“/todos/{todo_id}”, response_model=schemas.Todo)
async def read_todo(todo_id: int, db: AsyncSession = Depends(get_async_db)):
db_todo = await crud_async.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=404, detail=”Todo not found”)
return db_todo
例: PUT /todos/{todo_id}
@app.put(“/todos/{todo_id}”, response_model=schemas.Todo)
async def update_todo(todo_id: int, todo: schemas.TodoUpdate, db: AsyncSession = Depends(get_async_db)):
db_todo = await crud_async.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=404, detail=”Todo not found”)
return await crud_async.update_todo(db=db, db_todo=db_todo, todo_update=todo)
例: DELETE /todos/{todo_id}
@app.delete(“/todos/{todo_id}”)
async def delete_todo(todo_id: int, db: AsyncSession = Depends(get_async_db)):
db_todo = await crud_async.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=404, detail=”Todo not found”)
await crud_async.delete_todo(db=db, db_todo=db_todo)
return {“ok”: True}
“`
非同期版に移行することで、I/O処理中にイベントループがブロックされることなく、より高い並行性を実現できます。ただし、非同期SQLAlchemy ORMは同期版に比べてまだ新しい機能であり、書き方が少し異なります(特にクエリ部分)。また、サポートされるDBドライバーの種類も同期版より限られます。
一般的には、簡単なアプリケーションであれば同期版SQLAlchemyでも十分なパフォーマンスが得られることが多いです。しかし、多数の同時接続を捌く必要がある、またはデータベース操作がレスポンスタイムのボトルネックになっているようなケースでは、非同期SQLAlchemyへの移行を検討する価値があります。
発展的なトピック
- マイグレーション (Alembic): データベーススキーマの変更(テーブルやカラムの追加・削除など)を管理するには、Alembicのようなマイグレーションツールを使用するのが一般的です。これにより、安全かつ段階的にデータベースの変更を適用できます。
- リレーションシップの扱い: SQLAlchemyのリレーションシップ機能は非常に強力です。一対一、一対多、多対多などのリレーションシップを定義し、関連オブジェクトを簡単にロードしたり操作したりできます。ロード方法(Lazy Loading, Eager Loading, Joined Loadingなど)を適切に選択することで、パフォーマンスを最適化できます。
- エラーハンドリング: データベース関連のエラー(一意性制約違反、外部キー制約違反など)を捕捉し、FastAPIの
HTTPException
を使って適切なエラーレスポンス(例: 409 Conflict, 400 Bad Request)を返す必要があります。 - 認証・認可: FastAPIには、OAuth2などの認証フローを簡単に実装できる機能が組み込まれています。ユーザー認証を行い、認証されたユーザーに関連するデータのみにアクセスを制限する認可ロジックを実装する必要があります。
- テスト: FastAPIアプリケーションとSQLAlchemy連携部分の単体テスト・結合テストを記述することが重要です。テスト用にインメモリデータベース(SQLite)を使用したり、トランザクションを使ってテスト後にデータベースの状態をロールバックしたりする方法が一般的です。
まとめ
この記事では、Pythonの高速APIフレームワークであるFastAPIと、強力なSQLツールキット・ORMであるSQLAlchemyを組み合わせたアプリケーション開発の基礎を詳細に解説しました。
FastAPIは、そのモダンな設計、非同期処理への対応、Pydanticによる強力なデータ検証、そして自動APIドキュメント生成といった特徴により、API開発の生産性とパフォーマンスを同時に向上させます。SQLAlchemyは、ORMを通じてPythonオブジェクトとしてデータベースを操作できるため、SQLを直接書くよりも直感的でメンテナンスしやすいコードを書くことができます。
両者を組み合わせる際の鍵となるのは、FastAPIの依存性注入システムを活用したデータベースセッションの管理です。リクエストごとにセッションを取得し、処理後に適切に閉じることで、リソースを効率的に利用できます。また、PydanticモデルとSQLAlchemy ORMモデルを連携させることで、APIのリクエスト・レスポンス形式とデータベーススキーマの間のマッピングを明確に定義できます。
CRUD操作を専用のモジュール (crud.py
) に分離し、APIエンドポイント (main.py
) はそのCRUD関数を呼び出すようにすることで、コードの見通しが良くなり、関心の分離が実現されます。
さらに、SQLAlchemyの非同期APIを活用することで、FastAPIの非同期処理能力を最大限に引き出し、高い並行性とスループットを実現できる可能性があることも紹介しました。
FastAPIとSQLAlchemyの組み合わせは、Pythonで高性能かつ保守性の高いAPIを開発するための非常に強力な選択肢です。この記事で紹介した基礎知識を土台に、さらに高度な機能を実装したり、プロジェクトの規模に合わせて設計を洗練させていくことで、より堅牢なアプリケーションを構築できるでしょう。
今後の学習ステップとしては、Alembicによるマイグレーション、SQLAlchemyのリレーションシップのより詳細な使い方、認証・認可の実装、テストの書き方などを学ぶことをお勧めします。FastAPIとSQLAlchemyの公式ドキュメントは非常に充実しており、これらを深く理解するための最良のリソースとなります。
この組み合わせを使いこなし、素晴らしいAPIを開発してください!