FastAPIで学ぶ非同期処理入門:async/awaitの使い方とメリット

はい、承知いたしました。FastAPIで学ぶ非同期処理入門として、async/awaitの使い方とメリットを詳細に説明する約5000語の記事を記述します。


FastAPIで学ぶ非同期処理入門:async/awaitの使い方とメリット

はじめに:なぜ今、非同期処理が必要なのか

現代のWebアプリケーション開発において、高いパフォーマンスとスケーラビリティは必須要件です。特に、多数のユーザーからの同時リクエストを効率的に処理できることは、サービスの成功を左右します。しかし、従来の多くのWebフレームワークが採用していた同期処理モデルには、根本的な限界がありました。

データベースへのアクセス、外部APIの呼び出し、ファイルシステム操作、ネットワーク通信など、多くのWebアプリケーションは外部リソースとのI/O(Input/Output)処理に依存しています。これらの処理は、CPUを使った計算処理とは異なり、完了までに時間がかかります。

同期処理モデルでは、1つのリクエストがこれらの時間のかかるI/O処理を実行している間、そのリクエストを処理しているスレッドやプロセスは「待機状態」に入ります。この待機中、スレッドは他の処理を行うことができません。もし多数のリクエストが同時に到着し、それぞれが時間のかかるI/O処理を必要とする場合、利用可能なスレッドは次々と待機状態に入り、新しいリクエストを処理するためのスレッドが枯渇してしまいます。これは、アプリケーション全体の応答速度の低下や、最悪の場合はサービス停止につながる可能性があります。

この「ブロッキング」問題を解決するために、非同期処理が登場しました。非同期処理は、時間のかかるI/O処理を「待つ」間に、別のタスクに切り替えて実行することを可能にします。これにより、限られたリソース(スレッドやプロセス)を最大限に活用し、より多くの同時リクエストを効率的に処理できるようになります。

Pythonの世界では、古くから非同期処理の試みはありましたが、Python 3.4でasyncioライブラリが標準ライブラリとして導入され、Python 3.5でasyncawaitという言語レベルのキーワードが追加されたことで、非同期処理がより容易かつPythonicに記述できるようになりました。

そして、近年急速に普及しているモダンなPython WebフレームワークであるFastAPIは、このasyncioasync/awaitを最大限に活用するように設計されています。FastAPIはASGI(Asynchronous Server Gateway Interface)標準に基づいており、非同期処理に最適化されたWebサーバー(Uvicornなど)と組み合わせて使用することで、従来のフレームワークでは実現が難しかった高いパフォーマンスとスケーラビリティを比較的容易に実現できます。

この記事では、FastAPIを使って非同期処理を始めるために必要な基本的な知識から、実際のコード例、そしてasync/awaitを使うことのメリットと注意点までを詳細に解説します。

この記事で学ぶこと:

  • 同期処理の限界と非同期処理の必要性
  • 非同期処理の基本概念(ノンブロッキングI/O、イベントループ、コルーチンなど)
  • Pythonにおけるasyncioasync/awaitの基本
  • FastAPIにおけるasync/awaitの使い方
  • 具体的な非同期処理の実装例(外部API呼び出し、データベースアクセス)
  • async/awaitのメリットとデメリット
  • 非同期処理における注意点とベストプラクティス

この記事を通じて、FastAPIでの非同期処理の仕組みを理解し、実際にアプリケーションに活用できるようになることを目指します。

同期処理の限界:なぜ「待つ」ことが問題なのか

非同期処理の重要性を理解するためには、まず同期処理の限界を深く理解する必要があります。

同期処理の基本

同期処理では、プログラムの実行は記述された順序通りに進みます。ある処理が完了するまで、次の処理は開始されません。これは、日常的なコード記述においては非常に直感的で分かりやすいモデルです。

“`python
def process_data():
data = fetch_data_from_database() # データ取得(時間かかる)
processed_data = process(data) # データ処理(CPU使う)
save_result(processed_data) # 結果保存(時間かかる)
return processed_data

result = process_data()
print(“処理完了”) # fetch_data_from_databaseとsave_resultが終わってから実行される
“`

この例では、fetch_data_from_database関数が完了するまで、process関数は実行されません。同様に、process関数が完了するまでsave_result関数は実行されず、save_result関数が完了するまでprint関数は実行されません。

ブロッキング処理

問題となるのは、「待機」が発生する処理です。特に、I/Oバウンドな処理がこれにあたります。I/Oバウンド処理とは、CPUによる計算よりも、外部デバイス(ディスク、ネットワーク、データベースなど)とのデータのやり取りにかかる時間の方が支配的な処理です。

  • データベースクエリの実行: データベースサーバーからの応答を待つ
  • 外部APIへのリクエスト: 外部サーバーからの応答を待つ
  • ファイルの読み書き: ディスクI/Oの完了を待つ
  • ネットワーク通信: データの送受信と相手からの応答を待つ

これらの処理は、実行を開始しても、データが準備できるまでの間、CPUはほとんど何もしていません。しかし、同期処理モデルでは、その処理を実行しているスレッドやプロセスは、I/Oが完了するまで「ブロック」されてしまい、他のタスクに切り替えることができません。

対照的に、CPUバウンドな処理は、ほとんどの時間をCPUを使った計算に費やします。例えば、複雑な計算、データ変換、画像処理、機械学習の推論などがこれにあたります。CPUバウンド処理は、基本的に待機時間は発生せず、CPUリソースを使い切るまで実行されます。

Webサーバーにおける同期処理の課題

Webサーバーは、多数のクライアントから同時にリクエストを受け付け、それぞれに応答を返す必要があります。従来の同期処理をベースにしたWebフレームワーク(例えば、多くのWSGIフレームワーク)では、この同時接続に対応するために、リクエストごとに新しいスレッドやプロセスを生成する、またはスレッド/プロセスプールを使用する、という手法が取られてきました。

ユーザーAからのリクエストが入る → スレッドAが生成される(またはプールから割り当てられる)
ユーザーBからのリクエストが入る → スレッドBが生成される(またはプールから割り当てられる)

