FastAPI + SQLAlchemy: 高速API開発とDB連携の基礎

はい、承知いたしました。FastAPIとSQLAlchemyを組み合わせたAPI開発とデータベース連携の基礎について、約5000語の詳細な記事を作成します。


FastAPI + SQLAlchemy: 高速API開発とDB連携の基礎

はじめに

現代のWeb開発において、高速かつスケーラブルなAPIの構築は不可欠です。Pythonの世界では、様々なフレームワークやライブラリがAPI開発のために提供されていますが、近年特に注目を集めているのが「FastAPI」と「SQLAlchemy」の組み合わせです。

FastAPIは、非同期処理に対応し、高いパフォーマンスと生産性を両立させたモダンなWebフレームワークです。その核となる機能は、Pydanticによるデータ検証とシリアライゼーション、そしてStarletteによる高性能なASGI Webフレームワークです。型ヒントを積極的に活用することで、エディタの補完機能やコードの可読性が向上し、さらに自動生成されるAPIドキュメント(Swagger UI, ReDoc)は開発効率を飛躍的に高めます。

一方、SQLAlchemyは、Pythonにおける最も強力で柔軟なデータベースツールキットの一つです。SQLAlchemyはORM(Object-Relational Mapper)機能を提供し、Pythonオブジェクトを通じてデータベースを操作できるようにします。これにより、生SQLを書く手間を省き、よりPythonらしいコードでデータベースとのやり取りが可能になります。また、ORMだけでなく、SQL Expression Languageという低レベルな抽象化レイヤーも提供しており、複雑なクエリも柔軟に構築できます。

FastAPIの高速な非同期処理能力と、SQLAlchemyの堅牢かつ柔軟なデータベース操作能力を組み合わせることで、高性能でメンテナンスしやすいAPIを効率的に開発できます。特に、FastAPIの依存性注入システムは、SQLAlchemyのセッション管理と非常に相性が良く、データベース接続の管理をシンプルに行えます。

この記事では、FastAPIとSQLAlchemyを初めて使う方、あるいはこれらのライブラリを使ったAPI開発の基礎を学びたい方を対象に、それぞれの基本的な使い方から、両者を組み合わせて実践的なAPIを構築する方法までを詳細に解説します。約5000語のボリュームで、概念説明から具体的なコード例までを網羅し、読者がご自身のプロジェクトでFastAPI+SQLAlchemyを活用できるようになることを目指します。

FastAPIの基礎

まずは、FastAPIの基本的な概念と使い方を学びましょう。

FastAPIとは?

FastAPIは、Python 3.7以降で利用できる、モダンで高速な(高いパフォーマンスを発揮する)Webフレームワークです。主な特徴は以下の通りです。

  • 高速: Starlette(ASGIフレームワーク)とPydantic(データ検証ライブラリ)を基盤としており、非常に高いスループットを実現します。NodeJSやGoに匹敵する性能を持つとされています。
  • 高い開発効率: Pythonの型ヒントをフル活用しており、エディタの補完機能が強力に効きます。これにより、typoや引数の誤りを減らし、開発速度が向上します。
  • 自動ドキュメント: コードに型ヒントを書くだけで、インタラクティブなAPIドキュメント(Swagger UIとReDoc)が自動生成されます。これはAPIの仕様確認やテストに非常に役立ちます。
  • データ検証: Pydanticモデルを使用することで、リクエストデータの構造と型を定義し、自動的に検証・変換を行います。無効なリクエストは自動的にエラーレスポンスとして返されます。
  • 依存性注入: 依存性注入システムが組み込まれており、共通のロジック(データベース接続、認証など)を簡単に再利用できます。
  • 非同期処理: async / await 構文をサポートしており、非同期I/Oを効率的に扱えます。データベース操作のようなI/Oバウンドな処理でパフォーマンスを発揮します。

インストール

FastAPIを使うには、まずインストールが必要です。ASGIサーバーとして、一般的にはUvicornを使います。

bash
pip install fastapi uvicorn[standard]

基本的な使い方:Hello World API

FastAPIの基本的な構造を見てみましょう。

“`python

main.py

from fastapi import FastAPI

app = FastAPI()

@app.get(“/”)
async def read_root():
return {“Hello”: “World”}

@app.get(“/items/{item_id}”)
async def read_item(item_id: int, q: str = None):
return {“item_id”: item_id, “q”: q}
“`

このコードを main.py という名前で保存します。
次に、Uvicornを使ってこのアプリケーションを実行します。

bash
uvicorn main:app --reload

--reload オプションは、コードの変更を検知してサーバーを自動的に再起動します。

ブラウザで http://127.0.0.1:8000/ にアクセスすると {"Hello": "World"} が表示されます。
http://127.0.0.1:8000/items/5?q=somequery にアクセスすると {"item_id": 5, "q": "somequery"} が表示されます。
http://127.0.0.1:8000/docs にアクセスすると自動生成されたSwagger UI、http://127.0.0.1:8000/redoc にアクセスするとReDocが表示されます。

ルーティング、パスパラメータ、クエリパラメータ

  • ルーティング: app.get(), app.post(), app.put(), app.delete() などのデコレータを使って、特定のHTTPメソッドとURLパスに関数を紐付けます。
  • パスパラメータ: URLパスの一部をパラメータとして受け取るには、パス内で {parameter_name} のように指定し、関数の引数として同名のパラメータを受け取ります。型ヒントを付けることで、FastAPIが自動的に型変換と検証を行います(例: item_id: int)。
  • クエリパラメータ: URLの ?key1=value1&key2=value2 の形式で渡されるパラメータです。関数の引数として、デフォルト値付きで定義することで受け取れます(例: q: str = None)。デフォルト値がない場合は必須パラメータとなります。

リクエストボディ(Pydanticモデルの利用)

POST, PUTなどのメソッドで送信されるリクエストボディのデータは、Pydanticモデルを使って定義・検証するのが一般的です。

まず、Pydanticモデルを定義します。

“`python

schemas.py (ファイル名を分けるのは一般的な慣習)

from pydantic import BaseModel

class Item(BaseModel):
name: str
price: float
is_offer: bool = None
“`

次に、FastAPIのエンドポイントでこのモデルを引数として受け取ります。

“`python

main.py (schemas.py と同じディレクトリにあると仮定)

from fastapi import FastAPI
from .schemas import Item # schemas.py からインポート

app = FastAPI()

… read_root, read_item エンドポイント …

@app.post(“/items/”)
async def create_item(item: Item):
return item
“`

このエンドポイントにPOSTリクエストを送信する際、リクエストボディが Item モデルの構造(name: str, price: float, is_offer: bool optional)に合致しているかFastAPIが自動的に検証します。合致しない場合は、適切なエラーレスポンスが返されます。

レスポンスモデル

FastAPIはレスポンスの形式もPydanticモデルで定義することを推奨しています。これにより、APIの出力構造が明確になり、自動ドキュメントにも反映されます。

“`python

schemas.py

from pydantic import BaseModel

class Item(BaseModel):
name: str
price: float
is_offer: bool = None

class ItemResponse(BaseModel): # レスポンス用のモデル
id: int
name: str
price: float
is_offer: bool = None
# その他のレスポンスに含めたいフィールド…

class Config: # ORMモードを有効にする
    orm_mode = True
    from_attributes = True # Pydantic v2以降はこちらを使用

“`

