はい、承知いたしました。【初心者向け】Spring Batch 入門ガイドとして、約5000語の詳細な説明を含む記事を作成し、直接表示します。
【初心者向け】Spring Batch 入門ガイド
大量データのバッチ処理を効率的、堅牢に実現するためのフレームワーク、Spring Batch。このガイドでは、Spring Batchの基本的な概念から、簡単なバッチジョブの作成方法、実行方法までを、初心者の方にも分かりやすく詳細に解説します。
さあ、Spring Batchの世界へ一緒に踏み込みましょう!
1. バッチ処理とは? なぜSpring Batchが必要なのか?
1.1. バッチ処理の定義
まずは「バッチ処理」とは何かを明確にしましょう。バッチ処理とは、コンピュータで実行される処理の形態の一つで、あらかじめ定めた一連の処理をまとめて(一括して)実行する方式です。
インタラクティブな(対話的な)処理とは異なり、ユーザーの操作を待つことなく、決められた時間に、あるいは決められたトリガーで自動的に開始され、完了するまで実行されます。
身近な例としては、以下のようなものがあります。
- 日次/月次でのレポート生成: 前日の売上データや顧客データを集計し、レポートファイルを作成する。
- データのクレンジングや移行: データベースの古いデータを整理したり、別のシステムへ移行したりする。
- 大量の請求書発行: 顧客データに基づいて、何万件もの請求書ファイルを生成する。
- 在庫の一括更新: POSシステムから送られてきた販売データに基づき、商品在庫をまとめて減らす。
これらの処理に共通するのは、「大量のデータを扱う」「一定の周期で実行される」「人間が逐一介入しない」という点です。
1.2. シンプルなバッチ処理の問題点
もしあなたが、このようなバッチ処理をSpring Batchを使わずに、例えばJavaの簡単なスクリプトやシェルスクリプトで実装しようとした場合、以下のような課題に直面する可能性があります。
- エラーハンドリング: 処理中にエラーが発生した場合、どうするのか?どこまで処理が進んだのか?最初からやり直すのか?
- 再実行・リトライ: エラーで停止した場合、途中から再開できるのか?冪等性(べきとうせい:同じ処理を何度実行しても同じ結果になること)をどう担保するのか?
- 処理状況の監視: 今、全体で何件中何件のデータを処理しているのか?あとどれくらいで終わるのか?
- パフォーマンス: 大量のデータを効率的に処理するには?メモリに乗らない場合は?
- 拡張性: データ量が増えた場合に、並列処理や分散処理でスケールさせるには?
- コードの管理: これらの機能を全て自作すると、コードが複雑になり、保守が困難になる。
これらの課題は、バッチ処理を本番環境で安定して運用するために非常に重要です。特に、夜間に実行される重要な処理が、エラーで停止し、朝になっても完了していない、しかもどこまで進んだか分からない…といった事態は避けたいものです。
1.3. Spring Batchとは? そのメリット
ここで登場するのが Spring Batch です。
Spring Batchは、Spring Frameworkのサブプロジェクトの一つで、上記のようなバッチ処理の課題を解決するために設計された、堅牢性、拡張性、管理性に優れたバッチ処理フレームワークです。
Spring Batchを利用することで、あなたはバッチ処理の「ビジネスロジック」(例:データをどう加工するか、どんな集計をするか)に集中することができます。エラー時の再実行、処理状況の記録、トランザクション管理、大量データ処理のためのテクニック(チャンク処理など)といった、バッチ処理に共通して必要となる複雑な制御部分は、Spring Batchが面倒を見てくれます。
Spring Batchの主なメリットは以下の通りです。
- 堅牢性: エラー発生時のリトライやスキップ、途中からの再実行(リスタート)が容易に実現できます。処理状況はデータベースに記録されるため、クラッシュしても状態を復元できます。
- 拡張性: 大量のデータを効率的に処理するための仕組み(チャンク処理)が用意されています。また、パーティショニングや並列処理などにより、処理能力をスケールさせることが可能です。
- 管理性: ジョブの実行状況、成功/失敗、処理件数などのメタデータがデータベースに記録されます。これにより、ジョブの監視や運用が容易になります。
- 宣言的な設定: SpringのIoCコンテナを利用して、ジョブやステップの構成をXMLやJava Configで宣言的に定義できます。ビジネスロジックとフレームワークの制御が分離され、コードの見通しが良くなります。
- 豊富なコンポーネント: ファイル(CSV, XML, フラットファイル)、データベース(JDBC, JPA, Hibernate)、メッセージキューなど、様々なデータソースに対応したReaderやWriterが提供されています。
- Springエコシステムとの連携: Spring Frameworkの他のプロジェクト(Spring Boot, Spring Data, Spring Integrationなど)とシームレスに連携できます。
Spring Batchは、複雑で手間のかかるバッチ処理基盤の実装から私たちを解放し、安定したバッチ処理システムの構築を強力にサポートしてくれるフレームワークです。
2. Spring Batchのアーキテクチャとコア概念
Spring Batchは、いくつかの重要なコンポーネントから構成されています。これらのコンポーネントが連携して、バッチ処理を実行・管理します。
主要なコンポーネントと概念を見ていきましょう。
2.1. 主要コンポーネント
- Job: バッチ処理全体を表す最上位の概念です。一つ以上のStepで構成されます。「日次売上集計」や「顧客データ移行」といった具体的な処理単位に対応します。
- Step: Jobを構成する単一の処理フェーズです。データを読み込み、処理し、書き出す一連の流れ(Chunk Oriented Step)や、特定のタスクを実行するだけの単純な処理(Tasklet Step)があります。JobはStepを順番に、あるいは条件分岐させながら実行していきます。
- ItemReader: Stepにおいて、処理対象のデータを1件ずつ読み込む役割を担います。ファイルから1行読む、データベースから1レコード取得するなど。
- ItemProcessor: ItemReaderが読み込んだデータを、必要に応じて変換したり、フィルタリングしたりする役割を担います(省略可)。
- ItemWriter: ItemProcessorで処理されたデータ、あるいはItemReaderから直接読み込まれたデータを、まとめてチャンク単位で書き込む役割を担います。ファイルに書き出す、データベースに一括登録するなど。
- Chunk: ItemReaderで読み込まれ、ItemProcessorで処理されたデータを、ItemWriterが書き込む際の処理単位です。例えば「100件まとめて書き込む」といった設定を行います。後述するチャンク指向処理の核となります。
- Tasklet: ItemReader/Processor/Writerという流れに当てはまらない、単一のタスクを実行するためのインターフェースです。ファイルの削除、特定のコマンド実行、ストアドプロシージャの呼び出しなど、比較的単純な処理に使用します。StepはChunk Oriented StepかTasklet Stepのいずれかになります。
- JobRepository: バッチ処理に関するメタデータ(ジョブの名前、実行日時、パラメータ、成功/失敗ステータス、各Stepの実行状況、読み書きした件数など)を記録・管理するコンポーネントです。通常はデータベースにこれらの情報が格納されます。Spring Batchの再実行機能はこのメタデータを利用します。
- JobLauncher: Jobを実行するためのインターフェースです。JobとJobParametersを受け取り、JobRepositoryを使用してJobExecutionを作成し、Jobを開始します。
- JobExplorer: JobRepositoryに格納されたメタデータを参照するためのインターフェースです。実行済みのジョブのステータスを確認する際などに使用します。
2.2. Chunk指向処理 (Chunk Oriented Processing)
Spring BatchのStepの最も一般的なタイプが「Chunk指向処理」です。これは、ItemReader, ItemProcessor (optional), ItemWriter を連携させて、データをチャンク単位で処理するモデルです。
(図は概念的なものであり、実際のSpring Batchの内部実装はより複雑ですが、考え方は同じです)
処理の流れは以下のようになります。
- トランザクション開始: 一つのチャンク処理の開始時にトランザクションが開始されます。
- 読み込み (Read): ItemReaderがデータを1件ずつ読み込みます。
- 処理 (Process): ItemProcessorが読み込んだデータを1件ずつ処理します。処理後のデータは内部的にリストに保持されます。
- チャンクサイズに達するまで繰り返し: 2と3の処理を、あらかじめ設定したチャンクサイズ(例えば100件)に達するまで繰り返します。
- 書き込み (Write): チャンクサイズ分のデータが溜まったら、ItemWriterがそのチャンク分のデータをまとめて書き込みます。
- トランザクションコミット: 書き込みが成功したら、トランザクションがコミットされます。
- 繰り返し: 処理対象のデータがなくなるまで、1〜6のサイクルを繰り返します。
- トランザクション終了: 全てのデータ処理が終わったら、最後のトランザクションが終了します。
このチャンク指向処理のメリットは、メモリ効率が良い点です。全てのデータを一度にメモリに読み込むのではなく、チャンクサイズ分のデータだけをメモリに乗せて処理・書き込み、トランザクションをコミットします。これにより、非常に大きなデータセットでも、メモリを大量に消費することなく処理できます。
また、チャンク単位でトランザクションがコミットされるため、途中で処理が中断した場合でも、最後に成功したチャンクの次から再開することが容易になります。これはSpring Batchの強力な再実行機能の基盤となっています。
2.3. Tasklet処理
Tasklet Stepは、Chunk Oriented Stepとは異なり、ItemReader, ItemProcessor, ItemWriterを持ちません。Tasklet
インターフェースを実装したクラスが、一つのタスクを一度だけ実行するシンプルなステップです。
例えば、
- バッチ処理の開始前に一時ファイルを削除する
- バッチ処理の終了後にメールを送信する
- 特定のシェルコマンドを実行する
- データベースのメンテナンス処理を実行する
といった、データ自体をItem単位で読み書きするわけではないが、バッチ処理の一部分として実行したい処理に適しています。
3. Spring Batchプロジェクトのセットアップ
Spring Batchを使った簡単なバッチ処理を実装してみましょう。ここでは、Spring Bootを利用してプロジェクトをセットアップするのが最も手軽です。
3.1. プロジェクト作成 (Spring Initializr)
Spring Bootのプロジェクトは、Spring Initializr を使うのが便利です。
- Project: Maven Project または Gradle Project を選択します。ここではMavenを例に進めます。
- Language: Java を選択します。
- Spring Boot: 最新の安定版を選択します。
- Project Metadata: Group, Artifact, Name, Description, Package Nameなどを適切に入力します。
- Packaging: Jar を選択します。
- Java: 利用したいJavaのバージョンを選択します。
- Dependencies: 以下の依存関係を追加します。
Spring Batch
H2 Database
(JobRepositoryのメタデータを保存するDBとして、開発用途で手軽なH2を使用します)Spring Data JPA
(データベースアクセスを簡潔にするために使用します – 今回の例ではJDBCを使うため必須ではありませんが、一般的なSpring Bootアプリでは使うことが多いです)Spring Web
(任意ですが、バッチを手動実行するRESTエンドポイントなどを後から追加する際に便利です)
Generateボタンをクリックして、プロジェクトをダウンロードし、お好みのIDE(IntelliJ IDEA, Eclipseなど)で開いてください。
3.2. 依存関係の確認 (pom.xml)
ダウンロードしたプロジェクトのpom.xml
には、以下のような依存関係が追加されているはずです。
“`xml
<properties>
<java.version>17</version> <!-- Javaバージョンに合わせてください -->
</properties>
<dependencies>
<!-- Spring Batch Core -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- Database for JobRepository (H2 for development) -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Spring Data JPA (Optional for this example's JDBC Writer) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- Spring Web (Optional) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Database Driver for JDBC Writer (e.g., H2 is already there, or add PostgreSQL, MySQL etc.) -->
<!-- <dependency> ... your database driver ... </dependency> -->
<!-- Test Dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
“`
spring-boot-starter-batch
を追加するだけで、Spring Batchを利用するために必要なコアライブラリやSpringの自動設定が有効になります。
3.3. 基本設定 (application.properties)
JobRepositoryのメタデータを保存するためのデータベース設定が必要です。H2データベースを使用する場合、src/main/resources/application.properties
に以下のような設定を追加します。
“`properties
H2 Database Settings
spring.datasource.url=jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
JPA Settings (for schema generation, not strictly needed for JDBC Writer but good practice)
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
spring.jpa.hibernate.ddl-auto=update # or create, create-drop
Spring Batch Settings
spring.batch.job.enabled=false # バッチ起動時に自動実行しないように設定 (手動で実行するため)
spring.batch.jdbc.initialize-schema=always # JobRepositoryのテーブルを自動生成
“`
spring.batch.job.enabled=false
: この設定は重要です。Spring Bootのデフォルトでは、アプリケーション起動時にJobが見つかると自動的に実行しようとします。今回は、後ほどJobLauncherを使って明示的に実行するため、自動実行を無効化しています。spring.batch.jdbc.initialize-schema=always
: Spring BatchがJobRepository用のテーブル(BATCH_JOB_INSTANCE
,BATCH_STEP_EXECUTION
など)を、アプリケーション起動時に自動的に作成するように設定します。H2のインメモリDBを使う場合はalways
が便利です。永続化DBを使う場合はembedded
やnever
を検討します。
これでSpring Batchを使うための基本的なセットアップは完了です。
4. シンプルなバッチジョブの作成
CSVファイルからデータを読み込み、加工して、データベースに書き込むという、よくあるバッチ処理を実装してみましょう。
シナリオ:
- 入力:
input.csv
ファイル (名前と年齢のCSV) - 処理: 名前の文字列を大文字に変換する
- 出力: データベースのテーブルに書き込む
4.1. 入力CSVファイルと出力テーブルの準備
src/main/resources
ディレクトリに input.csv
ファイルを作成します。
csv
名前,年齢
Alice,30
Bob,25
Charlie,35
出力先のデータベーステーブルを作成します。H2データベースの場合、スキーマは自動生成されますが、今回はJDBCを使って書き込むため、テーブルが存在する必要があります。SQLファイルを用意してアプリケーション起動時に実行させるか、Spring BootのJPA機能でエンティティから自動生成させる方法などがあります。ここでは、JDBC WriterのSQLで直接テーブル名を指定することにします。テーブルはJdbcBatchItemWriterがインサートする前に作成しておいてください(または起動時のスキーマ実行ファイルで作成)。
例えば、以下のようなテーブルを想定します。
sql
CREATE TABLE people (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100),
age INT
);
4.2. データモデルの定義
入力CSVのデータと、出力するデータベースのデータに対応するJavaのPOJOクラスを定義します。
com.example.springbatchbeginner.model.PersonInput.java
“`java
package com.example.springbatchbeginner.model;
// CSVから読み込むためのモデル
public class PersonInput {
private String name;
private int age;
public PersonInput() {
}
public PersonInput(String name, int age) {
this.name = name;
this.age = age;
}
// Getters and Setters
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "PersonInput{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
“`
com.example.springbatchbeginner.model.PersonOutput.java
“`java
package com.example.springbatchbeginner.model;
// 処理後にデータベースに書き込むためのモデル
public class PersonOutput {
private String name; // 大文字に変換された名前を格納
private int age;
public PersonOutput() {
}
public PersonOutput(String name, int age) {
this.name = name;
this.age = age;
}
// Getters and Setters
public String getName() {
return name;
}
public void setName(String name) {
name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "PersonOutput{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
“`
4.3. ItemReaderの実装 (CSVファイル読み込み)
CSVファイルからPersonInput
オブジェクトを1件ずつ読み込むItemReader
を定義します。Spring Batchは様々な形式のReaderを提供しており、CSVのようなフラットファイルにはFlatFileItemReader
が便利です。
設定クラス内にBeanとして定義します。
“`java
package com.example.springbatchbeginner.config;
import com.example.springbatchbeginner.model.PersonInput;
import com.example.springbatchbeginner.model.PersonOutput;
import com.example.springbatchbeginner.processor.PersonItemProcessor;
import com.example.springbatchbeginner.listener.JobCompletionNotificationListener; // 後述
import javax.sql.DataSource; // JavaSE 11以降は jakarta.sql.DataSource を使用
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder; // Spring Batch 5.0+
import org.springframework.batch.core.repository.JobRepository; // Spring Batch 5.0+
import org.springframework.batch.core.step.builder.StepBuilder; // Spring Batch 5.0+
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate; // H2DBでテーブル作成などに利用する場合
import org.springframework.transaction.PlatformTransactionManager; // Spring Batch 5.0+
@Configuration
// Spring Boot 3.x, Spring Batch 5.x を想定しています
// @EnableBatchProcessing は Spring Boot 3.x から不要になりました
public class BatchConfig {
// Spring Batch 5.x から JobBuilderFactory, StepBuilderFactory は非推奨となり、
// JobRepository と PlatformTransactionManager をコンストラクタインジェクションして使用します。
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final DataSource dataSource; // ItemWriterで使用するためDataSourceも注入
public BatchConfig(JobRepository jobRepository, PlatformTransactionManager transactionManager, DataSource dataSource) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
this.dataSource = dataSource;
}
// --- ItemReader ---
@Bean
public FlatFileItemReader<PersonInput> reader() {
return new FlatFileItemReaderBuilder<PersonInput>()
.name("personItemReader") // Readerに名前をつけます
.resource(new ClassPathResource("input.csv")) // 読み込むCSVファイルを指定 (src/main/resources 配下)
.linesToSkip(1) // ヘッダー行をスキップします
.delimited() // デリミタ区切りであることを指定
.names(new String[]{"name", "age"}) // CSVの各列に名前を割り当てます (PersonInputのフィールド名に対応)
.fieldSetMapper(new BeanWrapperFieldSetMapper<PersonInput>() {{ // 読み込んだ値をPersonInputオブジェクトにマッピング
setTargetType(PersonInput.class);
}})
.build();
}
// --- ItemProcessor ---
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
// --- ItemWriter ---
@Bean
public JdbcBatchItemWriter<PersonOutput> writer() {
// SQLクエリを指定。パラメータはBeanプロパティ名でマッピングします。
return new JdbcBatchItemWriterBuilder<PersonOutput>()
.dataSource(dataSource) // 書き込み先のDataSourceを指定
.sql("INSERT INTO people (name, age) VALUES (:name, :age)") // INSERTクエリ
.beanMapped() // Beanのプロパティ名をパラメータ名として使用することを指定
.build();
}
// --- Step ---
@Bean
public Step step1(FlatFileItemReader<PersonInput> reader,
PersonItemProcessor processor,
JdbcBatchItemWriter<PersonOutput> writer) {
return new StepBuilder("csvToDatabaseStep", jobRepository) // Stepに名前をつけ、JobRepositoryを指定
.<PersonInput, PersonOutput>chunk(10) // チャンクサイズを10に設定 (ItemReaderから10件読み込み、Processorで処理し、Writerで10件まとめて書き込む)
.reader(reader) // 使用するItemReaderを指定
.processor(processor) // 使用するItemProcessorを指定 (省略可)
.writer(writer) // 使用するItemWriterを指定
.transactionManager(transactionManager) // トランザクションマネージャーを指定
.build();
}
// --- Job ---
@Bean
public Job importPersonJob(Step step1, JobCompletionNotificationListener listener) {
return new JobBuilder("importPersonJob", jobRepository) // Jobに名前をつけ、JobRepositoryを指定
.repository(jobRepository) // JobRepositoryを明示的に指定 (Builderコンストラクタで指定済みだが、慣例的に記述)
.start(step1) // 最初に実行するStepを指定
.listener(listener) // Jobのライフサイクルイベントを捕捉するリスナーを指定 (後述)
.build();
}
// --- JDBC Template (Optional: テーブル作成などに利用する場合) ---
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
// 必要に応じて、ここでテーブル作成用のSQLを実行するなどできます。
// @PostConstruct や CommandLineRunner を使う方法もあります。
}
“`
FlatFileItemReaderBuilder
の設定詳細:
.name("personItemReader")
: Readerのインスタンス名を設定します。これはSpring Batchのメタデータ内で使用されます。.resource(new ClassPathResource("input.csv"))
: 読み込むファイルのリソースを指定します。ClassPathResource
はsrc/main/resources
ディレクトリからのパスを指定するのに便利です。.linesToSkip(1)
: ファイルの先頭からスキップする行数を指定します。ヘッダー行をスキップするために1を設定しています。.delimited()
: ファイルがデリミタ(区切り文字)で区切られた形式であることを示します。.csv
ファイルなのでこれを選択します。他に.fixedLength()
(固定長)などがあります。.names(new String[]{"name", "age"})
:.delimited()
と組み合わせて、各列に名前を割り当てます。この名前は後述のFieldSetMapperが使用します。CSVファイルにヘッダー行がない場合でも、ここで列名を定義できます。ヘッダー行がある場合は、names()
で指定した名前とヘッダー行の列名をマッピングするか、setNames()
の代わりにsetNames()
に渡す配列と同じ順序で列データを取得する設定を行います。今回はヘッダー行をスキップし、このnames
で列名を定義しています。.fieldSetMapper(...)
: 読み込んだ行のデータを、どのようにJavaオブジェクト(PersonInput
)にマッピングするかを定義します。BeanWrapperFieldSetMapper
: FieldSet(読み込んだ1行分のデータ)を、指定したJava Beanのプロパティに自動的にマッピングしてくれます。.names()
で指定した名前が、Beanのプロパティ名と一致している必要があります。setTargetType(PersonInput.class)
: マッピング先のJava Beanクラスを指定します。
.build()
: Builderパターンで設定を完了し、FlatFileItemReader
のインスタンスを作成します。
ItemReaderには、他にも様々な種類があります。
- データベース系:
JdbcCursorItemReader
,JdbcPagingItemReader
,JpaItemReader
,HibernateCursorItemReader
,HibernatePagingItemReader
,MongoItemReader
, etc. - ファイル系:
StaxEventItemReader
(XML), etc. - その他:
AmqpItemReader
(メッセージキュー),KafkaItemReader
, etc.
4.4. ItemProcessorの実装 (データ加工)
ItemReader
が読み込んだPersonInput
オブジェクトを受け取り、PersonOutput
オブジェクトに変換します。ここでは、名前を大文字に変換する処理を行います。
com.example.springbatchbeginner.processor.PersonItemProcessor.java
“`java
package com.example.springbatchbeginner.processor;
import com.example.springbatchbeginner.model.PersonInput;
import com.example.springbatchbeginner.model.PersonOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
// ItemProcessor<入力オブジェクトの型, 出力オブジェクトの型>
public class PersonItemProcessor implements ItemProcessor
private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
@Override
public PersonOutput process(final PersonInput personInput) throws Exception {
final String name = personInput.getName().toUpperCase(); // 名前を大文字に変換
final int age = personInput.getAge();
final PersonOutput transformedPerson = new PersonOutput(name, age);
// 処理ログを出力 (任意)
log.info("Converting (" + personInput + ") into (" + transformedPerson + ")");
return transformedPerson; // 変換後のオブジェクトを返す
}
}
“`
ItemProcessor
は ItemProcessor<I, O>
インターフェースを実装します。process
メソッドは、ItemReader
から渡された入力オブジェクト(I
型)を受け取り、処理後の出力オブジェクト(O
型)を返します。
- 変換が不要な場合は、
ItemProcessor
を定義せず、Step
のchunk()
設定で型パラメータを<PersonInput, PersonInput>
のように同じにして、ReaderからWriterに直接渡すことも可能です。 - データをスキップしたい場合は、
process
メソッドからnull
を返します。null
が返されたアイテムはWriterに渡されません。
4.5. ItemWriterの実装 (データベース書き込み)
処理済みのPersonOutput
オブジェクトを、データベーステーブルにまとめて書き込むItemWriter
を定義します。データベースへの書き込みにはJdbcBatchItemWriter
が便利です。
前述のBatchConfig
クラス内にBeanとして定義済みです。
java
// --- ItemWriter ---
@Bean
public JdbcBatchItemWriter<PersonOutput> writer() {
// SQLクエリを指定。パラメータはBeanプロパティ名でマッピングします。
return new JdbcBatchItemWriterBuilder<PersonOutput>()
.dataSource(dataSource) // 書き込み先のDataSourceを指定
.sql("INSERT INTO people (name, age) VALUES (:name, :age)") // INSERTクエリ
.beanMapped() // Beanのプロパティ名をパラメータ名として使用することを指定
.build();
}
JdbcBatchItemWriterBuilder
の設定詳細:
.dataSource(dataSource)
: 使用するデータベースのDataSource
を指定します。Spring Bootの自動設定により作成されたDataSourceがインジェクションされます。.sql("INSERT INTO people (name, age) VALUES (:name, :age)")
: 実行するSQLのINSERTクエリを指定します。パラメータプレースホルダーは:propertyName
の形式で記述します。.beanMapped()
: 書き込み対象のオブジェクト(ここではPersonOutput
)のプロパティを、SQLのパラメータプレースホルダーにマッピングすることを指定します。:name
はPersonOutput
のgetName()
メソッド、:age
はgetAge()
メソッドから値を取得してSQLにバインドします。.build()
:JdbcBatchItemWriter
のインスタンスを作成します。
ItemWriter
にも様々な種類があります。
- データベース系:
JdbcCursorItemReader
,JdbcPagingItemReader
,JpaItemReader
,HibernateCursorItemReader
,HibernatePagingItemReader
,MongoItemReader
, etc. - ファイル系:
FlatFileItemWriter
,StaxEventItemWriter
(XML), etc. - その他:
AmqpItemWriter
,KafkaItemWriter
, etc.
注意点として、ItemWriter
はチャンク単位でアイテムのリストを受け取ります。JdbcBatchItemWriter
はこのリストを受け取り、JDBCのバッチ更新機能を使ってまとめてSQLを実行します。
4.6. Stepの定義
ItemReader, ItemProcessor, ItemWriterを組み合わせて、一つのStepを定義します。前述のBatchConfig
クラス内にBeanとして定義済みです。
java
// --- Step ---
@Bean
public Step step1(FlatFileItemReader<PersonInput> reader,
PersonItemProcessor processor,
JdbcBatchItemWriter<PersonOutput> writer) {
return new StepBuilder("csvToDatabaseStep", jobRepository) // Stepに名前をつけ、JobRepositoryを指定
.<PersonInput, PersonOutput>chunk(10) // チャンクサイズを10に設定
.reader(reader) // 使用するItemReaderを指定
.processor(processor) // 使用するItemProcessorを指定 (省略可)
.writer(writer) // 使用するItemWriterを指定
.transactionManager(transactionManager) // トランザクションマネージャーを指定
.build();
}
StepBuilder
の使い方:
StepBuilder("csvToDatabaseStep", jobRepository)
: Stepに名前をつけ、JobRepositoryを渡してBuilderのインスタンスを作成します。.<PersonInput, PersonOutput>chunk(10)
: このStepがチャンク指向処理であることを宣言し、入力型(PersonInput
)、出力型(PersonOutput
)、およびチャンクサイズ(10件)を指定します。この設定により、Spring Batchは内部でItemReaderから10件読み込み、ItemProcessorで処理し、ItemWriterにまとめて渡す、というループを実行します。.reader(reader)
: 使用するItemReaderのBeanを指定します。.processor(processor)
: 使用するItemProcessorのBeanを指定します。Processorがない場合はこの行は不要です。.writer(writer)
: 使用するItemWriterのBeanを指定します。.transactionManager(transactionManager)
: このStepで使用するトランザクションマネージャーを指定します。Spring Bootのデフォルトのトランザクションマネージャーをインジェクションして使用します。.build()
: Stepのインスタンスを作成します。
4.7. Jobの定義
一つ以上のStepを組み合わせて、Jobを定義します。前述のBatchConfig
クラス内にBeanとして定義済みです。
java
// --- Job ---
@Bean
public Job importPersonJob(Step step1, JobCompletionNotificationListener listener) {
return new JobBuilder("importPersonJob", jobRepository) // Jobに名前をつけ、JobRepositoryを指定
.repository(jobRepository) // JobRepositoryを明示的に指定
.start(step1) // Jobの最初のStepを指定
// .next(step2) // 続けて実行する別のStepを指定することも可能
// .end() // ジョブの終了を示す (start()で始まりstepが一つだけなら省略可)
.listener(listener) // Jobのライフサイクルイベントを捕捉するリスナーを指定
.build();
}
JobBuilder
の使い方:
JobBuilder("importPersonJob", jobRepository)
: Jobに名前をつけ、JobRepositoryを渡してBuilderのインスタンスを作成します。Job名は実行時にJobRepositoryに記録されます。Job名は、パラメータが完全に一致しない限り、同じ名前のJobでも異なるJobInstanceとして扱われます。 (JobParametersについては後述).repository(jobRepository)
: JobRepositoryを明示的に指定します。.start(step1)
: このJobが最初に実行するStepを指定します。.next(anotherStep)
: 続けて実行するStepを指定します。.start().next().next()...
のように複数のStepを直列に繋げることができます。.from(step).on(ExitStatus).to(anotherStep)
: Stepの実行結果(ExitStatus)に応じて、次に実行するStepを分岐させることも可能です。.listener(listener)
: Jobの開始前や終了後に特定の処理を実行するためのJobExecutionListener
を指定します。
4.8. Job実行完了を通知するリスナー (Optional)
Jobの実行が完了した際に、成功か失敗かをログ出力するなど、特定の処理を実行したい場合があります。これにはJobExecutionListener
インターフェースを実装したクラスを使用します。
com.example.springbatchbeginner.listener.JobCompletionNotificationListener.java
“`java
package com.example.springbatchbeginner.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component; // コンポーネントとして管理
@Component // Springコンテナに管理させる
public class JobCompletionNotificationListener implements JobExecutionListener {
private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
private final JdbcTemplate jdbcTemplate; // 結果確認のためにJdbcTemplateを使用
@Autowired // JdbcTemplateをインジェクション
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
// Job実行前に行う処理
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("!!! JOB STARTED! Job Name: {}", jobExecution.getJobInstance().getJobName());
}
// Job実行後に行う処理
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Status: COMPLETED");
// 書き込まれたデータを確認 (任意)
jdbcTemplate.query("SELECT name, age FROM people",
(rs, row) -> "Found <" + rs.getString(1) + "> in the database.")
.forEach(log::info);
} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
log.error("!!! JOB FAILED! Status: FAILED");
// エラー情報の取得なども可能
jobExecution.getAllFailureExceptions().forEach(e -> log.error("Failure cause:", e));
} else {
log.warn("!!! JOB FINISHED with Status: {}", jobExecution.getStatus());
}
}
}
“`
JobExecutionListener
インターフェースにはbeforeJob
とafterJob
メソッドがあり、それぞれJobの実行前と実行後に呼び出されます。JobExecution
オブジェクトを通じて、実行中のJobに関する様々な情報(ステータス、パラメータ、実行時間など)にアクセスできます。
このリスナーは @Component
アノテーションを付けてSpringコンテナに管理させ、BatchConfig
クラスのJob定義で .listener(listener)
として紐づけています。
また、ItemReader/Processor/WriterやStepに対しても、それぞれのライフサイクルに対応したリスナー(ItemReadListener
, ItemProcessListener
, ItemWriteListener
, StepExecutionListener
, ChunkListener
など)が用意されています。
これで、シンプルなバッチジョブを構成する全てのコンポーネントが揃いました。
5. Spring Batchジョブの実行
作成したバッチジョブを実行する方法はいくつかあります。
5.1. Spring Bootアプリケーション起動時に自動実行 (非推奨 – 設定済み)
application.properties
でspring.batch.job.enabled=true
(デフォルト)に設定している場合、Spring Bootアプリケーションの起動時に、コンテナ内で見つかったJobのBeanが自動的に実行されます。
properties
spring.batch.job.enabled=true # 自動実行を有効にする
この方法の欠点は、同じパラメータで複数回実行しようとしたり、特定のJobだけを選択して実行したり、実行パラメータを動的に変更したりするのが難しい点です。通常は、false
に設定し、後述する方法で明示的に実行を制御します。
5.2. CommandLineRunnerまたはApplicationRunnerで実行
Spring Bootアプリケーション起動後、特定の処理を実行したい場合にCommandLineRunner
やApplicationRunner
を使用します。これらを利用して、JobLauncher
を使ってJobを起動するのが一般的です。
com.example.springbatchbeginner.runner.JobRunner.java
“`java
package com.example.springbatchbeginner.runner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component // Springコンテナに管理させる
public class JobRunner implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(JobRunner.class);
private final JobLauncher jobLauncher; // Jobを実行するためのコンポーネント
private final Job importPersonJob; // 実行したいJobのBean
@Autowired // JobLauncherとJobをインジェクション
public JobRunner(JobLauncher jobLauncher, Job importPersonJob) {
this.jobLauncher = jobLauncher;
this.importPersonJob = importPersonJob;
}
@Override
public void run(String... args) throws Exception {
log.info("JobRunner started. Launching the batch job.");
// JobParametersを作成
// 同一Job名で異なるJobParametersで実行すると、新しいJobInstanceが作成されます。
// リスタート可能なJobParametersで再度実行すると、既存のJobInstanceがリスタートされます。
JobParameters jobParameters = new JobParametersBuilder()
.addLong("start_time", System.currentTimeMillis()) // 実行ごとにユニークなパラメータ
.toJobParameters();
try {
// JobLauncherを使ってJobを実行
jobLauncher.run(importPersonJob, jobParameters);
log.info("Job execution requested.");
} catch (Exception e) {
log.error("Job execution failed.", e);
}
}
}
“`
@Component
: このクラスをSpringコンテナにBeanとして登録します。CommandLineRunner
: Spring Bootアプリケーション起動後、run
メソッドが実行されます。JobLauncher
: Spring Batchが提供する、Jobを実行するためのインターフェースです。Spring Bootの自動設定によりBeanが作成されています。Job
: 実行したいJobのBeanをインジェクションします。Job名はBatchConfig
で定義したBean名(ここではimportPersonJob
)ではなく、JobBuilder
で設定した名前(”importPersonJob”)でルックアップされることに注意してください。Bean名と同じであることが多いですが、異なる場合もあります。JobParameters
: Jobの実行ごとに異なる値を与えるためのパラメータです。JobParametersBuilder
を使って作成します。例えば、処理対象のファイル名、日付、IDなどをパラメータとして渡すことで、同じJob定義を使って異なるデータセットを処理したり、特定の実行を識別したりできます。JobParameters
が完全に一致する同一名のJobは、Spring Batchによって同一のJob Instanceとみなされます。JobParametersBuilder
にaddLong("key", value)
のように追加していきます。今回は、実行ごとにユニークなstart_time
パラメータを追加して、毎回新しいJob Instanceとして実行されるようにしています。jobLauncher.run(importPersonJob, jobParameters)
: 指定したJobを、指定したパラメータで実行します。
このJobRunner
クラスをプロジェクトに追加すると、アプリケーションを起動するだけでバッチジョブが実行されるようになります。
5.3. RESTエンドポイントから実行
Webアプリケーションの一部としてSpring Batchを使用している場合、RESTfulなAPIエンドポイントからJobをトリガーすることもよくあります。
com.example.springbatchbeginner.controller.JobController.java
“`java
package com.example.springbatchbeginner.controller;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController // RESTfulコントローラーとして公開
@RequestMapping(“/batch”) // ベースパス
public class JobController {
private final JobLauncher jobLauncher;
private final Job importPersonJob; // 実行したいJobのBean
@Autowired
public JobController(JobLauncher jobLauncher, Job importPersonJob) {
this.jobLauncher = jobLauncher;
this.importPersonJob = importPersonJob;
}
@PostMapping("/run-person-import") // POSTリクエストでバッチを実行
public String runPersonImportJob(@RequestParam(required = false) String runId) {
try {
// JobParametersを作成
// 実行を識別するためのユニークなパラメータを含めることが推奨されます
JobParameters jobParameters = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis()) // 常にユニークにするためのタイムスタンプ
.addString("runId", runId != null ? runId : "manual-trigger") // 任意で渡せるパラメータ
.toJobParameters();
// JobLauncherを使ってJobを実行
jobLauncher.run(importPersonJob, jobParameters);
return "Batch job 'importPersonJob' triggered successfully!";
} catch (JobExecutionException e) {
return "Failed to trigger batch job: " + e.getMessage();
}
}
}
“`
このコントローラーを配置すると、アプリケーション起動後にPOST /batch/run-person-import
というリクエストを送ることで、バッチジョブを実行できるようになります。(spring-boot-starter-web
依存関係が必要です。)
5.4. スケジューラーと連携
日次や月次でバッチを実行したい場合は、Spring Scheduler (@Scheduled
) やQuartzなどのスケジューリングライブラリと連携させます。
“`java
// スケジューリングを有効にするアノテーションをメインクラスまたは設定クラスに追加
// @EnableScheduling
// スケジュール設定クラス(例)
@Configuration
public class SchedulerConfig {
private final JobLauncher jobLauncher;
private final Job importPersonJob;
@Autowired
public SchedulerConfig(JobLauncher jobLauncher, Job importPersonJob) {
this.jobLauncher = jobLauncher;
this.importPersonJob = importPersonJob;
}
// 毎日午前2時に実行する場合
@Scheduled(cron = "0 0 2 * * ?")
public void runPersonImportJobScheduled() {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis()) // 実行ごとにユニークなパラメータ
.toJobParameters();
jobLauncher.run(importPersonJob, jobParameters);
} catch (JobExecutionException e) {
// 例外処理
e.printStackTrace(); // ログ出力など適切に
}
}
}
“`
@Scheduled
アノテーションを使って、指定した時刻や周期でrunPersonImportJobScheduled
メソッドが呼び出されるように設定します。メソッド内でJobLauncher
を使ってJobを実行します。ここでも、実行ごとにユニークなJobParametersを生成することが重要です。
6. Spring Batchの高度な機能 (入門レベルの紹介)
これまでに、Spring Batchの基本的な仕組みとシンプルなジョブの作成方法を見てきました。Spring Batchには、さらに強力な機能が多数備わっています。入門レベルでは全ての詳細を把握する必要はありませんが、どのようなことができるのかを知っておくと、今後の学習に役立ちます。
6.1. 再実行 (Restartability)
Spring Batchの最も重要な機能の一つが再実行可能性です。Jobがエラーで中断した場合、JobParametersが同じであれば、JobRepositoryに保存されたメタデータに基づいて、中断したStepの、中断したチャンクの次から処理を再開できます。
これは、JobRepository
が各JobInstance、StepExecution、ExecutionContext(Stepの実行状態を保存するコンテキスト)の情報をデータベースに記録しているおかげです。ItemReaderの実装によっては、最後に読み込んだ位置(例: ファイルの行番号、データベースのカーソル位置やページ番号)をExecutionContextに保存することで、再開時にその位置から読み込みを再開するようになっています。
ただし、Reader, Processor, Writerが冪等性を持つように設計されていることが前提となります。例えば、Writerが単なるINSERTではなくUPSERT(存在すれば更新、なければ挿入)を行うようにするなど、再実行されてもデータが重複したり不整合が発生したりしないように注意が必要です。
6.2. エラーハンドリングとスキップ・リトライ
バッチ処理では、数万件、数百万件のデータの中に、ごく一部だけ不正なデータが含まれていることがあります。そのようなデータのためにJob全体が失敗するのは避けたい場合、特定のアイテムに対する処理エラーをスキップしたり、一時的なエラーに対してリトライしたりする機能が役立ちます。
Chunk指向Stepの.chunk()
設定に、エラーハンドリングに関する設定を追加できます。
java
@Bean
public Step step1(...) {
return new StepBuilder("csvToDatabaseStep", jobRepository)
.<PersonInput, PersonOutput>chunk(10, transactionManager)
.reader(...)
.processor(...)
.writer(...)
// エラーハンドリング設定例
.faultTolerant() // エラー許容モードを有効化
.skipLimit(10) // スキップできるエラーの上限数を設定
.skip(FlatFileParseException.class) // この例外が発生したアイテムはスキップ
.skip(DataIntegrityViolationException.class) // この例外が発生したアイテムもスキップ
.noSkip(ItemProcessorException.class) // この例外はスキップしない(Job失敗となる)
.retryLimit(3) // リトライの上限数を設定
.retry(TemporaryDataSourceException.class) // この例外が発生した場合はリトライ
.build();
}
.faultTolerant()
: エラー許容モードを有効にします。これを設定しないと、デフォルトでは ItemReader/Processor/Writer で例外が発生するとStepが失敗します。.skipLimit(10)
: スキップできるアイテムの総数を制限します。この数を超えるとStepは失敗します。.skip(ExceptionType.class)
: 指定した例外タイプが発生した場合、そのアイテムの処理をスキップします。.noSkip(ExceptionType.class)
: 特定の例外タイプはスキップしないように強制します。.retryLimit(3)
: リトライ可能な例外が発生した場合の最大リトライ回数を設定します。.retry(ExceptionType.class)
: 指定した例外タイプが発生した場合、そのアイテムの処理をリトライします。
これらの設定により、一時的な問題や一部の不良データによるJob全体の失敗を防ぎ、バッチ処理の完了率を高めることができます。
6.3. Listeners (詳細)
前述のJobCompletionNotificationListener
のように、Spring BatchではJobやStep、Chunk、Itemの様々なライフサイクルイベントに対してリスナーを設定し、独自の処理を差し込むことができます。
主なリスナーインターフェース:
JobExecutionListener
:beforeJob
,afterJob
(Job単位)StepExecutionListener
:beforeStep
,afterStep
(Step単位)ChunkListener
:beforeChunk
,afterChunk
,afterChunkError
(Chunk単位)ItemReadListener
:beforeRead
,afterRead
,onReadError
(ItemReaderが1件読むごと)ItemProcessListener
:beforeProcess
,afterProcess
,onProcessError
(ItemProcessorが1件処理するごと)ItemWriteListener
:beforeWrite
,afterWrite
,onWriteError
(ItemWriterがChunkを書き込むごと)SkipListener
:onSkipInRead
,onSkipInProcess
,onSkipInWrite
(スキップが発生した時)RetryListener
:open
,close
,onError
(リトライが発生した時)
これらのリスナーを使って、処理の開始・終了ログ出力、リソースの後処理、エラー発生時の通知、処理件数の集計などを実装できます。
6.4. JobParametersとJobInstance
JobParameters
は、Jobの実行ごとに異なる入力を与えるためのキー/バリューペアのセットです。Job名とJobParameters
の組み合わせがJob Instanceを一意に識別します。
- Job Instance: 特定のJob定義と特定の
JobParameters
の組み合わせです。例: “importPersonJob” + { “timestamp”: 1678886400000, “file”: “input1.csv” } は一つのJob Instance。 - Job Execution: 特定のJob Instanceの一回の実行です。Job Instanceが途中で失敗した場合、同じJob Instanceに対して複数回のJob Executionが発生する可能性があります(再実行した場合)。
Spring BatchはJobRepositoryにJob InstanceとJob Executionの情報を記録します。同じJob Instanceを再度実行しようとした場合、そのInstanceが前回COMPLETED
で終了していれば、デフォルトでは新しいJob Instanceとして起動するか、またはエラーになります(JobExecutionAlreadyCompletedException
)。FAILED
やSTOPPED
で終了している場合は、そのJob Instanceをリスタートしようとします。
そのため、JobParametersは「このデータセットに対するこのJobの処理」を一意に識別できるように設定することが重要です。例えば、日次バッチであれば実行日付、ファイル処理であればファイルパスなどを含めることが考えられます。
6.5. FlowとDecision
複数のStepを実行する際に、単に直列に実行するだけでなく、前のStepの実行結果に応じて次に実行するStepを変えたい場合があります。これは、Flow
とDecision
を使って実現できます。
“`java
@Bean
public Job complexJob(Step stepA, Step stepB, Step stepC, JobRepository jobRepository, JobExecutionDecider decider) {
return new JobBuilder(“complexJob”, jobRepository)
.start(stepA) // Step A を開始
.next(decider) // Step A の次に Decision を実行
.from(decider).on(“CONTINUE”).to(stepB) // Decisionが”CONTINUE”を返したら Step B を実行
.from(decider).on(“COMPLETE”).to(stepC) // Decisionが”COMPLETE”を返したら Step C を実行
.end() // Job 終了
.build();
}
@Bean
public JobExecutionDecider decider() {
// CustomJobExecutionDecider は JobExecutionDecider インターフェースを実装し、
// 前のStepの結果などに基づいて ExitStatus を返すクラス
return new CustomJobExecutionDecider();
}
“`
JobExecutionDecider
:decide
メソッドを持ち、前のStepのExitStatus
などを受け取って、次に進むためのカスタムステータス(文字列)を返します。JobBuilderの.on()
メソッドはこのカスタムステータス文字列とマッチングして、次に実行するStepを決定します。Flow
: 複数のStepやDecisionをグループ化し、名前を付けて再利用可能にしたものです。より複雑なJob構造を組み立てる際に役立ちます。
6.6. Tasklet (詳細)
Chunk Oriented Stepに馴染まない処理(ファイル削除、シェルコマンド実行など)には、Tasklet
インターフェースを実装したクラスを使います。
“`java
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
// ファイルを削除するTaskletの例
public class FileCleanupTasklet implements Tasklet {
private String filePath;
public void setFilePath(String filePath) {
this.filePath = filePath;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// ファイル削除などの処理をここに記述
System.out.println("Cleaning up file: " + filePath);
// new File(filePath).delete(); // 実際の削除処理
// Taskletは一度実行されれば完了
return RepeatStatus.FINISHED;
// RepeatStatus.CONTINUABLE を返すと、executeメソッドが繰り返し実行されます
}
}
“`
このTaskletをStepとして定義するには、StepBuilder
の.tasklet()
メソッドを使用します。
“`java
@Bean
public Step cleanupStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
FileCleanupTasklet tasklet = new FileCleanupTasklet();
tasklet.setFilePath(“/path/to/your/temp/file”); // パスを設定
return new StepBuilder("fileCleanupStep", jobRepository)
.tasklet(tasklet, transactionManager) // Taskletを指定
.build();
}
@Bean
public Job jobWithCleanup(Step step1, Step cleanupStep, JobRepository jobRepository) {
return new JobBuilder("jobWithCleanup", jobRepository)
.start(step1) // まずデータ処理ステップを実行
.next(cleanupStep) // その後ファイルクリーンアップステップを実行
.build();
}
“`
Tasklet StepはChunk Oriented Stepとは異なり、デフォルトではトランザクションはTaskletのexecute
メソッドの呼び出しごと(Tasklet全体)にかかります。RepeatStatus.CONTINUABLE
を返して複数回実行されるようにした場合でも、各実行サイクルでトランザクションはコミット・再開されます。
6.7. スケーリング
Spring Batchは、大量データを高速に処理するために、様々なスケーリング手法をサポートしています。
- Multi-threaded Step: 単一のStep内でItemReader/Processor/Writerを複数のスレッドで並列実行します。最も簡単な方法ですが、Reader/Writerがスレッドセーフである必要があります(例:
JdbcCursorItemReader
はスレッドセーフではないため注意が必要です)。 - Parallel Steps: 複数のStepを並列に実行します。互いに依存しないStepがある場合に有効です。
- Remote Chunking: ItemReader/Processorを実行するプロセスと、ItemWriterを実行するプロセスを分離し、メッセージキューなどを介してChunkデータをやり取りします。これにより、Writerの負荷が高い場合にスケールアウトできます。
- Partitioning: 処理対象データを複数の区画(Partition)に分割し、各区画の処理を独立したStep実行として、同じプロセス内またはリモートプロセスで並列実行します。例えば、データベースのデータをIDレンジや日付で分割し、それぞれのレンジを別のStepExecutionで処理するといった使い方ができます。これは非常に強力なスケーリング手法です。
これらのスケーリング機能を利用するには、Spring Batch Integrationなどの追加のコンポーネントや、分散環境の構築が必要になる場合があります。
7. 実運用における考慮事項
Spring Batchアプリケーションを本番環境で運用する際には、いくつかの重要な考慮事項があります。
7.1. JobRepositoryの永続化
開発中は手軽なH2のインメモリデータベースでJobRepositoryを運用できますが、本番環境ではデータベースの永続化が必須です。これにより、アプリケーションが停止してもJobの実行履歴やステータスが失われず、再実行機能が正しく機能します。
application.properties
で、H2以外のデータベース(PostgreSQL, MySQL, Oracleなど)への接続設定を行います。
“`properties
spring.datasource.url=jdbc:postgresql://localhost:5432/batch_db
spring.datasource.username=your_user
spring.datasource.password=your_password
spring.datasource.driverClassName=org.postgresql.Driver
spring.jpa.database-platform=org.hibernate.dialect.PostgreSQLDialect # JPAを使う場合
spring.batch.jdbc.initialize-schema=embedded # or never (手動でテーブルを作成する場合)
“`
spring.batch.jdbc.initialize-schema=always
は、開発環境やテスト環境で手軽にスキーマを構築するのに便利ですが、本番環境では通常never
に設定し、Spring Batchが提供するSQLスクリプトを使って事前にテーブルを作成しておくことが推奨されます。これらのSQLスクリプトはSpring BatchのJARファイル内(例: org/springframework/batch/core/schema-@@platform@@.sql
)に含まれています。
7.2. 監視とロギング
バッチ処理は非同期で実行されるため、処理状況や結果を適切に監視することが重要です。
- JobRepositoryの活用:
JobExplorer
を使ってJobRepositoryに格納されたメタデータ(実行ステータス、処理件数、開始/終了時刻、エラー情報など)をプログラムから参照できます。独自の監視ツールやUIを開発したり、既存の監視システムと連携させたりできます。Spring Batch Adminというプロジェクトもありましたが、現在はメンテナンスされていません。Spring Cloud TaskやSpring Batch Explorerといった代替手段が検討できます。 - ロギング: SLF4jやLogbackなどのロギングライブラリを使用して、ItemReader/Processor/Writerの処理状況、エラー、スキップ、リトライなどの情報を詳細にログ出力します。ログレベルを適切に設定し、ログ集約システム(Elasticsearch/Kibana, Splunkなど)と連携させると、監視が容易になります。前述のリスナーを活用して、重要なイベント発生時に特定のログレベルで出力することも有効です。
7.3. テスト
バッチジョブのテストは重要ですが、結合するコンポーネントが多いため少し複雑になります。
- ユニットテスト: ItemReader, ItemProcessor, ItemWriterなど、個々のコンポーネントのビジネスロジックを単体でテストします。SpringのDIを利用して依存関係をモック化するなどします。
- ステップテスト: 特定のStepだけを起動してテストします。Spring Batch Testモジュールが提供する
StepRunner
などが利用できます。 - ジョブテスト: Job全体を起動してテストします。
JobLauncherTestUtils
やJobOperator
などが利用できます。テスト用のインメモリデータベースやテストデータを準備して、期待通りの結果(テーブルへの書き込み内容、ファイル出力内容、実行ステータスなど)が得られるかを確認します。
7.4. デプロイとスケジューリング
Spring Batchアプリケーションは、通常Javaの実行可能JARファイルとしてパッケージングされます。これをどのようにデプロイし、いつ実行するかはシステムによって異なります。
- 手動実行: コマンドラインから
java -jar your-batch-app.jar jobName param1=value1 param2=value2
のように起動する。 - シェルスクリプト/バッチファイル: 起動コマンドをスクリプトに記述し、タスクスケジューラ(cron, Windows タスクスケジューラなど)で定期実行する。
- 専用スケジューラ: Quartz, Control-M, JP1などのエンタープライズ向けジョブスケジューラから起動する。REST APIをトリガーとして使用したり、コマンドラインから直接起動したりします。
- コンテナ環境: Dockerコンテナとしてビルドし、Kubernetesなどのコンテナオーケストレーションプラットフォーム上でPodとして実行する。KubernetesのCronJobリソースを利用して定期実行することも可能です。
- クラウドサービス: AWS Batch, Google Cloud Batch, Azure Batchなどのクラウドのバッチ実行サービスを利用する。
どの方法を採用するにしても、JobParametersを適切に渡し、JobRepositoryにアクセスできる環境を用意し、ログ出力や監視ができるように構成することが重要です。
8. まとめと次のステップ
このガイドでは、Spring Batchの基本的な概念、アーキテクチャ、そして簡単なバッチジョブの作成から実行までを詳細に解説しました。
重要なポイントの振り返り:
- バッチ処理は大量データをまとめて処理する形態であり、堅牢性や運用管理が重要。
- Spring Batchはこれらの課題を解決する強力なフレームワーク。
- Job, Step, ItemReader, ItemProcessor, ItemWriter, Chunk, Tasklet, JobRepository, JobLauncher, JobExplorerといったコアコンポーネントを理解することが出発点。
- 特にチャンク指向処理(Reader -> Processor -> Writerをチャンク単位で繰り返し)はSpring Batchの基本かつ強力なモデル。
- Spring Bootを使えば簡単にセットアップできる。
spring-boot-starter-batch
とJobRepository用のデータベースが必要。 @EnableBatchProcessing
はSpring Boot 3.xから不要になった。JobRepositoryとPlatformTransactionManagerは依存性注入で取得する。- ItemReader/Processor/WriterはSpringが提供する豊富な実装を利用するか、独自に実装する。
- JobとStepはJavaConfig (
@Bean
) で宣言的に定義する。JobBuilder
,StepBuilder
を使う。 - Jobの実行は
JobLauncher
を使用し、JobParameters
で実行を識別・制御する。 - エラー時の再実行、スキップ、リトライ、リスナーによるイベント処理など、堅牢性を高めるための機能が充実している。
- 実運用ではJobRepositoryの永続化、監視、テスト、デプロイ方法などを考慮する必要がある。
次のステップ:
- 実際に手を動かす: このガイドのコード例を実際にSpring Bootプロジェクトとして実装し、実行してみてください。様々なデータで試したり、エラーを発生させてみたりして、挙動を確認することが理解を深める最良の方法です。
- 他のItemReader/Writerを試す: データベース(JDBC, JPA)からの読み込み/書き込み、XMLファイルの処理など、他のReader/Writerを使って別のシナリオのバッチ処理を実装してみましょう。
- Taskletを試す: ファイル操作やコマンド実行など、Taskletを使ったシンプルなStepを持つJobを作成してみましょう。
- エラーハンドリングを実装する: スキップやリトライの設定を実際にコードに追加し、意図的にエラーを発生させて期待通りの挙動になるか確認しましょう。
- JobParametersを活用する: 処理対象のファイルパスをJobParametersから受け取るようにするなど、JobParametersを積極的に利用してみましょう。
- 公式ドキュメントを読む: Spring Batchの公式ドキュメントは非常に詳細で正確な情報源です。分からないことや、より高度な機能(パーティショニング、リモートチャンキングなど)について調べる際には参照しましょう。(英語ですが、頑張って読んでみる価値はあります。)
- 関連書籍やオンラインリソースを参考にする: Spring Batchに関する解説記事や書籍は多数存在します。様々な解説に触れることで、理解が深まることがあります。
Spring Batchは強力で多機能なフレームワークですが、その分学ぶべきことも多いです。しかし、一度基本を習得すれば、大量データ処理のバッチアプリケーション開発が格段に効率的かつ堅牢になります。
このガイドが、あなたのSpring Batch学習の第一歩として役立つことを願っています。頑張ってください!