ユーザーAのリクエストがデータベースに問い合わせを行う(I/Oバウンド)
→ スレッドAはデータベースからの応答を待ってブロックされる

ユーザーBのリクエストが外部APIを呼び出す(I/Oバウンド)
→ スレッドBは外部APIからの応答を待ってブロックされる

もし同時に1000人のユーザーがそれぞれI/Oバウンドな処理を含むリクエストを行った場合、1000個のスレッド(またはプロセス)が必要になる可能性があります。スレッドやプロセスはOSのリソースを消費します。スレッドの生成や切り替え(コンテキストスイッチ)にはコストがかかり、あまりにも多数のスレッドが存在すると、むしろパフォーマンスが低下することがあります。また、システム全体で生成できるスレッド/プロセス数には上限があります。

この結果、利用可能なスレッド/プロセス数が枯渇すると、新しいリクエストを受け付けることができなくなり、クライアントは応答を得られずにタイムアウトしてしまいます。これが、同期処理モデルにおける「C10K問題」(Connections 10000 Problem、単一サーバーで同時に1万件以上の接続を効率的に処理する問題)のような課題の根源にあります。

まとめると、同期処理の限界は以下の点に集約されます:

  1. ブロッキング: I/Oバウンド処理中にスレッド/プロセスが待機状態になり、他の処理を行えない。
  2. リソース消費: 同時接続に対応するために多数のスレッド/プロセスが必要となり、リソース消費が大きくなる。
  3. スケーラビリティの限界: スレッド/プロセスの生成・管理コストにより、同時接続数の増加に比例してパフォーマンスが向上しにくくなる。

これらの問題を克服するために、非同期処理が有効な手段となります。

非同期処理の基本概念:ノンブロッキング、イベントループ、コルーチン

同期処理の限界を理解したところで、非同期処理がどのように機能するのか、その基本概念を見ていきましょう。

非同期処理 vs 同期処理(再確認)

  • 同期処理: タスクは順番に実行され、前のタスクが完了するまで次のタスクは開始されない。時間のかかる処理は実行をブロックする。
  • 非同期処理: タスクは順番に開始されるが、時間のかかる処理(特にI/O)は完了を待たずに次のタスクに切り替えることができる。完了したタスクは後で通知を受け取るか、実行コンテキストに戻る。

ノンブロッキングI/O

非同期処理の根幹にある考え方の一つが「ノンブロッキングI/O」です。同期的なブロッキングI/Oでは、I/O操作を開始したら完了まで待機します。一方、ノンブロッキングI/Oでは、I/O操作を開始したらすぐに制御を呼び出し元に返し、I/Oが進行中であることを伝えます。呼び出し元は待機せずに別の処理を行い、後でI/Oの完了状態を確認するか、完了時に通知を受け取ります。

例えるなら:

  • ブロッキングI/O: レストランで注文し、料理が来るまで席でじっと待っている。
  • ノンブロッキングI/O: レストランで注文し、「できたら呼んでください」と伝えて、その間他の用事(例えば、別の店のウィンドウショッピング)を済ませている。

並行処理 (Concurrency) vs 並列処理 (Parallelism)

非同期処理を語る上で重要なのが、並行処理と並列処理の違いです。

  • 並列処理 (Parallelism): 複数のタスクが同時に実行されること。これは、複数のCPUコアやプロセッサが存在する場合に可能です。各コアが異なるタスクを同時に処理します。マルチプロセスやマルチスレッドは、しばしば並列処理を実現するために使われます(ただし、GILの制約によりPythonの標準的なスレッドはCPUバウンドな処理で真の並列実行は難しい)。

  • 並行処理 (Concurrency): 複数のタスクが見かけ上同時に進行しているように見えること。実際には、単一のCPUコア上で、タスク間を非常に高速に切り替えることで実現されます。これは、タスクがI/Oなどで待機している間に、別のタスクにCPUを明け渡すことで効率を高めます。非同期I/Oやイベントループはこの並行処理を実現するための仕組みです。

非同期I/Oは主に並行処理を実現するための手法です。I/O待機中にCPUを他のタスクに譲ることで、限られたリソース(例: シングルスレッド)で多数のI/Oバウンドなタスクを効率的に処理することを目指します。これは、CPUバウンドな処理を同時に実行してスループットを向上させる並列処理とは目的が異なります。ただし、非同期処理のフレームワーク(asyncioなど)とマルチプロセスを組み合わせることで、並行処理と並列処理の両方の恩恵を受けることも可能です。FastAPIはASGIサーバー(Uvicornなど)を使うことで、通常は複数のワーカープロセスを起動し、各プロセス内で非同期処理による並行処理を行う構成を取ることが多いです。

コルーチン (Coroutine)

Pythonにおける非同期処理の中核をなすのがコルーチンです。コルーチンは、実行を一時停止し、後でその一時停止した場所から再開できるサブルーチンです。従来の関数は実行を開始したら最後までノンストップで実行されるか、例外を発生させて終了するかのどちらかですが、コルーチンは途中で「中断」して別のコルーチンに実行を譲り、後で「再開」することができます。

Pythonでは、async defキーワードを使ってコルーチンを定義します。そして、コルーチンの中で別のコルーチンの完了を待つ際にawaitキーワードを使用します。

イベントループ (Event Loop)

非同期処理の全体を管理し、コルーチンの実行スケジューリングを行うのがイベントループです。イベントループは基本的に以下の処理を繰り返します:

  1. 実行可能なタスク(コルーチン)があれば、それを実行する。
  2. タスクがI/O操作などで待機状態に入り、awaitで実行を一時停止したら、そのタスクから制御を受け取る。
  3. 待機中のI/O操作が完了したという通知(イベント)を監視する。
  4. I/Oが完了したタスクがあれば、そのタスクを再び実行可能リストに戻す。
  5. これを繰り返す。

イベントループは、ノンブロッキングI/O操作の完了を効率的に監視し、完了したタスクを再び実行キューに戻すことで、多数のタスクを効率的に切り替えながら実行します。これは、シングルスレッドで多数の同時接続を扱うNode.jsのような環境で採用されているモデルと似ています。

Future / Task