ItemResponseConfig クラスにある orm_mode = True (Pydantic v2以降は from_attributes = True) は重要です。これは、このPydanticモデルがORMオブジェクト(SQLAlchemyモデルのインスタンスなど)からデータを読み取れるようにします。SQLAlchemyモデルは属性アクセス(例: item.name)でデータにアクセスしますが、Pydanticモデルは辞書のようにキーアクセス(例: item_dict['name'])でアクセスするのが基本です。orm_mode = True を設定することで、PydanticはORMオブジェクトの属性も読み取れるようになります。

エンドポイント関数に response_model 引数を指定します。

“`python

main.py

from fastapi import FastAPI
from .schemas import Item, ItemResponse # ItemResponse もインポート

app = FastAPI()

… create_item エンドポイント …

Dummy data for demonstration

items_db = {
1: {“id”: 1, “name”: “Foo”, “price”: 50.5, “is_offer”: False},
2: {“id”: 2, “name”: “Bar”, “price”: 120.0, “is_offer”: True},
}

@app.get(“/items/{item_id}”, response_model=ItemResponse)
async def read_item_response(item_id: int):
# 実際にはDBから取得する
item_data = items_db.get(item_id)
if item_data:
return item_data # Pydanticは辞書からItemResponseモデルに変換しようとする
# 存在しない場合は適切なエラーを返す(後述)
return None # または HTTPException を raise する
“`

このように response_model を指定すると、FastAPIはエンドポイント関数の戻り値を自動的に指定されたPydanticモデルに変換し、JSONレスポンスとして返します。これにより、レスポンスの形式が保証されます。

非同期処理 (async def, await)

FastAPIはASGI(Asynchronous Server Gateway Interface)標準に基づいています。これにより、非同期I/O(データベースアクセス、外部API呼び出しなど)を効率的に扱うことができます。Pythonでは async defawait 構文を使って非同期処理を記述します。

エンドポイント関数を async def で定義すると、その関数内で await を使って非同期処理を実行できます。非同期処理中にCPUがブロックされることなく、他のリクエストを処理できるため、特にI/Oバウンドなアプリケーションで高いパフォーマンスを発揮します。

データベース操作は典型的なI/Oバウンドな処理です。SQLAlchemyもバージョン1.4以降で非同期対応(AsyncIO)が強化されました。しかし、SQLAlchemyの標準的な使い方(同期API)はブロッキングI/Oです。FastAPIの async def エンドポイント内で同期SQLAlchemyの操作を行うと、データベース操作中にイベントループがブロックされてしまい、非同期処理のメリットが損なわれる可能性があります。

注意点: 非同期FastAPIエンドポイント内でブロッキングI/O(例: 同期版SQLAlchemy、requestsライブラリなど)を実行すると、その操作が完了するまでイベントループがブロックされ、他の非同期タスクやリクエストの処理が停止してしまいます。FastAPIはこれを検知すると、そのブロッキング処理を別のスレッドで実行するように試みますが、基本的には非同期エンドポイントでは非同期ライブラリを使うのが最も効率的です。

この記事の後半では、同期版SQLAlchemyをFastAPIと連携させる方法を主に解説しますが、非同期版SQLAlchemyについても触れます。

依存性注入 (Dependency Injection)

FastAPIの最も強力な機能の一つが依存性注入システムです。これは、エンドポイント関数や他の依存関数(dependency function)の引数として、様々なリソースやオブジェクト(データベースセッション、設定値、認証されたユーザーなど)を自動的に提供する仕組みです。

依存性を定義するには、単なる関数(またはクラスなど、呼び出し可能なオブジェクト)を作成し、それをFastAPIの Depends 関数で指定します。

“`python
from fastapi import FastAPI, Depends, Header

app = FastAPI()

async def get_token_header(x_token: str = Header(…)):
if x_token != “fake-super-secret-token”:
raise HTTPException(status_code=400, detail=”X-Token header invalid”)
return x_token

@app.get(“/items/”, dependencies=[Depends(get_token_header)]) # 依存性をリストで指定
async def read_items():
return [{“item”: “Foo”}, {“item”: “Bar”}]

あるいは、引数として直接依存性を定義

async def get_current_user(token: str = Depends(get_token_header)):
# token を使ってユーザーを特定するロジック
user = {“username”: “testuser”, “token”: token}
return user

@app.get(“/users/me”)
async def read_users_me(current_user: dict = Depends(get_current_user)):
return current_user
“`

依存性注入は、データベースセッションの管理に特に役立ちます。リクエストごとに新しいセッションを作成し、処理終了時に適切に閉じたりロールバックしたりするロジックを、各エンドポイント関数から切り離して、依存関数として一元管理できます。

自動ドキュメント (Swagger UI, ReDoc)

FastAPIは、コード中の型ヒントやPydanticモデルの情報から、OpenAPI標準に基づいたドキュメントを自動生成します。デフォルトで /docs にSwagger UI、/redoc にReDocが提供されます。

これは開発チーム内でのAPI仕様共有や、クライアント開発者への情報提供に非常に便利です。Swagger UIからは、各エンドポイントの詳細を確認したり、実際にリクエストを送信してAPIを試したりすることも可能です。

SQLAlchemyの基礎

次に、SQLAlchemyの基本的な概念と使い方を学びましょう。この記事では、ORM(Object-Relational Mapper)機能を中心に解説します。

SQLAlchemyとは?

SQLAlchemyは、PythonのSQLツールキットおよびORMです。主な特徴は以下の通りです。

  • 柔軟性: ORMとSQL Expression Languageの二つの主要なモードを提供します。ORMはPythonオブジェクトとしてデータベース操作を抽象化し、Expression Languageはより低レベルでSQLに近い柔軟なクエリ構築を可能にします。
  • 強力なORM: Pythonクラスをデータベーステーブルにマッピングし、オブジェクト指向的にデータのCRUD(Create, Read, Update, Delete)操作を行えます。リレーションシップ(一対多、多対多など)の定義と操作も強力にサポートします。
  • データベース非依存: 様々なデータベース(PostgreSQL, MySQL, SQLite, Oracle, MS-SQLなど)に対応しており、データベース固有の構文を意識することなく操作できます。
  • コネクションプール: データベース接続の管理を効率的に行い、パフォーマンスを向上させます。
  • トランザクション: データベース操作をトランザクションとして扱い、原子性、一貫性、独立性、永続性(ACID特性)を保証できます。

インストール

SQLAlchemyを使うには、まずインストールが必要です。対象のデータベースに応じたDBドライバーもインストールする必要があります。ここではSQLiteを例に進めます。

bash
pip install sqlalchemy
pip install databases # 後述の非同期対応ライブラリ(これはSQLAlchemy Core向けだが、ORMと組み合わせることも)

データベース接続 (Engine)

データベースへの接続は、create_engine 関数を使って行います。これはデータベースとの主要な通信ポイントである Engine オブジェクトを作成します。

“`python
from sqlalchemy import create_engine

SQLiteの場合

DATABASE_URL = “sqlite:///./test.db” # .envなどで管理するのが一般的

Engineの作成

connect_args={“check_same_thread”: False} はSQLiteで必要な設定(FastAPIで複数リクエストが同時に発生する場合)

engine = create_engine(DATABASE_URL, connect_args={“check_same_thread”: False})
“`

