はい、承知いたしました。「ZIO Scalaを始めよう:初心者向け導入ガイド」というテーマで、約5000語の詳細な技術記事を記述します。
ZIO Scalaを始めよう:初心者向け導入ガイド
はじめに
Scalaは、オブジェクト指向と関数型プログラミングのパラダイムを統合した強力な言語です。近年、特にバックエンド開発において、関数型プログラミングのアプローチが注目を集めています。その理由の一つは、副作用の管理を徹底することで、より予測可能でテストしやすい、堅牢なコードを記述できるからです。
従来のオブジェクト指向的なアプローチでは、状態の変更や外部とのやり取り(ファイルI/O、ネットワーク通信、データベースアクセスなど)といった「副作用」が、メソッド呼び出しの中で暗黙的に行われることがよくあります。これはコードをシンプルに見せる一方で、実行結果が呼び出し時のコンテキストやグローバルな状態に依存しやすくなり、バグの原因となったり、テストを困難にしたりします。
関数型プログラミングでは、可能な限り「純粋な関数」、すなわち、同じ入力に対して常に同じ出力を返し、外部の状態を変更しない関数を使うことを目指します。しかし、現実のアプリケーション開発において、副作用を完全に避けることは不可能です。データベースへの書き込みや、外部APIへのリクエストは必要です。ここで重要になるのが、副作用を「いつ」「どのように」実行するかを明確に制御し、その発生を型システムで表現することです。
副作用を扱うためのライブラリとして、ScalaではCats Effectのような型クラスベースのアプローチや、今回の主題であるZIOのようなデータ型ベースのアプローチがあります。ZIOは、副作用を持つ操作を値として表現するデータ型 ZIO[R, E, A]
を中心に、堅牢なアプリケーションを構築するための豊富な機能を提供します。このガイドでは、ZIOの基本的な考え方から、実際のコードの書き方、さらには並行処理、エラーハンドリング、依存性の注入といった高度なトピックまで、初心者向けに分かりやすく解説します。
なぜZIOか?
ZIOが提供する主な価値は以下の点に集約されます。
- 副作用の明示的な表現: 副作用を含む計算を
ZIO[R, E, A]
という値として表現します。これにより、コードを読むだけで、その操作が何らかの副作用(例えば、R型の環境に依存し、E型のエラーが発生する可能性があり、成功した場合はA型の値を返す)を持つことが分かります。 - 強力な型安全性:
ZIO
型のパラメータR
,E
,A
によって、依存する環境、発生しうるエラー、そして成功時の結果の型が静的に保証されます。これにより、実行時エラーの多くをコンパイル時に検出できます。 - 構造化された並行処理: 軽量なファイバー(Fiber)を用いた並行処理モデルを提供します。ファイバーはOSスレッドよりもはるかに軽量で、大量の並行タスクを効率的に管理できます。また、ファイバーの構造化された親子関係により、並行タスクのキャンセルやエラー伝播が容易になります。
- 包括的なエラーハンドリング:
ZIO
型のE
パラメータにより、リカバリ可能なビジネスロジック上のエラーを表現できます。ZIOは、例外(非同期例外を含む)とドメインエラーの両方を扱うための、強力で柔軟なエラーハンドリング機能を提供します。 - 洗練された依存性注入:
R
パラメータを用いた環境(Environment)システムにより、モジュール間の依存性を型安全かつ明確に管理できます。これは、特に大規模なアプリケーション開発において、コードのモジュール性、テスト容易性、再利用性を向上させます。 - テスト容易性:
ZIO
が副作用を値として扱うため、副作用を含むコードのテストが容易になります。ZIO Testフレームワークは、ZIOベースのコードをテストするための強力な機能を提供します。
これらの特徴により、ZIOは信頼性が高く、保守しやすい、スケーラブルなアプリケーション開発に適しています。
記事の目的と対象読者
このガイドは、Scalaの基本的な文法は理解しているものの、関数型プログラミングの副作用管理や、ZIOのようなライブラリを使った開発経験がない方を対象としています。ZIOの基本的な概念から始め、実際にコードを書いて動かすところまでを目標とします。最終的には、ZIOを使った堅牢なアプリケーション構築の基礎を身につけることができるように構成しています。
さあ、ZIOの世界へ踏み出しましょう!
1. ZIOの基本概念
ZIOの中心となるのは、計算や処理を表すデータ型 ZIO[R, E, A]
です。この型は、これから実行される「副作用を伴うかもしれない計算」を表現した「値」です。まだ実行されていません。実行は明示的に指示された時に行われます。この遅延評価(Deferred Execution)の仕組みが、副作用を管理するための鍵となります。
ZIO[R, E, A]
は3つの型パラメータを持ちます。
R
(Requirement/Environment): この計算を実行するために必要な「環境」(依存性)の型。計算が特定のサービスや設定を必要とする場合に指定します。依存性がない場合はAny
を指定します。E
(Error): この計算が失敗した場合に返す「エラー」の型。リカバリ可能なドメイン固有のエラーを指定します。エラーが発生しない場合はNothing
を指定します。A
(Success): この計算が成功した場合に返す「結果」の型。計算が値を返さない(副作用だけを実行する)場合はUnit
を指定します。
例を見てみましょう。
ZIO[Any, Nothing, Int]
: 依存性もエラーもなく、成功すると整数値を返す計算。最も純粋な計算に近いです。ZIO[Any, Throwable, String]
: 依存性はないが、Javaの例外 (Throwable
) が発生する可能性があり、成功すると文字列を返す計算。ファイル読み込みなど、一般的なI/O操作によく使われます。ZIO[DatabaseService, DatabaseError, User]
:DatabaseService
という環境に依存し、DatabaseError
というエラーが発生する可能性があり、成功するとUser
オブジェクトを返す計算。依存性の注入とカスタムエラーの例です。ZIO[Console, java.io.IOException, Unit]
:Console
という環境に依存し、IOException
が発生する可能性があり、成功しても値を返さない(コンソール出力など、副作用だけを実行する)計算。
この ZIO[R, E, A]
という型を使うことで、メソッドのシグネチャを見ただけで、そのメソッドがどのような依存性を持ち、どのような種類のエラーが発生しうるか、そして何の結果を返すかが明確になります。これは、従来の「成功時には値を返し、失敗時には例外をスローする」というスタイルに比べて、はるかに情報量が多く、型安全です。
副作用の表現 (Deferred Execution)
ZIOの最も重要な概念の一つは、副作用を伴う操作を「その操作を実行するためのレシピ」として値化し、その値自体は純粋であるという点です。
“`scala
// これは「Hello, world!」と出力するという『計算』を表現した値
val sayHello: ZIO[Any, java.io.IOException, Unit] =
ZIO.debug(“Hello, world!”)
// この時点では、まだ何も出力されていません。sayHelloは単なる値です。
“`
この sayHello
という値は、コンソールに出力するという副作用を含む計算を表していますが、この値が作られた時点では、まだ実際には何も実行されていません。実行は、後述する unsafeRun
(非推奨) や Runtime
, ZIOAppDefault
を使って明示的に指示された時に初めて行われます。
この遅延評価のアプローチにはいくつかの利点があります。
- コンポジション:
ZIO
の値を組み合わせることで、より複雑な計算を構築できます。例えば、複数のI/O操作をシーケンシャルに実行したり、並行して実行したりする計算を、純粋な関数型の手法で記述できます。 - テスト容易性: 副作用を持つ操作自体が値として表現されているため、実際の副作用を実行せずに、その値(計算のレシピ)を検査したり、モック環境下で実行したりすることが容易になります。
- リソース管理: 計算グラフ全体を把握できるため、リソース(ファイルハンドル、ネットワーク接続など)の適切な取得と解放を保証できます。ZIOの
acquireReleaseWith
などのコンストラクタは、このリソース管理を型安全に行うためのものです。 - エラーリカバリとリトライ: 計算が失敗した場合に、そのレシピを使ってエラーから回復したり、操作を自動的にリトライしたりするロジックを、副作用の実行とは分離して記述できます。
不変性 (Immutability)
ZIOのデータ型 ZIO[R, E, A]
は不変です。一度作成された ZIO
の値は、その内部状態が変化することはありません。複数の ZIO
の値を結合して新しい ZIO
の値を作成する際も、元の値は変更されず、新しい値が生成されます。これは、並行処理において共有状態の変更による複雑な問題を回避する上で非常に重要です。
型安全性 (Type Safety)
前述のように、ZIO[R, E, A]
の型パラメータは、計算の依存性、エラー、結果を静的に表現します。これにより、コンパイラがコードの整合性をチェックし、多くの潜在的な問題をコンパイル時に検出できます。例えば、必要な環境が提供されていない場合や、発生しないはずのエラーをハンドリングしようとした場合、あるいは予期しない型の結果を扱おうとした場合などに、コンパイルエラーが発生します。
モジュラリティ (Modularity)
R
パラメータによる環境システムは、依存性注入(DI)の強力な仕組みを提供します。これにより、個々の ZIO
の値や、それらを組み合わせた大きな計算は、特定の具象クラスに直接依存するのではなく、必要なサービスの「型」にのみ依存するようになります。実際のサービス実装は、実行時に「レイヤー」(Layer[RIn, E, ROut]
)として提供されます。これにより、コードの各部分を独立して開発、テスト、再利用することが容易になり、アプリケーション全体のモジュール性が向上します。
並行処理と並列処理 (Concurrency and Parallelism)
ZIOは、軽量なユーザーレベルスレッドである「ファイバー」(Fiber)を基盤とした、構造化された並行処理モデルを提供します。 fork
オペレータを使うと、ある ZIO
の計算を別のファイバーで実行し、現在のファイバーとは独立して(ただし、ZIOランタイムによって協調的に)実行させることができます。
- 並行処理 (Concurrency): 複数のタスクが「同時に進行しているように見える」こと。ZIOは、単一のスレッドプール上で複数のファイバーを効率的に切り替えることで、高い並行性を実現します。これは、I/Oバウンドな処理(ネットワーク通信、ファイルI/Oなど)において、OSスレッドをブロックせずに多数の接続を同時に処理するのに特に有効です。
- 並列処理 (Parallelism): 複数のタスクが「実際に同時に実行される」こと。ZIOは、スレッドプール上の複数のOSスレッドを活用して、CPUバウンドな処理を複数のコアで並列に実行できます。
ZIO.collectAllPar
,ZIO.foreachPar
といった並列コンビネータが提供されています。
ファイバーは親子関係を持つことができ、親ファイバーが子ファイバーの完了を待ったり、子ファイバーをキャンセルしたり、子ファイバーで発生したエラーを伝播させたりすることができます。この「構造化された並行処理」により、並行タスクの管理が容易になり、デッドロックやリソースリークといった並行処理にありがちな問題を回避しやすくなります。
2. 基本的なZIOエフェクトの作成
ZIO[R, E, A]
型の値を作成するには、主にZIOオブジェクトのファクトリメソッドを使用します。ここではいくつかの基本的なメソッドを紹介します。
純粋な値の作成
ZIO.succeed[A](value: A)
: 成功する計算を作成します。依存性もエラーもありません (ZIO[Any, Nothing, A]
)。
scala
val meaningOfLife: ZIO[Any, Nothing, Int] = ZIO.succeed(42)ZIO.fail[E](error: E)
: 指定したエラーで即座に失敗する計算を作成します。依存性も結果もありません (ZIO[Any, E, Nothing]
)。
scala
val failedCalculation: ZIO[Any, String, Nothing] = ZIO.fail("Something went wrong")ZIO.unit
: 成功するが、Unit(値を返さないこと)を結果とする計算を作成します。これは副作用だけを実行する計算によく使われます (ZIO[Any, Nothing, Unit]
)。
scala
val doNothing: ZIO[Any, Nothing, Unit] = ZIO.unit
副作用を持つエフェクトの作成
ZIOは、既に存在する副作用のあるコード(Scalaの標準ライブラリなど)をZIOエフェクトに変換するための様々なメソッドを提供します。
-
ZIO.attempt[A](f: => A)
: 副作用があり、Throwable
(例外)をスローする可能性のあるコードブロックf
を、ZIO[Any, Throwable, A]
に変換します。成功するとA
型の値を返し、例外をスローした場合はThrowable
をエラーとしてラップします。
“`scala
import scala.io.Sourceval readFile: ZIO[Any, Throwable, String] =
ZIO.attempt {
Source.fromFile(“example.txt”).getLines().mkString(“\n”)
}
* `ZIO.attemptBlocking[A](f: => A)`: ブロッキング操作を含むコードブロック `f` をZIOエフェクトに変換します。`attempt` と似ていますが、これはZIOのブロッキング専用スレッドプールで実行されることが意図されています。ファイルI/OやJDBC呼び出しなど、長時間ブロックする可能性のある操作に使います。
scala
val readLargeFileBlocking: ZIO[Any, Throwable, String] =
ZIO.attemptBlocking {
Source.fromFile(“large_file.txt”).getLines().mkString(“\n”)
}
* `ZIO.sync[A](f: => A)`: 副作用があるが、例外をスローしないと保証できるコードブロック `f` を、`ZIO[Any, Nothing, A]` に変換します。注意深く使うべきです。
scala
val calculatePurely: ZIO[Any, Nothing, Int] =
ZIO.sync {
// 例外をスローしない純粋な計算
val x = 10
x * 2
}
* `ZIO.debug[A](message: A)`: デバッグメッセージを標準出力に書き出すエフェクトを作成します。これは `ZIO[Console, IOException, Unit]` に変換されます。(ZIO 2.0以降で推奨)
scala
import zio.Console._ // Console環境に依存するため、これをimportしますval printMessage: ZIO[Console, java.io.IOException, Unit] =
ZIO.debug(“Printing a message”)
``
ZIO.consoleは Console 環境を直接参照するためのものです。コンソール出力自体は
printLine,
readLineなどのメソッドを持つ
Console` サービスを利用します。ZIO 2.0ではこれらのサービスはレイヤーとして提供されます。“`scala
import zio.Consoleval printHello: ZIO[Console, java.io.IOException, Unit] =
Console.printLine(“Hello from ZIO Console!”)val readName: ZIO[Console, java.io.IOException, String] =
Console.readLine(“What’s your name? “)
``
Console.live` レイヤーを環境として提供する必要があります。
これらのコンソール操作を実行するには、後述する
値の変換 (map
, mapError
)
作成したZIOエフェクトの値やエラーを、新しい型に変換するには map
や mapError
といったオペレータを使用します。これらは、元のエフェクトの結果に写像関数を適用して新しいエフェクトを作成します。
map[B](f: A => B)
: 成功時の値A
を、関数f
を使ってB
に変換します。エラー型E
と環境型R
は変わりません。
scala
val meaningOfLife: ZIO[Any, Nothing, Int] = ZIO.succeed(42)
val meaningOfLifeString: ZIO[Any, Nothing, String] =
meaningOfLife.map(i => s"The meaning of life is $i")-
mapError[E2](f: E => E2)
: 失敗時のエラーE
を、関数f
を使ってE2
に変換します。成功時の値型A
と環境型R
は変わりません。
“`scala
case class CustomError(message: String)val failedCalculation: ZIO[Any, String, Int] = ZIO.fail(“Something went wrong”)
val failedWithCustomError: ZIO[Any, CustomError, Int] =
failedCalculation.mapError(errorMessage => CustomError(s”Processing failed: $errorMessage”))
“`
3. エフェクトの結合
ZIOの強力さは、作成した小さなエフェクトを組み合わせて、より複雑な計算フローを構築できる点にあります。ここでは主な結合方法を見ていきます。
シーケンシャルな結合
複数のエフェクトを順番に実行し、前のエフェクトの結果を次のエフェクトで使用する場合に使います。
-
flatMap[R1, E1, B](f: A => ZIO[R1, E1, B])
: 最初のZIOエフェクトが成功した際に得られる値A
を使って、次のZIOエフェクトZIO[R1, E1, B]
を作成する関数f
を適用します。結果はZIO[R with R1, E or E1, B]
となります(環境とエラー型が結合されます)。これは、多くの非同期/副作用ライブラリにおけるthen
やawait
に相当し、シーケンシャル処理の基本です。
“`scala
val readFirstLine: ZIO[Any, Throwable, String] = ZIO.attempt(scala.io.Source.fromFile(“file1.txt”).getLines().next())
val readSecondLine: ZIO[Any, Throwable, String] = ZIO.attempt(scala.io.Source.fromFile(“file2.txt”).getLines().next())val readBothLines: ZIO[Any, Throwable, (String, String)] =
readFirstLine.flatMap { line1 =>
readSecondLine.map { line2 =>
(line1, line2)
}
}
“` -
>>
(andThen): 最初のZIOエフェクトが成功した後、その結果を捨てて、次のZIOエフェクトを実行します。結果は次のエフェクトの結果になります。
“`scala
val step1: ZIO[Any, String, Int] = ZIO.succeed(1)
val step2: ZIO[Any, String, String] = ZIO.succeed(“done”)val combined: ZIO[Any, String, String] = step1 >> step2 // 結果は “done”
* `*>` (zipRight): `>>` と同じですが、ZIO 2.0以降では `*>` が推奨されています。
scala
val step1: ZIO[Any, String, Int] = ZIO.succeed(1)
val step2: ZIO[Any, String, String] = ZIO.succeed(“done”)val combined: ZIO[Any, String, String] = step1 *> step2 // 結果は “done”
* `<<` (zipLeft): 最初のZIOエフェクトと次のZIOエフェクトを順番に実行し、結果は最初のZIOエフェクトの結果になります。
scala
val step1: ZIO[Any, String, Int] = ZIO.succeed(1)
val step2: ZIO[Any, String, String] = ZIO.succeed(“done”)val combined: ZIO[Any, String, Int] = step1 <* step2 // 結果は 1
* `<*` (zipLeft): `<<` と同じですが、ZIO 2.0以降では `<*` が推奨されています。
scala
val step1: ZIO[Any, String, Int] = ZIO.succeed(1)
val step2: ZIO[Any, String, String] = ZIO.succeed(“done”)val combined: ZIO[Any, String, Int] = step1 <* step2 // 結果は 1
“`
for
コンプレッションの使用
flatMap
をチェインしてシーケンシャルな処理を書くのは冗長になりがちです。Scalaの for
コンプレッションは、flatMap
、map
、filter
を含むシーケンシャルな操作を、より読みやすく記述するための糖衣構文(Syntactic Sugar)です。ZIOも for
コンプレッション内で自然に使用できます。
“`scala
import zio.Console
val program: ZIO[Console, java.io.IOException, Unit] =
for {
_ <- Console.printLine(“What’s your name?”) // 結果がUnitなので _ で受ける
name <- Console.readLine // Stringを受け取る
_ <- Console.printLine(s”Hello, $name!”)
} yield () // 全体の結果がUnitなので () を返す
``
printLine
この例では、、
readLine、
printLineという3つのZIOエフェクトがシーケンシャルに実行されています。
<-の左辺は、右辺のZIOエフェクトが成功した際の値を束縛します。最後の
yieldは、
for` コンプレッション全体の成功時の値を指定します。
for
コンプレッションは、複雑なシーケンシャル処理を直感的に書くための強力なツールです。
並列結合
複数のエフェクトを並行して(可能であれば並列に)実行し、それらの結果を組み合わせる場合に使います。
-
zip[R1, E1, B](that: ZIO[R1, E1, B])
: 2つのZIOエフェクトを「並行して」実行し、それぞれの結果をタプルとして結合します。どちらかのエフェクトが失敗した場合、全体の計算も失敗します。
“`scala
val effect1: ZIO[Any, String, Int] = ZIO.succeed(1)
val effect2: ZIO[Any, String, String] = ZIO.succeed(“two”)val combined: ZIO[Any, String, (Int, String)] = effect1.zip(effect2) // 並行実行、結果は (1, “two”)
* `zipPar[R1, E1, B](that: ZIO[R1, E1, B])`: `zip` と似ていますが、より明示的に並列実行を意図していることを示します。ZIOランタイムが最適な実行戦略を選択します。
scala
val effect1: ZIO[Any, String, Int] = ZIO.succeed(1)
val effect2: ZIO[Any, String, String] = ZIO.succeed(“two”)val combined: ZIO[Any, String, (Int, String)] = effect1.zipPar(effect2) // 並列実行、結果は (1, “two”)
* `ZIO.collectAllPar[R, E, A](effects: Iterable[ZIO[R, E, A]])`: ZIOエフェクトのリストをすべて並列に実行し、成功した結果をリストとして返します。いずれか一つでも失敗した場合、全体の計算は失敗し、他の実行中のエフェクトはキャンセルされます。
scala
val effects = List(ZIO.succeed(1), ZIO.succeed(2), ZIO.succeed(3))
val results: ZIO[Any, Nothing, List[Int]] = ZIO.collectAllPar(effects) // 並列実行、結果は List(1, 2, 3)
* `ZIO.foreachPar[R, E, A, B](collection: Iterable[A])(f: A => ZIO[R, E, B])`: コレクションの各要素に関数 `f` を適用して得られるZIOエフェクト群を並列に実行し、その結果をリストとして返します。
scala
val ids = List(1, 2, 3)
def fetchUser(id: Int): ZIO[Any, String, String] =
// ユーザーを取得する処理 (ここではダミー)
ZIO.succeed(s”User $id”)val users: ZIO[Any, String, List[String]] = ZIO.foreachPar(ids)(fetchUser) // 並列実行、結果は List(“User 1”, “User 2”, “User 3”)
“`
並列結合は、複数の独立したI/O操作などを効率的に実行したい場合に非常に有効です。
4. エラーハンドリング
ZIOは、E
型パラメータを使って、リカバリ可能なドメイン固有のエラーを型安全に表現します。これにより、例外処理に比べて、どのようなエラーが発生する可能性があるかをコード上で明確にできます。
E
タイプによるエラー表現
ZIO[R, E, A]
の E
は、ビジネスロジック上で起こりうるエラーの型を表します。例えば、データベースアクセスにおける DatabaseError
や、API呼び出しにおける ApiError
などです。予期しないシステムレベルのエラー(例えば、OOMやStackOverflowErrorなど)は通常 Throwable
として扱われ、ZIOランタイムによって適切に処理されますが、ビジネスロジック上で意図的にエラーを表現する場合は E
を使います。
エラーの変換とリカバリ
ZIOは、エラーから回復したり、エラーを別の型に変換したりするための様々なオペレータを提供します。
-
catchAll[R1, E2, A1 >: A](f: E => ZIO[R1, E2, A1])
: エフェクトが失敗した場合、そのエラーE
を受け取り、別のZIOエフェクトZIO[R1, E2, A1]
を実行してエラーから回復します。成功した場合は元の成功値A
をそのまま返します。
“`scala
val mightFail: ZIO[Any, String, Int] = ZIO.fail(“Operation failed”)val recovered: ZIO[Any, Nothing, Int] =
mightFail.catchAll(error => {
ZIO.debug(s”Caught error: $error”) *> ZIO.succeed(-1) // エラーをログに出して、-1を返して成功させる
}) // エラー型がNothingに変わる
* `orElse[R1, E2, A1 >: A](that: ZIO[R1, E2, A1])`: エフェクトが失敗した場合、代わりに別のZIOエフェクト `that` を実行します。成功した場合は元の成功値 `A` をそのまま返します。
scala
val primary: ZIO[Any, String, Int] = ZIO.fail(“Primary failed”)
val secondary: ZIO[Any, String, Int] = ZIO.succeed(100)val result: ZIO[Any, String, Int] = primary.orElse(secondary) // primaryが失敗したので secondaryが実行され、結果は 100
* `fold[B](failure: E => B, success: A => B)`: エフェクトの実行結果を、失敗した場合と成功した場合で異なる関数を使って、どちらの場合でも同じ型 `B` の値に変換します。これにより、エラー型 `E` を消去し、結果を「成功した値」または「失敗した値」のいずれかの形で単一の型 `B` で表現できます。
scala
val mightFail: ZIO[Any, String, Int] = ZIO.fail(“Operation failed”)val resultString: ZIO[Any, Nothing, String] =
mightFail.fold(
error => s”Failed with error: $error”, // 失敗した場合の変換関数
value => s”Succeeded with value: $value” // 成功した場合の変換関数
) // エラー型がNothingに変わる
``
mapErrorE2
*: 失敗時のエラー型
Eを別の型
E2に変換します。
absolveE, A
*: 結果が
Either[E, A]であるZIOエフェクトを、エラー型が
E、成功値型が
AのZIOエフェクトに変換します。
Eitherで表現されたエラーをZIOのエラーチャネルに移動させます。
either
*:
ZIO[R, E, A]を
ZIO[R, Nothing, Either[E, A]]に変換します。成功も失敗も
Either` の中にラップされ、ZIO自体は決して失敗しないようになります。これは、エラーを値として扱いたい場合に便利です。
リトライ (retry
)
特定の条件で失敗した場合に、自動的に操作を再試行するには retry
オペレータを使用します。retry
は、ZIOが提供する Schedule
というデータ型と組み合わせて使います。Schedule
は、リトライの間隔や最大試行回数などを定義するためのものです。
“`scala
import zio.Schedule
import java.io.IOException
val flakyOperation: ZIO[Any, IOException, String] =
for {
_ <- ZIO.debug(“Attempting flaky operation…”)
_ <- ZIO.fail(new IOException(“Network unreachable”)).when(scala.util.Random.nextBoolean())
res <- ZIO.succeed(“Success!”)
} yield res
val retriedOperation: ZIO[Any, IOException, String] =
flakyOperation.retry(Schedule.recurs(3) && Schedule.spaced(zio.Duration.ofSeconds(1))) // 3回まで、1秒間隔でリトライ
// 上記の Schedule は “3回リトライするか、または各リトライの間に1秒待つ” スケジュールを組み合わせている
// ZIOの Schedule コンビネータは強力で、様々なリトライ戦略を構築できる
``
Schedule
この例では、ランダムに失敗する操作を最大3回まで、かつ各試行の間に1秒待つというスケジュールでリトライしています。リトライスケジュールはオブジェクトを
&&や
||で組み合わせたり、
map,
filter` などで変換したりして柔軟に構築できます。
タイムアウト (timeout
)
操作に時間制限を設けるには timeout
オペレータを使用します。指定した時間内に操作が完了しなかった場合、ZIOエフェクトは中断され、None
を含む Option
を返します。時間内に完了した場合は、Some
にラップされた成功値が返されます。
“`scala
import zio.Duration
val longRunningOperation: ZIO[Any, String, Int] =
ZIO.sleep(Duration.ofSeconds(5)) *> ZIO.succeed(42) // 5秒待ってから成功
val timedOperation: ZIO[Any, String, Option[Int]] =
longRunningOperation.timeout(Duration.ofSeconds(3)) // 3秒でタイムアウト
``
None` となります。
この例では、5秒かかる操作に3秒のタイムアウトを設けているため、結果は
ファイナライゼーション (ensuring
, acquireReleaseWith
)
リソースの解放など、操作が成功しても失敗しても常に実行したいクリーンアップ処理は、ファイナライゼーションを使って安全に実行できます。
-
ensuring(finalizer: ZIO[R1, Nothing, Any])
: 元のエフェクトの実行が完了した後(成功、失敗、中断にかかわらず)、常にfinalizer
エフェクトを実行します。
“`scala
val operation: ZIO[Any, String, Int] =
ZIO.succeed(10).tap(_ => ZIO.debug(“Operation finished”))val finalizedOperation: ZIO[Any, String, Int] =
operation.ensuring(ZIO.debug(“Running finalizer”))
* `acquireReleaseWith[R, E, A, B](acquire: ZIO[R, E, A])(release: A => ZIO[R, Nothing, Any])(use: A => ZIO[R, E, B])`: リソースの取得 (`acquire`)、そのリソースを使った処理 (`use`)、そしてリソースの解放 (`release`) というパターンを安全に実装するためのコンストラクタです。`acquire` が成功した後に `use` が実行され、`use` の結果にかかわらず(成功、失敗、中断)、必ず `release` が実行されます。`release` は `acquire` で取得したリソース `A` を受け取ります。`release` エフェクトはエラー型が `Nothing` であるべきです(解放処理自体が失敗した場合はログ出力などで対処することが多い)。
scala
import java.io.{FileWriter, IOException}def openFile(name: String): ZIO[Any, IOException, FileWriter] =
ZIO.attemptBlockingIO(new FileWriter(name))def closeFile(writer: FileWriter): ZIO[Any, IOException, Unit] =
ZIO.attemptBlockingIO(writer.close())def writeToFile(writer: FileWriter, content: String): ZIO[Any, IOException, Unit] =
ZIO.attemptBlockingIO(writer.write(content))val writeOperation: ZIO[Any, IOException, Unit] =
ZIO.acquireReleaseWith(
openFile(“output.txt”) // リソースの取得
)(
writer => closeFile(writer).catchAll(e => ZIO.debug(s”Error closing file: $e”)) // リソースの解放 (解放エラーはログに出すだけ)
)(
writer => writeToFile(writer, “Hello, ZIO world!”) // リソースを使った処理
)
``
acquireReleaseWith` はファイルやソケット、データベースコネクションなどのリソース管理に不可欠です。ZIOは、このパターンを使うことで、例外や割り込み(ファイバーのキャンセルなど)が発生した場合でもリソースリークを防ぐことを保証します。
5. 環境 (Environment) と依存性注入 (DI)
ZIOの R
型パラメータは、計算が実行されるために必要な「環境」または「依存性」を表します。これは、サービスや設定といった外部リソースへの依存性を型安全に管理するための強力な仕組みです。
R
タイプによる依存性の表現
あるZIOエフェクトが DatabaseService
に依存する場合、その型は ZIO[DatabaseService, E, A]
のようになります。これにより、コードを読むだけで、このエフェクトを実行するには DatabaseService
のインスタンスが必要であることが明確になります。
“`scala
trait DatabaseService {
def getUser(id: Int): ZIO[Any, DatabaseError, User]
}
case class User(id: Int, name: String)
case class DatabaseError(message: String)
def findUser(id: Int): ZIO[DatabaseService, DatabaseError, User] =
ZIO.environment[DatabaseService].flatMap(.get.getUser(id))
// または ZIO.service[DatabaseService].flatMap(.getUser(id)) // ZIO 2.0以降推奨
``
findUser
上記のエフェクトは、まだ特定のデータベース実装を知りません。知っているのは、
DatabaseService` という型のサービスが必要であるということだけです。
環境の提供 (provideLayer
, provideEnvironment
, provideService
)
依存性を解決し、ZIOエフェクトを実行可能にするには、必要な環境を「提供」する必要があります。ZIOでは、「レイヤー」(Layer[RIn, E, ROut]
)という概念を使って環境を構築し、提供します。
Layer[RIn, E, ROut]
: サービスの実装方法を記述したものです。RIn
はこのレイヤー自身が依存する環境、E
はレイヤーの構築中に発生しうるエラー、ROut
はこのレイヤーが提供するサービスを表します。レイヤーは、依存するサービスを別のレイヤーから取得し、それを基に自身のサービスを構築します。ZIO#provideLayer[RIn, E2](layer: Layer[RIn, E2, R])
: ZIOエフェクトZIO[R, E, A]
に対して、必要な環境R
を提供するレイヤーLayer[RIn, E2, R]
を適用します。結果として得られるエフェクトは、そのレイヤーが依存する環境RIn
を必要とし、元のエラーE
またはレイヤー構築時のエラーE2
が発生する可能性があります。これにより、必要な依存性が満たされた新しいZIOエフェクトが得られます。
例:DatabaseService
の具体的な実装を提供するレイヤーを作成します。
“`scala
// DatabaseService の実装例 (これは単純なモック実装)
case class DatabaseServiceLive() extends DatabaseService {
override def getUser(id: Int): ZIO[Any, DatabaseError, User] =
ZIO.succeed(User(id, s”User $id”)) // 実際はDBアクセスなどを行う
}
// DatabaseService のレイヤーを作成
object DatabaseServiceLive {
val layer: Layer[Any, Nothing, DatabaseService] =
ZIO.succeed(DatabaseServiceLive()).toLayer // DatabaseServiceLive のインスタンスを作成し、それを Layer に変換する
}
// findUser エフェクトにレイヤーを提供して、依存性を解決する
val findUserEffect: ZIO[DatabaseService, DatabaseError, User] = findUser(123)
val runnableEffect: ZIO[Any, DatabaseError, User] =
findUserEffect.provideLayer(DatabaseServiceLive.layer) // DatabaseService の依存性を解決
``
runnableEffectは
Any環境に依存するようになりました。これは、実行に必要な
DatabaseServiceが
DatabaseServiceLive.layer` によって提供されたためです。
複数のサービスに依存する場合、それらのサービスを提供する複数のレイヤーを組み合わせる必要があります。ZIOはレイヤーを組み合わせるための強力なコンビネータを提供しています。
Layer.merge[RIn, E, ROut1, ROut2](layer1: Layer[RIn, E, ROut1], layer2: Layer[RIn, E, ROut2])
: 同じ入力環境RIn
を持つ2つのレイヤーを結合し、両方の出力環境ROut1
とROut2
を含む新しいレイヤーを作成します。ROut1
とROut2
が構造的に結合されます(例えばHas[ServiceA] with Has[ServiceB]
のような型になります)。Layer.and[RIn, E, ROut1, ROut2](layer1: Layer[RIn, E, ROut1], layer2: Layer[ROut1, E, ROut2])
:layer1
の出力ROut1
をlayer2
の入力RIn
として使用して、2つのレイヤーをシーケンシャルに結合します。
例:ロギングサービスに依存するデータベースサービスがあり、両方に依存するアプリケーションコードがある場合
“`scala
trait LoggingService {
def log(message: String): ZIO[Any, Nothing, Unit]
}
object LoggingServiceLive {
val layer: Layer[Any, Nothing, LoggingService] =
ZIO.succeed(new LoggingService {
override def log(message: String): ZIO[Any, Nothing, Unit] = ZIO.debug(s”[LOG] $message”)
}).toLayer
}
// DatabaseService が LoggingService に依存する場合
case class DatabaseServiceWithLogging(logging: LoggingService) extends DatabaseService {
override def getUser(id: Int): ZIO[Any, DatabaseError, User] =
logging.log(s”Fetching user $id”) *> // ロギングサービスを使用
ZIO.succeed(User(id, s”User $id”)).mapError(e => DatabaseError(e.getMessage))
}
object DatabaseServiceWithLogging {
// LoggingService レイヤーに依存して、DatabaseService レイヤーを構築
val layer: Layer[LoggingService, Throwable, DatabaseService] =
ZIO.service[LoggingService].map(new DatabaseServiceWithLogging(_)).toLayer
}
// アプリケーションコードは DatabaseService と LoggingService の両方に依存
val appLogic: ZIO[DatabaseService with LoggingService, DatabaseError, User] =
for {
_ <- ZIO.service[LoggingService].flatMap(.log(“Starting app logic”))
user <- findUser(123) // DatabaseService が必要
_ <- ZIO.service[LoggingService].flatMap(.log(s”Fetched user: $user”))
} yield user
// 依存性を提供
val fullLayer: Layer[Any, Throwable, DatabaseService with LoggingService] =
LoggingServiceLive.layer >>> DatabaseServiceWithLogging.layer // Logging の出力が Database の入力になるように結合
val runnableApp: ZIO[Any, Throwable, User] =
appLogic.provideLayer(fullLayer) // 依存性を解決
// 実行(後述)
``
R` 型パラメータで明示され、具体的な実装はレイヤーとして提供されます。テスト時には、本物のサービス実装の代わりにモック実装を提供するレイヤーを簡単に差し替えることができます。
このレイヤーシステムは、SpringやGuiceといった伝統的なDIフレームワークとはアプローチが異なりますが、型安全性とテスト容易性において大きな利点があります。依存性は
ZIO 2.0以降の環境とサービス
ZIO 2.0では、環境の表現とサービスアクセスがさらに改善されました。Has[Service]
という型は Service
そのものに置き換えられ、ZIO.environment[Service]
や ZIO.accessM
の代わりに ZIO.service[Service]
が推奨されています。これにより、環境の型がよりシンプルになりました (ZIO[ServiceA with ServiceB, E, A]
)。
上記の例では、findUser
は ZIO[DatabaseService, DatabaseError, User]
という型になり、内部で ZIO.service[DatabaseService]
を使ってサービスインスタンスを取得しています。
ZIO.service[ServiceType]
は ZIO[ServiceType, Nothing, ServiceType]
というエフェクトを返します。つまり、ServiceType
環境に依存し、成功するとその環境の値(ServiceType
のインスタンス)を返します。これを使ってサービスを取得し、そのメソッドを呼び出すのが ZIO 2.0スタイルのDIです。
6. 実行 (Running ZIO Effects)
ここまで、ZIOエフェクトを作成し、結合し、エラーや依存性を扱う方法を見てきました。しかし、これまでの操作はすべて「計算の定義」であり、実際の副作用は発生していません。ZIOエフェクトを実行し、その結果を得るには、明示的に実行を指示する必要があります。
ZIO.unsafeRun
(非推奨)
開発やデバッグの初期段階で手軽にZIOエフェクトを実行するために unsafeRunSync
, unsafeRunAsync
などのメソッドが提供されていましたが、これらは推奨されません。理由は、ZIOの提供する構造化された並行処理、エラー処理、リソース管理といった安全な仕組みの外で副作用を実行するため、予期しない挙動やリソースリークを引き起こす可能性があるからです。
特別な理由がない限り、アプリケーションのエントリーポイントとして推奨されるのは ZIOAppDefault
の使用です。
アプリケーションのエントリーポイント (ZIOAppDefault
)
ZIOアプリケーションを作成する最も一般的な方法は、ZIOAppDefault
を継承することです。これにより、ZIOランタイムのセットアップやシャットダウン、トップレベルのZIOエフェクトの実行などが自動的に行われます。
“`scala
import zio.
import zio.Console.
// 環境として Console サービスが必要
object MyApp extends ZIOAppDefault {
// run メソッドがアプリケーションのエントリーポイントとなる
override def run: ZIO[Any, Throwable, Unit] =
(for {
_ <- printLine(“Hello, what is your name?”)
name <- readLine
_ <- printLine(s”Hello, $name, welcome to ZIO!”)
} yield ()).provide(Console.live) // Console サービスレイヤーを提供
}
``
ZIOAppDefaultを継承したオブジェクトは、通常のScalaアプリケーションのように
mainメソッドを持ちます(ZIOが自動生成します)。
runメソッドは、アプリケーションのメインロジックを表す
ZIO[Any, Throwable, Any]エフェクトを返します。
Any環境は、
ZIOAppDefaultが基本的なZIOランタイム環境を提供するためです。
Throwableは、アプリケーション全体でリカバリしない非予期的なエラーを表します。結果型は
Anyで、通常は
Unit` を返します。
run
メソッド内で、必要な環境を provide
または provideLayer
メソッドを使って提供します。上記の例では、標準的なコンソール操作を提供する Console.live
レイヤーを提供しています。
Runtime
の使用
より低レベルな実行制御や、既存のアプリケーションの一部としてZIOを使いたい場合は、Runtime
を直接使用することも可能です。しかし、これは通常の上級者向けのトピックです。Runtime
はZIOエフェクトを実行するためのスレッドプールやスケジューラといった基盤を提供します。
“`scala
import zio.
import zio.console. // 2.0未満のインポートスタイル例
// ZIO 2.0以降では zio.Console._ のみで OK
import zio.Console
object ManualRunExample {
def main(args: Array[String]): Unit = {
// デフォルトの Runtime を作成 (プロダクションでは設定をカスタマイズすることが多い)
val runtime = Runtime.default
// 実行したい ZIO エフェクト
val myApp: ZIO[Console, java.io.IOException, Unit] =
Console.printLine("Running manually")
// エフェクトを実行し、結果を得る
// provideLayer で必要な環境 (Console) を提供
val result = runtime.unsafeRun(myApp.provideLayer(Console.live))
// 結果を処理 (ここでは Unit なので何も起こらない)
println(s"Execution finished with result: $result")
}
}
``
Runtime.defaultは、最も基本的な設定でRuntimeインスタンスを作成します。
runtime.unsafeRunは、与えられたZIOエフェクトが完了するまで現在のスレッドをブロックし、結果を返します。繰り返しになりますが、
unsafeRunの使用は注意が必要です。
ZIOAppDefault` を使う方が、Runtimeの管理をZIOに任せられるため、より安全で推奨されます。
7. 並行処理と並列処理
ZIOは、軽量な「ファイバー」(Fiber)を基盤とした、強力で構造化された並行処理モデルを提供します。
Fiberの概念 (fork
)
ZIO[R, E, A]
エフェクトを「実行」すると、その実行はZIOランタイムによって管理される「ファイバー」の中で行われます。通常、エフェクトは親ファイバーと同じファイバーでシーケンシャルに実行されます。
新しいファイバーを作成し、そこで別のエフェクトを並行して実行するには、fork
オペレータを使用します。
“`scala
val effect1: ZIO[Any, String, Int] = ZIO.sleep(zio.Duration.ofSeconds(3)) > ZIO.succeed(1)
val effect2: ZIO[Any, String, String] = ZIO.sleep(zio.Duration.ofSeconds(2)) > ZIO.succeed(“two”)
// effect2 を新しいファイバーで実行
val fiber2: ZIO[Any, Nothing, Fiber[String, String]] = effect2.fork
// 親ファイバーは effect1 を実行しつつ、fiber2 の完了を待つなどの操作が可能になる
val combined: ZIO[Any, String, (Int, String)] =
for {
f2 <- effect2.fork // effect2 を並行して実行開始
res1 <- effect1 // effect1 を親ファイバーで実行し、完了を待つ (3秒かかる)
res2 <- f2.join // fiber2 (effect2) の完了を待ち、結果を取得する (2秒かかるはずだが、effect1 実行中に既に完了している可能性が高い)
} yield (res1, res2) // 結果は (1, “two”)
``
forkは
ZIO[R, E, A]エフェクトを受け取り、そのエフェクトを新しいファイバーで実行する「レシピ」である
ZIO[R, Nothing, Fiber[E, A]]を返します。
Fiber[E, A]は、フォークされたファイバー自体を表すハンドルです。
Fiber[E, A]の
Eと
Aは、フォークされたエフェクトのそれに対応します。
forkエフェクト自身は成功し(
Nothing` エラー)、フォークされたファイバーのハンドルを返します。フォークされたファイバーで発生したエラーは、ハンドルに保持されます。
Fiberの結合 (join
, await
)
フォークしたファイバーの結果を得たり、完了を待ったりするには、Fiber
オブジェクトのメソッドを使います。
fiber.join
: ファイバーが完了するまで現在のファイバーをブロックし、その成功値A
を含むZIO[Any, E, A]
エフェクトを返します。フォークされたファイバーが失敗した場合、join
を呼び出したファイバーも同じエラーで失敗します。fiber.await
: ファイバーが完了するまで現在のファイバーをブロックし、その結果をExit[E, A]
として返すZIO[Any, Nothing, Exit[E, A]]
エフェクトを返します。Exit[E, A]
は、ファイバーが成功したか (Exit.succeed(value)
)、失敗したか (Exit.fail(error)
またはExit.die(throwable)
またはExit.interrupt(fiberId)
) を表すデータ型です。await
は決して失敗しません(エラー型がNothing
)。エラーや中断もExit
値として返されます。
join
はフォークされたエフェクトのエラーを親に伝播させたい場合に、await
はエラーや中断も含めた「結果」を値として扱いたい場合に便利です。
並行処理パターン (race
, merge
)
複数のZIOエフェクトを並行して実行し、特定の条件で全体の計算を完了させるための便利なコンビネータも用意されています。
-
effect1.race(effect2)
: 2つのエフェクトを並行して実行し、最初に完了した方の結果を返します。遅く完了した方のエフェクトは自動的にキャンセルされます。
“`scala
val effectA = ZIO.sleep(zio.Duration.ofSeconds(3)) > ZIO.succeed(“A”)
val effectB = ZIO.sleep(zio.Duration.ofSeconds(2)) > ZIO.succeed(“B”)val winner = effectA.race(effectB) // effectB が先に完了し、結果は “B”
``
effect1.merge(effect2)(f: (Either[E, A], Either[E, B]) => C)
*: 2つのエフェクトを並行して実行し、両方の結果(成功または失敗)が得られたら、関数
fを使ってそれらを結合し、単一の結果
C` を返します。
並列処理コンビネータ
前述の ZIO.collectAllPar
, ZIO.foreachPar
, zipPar
は、複数の独立した計算を効率的に並列実行するための最も一般的な方法です。これらは、計算がCPUバウンドかI/Oバウンドかに応じて、ZIOランタイムが内部的に適切なスレッドプールを使い分けて実行します。
キュー (Queue
) とセマフォ (Semaphore
)
より複雑な並行処理の協調パターン(Producer/Consumerなど)を実装するために、ZIOは型安全な並行プリミティブを提供しています。
Queue[A]
: スレッドセーフなキュー。offer
(要素を追加),take
(要素を取り出す),poll
(要素を取り出すが、なければNoneを即時返す),size
(現在の要素数) などの操作がZIOエフェクトとして提供されます。容量を限定したキュー(bounded
)や無制限のキュー(unbounded
)を作成できます。Semaphore
: 並行アクセスを制御するためのセマフォ。指定した数の「許可」を持ち、acquire
(許可を取得),release
(許可を解放) といった操作がZIOエフェクトとして提供されます。
これらのプリミティブを使うことで、低レベルなロックや同期メカニズムを使うことなく、安全かつ表現力豊かに並行処理を記述できます。
“`scala
import zio._
val producerConsumer: ZIO[Any, Nothing, Unit] =
for {
// 容量3のキューを作成
queue <- Queue.boundedInt
// Producer: 1から10までの数字をキューに入れる
producer = ZIO.foreach(1 to 10) { i =>
queue.offer(i) *> ZIO.debug(s"Produced: $i") *> ZIO.sleep(zio.Duration.ofMillis(100))
}.fork // Producer を別のファイバーで実行
// Consumer: キューから数字を取り出して処理する (3つの Consumer が並列に動く)
consumer = ZIO.foreachPar(1 to 3) { id =>
queue.take.flatMap(i =>
ZIO.debug(s"Consumer $id consumed: $i") *> ZIO.sleep(zio.Duration.ofMillis(300))
).forever // キューが空になるまで(正確にはキューを閉じられるまで)無限に処理を繰り返す
}.fork // Consumer たちを並列で実行
// Producer と Consumer たちの完了を待つ
pFiber <- producer
cFiber <- consumer
_ <- pFiber.join // Producer が終わるのを待つ
_ <- queue.shutdown // Producer が終わったらキューを閉じる
_ <- cFiber.join // Consumer たちがキューの要素を全て処理して終わるのを待つ
} yield ()
“`
この例は、Producer/ConsumerパターンをZIOのQueueとFiberを使って実装したものです。複数のConsumerファイバーが並列に動作し、Queueから要素を取り出して処理します。
8. ストリームとチャンク
リアルタイム処理やバッチ処理、大量のデータを扱う場合、データを一度にメモリにロードするのではなく、要素を逐次処理するストリーム処理が有効です。ZIOは、プルベースのストリームライブラリである ZIO Streams (ZStream
) を提供しています。
ZIO Streams (ZStream
) の導入
ZStream[R, E, A]
は、非同期かつエラーハンドリング可能な要素のシーケンスを表します。R
は必要な環境、E
は発生しうるエラー、A
はストリームから流れる要素の型です。ZIO Streamsは「プルベース」であり、コンシューマーが必要なときに次の要素を要求するというモデルに基づいています。これにより、バックプレッシャーが自然に処理されます。
ストリームを作成するための様々なコンストラクタがあります。
ZStream.succeed(a: A)
: 単一要素のストリームZStream.empty
: 要素を含まないストリームZStream.fail(e: E)
: 指定したエラーで即座に失敗するストリームZStream.fromIterable(iterable: Iterable[A])
: コレクションからストリームを作成ZStream.fromChunk(chunk: Chunk[A])
: ZIOのChunkからストリームを作成ZStream.fromIterator(iterator: Iterator[A])
: イテレータからストリームを作成ZStream.repeatEffect(zio: ZIO[R, E, A])
: 指定したZIOエフェクトを繰り返し実行し、その結果を要素とするストリーム(無限ストリームになる可能性あり)ZStream.fromFile(path: String)
: ファイルからバイトを読み込むストリーム (ZStream[Any, Throwable, Byte]
)ZStream.iterate(initial: A)(f: A => A)
: 初期値から開始し、関数を適用して要素を生成する無限ストリーム
“`scala
import zio.stream._
val numbers: ZStream[Any, Nothing, Int] = ZStream(1, 2, 3, 4, 5)
val fileLines: ZStream[Any, Throwable, String] =
ZStream.fromFileName(“input.txt”).splitLines // ファイルから読み込み、行ごとに分割
“`
ストリームの変換
ストリームを別のストリームに変換するには、様々なオペレータを使用します。これらはZIOエフェクトのオペレータと似ています。
map[B](f: A => B)
: 各要素に関数を適用filter(f: A => Boolean)
: 条件を満たす要素のみを残すtake(n: Long)
: 最初のN個の要素だけを取るdrop(n: Long)
: 最初のN個の要素を捨てるflatMap[R1, E1, B](f: A => ZStream[R1, E1, B])
: 各要素から新しいストリームを作成し、それらを連結するscan[S](initial: S)(f: (S, A) => S)
: 状態を持ちながら要素を処理する(畳み込み)
scala
val processedNumbers: ZStream[Any, Nothing, Int] =
numbers
.filter(_ % 2 == 0) // 偶数だけを残す
.map(_ * 10) // 各要素を10倍する
.take(2) // 最初の2つの要素だけを取る (20, 40)
ストリームの消費 (runCollect
, runForEach
)
ストリームを「消費」して実際の結果を得るには、run
オペレータを使用します。run
は、指定された「シンク」(Sink)を使ってストリームの要素を処理し、最終的な結果を返すZIOエフェクトを生成します。
ZSink[R, E, In, L, Out]
: ストリームの要素In
を消費して、何らかの蓄積L
を行い、最終的な結果Out
を返すためのものです。stream.run(sink: ZSink[R, E, A, L, Out])
: ストリームZStream[R, E, A]
をシンクで処理し、ZIO[R, E, Out]
エフェクトを返します。
一般的なシンクの例:
ZSink.collectAll[A]
: ストリームの全要素をChunk[A]
として収集するシンク (ZSink[Any, Nothing, A, Chunk[A], Chunk[A]]
)ZSink.foreach[R, E, A](f: A => ZIO[R, E, Any])
: ストリームの各要素に関数f
を適用するシンク (ZSink[R, E, A, Any, Unit]
)
“`scala
// ストリームの全要素を収集してリストにする
val listResult: ZIO[Any, Nothing, Chunk[Int]] = numbers.runCollect
// ストリームの各要素をコンソールに出力する
val printEach: ZIO[Console, java.io.IOException, Unit] =
numbers.run(ZSink.foreach(i => Console.printLine(s”Processing: $i”)))
``
runCollectは内部的に
ZSink.collectAllを使用する便利なメソッドです。
runForEachは内部的に
ZSink.foreach` を使用する便利なメソッドです。
チャンク (Chunk
)
ZIO Streamsは、要素を一つずつではなく、「チャンク」(Chunk[A]
) と呼ばれる要素のバッチ単位で効率的に処理します。Chunk
はScala標準ライブラリの Seq
と似ていますが、より効率的な内部表現(特に配列ベース)を持ち、ストリーム処理に適しています。ストリームの多くのオペレータは、内部的にチャンク単位で処理を行います。runCollect
の結果も Chunk
になります。
9. テスト (Testing)
ZIOは、ZIOベースのコードをテストするための専用フレームワーク ZIO Test
を提供しています。ZIO Testは、ZIOのエフェクトモデルと深く統合されており、副作用を含むZIOコードのテストを容易にします。
ZIO Test
フレームワークの紹介
ZIO Testは、ScalaCheckのようなプロパティベーステストと、ScalatestやSpecs2のような例ベーステストの両方をサポートしています。また、ZIOエフェクトの実行を制御し、仮想時間や依存性のモックなどを簡単に設定できます。
build.sbt
に依存性を追加します:
scala
libraryDependencies ++= Seq(
"dev.zio" %% "zio-test" % "2.0.0" % Test,
"dev.zio" %% "zio-test-sbt" % "2.0.0" % Test // sbtとの統合
)
テストスイートは ZIOSpecDefault
を継承して作成します。
“`scala
import zio.test.
import zio.
object MyZIOSpec extends ZIOSpecDefault {
// spec メソッドでテストスイートを定義
override def spec: Spec[Any, Throwable] =
suite(“My ZIO App Tests”)(
test(“a simple ZIO effect should succeed”) {
val effect: ZIO[Any, String, Int] = ZIO.succeed(42)
// assert で ZIO エフェクトの結果を検証
assertZIO(effect)(Assertion.equalTo(42))
},
test("a failing ZIO effect should fail with correct error") {
val effect: ZIO[Any, String, Int] = ZIO.fail("Something went wrong")
// assertZIO で失敗を検証
assertZIO(effect.either)(Assertion.isLeft(Assertion.equalTo("Something went wrong")))
},
test("an effect requiring Console should print and read correctly") {
// TestConsole を使って Console サービスの挙動をシミュレーション
val effect: ZIO[Console, java.io.IOException, String] =
Console.printLine("Enter name:") *> Console.readLine
// TestConsole.feedLines で標準入力を模倣する
for {
_ <- TestConsole.feedLines("Alice")
result <- effect
output <- TestConsole.output
} yield {
// assert で結果や副作用 (出力) を検証
assert(result)(Assertion.equalTo("Alice")) &&
assert(output.mkString)(Assertion.containsString("Enter name:"))
}
}.provideLayer(TestConsole.global) // TestConsole レイヤーを提供する
)
}
``
specメソッドは
Spec[R, E]を返します。これは、テストスイート全体を表すZIOエフェクトのようなものです。
suiteや
testコンストラクタを使って、テストグループや個々のテストケースを定義します。テストケースは
ZIO[R, E, TestResult]エフェクトとして記述します。
TestResult` はテストの成功/失敗を表す型です。
assertZIO(effect)(assertion)
は、与えられたZIOエフェクトを実行し、その成功値が指定されたAssertionを満たすかどうかを検証するための便利なメソッドです。エラーや例外の検証にも使えます。
レイターを使った依存性のモック
ZIO TestとZIOのレイヤーシステムはテストにおいて強力な連携を見せます。テスト対象のコードが特定のサービス(例えばデータベース、外部APIクライアント)に依存している場合、本物のサービス実装の代わりに、テスト用のモック実装を提供するレイヤーを差し込むことができます。
“`scala
// テスト対象のサービスに依存するロジック
trait UserService {
def getUserName(id: Int): ZIO[Any, Throwable, String]
}
def welcomeUser(id: Int): ZIO[UserService, Throwable, String] =
UserService.getUserName(id).map(name => s”Welcome, $name!”)
// UserService のモック実装レイヤーを作成
object MockUserService {
val layer: Layer[Any, Nothing, UserService] =
ZLayer.succeed(new UserService {
// テストデータや検証ロジックを含むモックメソッド
override def getUserName(id: Int): ZIO[Any, Throwable, String] =
if (id == 1) ZIO.succeed(“Alice”)
else ZIO.fail(new NoSuchElementException(s”User $id not found”))
})
}
// テストコード
object UserServiceSpec extends ZIOSpecDefault {
override def spec: Spec[Any, Throwable] =
suite(“UserService Tests”)(
test(“welcomeUser should return correct greeting for existing user”) {
val effect = welcomeUser(1)
assertZIO(effect)(Assertion.equalTo(“Welcome, Alice!”))
}.provideLayer(MockUserService.layer), // モックレイヤーを提供
test("welcomeUser should fail for non-existing user") {
val effect = welcomeUser(999)
assertZIO(effect.either)(Assertion.isLeft(Assertion.isSubtype[NoSuchElementException](Assertion.anything)))
}.provideLayer(MockUserService.layer) // モックレイヤーを提供
)
}
``
welcomeUser
この例では、関数が
UserServiceに依存しています。テストでは、
MockUserService.layerを
provideLayerすることで、本物のデータベースアクセスなどを行わずに、定義済みのテストデータを使って
welcomeUser` の挙動を検証しています。このアプローチにより、テストは高速かつ信頼性が高くなります。
ZIO Testは、他にも仮想時間による時間依存コードのテスト、並行処理のテストヘルパー、プロパティベーステストとの連携など、様々な機能を提供します。
10. 実践的なヒントとベストプラクティス
ZIOを使った開発を円滑に進めるためのいくつかのヒントとベストプラクティスを紹介します。
for
コンプレッションを積極的に使う: シーケンシャルなZIOエフェクトの結合には、flatMap
チェインよりもfor
コンプレッションを使った方が、コードが読みやすく、意図が明確になります。-
適切なエラー型 (
E
) を設計する:E
にThrowable
を使うことは可能ですが、リカバリ可能なビジネスロジック上のエラーには、専用のケースクラスやADT(代数的データ型)を定義することをお勧めします。これにより、どのような種類のエラーが発生しうるかを型で表現でき、エラーハンドリングの網羅性をコンパイラがチェックしてくれます。
“`scala
// 良い例
sealed trait AppError
case object NetworkError extends AppError
case object DatabaseError extends AppError
case class InvalidInput(message: String) extends AppErrorval myEffect: ZIO[Any, AppError, Unit] = ???
// 悪い例 (エラーの種類が不明確)
val myEffect: ZIO[Any, Throwable, Unit] = ???
``
ZLayer.make
* **レイヤーを適切に分割・結合する:** 小さな責任を持つレイヤーを作成し、それらを組み合わせてより大きなレイヤーを構築するアプローチを推奨します。これにより、コードのモジュール性が高まり、テスト時のモック差し替えが容易になります。マクロを使うと、依存関係を自動的に解決してレイヤーを構築できます。
FiberId
* **Fiber ID と Span を活用したデバッグ:** ZIOエフェクトの実行は複数のファイバーに分散されることがあります。デバッグ時には、を使って特定のファイバーの実行経路を追跡したり、ZIOが提供するSpan(操作の開始と終了を示すメタデータ)を活用したりすると、並行処理の問題を特定しやすくなります。ロギングライブラリと連携させることで、ログにFiber IDやSpanを含めることができます。
ZIO.attemptBlocking
* **blocking スレッドプールを適切に使う:** ファイルI/OやJDBCアクセスなど、OSスレッドを長時間ブロックする可能性のある操作は、必ずや
ZIO.acquireReleaseWithのようなブロッキング対応のコンストラクタを使用し、ZIOのデフォルトIOスレッドプールではなく、専用のブロッキングスレッドプールで実行されるようにします。これにより、IOスレッドプールがブロックされて非同期処理のパフォーマンスが低下するのを防ぎます。
ZIO HTTP
* **ZIO Ecosystem を探索する:** ZIOには、HTTPサーバー/クライアント ()、JSON処理 (
ZIO JSON)、データベースアクセス (
ZIO JDBC,
ZIO Quill)、メッセージキュー (
ZIO Kafka,
ZIO AWS SQS`) など、様々な領域をカバーするエコシステムライブラリがあります。これらを活用することで、ZIOのパラダイムで一貫したアプリケーション全体を構築できます。
11. さらなる学習のために
このガイドはZIOの基本的な概念と主要な機能を紹介しましたが、ZIOの可能性はこれだけにとどまりません。さらに深く学習するためのリソースを紹介します。
- ZIO 公式ドキュメント: ZIOのすべての機能、API、概念に関する最も正確で詳細な情報源です。まずは「Learning ZIO」セクションを読むことをお勧めします。 https://zio.dev/
- ZIO コミュニティ: ZIOのDiscordサーバーやGitHub Discussionsは、質問したり他のZIO開発者と交流したりするのに最適な場所です。
- 書籍: ZIOに関する書籍もいくつか出版されています。公式ドキュメントと合わせて読むことで、理解を深めることができます。
- ZIO エコシステム: 前述の通り、ZIOには豊富なエコシステムがあります。興味のある分野のライブラリを探してみてください。
これらのリソースを活用して、ZIOを使った堅牢で表現力豊かなアプリケーション開発のスキルをさらに磨いていってください。
12. まとめ
このガイドでは、Scalaにおける関数型プログラミングの文脈で、ZIOがなぜ重要であるか、そしてその基本的な概念から応用的なトピックまでを幅広く解説しました。
- ZIOは
ZIO[R, E, A]
という型を使って、副作用を伴う計算を値として表現し、遅延評価します。 - 型安全性、構造化された並行処理、包括的なエラーハンドリング、そして洗練された依存性注入システムを提供します。
- 基本的なエフェクトの作成、シーケンシャル/並列な結合方法を学びました。
catchAll
,retry
,timeout
,acquireReleaseWith
といった強力なエラーハンドリングとリソース管理機能を紹介しました。R
型パラメータとレイヤーシステムを使った依存性注入の仕組みを理解しました。- 軽量なファイバーによる並行処理と、
fork
,join
,race
などのコンビネータの使い方を見ました。 - ZIO Streamsによる効率的なストリーム処理の基礎を学びました。
- ZIO Testフレームワークを使ったZIOコードのテスト方法を学びました。
- 実践的な開発のためのヒントとベストプラクティスをいくつか紹介しました。
ZIOは学習コストがないわけではありませんが、一度そのパラダイムを習得すれば、副作用を安全に管理し、テスト可能で保守しやすい、そして並行処理に強いアプリケーションを効率的に構築できるようになります。
このガイドが、あなたがZIOの世界へ一歩踏み出すための助けとなれば幸いです。ぜひ実際にコードを書いて、ZIOの強力さを体験してみてください。
Happy hacking with ZIO Scala!