asyncioライブラリでは、非同期処理の結果を表すためにFutureTaskという概念が使われます。

  • Future: 将来完了するであろう処理の結果を表すプレースホルダーのようなものです。結果がまだ得られていない状態、結果が得られた状態、エラーが発生した状態など、処理の様々な状態を持つことができます。
  • Task: イベントループで実行されるようにスケジュールされたコルーチンをラップしたものです。イベントループはTaskを管理し、その実行をスケジュールします。awaitキーワードは、内部的にはFutureTaskの完了を待機するために使用されます。

これらの概念(ノンブロッキングI/O、並行処理、コルーチン、イベントループ、Future/Task)が組み合わさることで、Pythonの非同期処理フレームワークasyncioは機能しています。

Pythonにおけるasync/await:コルーチンと非同期I/O

Python 3.5以降で導入されたasyncawaitキーワードは、asyncioを使った非同期処理の記述を格段に分かりやすくしました。これらは、非同期処理における「協調的なマルチタスク」を表現するためのシンタックスシュガーです。

asyncキーワード

asyncキーワードは、その後に続く関数がコルーチンであることを宣言します。

python
async def my_async_function():
# この関数はコルーチンです
pass

async defで定義された関数(コルーチン関数)を呼び出すと、すぐにその中身が実行されるわけではなく、コルーチンオブジェクトが返されます。

python
coroutine_obj = my_async_function()
print(coroutine_obj) # <coroutine object my_async_function at ...>

このコルーチンオブジェクトを実行するには、イベントループ上で明示的にスケジュールするか、別のコルーチンの中でawaitする必要があります。

asyncはクラスメソッドやジェネレータと組み合わせることもできます (async def my_method(self):, async def my_generator(): yield ...)。また、非同期コンテキストマネージャ (async with) や非同期イテレータ (async for) といった概念も存在します。

awaitキーワード

awaitキーワードは、コルーチンの中で別の「awaitable」オブジェクト(主に他のコルーチン、またはasyncioFuture/Taskなど)の完了を待つために使用されます。

“`python
import asyncio

async def say_hello():
print(“Hello”)
await asyncio.sleep(1) # 1秒待機する(非同期I/Oに見立てる)
print(“World”)

async def main():
print(“Starting…”)
await say_hello() # say_helloコルーチンの完了を待つ
print(“Finished.”)

イベントループを取得し、mainコルーチンを実行する

Python 3.7以降では asyncio.run() が推奨される

asyncio.run(main())
“`

この例では、mainコルーチンの中でawait say_hello()を実行しています。say_helloコルーチンがawait asyncio.sleep(1)で一時停止すると、イベントループはsay_helloの実行を中断し、別の実行可能なタスクがあればそちらに切り替えます。1秒後、asyncio.sleepが完了すると、イベントループはsay_helloを再開し、中断された場所(await asyncio.sleep(1)の次)から実行を続けます。say_helloが完了すると、制御はawait say_hello()の次の行に戻り、mainコルーチンが実行を続けます。

重要なのは、awaitは「ブロッキングせずに待つ」ということです。awaitしている間、スレッド全体が停止するわけではありません。イベントループがそのコルーチンから制御を受け取り、他のタスクを実行する機会を得ます。

asyncioライブラリ

asyncioはPython標準ライブラリとして、非同期処理のための様々なツールを提供しています。

  • イベントループの管理: asyncio.get_event_loop(), asyncio.run()など
  • コルーチンの実行: loop.run_until_complete(), asyncio.run()
  • タスクの作成: asyncio.create_task(), asyncio.ensure_future()
  • 複数のコルーチンの並行実行: asyncio.gather(), asyncio.wait()
  • 非同期I/O操作: asyncio.sleep(), asyncio.open_connection(), asyncio.start_server()など
  • 同期処理の非同期実行: loop.run_in_executor()

FastAPIは内部的にasyncioを利用しています。FastAPIのパスオペレーション関数をasync defで定義すると、FastAPI(そしてその基盤となるASGIサーバー)は、その関数をasyncioのコルーチンとしてイベントループ上で実行します。

FastAPIにおけるasync/awaitの使い方

FastAPIは非同期処理に特化した設計になっており、async/awaitを非常に自然な形で利用できます。FastAPIはASGI(Asynchronous Server Gateway Interface)という新しいPython Web標準を実装しており、UvicornやHypercornといったASGIサーバー上で動作します。これらのサーバーは非同期I/Oとイベントループを活用して、高い並行処理能力を実現しています。

FastAPIの基本的な構造は、HTTPリクエストを受け付けて、対応するパスオペレーション関数を実行し、HTTPレスポンスを返すことです。このパスオペレーション関数を定義する際に、defを使うかasync defを使うかを選択できます。

パスオペレーション関数の定義:def vs async def

FastAPIは、定義されたパスオペレーション関数が通常のdef関数(同期関数)なのか、async def関数(コルーチン関数)なのかを自動的に判別します。

  • async defで定義する場合: 関数はコルーチンとして扱われ、asyncioのイベントループ上で実行されます。この関数内でawaitを使って他のコルーチン(非同期I/O操作など)の完了を待つことができます。
  • defで定義する場合: 関数は通常の同期関数として扱われます。この関数内でawaitを使うことはできません。もし時間のかかるブロッキング処理(同期I/Oなど)が含まれる場合、FastAPIはデフォルトではその関数をスレッドプール(通常はExecutor)で実行します。これにより、その同期関数がイベントループをブロックしてしまうのを防ぎます。

どちらを使うべきか?

基本的なルールは以下の通りです:

  • 関数内でawaitを使う必要がある場合: async defで定義する必要があります。これは、外部API呼び出し(httpxなど非同期ライブラリを使用)、非同期DBドライバでのデータベースアクセス、asyncio.sleepなど、非同期I/Oバウンドな処理を行う場合です。
  • 関数内でブロッキングI/Oを含む同期処理を実行する場合: defで定義するのが適切です。FastAPIが自動的にスレッドプールで実行してくれるため、イベントループをブロックしません。例えば、requestsを使った同期的な外部API呼び出し、psycopg2など同期DBドライバでのデータベースアクセス、またはCPUバウンドな重い計算処理を行う場合です。
  • 関数内にI/O処理や時間のかかる計算処理が一切含まれず、非常に高速に完了する場合: defでもasync defでも構いません。ただし、慣習としてI/Oが含まれない場合はdefを使うことが多いかもしれません。