create_engine はデータベースURL(通常は dialect+driver://user:password@host:port/database の形式)を受け取ります。

メタデータとテーブル定義 (Declarative Base)

SQLAlchemy ORMでは、Pythonクラスとしてデータベーステーブルのスキーマを定義します。これを「Declarative Base」と呼びます。

“`python
from sqlalchemy.ext.declarative import declarative_base # SQLAlchemy 1.x 形式

SQLAlchemy 2.0 形式: from sqlalchemy.orm import DeclarativeBase

from sqlalchemy import Column, Integer, String, Boolean

Declarative Base クラスを作成

SQLAlchemy 1.x

Base = declarative_base()

SQLAlchemy 2.0

class Base(DeclarativeBase):

pass

データベースモデル(テーブル定義)

class Item(Base):
tablename = “items” # テーブル名

id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
price = Column(Integer) # デモンストレーションのためIntegerとする
is_offer = Column(Boolean, default=None)

“`

  • declarative_base() または DeclarativeBase を継承したクラス Base を作成します。これは、後続で定義する全てのORMモデルの基盤となります。
  • Item クラスは Base を継承し、__tablename__ 属性で対応するデータベーステーブル名を指定します。
  • クラス属性として Column オブジェクトを定義し、カラム名、データ型、その他の制約(主キー primary_key=True、インデックス index=True、デフォルト値 default など)を指定します。

このモデル定義は、データベーステーブルをPythonオブジェクトとして扱うためのマッピング情報となります。

データベースに実際にテーブルを作成するには、Base.metadata.create_all(engine) を実行します。これは、Base に紐付けられた全てのモデル定義に対応するテーブルをデータベース上に作成します(既に存在する場合は何もしません)。

“`python

テーブル作成スクリプト(例: create_db.py)

from sqlalchemy import create_engine
from .models import Base # 上記で定義したBaseクラス

DATABASE_URL = “sqlite:///./test.db”
engine = create_engine(DATABASE_URL, connect_args={“check_same_thread”: False})

def create_database_tables():
print(“Creating database tables…”)
Base.metadata.create_all(bind=engine)
print(“Database tables created.”)

if name == “main“:
create_database_tables()
“`

このスクリプトを一度実行すれば、データベースファイル (./test.db) が作成され、items テーブルが生成されます。

セッション (SessionLocal)

SQLAlchemy ORMを使ってデータベース操作を行うには、「セッション」が必要です。セッションは、データベースとの対話の全ての段階(クエリの発行、オブジェクトのロード、変更の永続化など)を管理する、作業領域のようなものです。

セッションは通常、リクエストごと、または一連の関連するデータベース操作ごとに作成され、操作が完了したら閉じられます。セッションを使い終わったら必ず閉じることが重要です。さもないと、リソースリークや予期しない振る舞いを引き起こす可能性があります。

セッションを作成するには、sessionmaker 関数を使ってセッションファクトリを作成し、それを使ってセッションインスタンスを生成します。

“`python
from sqlalchemy.orm import sessionmaker

sessionmaker を使ってセッションファクトリを作成

autocommit=False: commit() を呼び出すまで変更を確定しない

autoflush=False: query() などの前に自動的に flush() を実行しない

bind=engine: このセッションファクトリが使うEngineを指定

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

セッションを取得する(例)

db = SessionLocal()

try:

# データベース操作

pass

finally:

db.close()

“`

SessionLocal は、新しいセッションオブジェクトを作成するためのクラス(またはファクトリ)として定義されます。これをFastAPIの依存性注入システムと組み合わせて使うことで、リクエストごとにセッションを簡単に取得・管理できます。

CRUD操作の基本

セッションを使って、データベースモデルのインスタンスを操作することで、CRUD操作を行います。

データの挿入 (Create)

ORMモデルのインスタンスを作成し、セッションに追加 (add) し、コミット (commit) します。

“`python
from .models import Item # 前述のItemモデル

セッションを取得していると仮定 (db は SessionLocal() で取得したセッションオブジェクト)

db_item = Item(name=”New Item”, price=100)
db.add(db_item) # セッションに追加
db.commit() # データベースに変更を確定
db.refresh(db_item) # データベースに保存された最新の状態をオブジェクトに反映(特にidなどが自動生成される場合)

print(f”Created item with ID: {db_item.id}”)
“`

db.add() はオブジェクトをセッションの「保留中 (pending)」状態にします。db.commit() が呼び出されると、保留中の変更がデータベースに書き込まれ、トランザクションが完了します。db.refresh(db_item) は、データベースからの情報をオブジェクトに再度読み込みます。特に主キーが自動生成される場合などに、オブジェクトのIDを知るために使われます。

データの取得 (Read)

セッションの query() メソッドを使ってクエリを構築します。

“`python

全て取得

items = db.query(Item).all()
print(f”All items: {items}”) # オブジェクトのリスト

IDで一つ取得

item_id = 1
item = db.query(Item).filter(Item.id == item_id).first()
if item:
print(f”Item with ID {item_id}: {item.name}”)

条件に一致する複数件を取得

items_over_50 = db.query(Item).filter(Item.price > 50).all()
print(f”Items over 50: {[i.name for i in items_over_50]}”)

特定のカラムだけを取得(タプルのリストになる)

item_names = db.query(Item.name).all()
print(f”Item names: {item_names}”)
“`

  • db.query(Item)Item モデルを対象としたクエリを開始します。
  • filter() は WHERE 句に相当する条件を指定します。条件式は SQLAlchemy の式言語を使って記述します(例: Item.id == item_id)。
  • all() は条件に一致する全ての行を、ORMオブジェクトのリストとして取得します。
  • first() は条件に一致する最初の行を、ORMオブジェクトとして取得します。一致する行がない場合は None を返します。

データの更新 (Update)

取得したORMオブジェクトの属性を変更し、セッションをコミットします。

“`python

IDでアイテムを取得

item_id = 1
item = db.query(Item).filter(Item.id == item_id).first()

if item:
item.price = 150 # 属性値を変更
db.commit() # 変更をデータベースに確定
db.refresh(item) # オブジェクトの状態を最新に更新
print(f”Updated item {item.id}: new price is {item.price}”)
“`

ORMオブジェクトの属性を変更すると、その変更はセッションによって追跡されます。db.commit() が呼び出されたときに、変更された属性に対応する UPDATE 文が自動的に生成され、データベースに実行されます。

データの削除 (Delete)

取得したORMオブジェクトをセッションから削除 (delete) し、コミットします。

“`python

IDでアイテムを取得

item_id = 1
item = db.query(Item).filter(Item.id == item_id).first()

if item:
db.delete(item) # セッションから削除対象としてマーク
db.commit() # データベースから削除を確定
print(f”Deleted item with ID: {item_id}”)
“`

db.delete(item) はオブジェクトをセッションの「削除待ち (deleted)」状態にします。db.commit() が呼び出されたときに、対応する DELETE 文が自動的に生成され、データベースに実行されます。

トランザクション

SQLAlchemyのセッションはデフォルトでトランザクションを管理します。SessionLocal を作成する際に autocommit=False と設定している場合、db.commit() を呼び出すまで、そのセッション上で行われた全ての変更は一つのトランザクションとして扱われます。

途中でエラーが発生した場合など、変更をデータベースに永続化させたくない場合は、db.rollback() を呼び出すことで、そのセッション開始以降に行われた全ての操作を取り消すことができます。

通常、FastAPIのエンドポイントでは、リクエスト処理の開始時にセッションを取得し、処理中にエラーが発生した場合はロールバック、成功した場合はコミット、そして最後にセッションを閉じる、という流れになります。これはFastAPIの依存性注入システムで効率的に実現できます。

FastAPIとSQLAlchemyの連携

いよいよ、FastAPIとSQLAlchemyを組み合わせてAPIを構築する方法を解説します。ここでは、典型的なプロジェクト構成を採用し、各コンポーネントの役割を明確にします。

プロジェクト構造の設計案

大規模なアプリケーション開発では、コードを複数のファイルに分割し、役割ごとに整理することが重要です。以下のような構造が一般的です。

.
├── main.py # FastAPIアプリケーションのエントリポイント、ルーティング
├── database.py # DB接続設定、SessionLocal、Base定義
├── models.py # SQLAlchemy ORMモデル定義
├── schemas.py # Pydanticモデル定義(リクエスト/レスポンスのスキーマ)
├── crud.py # データベース操作(CRUD)関数群
└── requirements.txt # プロジェクトの依存関係リスト

  • main.py: FastAPIアプリケーションインスタンスを作成し、ルーター(APIエンドポイント)を登録します。依存性注入の設定などもここで行うことがあります。
  • database.py: データベースURLの設定、create_enginesessionmaker (SessionLocal)、declarative_base (Base) などのデータベース関連の初期設定を行います。
  • models.py: SQLAlchemyのORMモデル(テーブル定義)を記述します。
  • schemas.py: Pydanticモデルを記述します。これらはAPIのリクエストボディの検証やレスポンスのシリアライズに使われます。
  • crud.py: データベースに対する具体的なCRUD操作を行う関数群を記述します。これらの関数はセッションを引数として受け取り、models.py で定義されたORMモデルを使って操作を行います。この層は、APIのエンドポイントロジックからデータベース操作の詳細を隠蔽します。

データベース接続の設定 (database.py)

まずはデータベース接続のための基本設定を行います。

“`python

database.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base # SQLAlchemy 1.x

from sqlalchemy.orm import DeclarativeBase # SQLAlchemy 2.0

from sqlalchemy.orm import sessionmaker

データベースURL。SQLiteを使用。環境変数などで管理するのが望ましい

相対パスの場合、FastAPI実行時のカレントディレクトリからのパスになる点に注意

DATABASE_URL = “sqlite:///./sql_app.db”

Engineの作成

SQLiteの場合、check_same_thread=False を指定しないと複数リクエスト時にエラーになる

connect_args={“check_same_thread”: False} はSQLiteに特有の設定であり、

PostgreSQLなどの他のDBでは不要。

engine = create_engine(
DATABASE_URL, connect_args={“check_same_thread”: False}
)

SessionLocal クラスを作成

autocommit=False: commit() を呼び出すまで変更を確定しない

autoflush=False: query() などの前に自動的に flush() を実行しない

bind=engine: 作成したEngineにバインド

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Declarative Base クラスを作成 (SQLAlchemy 1.x)

Base = declarative_base()

SQLAlchemy 2.0の場合:

class Base(DeclarativeBase):

pass

このBaseクラスをmodels.pyで使用してテーブルを定義する

“`

データベースモデルの定義 (models.py)

次に、データベースのテーブル構造をSQLAlchemy ORMモデルとして定義します。ここではユーザーとアイテムという単純なモデルを例にします。

“`python

models.py

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship # リレーションシップ用

from .database import Base # database.py で定義したBaseクラスをインポート

User モデル

class User(Base):
tablename = “users”

id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)

# リレーションシップ: このユーザーが所有するItemのリスト
items = relationship("Item", back_populates="owner")

Item モデル

class Item(Base):
tablename = “items”

id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id")) # 外部キー

# リレーションシップ: このアイテムを所有するUser
owner = relationship("User", back_populates="items")

“`

  • ForeignKey("users.id"): items テーブルの owner_id カラムが users テーブルの id カラムを参照する外部キーであることを示します。
  • relationship("Item", back_populates="owner"): User モデルから関連する Item オブジェクトへのアクセスを定義します。back_populates="owner" は、Item モデルの owner リレーションシップに対応することを示します。双方向のリレーションシップを定義することで、user.items でユーザーのアイテムリストを取得したり、item.owner でアイテムの所有者ユーザーを取得したりできます。

Pydanticモデルの定義 (schemas.py)

APIのリクエストとレスポンスのデータ形式をPydanticモデルで定義します。

“`python

schemas.py

from pydantic import BaseModel

————- Item 関連 ————-

Item作成時のリクエストボディ用スキーマ

class ItemCreate(BaseModel):
title: str
description: str | None = None # Python 3.10 以降の Union shorthand

Itemの基本スキーマ(レスポンスや他のモデルで継承して使う)

class ItemBase(BaseModel):
title: str
description: str | None = None

Itemレスポンス用スキーマ

class Item(ItemBase): # ItemBase を継承
id: int
owner_id: int

class Config:
    orm_mode = True # SQLAlchemy ORMモデルからのデータ読み込みを許可
    from_attributes = True # Pydantic v2以降はこちらを使用

————- User 関連 ————-

User作成時のリクエストボディ用スキーマ

class UserCreate(BaseModel):
email: str
password: str

Userの基本スキーマ

class UserBase(BaseModel):
email: str

Userレスポンス用スキーマ

class User(UserBase): # UserBase を継承
id: int
is_active: bool
items: list[Item] = [] # このユーザーが所有するItemのリスト

class Config:
    orm_mode = True # SQLAlchemy ORMモデルからのデータ読み込みを許可
    from_attributes = True # Pydantic v2以降はこちらを使用

“`

  • ItemCreateUserCreate は、クライアントがPOSTやPUTリクエストで送信するデータの構造を定義します。
  • ItemUser は、APIからのレスポンスとして返されるデータの構造を定義します。これらのモデルは、SQLAlchemy ORMモデルからデータを読み込むために Config クラスで orm_mode = True (または from_attributes = True) を設定しています。
  • User スキーマの items: list[Item] = [] は、ユーザーレスポンスにそのユーザーが所有するアイテムのリストを含めることを示しています。PydanticはSQLAlchemyのUserモデルから関連する items リレーションシップを通じてデータを取得し、それぞれのItemオブジェクトをItemスキーマに変換します。

CRUD操作の実装 (crud.py)

データベースに対する具体的な操作(CRUD関数)を実装します。これらの関数は、FastAPIのエンドポイントから呼び出されます。

“`python

crud.py

from sqlalchemy.orm import Session # SQLAlchemyのSession型ヒント用

from . import models, schemas # models.py, schemas.py をインポート

————- User 関連 CRUD ————-

IDでユーザーを取得

def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()

メールアドレスでユーザーを取得

def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()

全ユーザーを取得

def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()

ユーザーを作成

def create_user(db: Session, user: schemas.UserCreate):
# パスワードはハッシュ化するのが一般的だが、ここでは簡略化
fake_hashed_password = user.password + “notreallyhashed”
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user) # データベースに保存された状態(IDなど)をオブジェクトに反映
return db_user

————- Item 関連 CRUD ————-

特定のユーザーのアイテムを取得

def get_user_items(db: Session, user_id: int, skip: int = 0, limit: int = 100):
return db.query(models.Item).filter(models.Item.owner_id == user_id).offset(skip).limit(limit).all()

アイテムを作成

def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
db_item = models.Item(item.model_dump(), owner_id=user_id) # Pydantic v2: item.model_dump()
# Pydantic v1: db_item = models.Item(
item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item

IDでアイテムを取得

def get_item(db: Session, item_id: int):
return db.query(models.Item).filter(models.Item.id == item_id).first()

全アイテムを取得

def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Item).offset(skip).limit(limit).all()

… update, delete 関数などもここに追加する …

“`

  • 各関数は最初の引数として db: Session を受け取ります。これはFastAPIの依存性注入によって提供されるデータベースセッションです。
  • これらの関数は、models.py のORMモデルと schemas.py のPydanticモデルを引数や戻り値の型として使用します。
  • データベース操作(query(), add(), commit(), refresh(), delete() など)は全て db セッションオブジェクトを通じて行われます。
  • create_user_item 関数では、PydanticモデルのデータをORMモデルのインスタンス作成に渡しています。item.model_dump() (Pydantic v2) または item.dict() (Pydantic v1) は、Pydanticモデルのフィールドを辞書として返します。

FastAPIエンドポイントの実装 (main.py)

最後に、FastAPIのエンドポイントを定義し、依存性注入を使ってデータベースセッションを取得し、crud.py の関数を呼び出します。

“`python

main.py

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas # crud.py, models.py, schemas.py をインポート
from .database import SessionLocal, engine # database.py からSessionLocalとengineをインポート

データベーステーブルを全て作成

本番環境ではマイグレーションツール(Alembicなど)を使うのが一般的

models.Base.metadata.create_all(bind=engine)

app = FastAPI()

DBセッションを取得するための依存関数

def get_db():
db = SessionLocal() # 新しいセッションを作成
try:
yield db # セッションを提供
finally:
db.close() # リクエスト処理が完了したらセッションを閉じる

————- User エンドポイント ————-

@app.post(“/users/”, response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): # 依存性注入でDBセッションを取得
# メールアドレスが既に使われていないかチェック
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail=”Email already registered”)
# ユーザーを作成し、作成したユーザーオブジェクトを返す
return crud.create_user(db=db, user=user)

@app.get(“/users/”, response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
# 全ユーザーを取得して返す
users = crud.get_users(db, skip=skip, limit=limit)
return users

@app.get(“/users/{user_id}”, response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
# IDでユーザーを取得して返す
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail=”User not found”)
return db_user

————- Item エンドポイント ————-

@app.post(“/users/{user_id}/items/”, response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
# 特定ユーザー向けのアイテムを作成
return crud.create_user_item(db=db, item=item, user_id=user_id)

@app.get(“/items/”, response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
# 全アイテムを取得して返す
items = crud.get_items(db, skip=skip, limit=limit)
return items

@app.get(“/items/{item_id}”, response_model=schemas.Item)
def read_item(item_id: int, db: Session = Depends(get_db)):
# IDでアイテムを取得して返す
db_item = crud.get_item(db, item_id=item_id)
if db_item is None:
raise HTTPException(status_code=404, detail=”Item not found”)
return db_item

… update, delete エンドポイントなども追加 …

“`

  • models.Base.metadata.create_all(bind=engine): アプリケーション起動時にデータベーステーブルを全て作成します。注意: これは開発時のみに使い、本番環境では Alembic のようなマイグレーションツールを使うべきです。
  • get_db() 関数は、データベースセッションを取得するための依存関数です。SessionLocal() で新しいセッションを作成し、yield db でFastAPIにセッションを提供します。try...finally ブロックを使うことで、リクエスト処理が完了した後(成功しても失敗しても)必ず db.close() が呼び出され、セッションが適切に閉じられます。これはFastAPIの依存性注入がジェネレーターや asynccontextmanager をサポートしているおかげです。
  • 各エンドポイント関数の引数に db: Session = Depends(get_db) を追加することで、リクエストごとに新しいデータベースセッションが自動的に取得され、そのセッションが db 引数として関数に渡されます。
  • リクエストボディやレスポンスの型ヒントに schemas.py で定義したPydanticモデルを使用しています。これにより、FastAPIによる自動的なデータ検証、シリアライズ、ドキュメント生成が行われます。
  • データベース操作は、直接SQLAlchemyのクエリを書くのではなく、crud.py で定義した関数を呼び出す形にしています。これにより、エンドポイントのロジックとデータベース操作のロジックが分離され、コードの見通しが良くなります。
  • response_model にPydanticモデルを指定することで、返されるデータが自動的に指定されたスキーマに変換されます。SQLAlchemyのORMオブジェクトを返す場合、response_model が設定されたPydanticモデルの orm_mode = True (または from_attributes = True) により、オブジェクトの属性からデータを読み取ってスキーマにマッピングしてくれます。
  • データが存在しない場合など、エラーが発生する可能性のある箇所では HTTPException を raise しています。FastAPIはこれを捕捉し、適切なHTTPステータスコードとJSONレスポンスを自動的に返します。

プロジェクトを実行する

プロジェクトを実行するには、以下の手順を行います。

  1. 必要なライブラリをインストールします(requirements.txt を作成し、pip install -r requirements.txt を実行するのが良いでしょう)。
    fastapi
    uvicorn[standard]
    sqlalchemy
  2. create_db.py のようなスクリプト(models.Base.metadata.create_all(bind=engine) を実行するもの)を作成して実行し、データベースファイルとテーブルを作成します。

    “`python

    create_db.py (main.pyと同じディレクトリに配置)

    from .database import Base, engine
    from . import models # models.py をインポートしてテーブル定義を読み込む

    def create_db_tables():
    print(“Creating database tables…”)
    # Baseに登録されている全てのモデル定義に対応するテーブルを作成
    Base.metadata.create_all(bind=engine)
    print(“Database tables created.”)

    if name == “main“:
    create_db_tables()
    ``
    コマンドラインで実行:
    python -m src.create_db(もしプロジェクトを src ディレクトリ配下に置いている場合) またはpython create_db.py` (トップディレクトリに置いている場合)。

  3. FastAPIアプリケーションをUvicornで起動します。

    bash
    uvicorn main:app --reload

これで、http://127.0.0.1:8000/docs にアクセスして自動生成されたAPIドキュメントを確認し、APIをテストできるようになります。

実践的な連携例:簡単なTODOアプリ

上記の構成を使って、より具体的なTODOアプリを構築する手順を見ていきましょう。ユーザー機能は前述のものを利用し、各ユーザーがTODOアイテムを持つ形にします。

1. テーブル定義 (models.py)

Userモデルはそのまま使い、Todoモデルを追加します。

“`python

models.py

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base # database.py からBaseをインポート

User モデル (変更なし)

class User(Base):
tablename = “users”
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship(“Item”, back_populates=”owner”) # 仮にItemと名付けていたが、Todoに名前を変える方が自然かも

Todo モデル

class Todo(Base): # Itemの名前をTodoに変えても良い
tablename = “todos” # テーブル名を todos とする

id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String, index=True) # descriptionを追加
completed = Column(Boolean, default=False) # 完了フラグを追加
owner_id = Column(Integer, ForeignKey("users.id")) # 外部キー

# リレーションシップ
owner = relationship("User", back_populates="todos") # Userモデル側のリレーション名と一致させる

“`

User モデルのリレーションシップ名を todos に変更し、Todo モデルに descriptioncompleted カラムを追加しました。

2. スキーマ定義 (schemas.py)

Todo関連のPydanticスキーマを定義します。

“`python

schemas.py

from pydantic import BaseModel

————- Todo 関連 ————-

Todo作成時のリクエストボディ用スキーマ

class TodoCreate(BaseModel):
title: str
description: str | None = None # descriptionを追加

Todo更新時のリクエストボディ用スキーマ

class TodoUpdate(BaseModel):
title: str | None = None
description: str | None = None
completed: bool | None = None

Todoの基本スキーマ(レスポンス用)

class TodoBase(BaseModel):
title: str
description: str | None = None
completed: bool = False # completedを追加

Todoレスポンス用スキーマ

class Todo(TodoBase): # TodoBase を継承
id: int
owner_id: int

class Config:
    orm_mode = True
    from_attributes = True

————- User 関連 ————-

UserCreate, UserBase, User はそのまま (User モデルのリレーション名itemsをtodosに変更した場合はこちらも修正)

例:Userレスポンス用スキーマ

class User(BaseModel):
id: int
email: str
is_active: bool
todos: list[Todo] = [] # リレーションシップ名を todos に変更

class Config:
    orm_mode = True
    from_attributes = True

“`

TodoUpdate スキーマを追加しました。これはPUTやPATCHリクエストで、一部のフィールドのみを更新する場合に使えます。フィールドをOptional (| None) にすることで、リクエストボディに含まれないフィールドは更新しないようにできます。
User スキーマでは、関連するアイテムのリスト名を todos に変更しました。

3. CRUD関数 (crud.py)

Todo関連のCRUD関数を実装します。

“`python

crud.py

from sqlalchemy.orm import Session
from . import models, schemas

————- User 関連 CRUD (省略、必要に応じて修正) ————-

def get_user…

def get_user_by_email…

def get_users…

def create_user…

————- Todo 関連 CRUD ————-

特定のユーザーのTODOリストを取得

def get_user_todos(db: Session, user_id: int, skip: int = 0, limit: int = 100):
return db.query(models.Todo).filter(models.Todo.owner_id == user_id).offset(skip).limit(limit).all()

TODOを作成

def create_user_todo(db: Session, todo: schemas.TodoCreate, user_id: int):
# TodoCreate スキーマから ORM モデルインスタンスを作成
db_todo = models.Todo(todo.model_dump(), owner_id=user_id) # Pydantic v2
# Pydantic v1: db_todo = models.Todo(
todo.dict(), owner_id=user_id)
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo

IDでTODOを取得

def get_todo(db: Session, todo_id: int):
return db.query(models.Todo).filter(models.Todo.id == todo_id).first()

全TODOを取得 (任意、通常はユーザーごとで十分だがデバッグなどに)

def get_todos(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Todo).offset(skip).limit(limit).all()

TODOを更新

def update_todo(db: Session, db_todo: models.Todo, todo_update: schemas.TodoUpdate):
# Pydanticモデルのデータから、ORMモデルの属性を更新
update_data = todo_update.model_dump(exclude_unset=True) # Pydantic v2: exclude_unset=True でNoneでないフィールドだけ更新
# Pydantic v1: update_data = todo_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_todo, field, value) # ORMオブジェクトの属性を更新

db.add(db_todo) # 変更を追跡させるためaddするか、または自動追跡に任せる
db.commit()
db.refresh(db_todo)
return db_todo

TODOを削除

def delete_todo(db: Session, db_todo: models.Todo):
db.delete(db_todo)
db.commit()
return {“ok”: True} # 削除成功のレスポンス例
“`

update_todo 関数では、todo_update Pydanticモデルのデータを使って、既存の db_todo ORMオブジェクトの属性を更新しています。exclude_unset=True を使うことで、Pydanticモデルで値が設定されていないフィールド(クライアントがリクエストボディに含めなかったフィールド)は更新対象から除外されます。

4. FastAPIエンドポイント (main.py)

Todo関連のエンドポイントを追加します。

“`python

main.py

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

DBテーブル作成 (開発時のみ)

models.Base.metadata.create_all(bind=engine)

app = FastAPI()

DBセッション依存関数 (変更なし)

def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

————- User エンドポイント (省略、必要に応じて修正) ————-

@app.post(“/users/”)…

@app.get(“/users/”)…

@app.get(“/users/{user_id}”)…

————- Todo エンドポイント ————-

@app.post(“/users/{user_id}/todos/”, response_model=schemas.Todo)
def create_todo_for_user(
user_id: int, todo: schemas.TodoCreate, db: Session = Depends(get_db)
):
# ユーザーが存在するか確認 (オプション)
user = crud.get_user(db, user_id=user_id)
if user is None:
raise HTTPException(status_code=404, detail=”User not found”)

# 特定ユーザー向けのTODOを作成
return crud.create_user_todo(db=db, todo=todo, user_id=user_id)

@app.get(“/users/{user_id}/todos/”, response_model=list[schemas.Todo])
def read_todos_for_user(
user_id: int, skip: int = 0, limit: int = 100, db: Session = Depends(get_db)
):
# 特定ユーザーのTODOリストを取得
# ユーザーが存在するか確認 (オプション)
user = crud.get_user(db, user_id=user_id)
if user is None:
raise HTTPException(status_code=404, detail=”User not found”)

todos = crud.get_user_todos(db, user_id=user_id, skip=skip, limit=limit)
return todos

@app.get(“/todos/{todo_id}”, response_model=schemas.Todo)
def read_todo(todo_id: int, db: Session = Depends(get_db)):
# IDでTODOを取得
db_todo = crud.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=404, detail=”Todo not found”)
return db_todo

@app.put(“/todos/{todo_id}”, response_model=schemas.Todo)
def update_todo(todo_id: int, todo: schemas.TodoUpdate, db: Session = Depends(get_db)):
# IDでTODOを取得
db_todo = crud.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=404, detail=”Todo not found”)

# TODOを更新
return crud.update_todo(db=db, db_todo=db_todo, todo_update=todo)

@app.delete(“/todos/{todo_id}”)
def delete_todo(todo_id: int, db: Session = Depends(get_db)):
# IDでTODOを取得
db_todo = crud.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=404, detail=”Todo not found”)

# TODOを削除
crud.delete_todo(db=db, db_todo=db_todo)
return {"ok": True} # 削除成功を示すJSONレスポンス

“`

これで、簡単なTODOアプリのAPIが完成しました。

  • /users/{user_id}/todos/ POST: 特定のユーザーに新しいTODOを作成します。
  • /users/{user_id}/todos/ GET: 特定のユーザーのTODOリストを取得します。
  • /todos/{todo_id} GET: 特定のTODOの詳細を取得します。
  • /todos/{todo_id} PUT: 特定のTODOを更新します。
  • /todos/{todo_id} DELETE: 特定のTODOを削除します。

各エンドポイントは Depends(get_db) を通じてデータベースセッションを取得し、crud.py の対応する関数を呼び出しています。また、存在しないIDへのアクセスなど、エラーが発生しうる箇所では HTTPException を発生させています。

非同期処理とDB操作

前述の例では、FastAPIの非同期エンドポイント(async def)内で、同期版SQLAlchemyのセッション(SessionLocal)を使用しました。FastAPIはこれを検知すると、そのブロッキング処理を別のスレッドプールで実行するように内部的に処理します。これにより、同期DB操作中でもメインのイベントループが完全にブロックされるのを避けることができます。しかし、このスレッドプールへの切り替えにはオーバーヘッドがあり、真の非同期SQLAlchemyに比べるとパフォーマンスは劣る可能性があります。

SQLAlchemy 1.4以降では、非同期I/O (asyncio) に完全に対応したAPIが提供されています。これにより、FastAPIの async def エンドポイント内で、ブロッキングなしでデータベース操作を実行できます。

非同期SQLAlchemyの基本

非同期SQLAlchemyを使うには、以下の要素が必要です。

  1. 非同期DBドライバー: asyncpg (PostgreSQL), aiomysql (MySQL), aiosqlite (SQLite) などの非同期DBドライバーをインストールします。
  2. AsyncEngine: create_async_engine を使って非同期Engineを作成します。
  3. AsyncSession: async_sessionmaker を使って非同期Sessionを作成します。
  4. await を使用した操作: 非同期セッションに対するクエリやコミット操作は、await を付けて実行する必要があります。

“`python

database_async.py (非同期版のデータベース設定)

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base # ORMモデル定義は同期版と同じBaseを使用
from sqlalchemy.orm import sessionmaker

非同期データベースURL (asyncpgドライバの例)

DATABASE_URL = “postgresql+asyncpg://user:password@host:port/database”

aiosqlite ドライバの例 (SQLite)

DATABASE_URL = “sqlite+aiosqlite:///./sql_app_async.db”

AsyncEngineの作成

echo=True で実行されるSQLが表示される

async_engine = create_async_engine(DATABASE_URL, echo=True)

非同期SessionLocal クラスを作成

AsyncSessionLocal = sessionmaker(
async_engine, expire_on_commit=False, class_=AsyncSession
)

ORMモデル定義用のBase (これは同期版のBaseと同じ)

Base = declarative_base()

非同期セッションを取得するための依存関数 (async def になる)

async def get_async_db():
async with AsyncSessionLocal() as session: # async with 文でセッションを管理
yield session

“`

非同期CRUD関数の実装

CRUD関数も非同期 (async def) に変更し、データベース操作に await を付けます。

“`python

crud_async.py

from sqlalchemy import select # 非同期ORMではselect()を使うことが多い
from sqlalchemy.orm import Session # 型ヒント用だが、実際はAsyncSession
from sqlalchemy.ext.asyncio import AsyncSession

from . import models, schemas

非同期版 get_user

async def get_user(db: AsyncSession, user_id: int):
# await db.query(models.User)… ではなく、 await db.execute(select(models.User).filter(…)) の形式になる
result = await db.execute(select(models.User).filter(models.User.id == user_id))
return result.scalars().first() # scalars()でORMオブジェクトを取得

非同期版 create_user

async def create_user(db: AsyncSession, user: schemas.UserCreate):
fake_hashed_password = user.password + “notreallyhashed”
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
await db.commit() # await が必要
await db.refresh(db_user) # await が必要
return db_user

非同期版 get_user_todos

async def get_user_todos(db: AsyncSession, user_id: int, skip: int = 0, limit: int = 100):
result = await db.execute(
select(models.Todo)
.filter(models.Todo.owner_id == user_id)
.offset(skip)
.limit(limit)
)
return result.scalars().all()

非同期版 create_user_todo

async def create_user_todo(db: AsyncSession, todo: schemas.TodoCreate, user_id: int):
db_todo = models.Todo(**todo.model_dump(), owner_id=user_id)
db.add(db_todo)
await db.commit()
await db.refresh(db_todo)
return db_todo

… 他のCRUD関数も同様にasync def に変更し、await をつける …

async def get_todo(db: AsyncSession, todo_id: int):
result = await db.execute(select(models.Todo).filter(models.Todo.id == todo_id))
return result.scalars().first()

async def update_todo(db: AsyncSession, db_todo: models.Todo, todo_update: schemas.TodoUpdate):
update_data = todo_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_todo, field, value)
# db.add(db_todo) # updateの場合、セッションが追跡しているので不要なことが多い
await db.commit()
await db.refresh(db_todo)
return db_todo

async def delete_todo(db: AsyncSession, db_todo: models.Todo):
await db.delete(db_todo)
await db.commit()
return {“ok”: True}
“`

非同期SQLAlchemy ORMでは、同期版で使っていた db.query(...).all()db.query(...).first() の代わりに、await db.execute(select(...)) を使ってクエリを実行し、結果から result.scalars().all()result.scalars().first() などを使ってORMオブジェクトを取得する形式が推奨されます(SQLAlchemy 2.0のスタイル)。

非同期FastAPIエンドポイント

FastAPIエンドポイントは元々 async def なので、変更は少なく済みます。データベースセッションの依存性注入を、非同期版の get_async_db に変更するだけです。

“`python

main_async.py

from fastapi import FastAPI, Depends, HTTPException

from sqlalchemy.orm import Session # 同期Sessionは不要

from sqlalchemy.ext.asyncio import AsyncSession # 非同期Sessionの型ヒント用

from . import crud_async, models, schemas # 非同期版CRUDをインポート
from .database_async import get_async_db, async_engine # 非同期版DB設定からインポート

データベーステーブル作成 (開発時のみ、非同期で実行する必要がある)

async def create_db_tables_async():
async with async_engine.begin() as conn: # async with engine.begin() で非同期トランザクションを開始
# await conn.run_sync(models.Base.metadata.create_all) # 同期的なcreate_allを非同期で実行
# または、SQLAlchemy 2.0のAsyncDDLを使う (より複雑)
# 簡単のため、同期スクリプトで一度だけ作成する方法も検討
# ここではデモンストレーションのため、起動時に毎回実行する例を示す
# (本番ではAlembicなどを使うべき)
# 非同期版のcreate_allは提供されていないため、run_syncを使う
from sqlalchemy import inspect
async with async_engine.connect() as conn:
# データベースが存在するか確認し、存在しない場合のみ作成
inspector = inspect(conn.engine)
# 注意: SQLiteのasyncioはinspectorをasyncで実行できない可能性がある
# 実際のアプリケーションでは、事前に同期スクリプトで作成するのが最も簡単
# 以下は理論的な非同期でのテーブル存在チェック&作成例 (SQLite aiosqliteでは動作しない可能性高)
# exists = await inspector.has_table(models.User.tablename)
# if not exists:
# await conn.run_sync(models.Base.metadata.create_all)
# 実際には、create_db.py の同期スクリプトを一度実行する方が簡単で確実です。
# または Alembic の async モードを使用します。

        # デモ目的のため、起動時にテーブルが存在することを前提とするか、
        # または同期スクリプトを事前に実行してください。

        # もし必要であれば、async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) を使って、起動時にテーブル作成を試みることは可能です。
        # ただし、これはSQLiteのaiosqliteではうまく動作しないことがあります。

        # 簡単な方法として、起動時にチェックせず常に create_all を試みる (冪等なので問題ないことが多い)
        # async with async_engine.begin() as conn:
        #     await conn.run_sync(models.Base.metadata.create_all)
        pass # テーブル作成部分は別途同期スクリプトで実行済みとする

app = FastAPI()

アプリケーション起動時にテーブル作成を試みる場合 (非同期)

@app.on_event(“startup”)

async def startup_event():

await create_db_tables_async()

非同期DBセッションをDependsで取得

@app.post(“/users/”, response_model=schemas.User)
async def create_user(user: schemas.UserCreate, db: AsyncSession = Depends(get_async_db)):
db_user = await crud_async.get_user_by_email(db, email=user.email) # await が必要
if db_user:
raise HTTPException(status_code=400, detail=”Email already registered”)
return await crud_async.create_user(db=db, user=user) # await が必要

他のエンドポイントも同様に async def に変更し、

依存性を Depends(get_async_db) に、CRUD関数の呼び出しに await をつける

例: GET /users/{user_id}

@app.get(“/users/{user_id}”, response_model=schemas.User)
async def read_user(user_id: int, db: AsyncSession = Depends(get_async_db)):
db_user = await crud_async.get_user(db, user_id=user_id) # await が必要
if db_user is None:
raise HTTPException(status_code=404, detail=”User not found”)
return db_user

例: POST /users/{user_id}/todos/

@app.post(“/users/{user_id}/todos/”, response_model=schemas.Todo)
async def create_todo_for_user(
user_id: int, todo: schemas.TodoCreate, db: AsyncSession = Depends(get_async_db)
):
user = await crud_async.get_user(db, user_id=user_id)
if user is None:
raise HTTPException(status_code=404, detail=”User not found”)
return await crud_async.create_user_todo(db=db, todo=todo, user_id=user_id)

例: GET /users/{user_id}/todos/

@app.get(“/users/{user_id}/todos/”, response_model=list[schemas.Todo])
async def read_todos_for_user(
user_id: int, skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db)
):
user = await crud_async.get_user(db, user_id=user_id)
if user is None:
raise HTTPException(status_code=404, detail=”User not found”)
todos = await crud_async.get_user_todos(db, user_id=user_id, skip=skip, limit=limit)
return todos

例: GET /todos/{todo_id}

@app.get(“/todos/{todo_id}”, response_model=schemas.Todo)
async def read_todo(todo_id: int, db: AsyncSession = Depends(get_async_db)):
db_todo = await crud_async.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=404, detail=”Todo not found”)
return db_todo

例: PUT /todos/{todo_id}

@app.put(“/todos/{todo_id}”, response_model=schemas.Todo)
async def update_todo(todo_id: int, todo: schemas.TodoUpdate, db: AsyncSession = Depends(get_async_db)):
db_todo = await crud_async.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=404, detail=”Todo not found”)
return await crud_async.update_todo(db=db, db_todo=db_todo, todo_update=todo)

例: DELETE /todos/{todo_id}

@app.delete(“/todos/{todo_id}”)
async def delete_todo(todo_id: int, db: AsyncSession = Depends(get_async_db)):
db_todo = await crud_async.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=404, detail=”Todo not found”)
await crud_async.delete_todo(db=db, db_todo=db_todo)
return {“ok”: True}

“`

非同期版に移行することで、I/O処理中にイベントループがブロックされることなく、より高い並行性を実現できます。ただし、非同期SQLAlchemy ORMは同期版に比べてまだ新しい機能であり、書き方が少し異なります(特にクエリ部分)。また、サポートされるDBドライバーの種類も同期版より限られます。

一般的には、簡単なアプリケーションであれば同期版SQLAlchemyでも十分なパフォーマンスが得られることが多いです。しかし、多数の同時接続を捌く必要がある、またはデータベース操作がレスポンスタイムのボトルネックになっているようなケースでは、非同期SQLAlchemyへの移行を検討する価値があります。

発展的なトピック

  • マイグレーション (Alembic): データベーススキーマの変更(テーブルやカラムの追加・削除など)を管理するには、Alembicのようなマイグレーションツールを使用するのが一般的です。これにより、安全かつ段階的にデータベースの変更を適用できます。
  • リレーションシップの扱い: SQLAlchemyのリレーションシップ機能は非常に強力です。一対一、一対多、多対多などのリレーションシップを定義し、関連オブジェクトを簡単にロードしたり操作したりできます。ロード方法(Lazy Loading, Eager Loading, Joined Loadingなど)を適切に選択することで、パフォーマンスを最適化できます。
  • エラーハンドリング: データベース関連のエラー(一意性制約違反、外部キー制約違反など)を捕捉し、FastAPIの HTTPException を使って適切なエラーレスポンス(例: 409 Conflict, 400 Bad Request)を返す必要があります。
  • 認証・認可: FastAPIには、OAuth2などの認証フローを簡単に実装できる機能が組み込まれています。ユーザー認証を行い、認証されたユーザーに関連するデータのみにアクセスを制限する認可ロジックを実装する必要があります。
  • テスト: FastAPIアプリケーションとSQLAlchemy連携部分の単体テスト・結合テストを記述することが重要です。テスト用にインメモリデータベース(SQLite)を使用したり、トランザクションを使ってテスト後にデータベースの状態をロールバックしたりする方法が一般的です。

まとめ

この記事では、Pythonの高速APIフレームワークであるFastAPIと、強力なSQLツールキット・ORMであるSQLAlchemyを組み合わせたアプリケーション開発の基礎を詳細に解説しました。

FastAPIは、そのモダンな設計、非同期処理への対応、Pydanticによる強力なデータ検証、そして自動APIドキュメント生成といった特徴により、API開発の生産性とパフォーマンスを同時に向上させます。SQLAlchemyは、ORMを通じてPythonオブジェクトとしてデータベースを操作できるため、SQLを直接書くよりも直感的でメンテナンスしやすいコードを書くことができます。

両者を組み合わせる際の鍵となるのは、FastAPIの依存性注入システムを活用したデータベースセッションの管理です。リクエストごとにセッションを取得し、処理後に適切に閉じることで、リソースを効率的に利用できます。また、PydanticモデルとSQLAlchemy ORMモデルを連携させることで、APIのリクエスト・レスポンス形式とデータベーススキーマの間のマッピングを明確に定義できます。

CRUD操作を専用のモジュール (crud.py) に分離し、APIエンドポイント (main.py) はそのCRUD関数を呼び出すようにすることで、コードの見通しが良くなり、関心の分離が実現されます。

さらに、SQLAlchemyの非同期APIを活用することで、FastAPIの非同期処理能力を最大限に引き出し、高い並行性とスループットを実現できる可能性があることも紹介しました。

FastAPIとSQLAlchemyの組み合わせは、Pythonで高性能かつ保守性の高いAPIを開発するための非常に強力な選択肢です。この記事で紹介した基礎知識を土台に、さらに高度な機能を実装したり、プロジェクトの規模に合わせて設計を洗練させていくことで、より堅牢なアプリケーションを構築できるでしょう。

今後の学習ステップとしては、Alembicによるマイグレーション、SQLAlchemyのリレーションシップのより詳細な使い方、認証・認可の実装、テストの書き方などを学ぶことをお勧めします。FastAPIとSQLAlchemyの公式ドキュメントは非常に充実しており、これらを深く理解するための最良のリソースとなります。

この組み合わせを使いこなし、素晴らしいAPIを開発してください!


コメントする

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

上部へスクロール