注意点: async def関数内でawaitを使わずにブロッキングI/O(例: requests.get(...))を実行すると、そのブロッキング処理が完了するまでイベントループ全体が停止し、他のリクエスト処理もブロックされてしまいます。これは非同期処理のメリットを完全に打ち消してしまうため、絶対に避けるべきです。

基本的なFastAPI + async/await の例

非常にシンプルなFastAPIアプリケーションでasync defを使ってみましょう。

まず、FastAPIとUvicornをインストールします。

bash
pip install fastapi uvicorn httpx

main.pyというファイルを作成します。

“`python
from fastapi import FastAPI
import asyncio
import httpx

app = FastAPI()

非同期関数でパスオペレーションを定義

@app.get(“/”)
async def read_root():
# 非同期I/O操作(ここでは擬似的に1秒待つ)
print(“リクエスト受信、1秒待機開始”)
await asyncio.sleep(1)
print(“1秒待機完了”)
return {“Hello”: “World”}

非同期関数で外部APIを呼び出す例

@app.get(“/fetch_external_data/”)
async def fetch_external_data():
# httpxは非同期と同期の両方に対応したHTTPクライアント
# AsyncClientを使うことで非同期でリクエストを送信できる
async with httpx.AsyncClient() as client:
print(“外部API呼び出し開始”)
# awaitを使って非同期I/Oの完了を待つ
response = await client.get(“https://httpbin.org/delay/2”) # 2秒遅延するAPI
print(“外部API呼び出し完了”)
data = response.json()
return {“external_data”: data}

同期関数でパスオペレーションを定義(FastAPIが自動でスレッドプールで実行)

非推奨だが、requestsなど同期ライブラリを使わざるを得ない場合の例として

@app.get(“/fetch_external_data_sync/”)

def fetch_external_data_sync():

import requests # requestsは同期ライブラリ

print(“同期外部API呼び出し開始”)

# awaitできない。ブロッキング処理

response = requests.get(“https://httpbin.org/delay/2”) # この行でスレッドがブロックされる

print(“同期外部API呼び出し完了”)

data = response.json()

return {“external_data_sync”: data}

↑ これはイベントループをブロックする可能性があるため、通常は async/await + httpx が推奨される

“`

上記のコードをUvicornで実行します。

bash
uvicorn main:app --reload

ブラウザまたはcurlで http://127.0.0.1:8000/ および http://127.0.0.1:8000/fetch_external_data/ にアクセスしてみてください。

複数のタブで同時に/または/fetch_external_data/にアクセスすると、各リクエストがI/O待機中(asyncio.sleepawait client.get)に、サーバーが他のリクエストの処理を開始している様子が分かります。サーバーのログ出力を見て、リクエストの受信と処理完了のタイミングを確認すると、非同期処理によって複数のリクエストが並行して処理されていることがより明確に理解できます。

例えば、同時に3つのリクエストを送信した場合、同期処理であれば1つ目が完了してから2つ目、2つ目が完了してから3つ目と順番に処理されるため、全体でかかる時間は3つの処理時間の合計になります。一方、非同期処理であれば、3つのリクエストがほぼ同時に開始され、それぞれのI/O待機中に他のリクエストの処理が進むため、全体にかかる時間は最も時間のかかる処理の時間+α程度になります。

この例は非同期処理の基本を示していますが、実際のアプリケーションではデータベースアクセスや他のバックエンドサービスとの連携など、より複雑なI/O処理が関わってきます。

非同期処理の具体的な実装例 (FastAPI)

ここでは、Webアプリケーションでよく遭遇するI/Oバウンドな処理、特に外部API呼び出しとデータベースアクセスについて、FastAPIとasync/awaitを使った具体的な実装例を見ていきます。

例1: 外部API呼び出し

多くのWebアプリケーションは、認証サービス、決済サービス、データプロバイダーなどの外部APIと連携します。これらのAPI呼び出しはネットワーク通信を伴うため、典型的なI/Oバウンド処理です。

同期的な外部API呼び出し (requestsを使用)

Pythonで同期的なHTTPリクエストを行う場合、requestsライブラリが広く使われます。

“`python

main_sync.py

from fastapi import FastAPI
import requests # 同期ライブラリ
import time

app = FastAPI()

@app.get(“/fetch_external_sync/”)
def fetch_external_sync():
“””同期的に外部APIを呼び出す(非推奨の書き方)”””
print(f”[{time.time()}] 同期API呼び出し開始”)
# requests.get() はブロッキングコール
# 外部APIの応答を待っている間、このスレッドは完全にブロックされる
response = requests.get(“https://httpbin.org/delay/3″) # 3秒遅延
print(f”[{time.time()}] 同期API呼び出し完了”)
return response.json()

この例をUvicornで実行する場合: uvicorn main_sync:app –reload –workers 1 (ワーカーを1つに制限してブロッキングの影響を見る)

“`

この同期的な例をUvicornのワーカー数を1にして実行し、複数のリクエストを同時に送ってみてください。1つのリクエストが3秒待機している間、次のリクエストは開始されず、合計処理時間は各リクエストの処理時間の合計(約3秒 * リクエスト数)になることが観察できます。FastAPIがデフォルトでdef関数をスレッドプールで実行する場合でも、スレッドプールのサイズには限りがあるため、多数の同時接続ではやはりリソースが枯渇する可能性があります。

非同期的な外部API呼び出し (httpxまたはaiohttpを使用)

Pythonで非同期的なHTTPリクエストを行うには、httpx(非同期・同期両対応)やaiohttp(非同期専用)といったライブラリが使われます。ここでは、FastAPI公式ドキュメントでも推奨されているhttpxを使います。

“`python

main.py (前の例に追記)

from fastapi import FastAPI, Depends # Imports from earlier example

import asyncio

import httpx

app = FastAPI() # Instance from earlier example

@app.get(“/fetch_external_async/”)
async def fetch_external_async():
“””非同期的に外部APIを呼び出す”””
print(f”[{time.time()}] 非同期API呼び出し開始”)
# httpx.AsyncClientは非同期操作のためのクライアント
# async withを使うことで、クライアントのリソース管理を適切に行える
async with httpx.AsyncClient() as client:
# await client.get() は非同期コール
# 外部APIの応答を待っている間、イベントループは他のタスクに切り替わる
response = await client.get(“https://httpbin.org/delay/3″) # 3秒遅延
print(f”[{time.time()}] 非同期API呼び出し完了”)
return response.json()

この例をUvicornで実行する場合: uvicorn main:app –reload

デフォルトでは複数のワーカープロセスが起動する可能性あり。

各ワーカープロセスは通常1つのイベントループを持つ。

“`

この非同期的な例をUvicornで実行し、複数のリクエストを同時に送ってみてください。複数のリクエストがほぼ同時に開始され、それぞれのI/O待機中に他のリクエストの処理が進むため、全体としてより短い時間で多数のリクエストを処理できることが観察できます。例えば、3つのリクエストを同時に送信した場合、それぞれが3秒遅延しても、ほぼ3秒強で全ての応答が返ってくる可能性があります(ネットワーク遅延などにも依存します)。

例2: データベースアクセス

データベースアクセスも、ネットワークやディスクI/Oが伴うため、典型的なI/Oバウンド処理です。従来のORM(Object-Relational Mapper)やDBドライバの多くは同期処理を前提としていましたが、asyncioの普及に伴い、非同期対応のライブラリも増えています。

非同期対応DBライブラリの例:

  • PostgreSQL: asyncpg
  • MySQL: aiomysql, asyncmy
  • SQLite: aiosqlite
  • Redis: asyncio-redis, aioredis
  • SQLAlchemy: バージョン1.4以降で非同期対応のエグゼキューターが追加され、バージョン2.0で完全な非同期ORMが利用可能

同期的なデータベースアクセス (psycopg2 + PostgreSQLを使用)

同期DBドライバを使う場合、FastAPIはdef関数内で実行されることを想定し、スレッドプールで処理します。

“`python

main_sync_db.py

from fastapi import FastAPI
import psycopg2 # 同期PostgreSQLドライバ
import time
from typing import List, Dict

app = FastAPI()

データベース接続情報 (仮)

DATABASE_URL = “dbname=test user=user password=password host=localhost port=5432”

def get_sync_db():
“””同期データベース接続を返す(Dependsで使用)”””
conn = psycopg2.connect(DATABASE_URL)
try:
yield conn
finally:
conn.close()

@app.get(“/items_sync/”, response_model=List[Dict])
def read_items_sync(db=Depends(get_sync_db)):
“””同期的にデータベースからアイテムを取得する(FastAPIがスレッドプールで実行)”””
print(f”[{time.time()}] 同期DBアクセス開始”)
cursor = db.cursor()
# cursor.execute() や cursor.fetchall() はブロッキングコール
cursor.execute(“SELECT id, name FROM items”)
items = [{“id”: row[0], “name”: row[1]} for row in cursor.fetchall()]
cursor.close()
print(f”[{time.time()}] 同期DBアクセス完了”)
return items

この例をUvicornで実行する場合: uvicorn main_sync_db:app –reload –workers 1

“`

この場合、read_items_sync関数はdefで定義されているため、FastAPIはこれをスレッドプールで実行します。これにより、psycopg2のブロッキングコールがイベントループを直接ブロックするのを防ぎます。しかし、スレッドプールのサイズには限界があり、多数の同時DBアクセスリクエストがあると、利用可能なスレッドが枯渇し、新しいリクエストが待たされることになります。

非同期的なデータベースアクセス (asyncpg + PostgreSQLを使用)

非同期DBドライバを使う場合は、パスオペレーション関数をasync defで定義し、ドライバの非同期関数をawaitします。

“`python

main.py (前の例に追記)

from fastapi import FastAPI, Depends

import asyncio

import httpx

import asyncpg # 非同期PostgreSQLドライバ
import time
from typing import List, Dict

app = FastAPI()

DATABASE_URL = “…” # データベース接続情報 (前の例に合わせる)

async def fetch_external_async(): … # 前の例の関数

非同期データベース接続プール

アプリケーション起動時に作成し、終了時に閉じるのが一般的

FastAPIでは lifespan イベントハンドラを使うと良い

https://fastapi.tiangolo.com/advanced/events/

async def startup_event():
global db_pool # グローバル変数または他の方法で管理
db_pool = await asyncpg.create_pool(DATABASE_URL)
print(“DBプール作成完了”)

async def shutdown_event():
await db_pool.close()
print(“DBプール閉鎖完了”)

FastAPIアプリケーションにイベントハンドラを登録 (FastAPI 0.95.0以降)

app = FastAPI(lifespan={

‘startup’: [startup_event],

‘shutdown’: [shutdown_event]

})

より古いバージョンのFastAPI (<0.95.0) では on_event を使う

@app.on_event(“startup”)
async def startup_event_old():
global db_pool
db_pool = await asyncpg.create_pool(DATABASE_URL)
print(“DBプール作成完了 (old)”)

@app.on_event(“shutdown”)
async def shutdown_event_old():
await db_pool.close()
print(“DBプール閉鎖完了 (old)”)

async def get_async_db():
“””非同期データベース接続を返す(Dependsで使用)”””
# プールから接続を取得。取得できるまで非同期で待機
conn = await db_pool.acquire()
try:
yield conn
finally:
# 接続をプールに解放
await db_pool.release(conn)

@app.get(“/items_async/”, response_model=List[Dict])
async def read_items_async(db=Depends(get_async_db)):
“””非同期的にデータベースからアイテムを取得する”””
print(f”[{time.time()}] 非同期DBアクセス開始”)
# await db.fetch() は非同期コール
# DBからの応答を待っている間、イベントループは他のタスクに切り替わる
items = await db.fetch(“SELECT id, name FROM items”)
print(f”[{time.time()}] 非同期DBアクセス完了”)
# asyncpgのfetch結果はリストオブレコードなので、辞書リストに変換
return [dict(item) for item in items]

この例をUvicornで実行する場合: uvicorn main:app –reload

“`

この非同期的な例では、async def関数内でawait db.fetch()を実行しています。asyncpgfetchメソッドはコルーチンであり、データベースからの応答を待つ間、ノンブロッキングでイベントループに制御を返します。これにより、同じイベントループ上の他のタスク(別のユーザーからのリクエストなど)が実行可能になります。

データベース接続にはコストがかかるため、通常はアプリケーション起動時に接続プールを作成し、各リクエストでプールから接続を取得・解放するパターンが推奨されます。FastAPIでは、@app.on_event("startup")@app.on_event("shutdown")(または新しいlifespanイベント)を使って、アプリケーションの起動・終了時に接続プールを管理できます。Dependsを使って非同期DB接続を注入する際には、async defジェネレータ関数を使います。

SQLAlchemy 2.0の非同期対応

SQLAlchemy 1.4以降で非同期実行機能が導入され、2.0で非同期ORMが安定しました。これは、多くの開発者が使い慣れたSQLAlchemyの構文で非同期データベース操作が可能になったことを意味します。

“`python

main_sqlalchemy_async.py (非同期SQLAlchemyの例、モデル定義は省略)

from fastapi import FastAPI, Depends
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select # SQLAlchemy 2.0 スタイル

app = FastAPI()

非同期対応DB URL (例: asyncpg+PostgreSQL)

ASYNC_DATABASE_URL = “postgresql+asyncpg://user:password@localhost:5432/test”

非同期エンジンを作成

engine = create_async_engine(ASYNC_DATABASE_URL)

非同期セッションファクトリを作成

AsyncSessionLocal = sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)

async def get_async_session():
“””非同期データベースセッションを返す(Dependsで使用)”””
async with AsyncSessionLocal() as session:
yield session

仮のモデル定義 (実際のコードでは別途定義します)

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy import Column, Integer, String

Base = declarative_base()

class Item(Base):

tablename = “items”

id = Column(Integer, primary_key=True, index=True)

name = Column(String, index=True)

@app.get(“/items_sqlalchemy_async/”, response_model=List[Dict]) # response_modelは適切なPydanticモデルを指定するのがより良い
async def read_items_sqlalchemy_async(session: AsyncSession = Depends(get_async_session)):
“””非同期SQLAlchemyを使ってアイテムを取得する”””
print(f”[{time.time()}] 非同期SQLAlchemy DBアクセス開始”)
# SQLAlchemy 2.0 スタイルの非同期クエリ
# await session.execute() は非同期コール
result = await session.execute(select(Item)) # Itemは適切なモデルクラス
items = result.scalars().all() # レコードを取得
print(f”[{time.time()}] 非同期SQLAlchemy DBアクセス完了”)
# 取得したオブジェクトリストを辞書リストに変換 (Pydanticモデルを使う場合は不要)
return [{“id”: item.id, “name”: item.name} for item in items]

この例をUvicornで実行する場合: uvicorn main_sqlalchemy_async:app –reload

“`

SQLAlchemy 2.0の非同期機能を使うことで、ORMの抽象化を利用しつつ、非同期処理の恩恵を受けることができます。非同期セッション(AsyncSession)を取得し、その上で非同期対応のクエリ実行メソッド(session.execute(), session.get(), session.add(), session.commit()など)をawaitして使用します。依存性注入(Depends)を使って非同期セッションを取得するパターンは、FastAPIで非同期DBアクセスを実装する際の一般的な方法です。

例3: 複数の非同期処理を並行して実行

非同期処理の強力なメリットの一つは、複数の独立した非同期タスクを並行して実行し、すべての完了を待つことができる点です。これにより、複数の外部サービスからデータを取得したり、複数のデータベースクエリを同時に実行したりといったシナリオで、全体の処理時間を大幅に短縮できます。

asyncioライブラリのasyncio.gather()関数は、複数のコルーチンを受け取り、それらを同時に(イベントループ上で並行して)実行し、すべての結果をリストとして返す便利な関数です。

“`python

main.py (前の例に追記)

from fastapi import FastAPI, Depends

import asyncio

import httpx

import asyncpg

import time

from typing import List, Dict

app = FastAPI()

… (他の関数やイベントハンドラ)

async def fetch_user_data(user_id: int):
“””擬似的なユーザーデータ取得API呼び出し”””
print(f”Fetching user {user_id} data…”)
await asyncio.sleep(1 + user_id * 0.5) # ユーザーIDごとに異なる時間待機
print(f”Finished fetching user {user_id} data.”)
return {“id”: user_id, “name”: f”User {user_id}”, “data”: f”Data for {user_id}”}

async def fetch_product_data(product_id: int):
“””擬似的な商品データ取得DBアクセス”””
print(f”Fetching product {product_id} data…”)
await asyncio.sleep(2 – product_id * 0.3) # 商品IDごとに異なる時間待機
print(f”Finished fetching product {product_id} data.”)
return {“id”: product_id, “name”: f”Product {product_id}”, “price”: (product_id + 1) * 100}

@app.get(“/combined_data/”)
async def get_combined_data(user_id: int = 1, product_id: int = 10):
“””複数の非同期処理を並行して実行し、結果を結合する”””
print(f”[{time.time()}] 結合データ取得開始”)

# 複数のコルーチンオブジェクトを作成
user_task = fetch_user_data(user_id)
product_task = fetch_product_data(product_id)

# asyncio.gatherを使って、これらのコルーチンを並行して実行し、両方の完了を待つ
# asyncio.gatherは引数で渡されたコルーチンを同時にスケジュールし、
# 全て完了した時点でその結果をリストで返す。
# 例外が発生した場合の挙動なども指定可能。
user_data, product_data = await asyncio.gather(user_task, product_task)

print(f"[{time.time()}] 結合データ取得完了")

# 取得したデータを結合して返す
return {"user": user_data, "product": product_data, "combined_at": time.time()}

この例をUvicornで実行する場合: uvicorn main:app –reload

“`

この例では、/combined_data/エンドポイントへのリクエスト時に、fetch_user_dataコルーチンとfetch_product_dataコルーチンをasyncio.gatherを使って同時に実行しています。もしこれらの処理を順番にawaitした場合、合計時間はfetch_user_dataの時間 + fetch_product_dataの時間になります。しかし、asyncio.gatherを使うことで、2つの処理が並行して実行されるため、合計時間は2つの処理のうち長い方+αで済みます。

例えば、user_id=1, product_id=10で呼び出すと、user_taskは約1.5秒、product_taskは約-1秒(これは例なので負の時間はありえないですが、計算式的に)となります。もしuser_id=5, product_id=1で呼び出すと、user_taskは約3.5秒、product_taskは約1.7秒となります。asyncio.gatherを使った場合、約3.5秒+αで両方の処理が完了します。これを順番に実行した場合は、3.5秒 + 1.7秒 = 5.2秒+αかかります。

このように、asyncio.gatherは、複数の独立したI/Oバウンド処理を効率的に並行実行するために非常に役立ちます。

async/awaitのメリットとデメリット

FastAPIでasync/awaitを活用することには、明確なメリットといくつかのデメリットが存在します。

メリット

  1. 高いスループットとスケーラビリティ:

    • 最も重要なメリットです。I/O待機中に他のタスクに切り替えることができるため、限られたスレッド/プロセスで多数の同時接続を効率的に処理できます。
    • これにより、従来の同期フレームワークに比べて、同等のハードウェアリソースでより多くのリクエストを捌けるようになり、特にI/Oバウンドなワークロードにおいて高いスループットを実現できます。
  2. リソース効率:

    • 同期処理のようにリクエストごとに新しいスレッドやプロセスを多数生成する必要がないため、メモリ使用量やCPUオーバーヘッドを削減できます。
    • スレッド/プロセスのコンテキストスイッチはコストがかかりますが、イベントループ内のコルーチン間の切り替えはより軽量です。
  3. パフォーマンスの向上(I/Oバウンド処理において):

    • 複数のI/O操作を並行して実行できるため(例: asyncio.gather)、複数の外部サービスへの問い合わせや複数のDBクエリを含むリクエスト全体の処理時間を短縮できます。
  4. モダンなPython機能の活用:

    • async/awaitはPython 3.5以降のモダンな構文であり、コードをより直感的かつPythonicに非同期処理を記述できます。コールバック地獄のような古い非同期パラダイムに比べて、可読性が高いコードになります。
  5. FastAPIとの親和性:

    • FastAPIはasyncioを前提に設計されているため、async/awaitを使うことが自然であり、フレームワークの機能を最大限に引き出すことができます。

デメリット

  1. 学習コスト:

    • 非同期処理の概念(イベントループ、コルーチン、ノンブロッキングなど)は、同期処理に慣れた開発者にとって最初は難解に感じられることがあります。
    • 同期的な思考から非同期的な思考への切り替えが必要です。
  2. エコシステムの成熟度(かつて):

    • 登場当初は、非同期対応しているライブラリが少なく、同期ライブラリを非同期コンテキストで使う際に工夫が必要でした。現在では、主要なデータベースドライバやHTTPクライアントなど、多くのライブラリが非同期対応を進めていますが、古いライブラリや特定のライブラリにはまだ同期版しかない場合があります。
    • 同期ライブラリを非同期処理で使うには、asyncio.to_thread() (Python 3.9+) や loop.run_in_executor() を使う必要がありますが、これは事実上その部分をスレッドプールで実行する同期処理として扱うことになり、非同期処理のメリットが薄れます。
  3. デバッグの難しさ:

    • 複数のコルーチンが切り替わりながら実行されるため、実行フローを追跡するのが同期処理に比べて難しくなることがあります。スタックトレースも同期処理とは異なります。
  4. CPUバウンド処理には不向き:

    • 非同期処理はI/O待機中に効果を発揮しますが、CPUを使い続ける計算処理(CPUバウンド処理)においては、awaitでイベントループに制御を戻す機会がありません。
    • もしasync def関数内で時間のかかるCPUバウンド処理を行うと、その処理が完了するまでイベントループ全体がブロックされてしまい、他のタスクの実行が停止します。これは同期処理におけるブロッキングと同様の問題を引き起こします。
    • CPUバウンド処理は、別プロセス(マルチプロセッシング)で実行するか、loop.run_in_executor()を使ってスレッドプールで実行するなどの対策が必要です。
  5. 同期処理との混在:

    • 非同期関数 (async def) からは同期関数を直接awaitすることはできません。また、同期関数 (def) の中では非同期関数をawaitすることはできません。
    • 非同期コードと同期コードが混在するプロジェクトでは、インターフェースの設計や呼び出し規約に注意が必要になり、コードが複雑になる可能性があります。

デメリットは存在しますが、FastAPIでWebアプリケーションを開発する場合、特にI/Oバウンドなワークロードが中心であれば、async/awaitを活用するメリットはデメリットを大きく上回ることが多いです。重要なのは、非同期処理の特性を理解し、適切な場所で適切に使うことです。

非同期処理における注意点とベストプラクティス

FastAPIでasync/awaitを使って非同期処理を効果的に行うためには、いくつかの注意点とベストプラクティスがあります。

  1. async def関数内でブロッキング処理をawaitしない:

    • これは最も重要な注意点です。async defで定義されたコルーチン関数内で、awaitを使わずに長時間かかる同期的なブロッキングI/O(例: requests.get(), time.sleep(), 同期DBドライバの呼び出し)を実行すると、そのブロッキング処理が完了するまでイベントループ全体が停止してしまいます。
    • もし同期的なブロッキング処理を実行する必要がある場合は、以下のいずれかの方法を取ります。
      • 非同期対応ライブラリに置き換える: 可能な限り、requestshttpxに、同期DBドライバをasyncpgや非同期SQLAlchemyに、time.sleepasyncio.sleepに置き換えます。
      • asyncio.to_thread() (Python 3.9+) または loop.run_in_executor() を使う: 同期的なブロッキング処理を別スレッド(または別プロセス)で実行し、その完了を非同期に待つことができます。FastAPIのdef関数が自動でスレッドプールで実行されるのもこの仕組みを利用しています。
        python
        # 例: async def 関数内で requests を使う場合(非推奨だが知っておく価値はある)
        # import asyncio
        # import requests
        # async def fetch_sync_in_async():
        # # 同期関数呼び出しだが、await asyncio.to_thread() で別スレッドで実行し、完了を待つ
        # response = await asyncio.to_thread(requests.get, "https://httpbin.org/delay/3")
        # return response.json()

        しかし、これはオーバーヘッドを伴うため、非同期対応ライブラリを使う方が一般的に推奨されます。
  2. CPUバウンド処理への対応:

    • 時間のかかるCPUバウンド処理(例: 大規模なデータ処理、画像処理、重い計算)もイベントループをブロックします。これもasync def関数内で直接実行するべきではありません。
    • CPUバウンド処理は、asyncio.to_thread() (Python 3.9+) または loop.run_in_executor() を使って別スレッド(または別プロセス、デフォルトのExecutorはThreadPoolExecutor)で実行するか、FastAPIの場合はその処理を含む関数をdef関数として定義し、FastAPIにスレッドプールでの実行を任せます。CPUバウンド処理にはProcessPoolExecutorの方が適していることが多いですが、設定が必要です。
  3. 適切な非同期対応ライブラリを選択する:

    • 外部依存関係(データベース、キャッシュ、キュー、外部APIなど)を利用する際は、可能な限りasyncioに対応した非同期ライブラリを選びましょう。これにより、アプリケーション全体を効率的な非同期モデルで統一できます。
  4. 非同期リソースのライフサイクル管理:

    • データベース接続プール、非同期HTTPクライアントなどは、アプリケーションの起動時に作成し、終了時に適切に閉じる必要があります。FastAPIの@app.on_event("startup") / @app.on_event("shutdown")(またはlifespanイベント)や、Dependsを使ったリソース管理(yieldを使うパターン)を活用しましょう。
  5. エラーハンドリング:

    • 非同期処理におけるエラーハンドリングは、同期処理と同様にtry...exceptブロックで行います。ただし、asyncio.gatherなどで複数のタスクを並行実行している場合、いずれかのタスクで例外が発生すると、デフォルトではその例外がgatherを呼び出した場所に伝播し、他の実行中のタスクはキャンセルされないままになることがあります(return_exceptions=Trueを渡さない場合)。必要に応じてエラーハンドリング戦略を検討しましょう。
  6. デバッグ手法を学ぶ:

    • 非同期コードのデバッグは同期コードよりも少し複雑になることがあります。asyncioのデバッグモードを有効にしたり、ログを詳細に出力したり、printデバッグを効果的に使ったりするなどの手法に慣れましょう。
  7. 適切なユースケースの判断:

    • 非同期処理は特にI/Oバウンドなワークロードにおいて効果的です。全てのアプリケーションや処理を無理に非同期化する必要はありません。CPUバウンドな処理が中心のアプリケーションであれば、マルチプロセッシングの方が適している場合もあります。FastAPIは同期・非同期の両方のパスオペレーションを混在させることができるため、処理の内容に応じて適切な方を選択するのがベストプラクティスです。

これらの注意点とベストプラクティスを踏まえることで、FastAPIにおける非同期処理のメリットを最大限に享受し、堅牢でパフォーマンスの高いアプリケーションを構築できます。

まとめ:FastAPIとasync/awaitで未来のWebアプリケーションを構築する

この記事では、Webアプリケーション開発における同期処理の限界から始まり、非同期処理の基本的な概念、Pythonのasyncioasync/awaitキーワード、そしてFastAPIでこれらをどのように活用するかを詳細に解説しました。具体的なコード例を通じて、外部API呼び出しやデータベースアクセスといった一般的なI/Oバウンドなタスクを非同期で実装する方法を示しました。

同期処理はコードが直線的で分かりやすい反面、I/Oバウンドな処理が発生するとスレッド/プロセスがブロックされ、多数の同時接続に対応する際にスケーラビリティやリソース効率に課題がありました。

これに対し、非同期処理はI/O待機中にタスクを切り替えることで、限られたリソースで高い並行処理能力を実現します。Pythonのasync/await構文とasyncioライブラリは、この非同期処理を比較的容易に記述することを可能にしました。

FastAPIは、このモダンな非同期処理パラダイムを基盤として設計されており、async defキーワードを使って非同期対応のエンドポイントを簡単に定義できます。非同期対応ライブラリと組み合わせることで、ノンブロッキングなI/O処理を最大限に活用し、高いスループットと効率性を実現できます。特に、多くのI/Oバウンド処理を含むWeb APIやマイクロサービスにおいては、FastAPIとasync/awaitの組み合わせは非常に強力です。

もちろん、非同期処理には学習コストやデバッグの難しさ、CPUバウンド処理への不向きといったデメリットも存在します。しかし、非同期処理が真価を発揮するI/Oバウンドなシナリオにおいては、これらのデメリットを補って余りあるメリットがあります。FastAPIが同期関数と非同期関数の両方をサポートしているのは、この点を考慮しているためです。処理の内容に応じて、同期と非同期を適切に使い分けることが重要です。特に、async def関数内ではブロッキングI/Oを実行しないという原則は絶対に守るべきです。

FastAPIとasync/awaitを習得することで、あなたは現代のWebアプリケーションに求められる高いパフォーマンスとスケーラビリティを備えたシステムを構築するための強力なツールを手に入れることができます。非同期処理の概念をしっかりと理解し、本記事で紹介したコード例や注意点を参考にしながら、ぜひ実際にコードを書いてみてください。実践を通じて、その強力さと便利さを実感できるはずです。

今後も、非同期対応ライブラリのエコシステムは進化し続けるでしょう。新しい技術やライブラリが登場しても、非同期処理の基本的な概念とasync/awaitの使い方が理解できていれば、新しい知識もスムーズに習得できるはずです。

FastAPIで、より高速でスケーラブル、そして現代的なWebアプリケーション開発の世界へ踏み出しましょう!


コメントする

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

上部へスクロール