
なぜSpring Batchのスキップとリスナーが必要なのか?
バッチ処理を運用していると、「CSVファイルに不正なデータが混じっていて処理が止まってしまった…」
「一部のデータのエラーで、せっかくのバッチ処理が完走できない…」
といった経験はありませんか?
たった1件の不良データのために、何時間もかかるバッチ処理全体を停止させてしまうのは、非常に非効率的です。
この記事を読むことで、あなたは以下のメリットを得られます。
- バッチ処理のエラーで夜中に呼び出されることがなくなります。
- データ品質に左右されない、堅牢なシステムを構築するスキルが身につきます。
- エラー発生時にも処理を止めず、問題箇所を正確に特定・追跡できる運用を実現できます。
本記事では、Spring Batchの強力なスキップ機能とリスナーを組み合わせて、これらの課題を解決し、エラー耐性の高い堅牢なバッチ処理を構築する方法をハンズオン形式で学びます。
具体的には、以下の実践的なバッチジョブを構築します。
- CSVファイルから商品データを読み込む。
- 読み込み中に発生するフォーマットエラーをスキップする。
- 処理中に発生する業務エラーをスキップする。
- スキップしたアイテムの情報を、別のテーブルに永続的に記録する。
- 正常なデータのみDBに書き込む。
目次
- 1. スキップとリスナーの基本概念
- 1-1. スキップとは?その目的
- 1-2. リスナーとは?その目的
- 1-3. スキップとリスナーの強力な連携
- 2. ハンズオンで学ぶ!堅牢なバッチ処理の構築
- 2-1. ハンズオンアプリケーションの目的
- 2-2. アプリケーションの概要
- 2-3. 主要な仕様
- 2-4. フォルダ構成
- 3. Step 1: 準備 – モデルとリポジトリの作成
Product.javaSkippedItem.java
- 4. Step 2: エラーの準備 – 不正なデータとProcessorの実装
products.csvSkippableException.javaProductProcessor.java
- 5. Step 3: ジョブの構築 – ConfigとListenerの連携
SkipListenerJobConfig.javaCustomSkipListenerとSkippedItemService
- 6. ジョブ実行用インタフェースの作成
- 6-1. APIエンドポイントの追加
- 6-2. Webブラウザからの実行
- 7. 実行方法
- 7-1. Web UIからの実行
- 7-2. APIでの直接実行
- 8. Step 3: 実行と結果解説
- 8-1. 実行ログの確認
- 8-2. H2コンソールでの結果確認
- まとめ:エラーに強いバッチ処理の実現へ
対象読者
- Spring Batchの基本的な概念を理解している開発者
- バッチ処理におけるエラーハンドリングのベストプラクティスを学びたい方
- 堅牢で運用しやすいSpring Batchアプリケーションの構築を目指している方
- Spring BootとSpring Batchを用いた開発経験がある方
1. スキップとリスナーの基本概念
1-1. スキップとは?その目的
「スキップ」とは、バッチ処理の実行中に特定の例外やエラーが発生した際、そのエラーを発生させたアイテム(レコード)の処理を意図的にスキップし、残りの処理を続行するためのメカニズムです。
これにより、一部のデータに問題があっても、バッチ処理全体が中断されることなく最後まで実行されることを保証します。
例えるなら、製造ラインで不良品が見つかっても、ライン全体を止めずに不良品だけを取り除き、残りの製品の生産を続けるようなものです。
スキップは、以下のような目的のために使用されます。
- バッチ処理の継続性:
- 一部の不良データや一時的な外部システムの問題によって処理全体が停止することを防ぎ、可能な限り多くのアイテムを処理します。
- エラー耐性 (Fault Tolerance):
- 予期せぬエラーが発生してもシステムが回復し、処理を継続できる能力を高めます。
- 問題のあるアイテムの隔離:
- 問題を発生させたアイテムを特定し、後で手動での修正や、別の専用プロセスで再処理するための仕組みを提供します。これにより、メインのバッチ処理の複雑性を軽減します。
1-2. リスナーとは?その目的
「リスナー」とは、Spring Batchのジョブやステップの様々なライフサイクルイベント(開始、終了、読み込み、処理、書き込みなど)に対応して、カスタムの処理ロジックを挿入するためのインターフェース群です。
これにより、バッチ処理の実行状況を監視したり、特定のイベントに応じて追加の処理を実行したりすることが可能になります。
例えるなら、製造ラインで不良品が取り除かれた際に、その不良品の種類や原因を記録する品質管理担当者のような役割を担います。
リスナーは、以下のような目的のために広く利用されます。
- 処理状況の可視化:
- ジョブやステップの開始・終了時、またはチャンク処理の完了時などにログを出力したり、進捗状況を追跡したりします。
- エラーハンドリングの拡張:
- エラー発生時に、標準のエラー処理に加えてカスタムの緊急通知を行ったり、エラーの詳細情報をデータベースに記録したりすることができます。
- リソース管理:
- ジョブやステップの開始前にリソース(データベース接続、ファイルハンドラなど)を初期化し、終了後にそれらをクリーンアップする用途に利用されます。
- ビジネスロジックの追加:
- 特定のライフサイクルイベント(例: ジョブの完了)をトリガーとして、レポート生成、後続処理のキックオフ、外部システムへの通知などのビジネスロジックを実行します。
1-3. スキップとリスナーの強力な連携
スキップとリスナーは、エラーハンドリング戦略において密接に連携します。
特にSkipListenerは、スキップ機能と組み合わせることで、エラー発生時の詳細な情報収集と対応を可能にします。
バッチ処理中にスキップ対象の例外が発生した場合、Spring Batchはエラーの原因となったアイテムをスキップし、処理を続行します。
このとき、SkipListenerに実装されたonSkipInRead()、onSkipInProcess()、onSkipInWrite()メソッドが呼び出され、スキップされたアイテムや例外の情報を捕捉できます。
この連携により、バッチ処理は継続性を保ちつつ、運用担当者は後からエラーの詳細を分析し、適切な対策を講じることが可能になります。
図:Spring Batchにおけるスキップとトランザクション分離のシーケンス
このシーケンス図は、Spring Batchジョブがアイテムを処理する際のライフサイクル、特にエラー発生時のスキップ処理とトランザクションの分離メカニズムを視覚的に示しています。
Stepがアイテムの読み込み、処理、書き込みを内部で管理し、不正なアイテムが検出された際にはSkipListenerが起動します。SkipListenerはSkippedItemServiceを呼び出し、@Transactional(propagation = Propagation.REQUIRES_NEW)によってメインのチャンクトランザクションとは独立した新しいトランザクションでスキップ情報をデータベースに永続化します。- これにより、メインのバッチ処理がロールバックされても、スキップ情報は確実に記録されます。

Spring Batchのスキップとリスナーの詳細については、以下の記事で詳細に解説していますので、是非ご覧ください。
2. ハンズオンで学ぶ!堅牢なバッチ処理の構築
2-1. ハンズオンアプリケーションの目的
本アプリケーションは、Spring Batchにおけるエラーハンドリングの中でも特に重要な「スキップ」と「リスナー」の連携を実践的に学ぶことを目的としています。
データ品質に問題がある場合でもバッチ処理を止めずに完走させ、かつ問題があった箇所を正確に記録・追跡するための堅牢な実装パターンを習得します。
2-2. アプリケーションの概要
CSVファイル(products.csv)から商品データを読み込み、データベース(H2)に保存するバッチジョブを構築します。
このCSVファイルには、意図的に不正なフォーマットの行や、業務エラーを引き起こすデータが含まれています。
Spring Batchのスキップ機能を用いてこれらのエラーをハンドリングし、SkipListener を介してスキップされたアイテムの詳細情報(どのステップで、どのデータが、なぜスキップされたか)を別の専用テーブル(SKIPPED_ITEMS)に記録します。
2-3. 主要な仕様
- 使用技術: Spring Boot, Spring Batch, JPA(Hibernate), H2 Database, Java。
- 入力データ:
products.csv。価格が数値でない行や、処理エラーを引き起こすフラグが立てられた行を含む。 - Job構成:
skipListenerJobは以下の主要なStepで構成されます。skipListenerStep: メインの処理ステップ。- Reader:
FlatFileItemReaderを使用し、CSVからProductオブジェクトへのマッピングを行う。 - Processor:
ProductProcessorで、特定の条件下でカスタム例外SkippableExceptionをスローする。 - Writer:
JpaItemWriterを使用し、正常に処理されたProductをDBに保存する。 - エラーハンドリング:
faultTolerant()を有効化。- 読み込み時の
FlatFileParseExceptionと処理時のSkippableExceptionをスキップ対象に設定。 - スキップ上限を
skipLimit(5)に設定。
- リスナー:
CustomSkipListener: スキップ発生時にSkippedItemServiceを呼び出し、スキップ情報をSKIPPED_ITEMSテーブルに永続化する。
- Reader:
- 実行方法:
- Web UI (
http://localhost:8080/) から提供されるボタンを通じてJobを起動できます。 - REST API (
curl -X POST http://localhost:8080/launch/jobs/skip-listener-job) を使用してJobを直接実行することも可能です。
- Web UI (
2-4. フォルダ構成
主要なソースコードはsrc/main/java/com/example/springbatchh2crud/配下に配置され、関心事ごとに以下のパッケージに分割されています。
src/main/java/com/example/springbatchh2crud/
├── config/
│ └── SkipListenerJobConfig.java // スキップ関連のJobおよびStep定義
├── exception/
│ └── SkippableException.java // スキップ対象のカスタム例外
├── listener/
│ └── CustomSkipListener.java // スキップ情報を永続化するリスナー
├── model/
│ ├── Product.java // 処理対象のデータモデル
│ └── SkippedItem.java // スキップ情報を記録するモデル
├── processor/
│ └── ProductProcessor.java // 意図的に例外を発生させるプロセッサー
└── service/
└── SkippedItemService.java // スキップ情報をDBに保存するサービスSpring Batchのサンプルコードのベースを、以下の記事で提供していますので、本記事の実装をする際に、是非ご活用ください。
3. Step 1: 準備 – モデルとリポジトリの作成
まず、処理対象となる商品データ Product と、スキップされたアイテムの情報を記録する SkippedItem の2つのエンティティを定義します。
Product.java
処理対象のデータモデルです。CSVファイルの一行に対応します。
package com.example.springbatchh2crud.model;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.math.BigDecimal;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Entity
@Table(name = "products")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Product {
@Id
private Long id;
private String name;
private BigDecimal price;
private boolean invalid;
}SkippedItem.java
スキップが発生した際に、その詳細情報を保存するためのエンティティです。
package com.example.springbatchh2crud.model;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Lob;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Entity
@Table(name = "skipped_items")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SkippedItem {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String jobName;
private String stepName;
private String phase; // "read", "process", "write"
@Lob
@Column(length = 2048)
private String item;
@Lob
@Column(length = 4096)
private String cause;
}これらのエンティティに対応するJPAリポジトリも作成します。中身は空のインターフェースで問題ありません。
ProductRepository.java
package com.example.springbatchh2crud.repository;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import com.example.springbatchh2crud.model.Product;
@Repository
public interface ProductRepository extends JpaRepository<Product, Long> {
}SkippedItemRepository.java
package com.example.springbatchh2crud.repository;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import com.example.springbatchh2crud.model.SkippedItem;
@Repository
public interface SkippedItemRepository extends JpaRepository<SkippedItem, Long> {
}4. Step 2: エラーの準備 – 不正なデータとProcessorの実装
スキップ機能の動作を確認するため、意図的にエラーを発生させる仕組みを準備します。
products.csv
入力となるCSVファイルです。読み込みエラー(価格が数値でない)と処理エラー(invalidフラグがtrue)を引き起こすデータを含んでいます。
※ コメントは削除してから、利用してください。
src/main/resources/input/products.csv
id,name,price,invalid
1,Book,19.99,false
2,Pencil,0.99,false
3,Notebook,not_a_number,false # Read-Error
4,Eraser,1.49,false
5,Marker,2.99,true # Process-Error
6,Stapler,12.99,false
7,Invalid-Record # Read-Error
8,Ruler,3.49,false
9,Highlighter,1.99,true # Process-Error
10,Tape,4.99,falseSkippableException.java
処理エラー(業務エラー)を表現するためのカスタム例外クラスです。
package com.example.springbatchh2crud.exception;
public class SkippableException extends Exception {
public SkippableException(String message) {
super(message);
}
}ProductProcessor.java
ItemProcessorの実装です。invalidフラグがtrueの商品データを受け取った場合に、SkippableExceptionをスローします。
package com.example.springbatchh2crud.processor;
import org.springframework.batch.item.ItemProcessor;
import com.example.springbatchh2crud.exception.SkippableException;
import com.example.springbatchh2crud.model.Product;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ProductProcessor implements ItemProcessor<Product, Product> {
@Override
public Product process(Product item) throws Exception {
log.debug("Processing product: {}", item);
if (item.isInvalid()) {
log.warn("Invalid product found: {}", item.getId());
throw new SkippableException("Skipping invalid product with id=" + item.getId());
}
// In a real scenario, some transformation logic would go here.
return item;
}
}5. Step 3: ジョブの構築 – ConfigとListenerの連携
いよいよ、スキップ機能とリスナーを組み込んだバッチジョブを定義します。
SkipListenerJobConfig.java
今回の機能の心臓部です。ジョブ、ステップ、Reader/Processor/Writer、そして各種リスナーの定義をここで行います。
package com.example.springbatchh2crud.config;
import com.example.springbatchh2crud.exception.SkippableException;
import com.example.springbatchh2crud.listener.CustomSkipListener;
import com.example.springbatchh2crud.listener.LoggingChunkListener;
import com.example.springbatchh2crud.listener.LoggingItemProcessListener;
import com.example.springbatchh2crud.listener.LoggingItemReadListener;
import com.example.springbatchh2crud.listener.LoggingItemWriteListener;
import com.example.springbatchh2crud.listener.LoggingStepExecutionListener;
import com.example.springbatchh2crud.model.Product;
import com.example.springbatchh2crud.processor.ProductProcessor;
import com.example.springbatchh2crud.tasklet.TruncateTablesTasklet;
import java.util.Arrays;
import javax.sql.DataSource;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileParseException;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@RequiredArgsConstructor
public class SkipListenerJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final DataSource dataSource;
private final JdbcTemplate jdbcTemplate;
// Listeners
private final CustomSkipListener customSkipListener;
private final LoggingStepExecutionListener loggingStepExecutionListener;
private final LoggingChunkListener loggingChunkListener;
private final LoggingItemReadListener<Product> loggingItemReadListener;
private final LoggingItemProcessListener<
Product,
Product
> loggingItemProcessListener;
private final LoggingItemWriteListener<Product> loggingItemWriteListener;
@Bean
public Job skipListenerJob() {
return new JobBuilder("skipListenerJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(truncateTablesStep())
.next(skipListenerStep())
.build();
}
@Bean
public Step truncateTablesStep() {
return new StepBuilder("truncateTablesStep", jobRepository)
.tasklet(
new TruncateTablesTasklet(
jdbcTemplate,
Arrays.asList("products", "skipped_items")
),
transactionManager
)
.build();
}
@Bean
public Step skipListenerStep() {
return new StepBuilder("skipListenerStep", jobRepository)
.<Product, Product>chunk(3, transactionManager)
.reader(productReader())
.processor(productProcessor())
.writer(productWriter())
.faultTolerant()
.skip(FlatFileParseException.class)
.skip(SkippableException.class)
.skipLimit(5) // 4 errors in csv, so 5 is safe
.listener(customSkipListener)
.listener(loggingStepExecutionListener)
.listener(loggingChunkListener)
.listener(loggingItemReadListener)
.listener(loggingItemProcessListener)
.listener(loggingItemWriteListener)
.build();
}
@Bean
public FlatFileItemReader<Product> productReader() {
return new FlatFileItemReaderBuilder<Product>()
.name("productReader")
.resource(new ClassPathResource("input/products.csv"))
.delimited()
.names("id", "name", "price", "invalid")
.fieldSetMapper(
new BeanWrapperFieldSetMapper<Product>() {
{
setTargetType(Product.class);
}
}
)
.linesToSkip(1) // Skip header
.build();
}
@Bean
public ProductProcessor productProcessor() {
return new ProductProcessor();
}
@Bean
public JdbcBatchItemWriter<Product> productWriter() {
return new JdbcBatchItemWriterBuilder<Product>()
.itemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<>()
)
.sql(
"INSERT INTO products (id, name, price, invalid) VALUES (:id, :name, :price, :invalid)"
)
.dataSource(dataSource)
.build();
}
}ポイント解説:Stepの定義
.faultTolerant(): これを呼び出すことで、後続の.skip()や.retry()といったエラーハンドリング機能が有効になります。.skip(Exception.class): スキップ対象とする例外クラスを指定します。指定された例外、またはそのサブクラスの例外が発生した場合にスキップ処理が実行されます。.skipLimit(int): ステップ全体で許容されるスキップの総数を設定します。この回数を超えてスキップが発生すると、ジョブは失敗します。.listener(...): ステップに各種リスナーを登録します。
CustomSkipListener と SkippedItemService
スキップが発生したアイテムをDBに記録するためのリスナーとサービスです。
CustomSkipListener.java
package com.example.springbatchh2crud.listener;
import com.example.springbatchh2crud.model.Product;
import com.example.springbatchh2crud.model.SkippedItem;
import com.example.springbatchh2crud.service.SkippedItemService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@StepScope
@RequiredArgsConstructor
public class CustomSkipListener implements SkipListener<Product, Product> {
private final SkippedItemService skippedItemService;
private final ObjectMapper objectMapper;
@Value("#{stepExecution.jobExecution.jobInstance.jobName}")
private String jobName;
@Value("#{stepExecution.stepName}")
private String stepName;
@Override
public void onSkipInRead(Throwable t) {
log.warn("Skipped in read due to: {}", t.getMessage());
// FlatFileParseException does not give us the item easily.
// We will log what we can.
SkippedItem skippedItem = new SkippedItem(
null,
jobName,
stepName,
"read",
"N/A",
t.toString()
);
skippedItemService.saveSkippedItem(skippedItem);
}
@Override
public void onSkipInProcess(Product item, Throwable t) {
log.warn("Skipped in process: item={}, cause={}", item, t.getMessage());
String itemAsString = convertItemToString(item);
SkippedItem skippedItem = new SkippedItem(
null,
jobName,
stepName,
"process",
itemAsString,
t.toString()
);
skippedItemService.saveSkippedItem(skippedItem);
}
@Override
public void onSkipInWrite(Product item, Throwable t) {
log.warn("Skipped in write: item={}, cause={}", item, t.getMessage());
String itemAsString = convertItemToString(item);
SkippedItem skippedItem = new SkippedItem(
null,
jobName,
stepName,
"write",
itemAsString,
t.toString()
);
skippedItemService.saveSkippedItem(skippedItem);
}
private String convertItemToString(Object item) {
try {
return objectMapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
log.error("Failed to serialize item: {}", item, e);
return item.toString();
}
}
}SkippedItemService.java
package com.example.springbatchh2crud.service;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.example.springbatchh2crud.model.SkippedItem;
import com.example.springbatchh2crud.repository.SkippedItemRepository;
import lombok.RequiredArgsConstructor;
@Service
@RequiredArgsConstructor
public class SkippedItemService {
private final SkippedItemRepository skippedItemRepository;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void saveSkippedItem(SkippedItem item) {
skippedItemRepository.save(item);
}
}ポイント解説:トランザクションの分離
[!IMPORTANT]
SkippedItemServiceのsaveSkippedItemメソッドに付与されている@Transactional(propagation = Propagation.REQUIRES_NEW)が非常に重要です。Spring Batchのチャンク処理は、単一のトランザクション内で実行されます。もしスキップが発生すると、そのチャンクはロールバックされ、新しいトランザクションでリトライ(またはスキップを考慮した再処理)が行われます。
このとき、
SkipListener内の処理も同じトランザクションで実行されていると、ロールバックに巻き込まれてしまい、スキップ情報をDBに保存する処理もキャンセルされてしまいます。
Propagation.REQUIRES_NEWを指定することで、saveSkippedItemメソッドは常に新しいトランザクションで実行されるようになります。これにより、メインのチャンク処理がロールバックされても、スキップ情報の保存処理は独立してコミットされ、確実にDBに記録を残すことができます。
6. ジョブ実行用インタフェースの作成
実装したジョブフローを手動で実行して動作を確認するための仕組みを整備します。
6-1. APIエンドポイントの追加
JobLaunchControllerに、作成したskipListenerJobを起動するための@PostMappingエンドポイントを追加します。
src/main/java/com/example/springbatchh2crud/controller/JobLaunchController.java(抜粋)
// ... (existing code)
private final Job skipListenerJob;
public JobLaunchController(
@Qualifier("skipListenerJob") Job skipListenerJob,
) {
this.skipListenerJob = skipListenerJob;
}
// ... (existing methods)
// --- Skip Listener Job ---
@PostMapping("/jobs/skip-listener-job")
public ResponseEntity<String> launchSkipListenerJob() {
return launchJob(skipListenerJob);
}6-2. Webブラウザからの実行
Webブラウザからジョブを簡単に実行できるよう、index.htmlに起動ボタンを追加します。
src/main/resources/static/index.html
<!-- ... (existing content) -->
<h2 class="group-title">Error Handling and Retry Jobs</h2>
<div class="job-card">
<h3>Skip Listener Job</h3>
<p>
<code>skipListenerJob</code>:
CSV読み込み時のエラーや処理中の業務エラーをスキップし、その内容をDBに記録する堅牢なデータ処理ジョブ。
</p>
<form action="/launch-job/jobs/skip-listener-job" method="POST">
<button type="submit">Launch skipListenerJob</button>
</form>
</div>
<!-- ... (existing content) -->7. 実行方法
実装した skipListenerJob は、以下のいずれかの方法で実行できます。
7-1. Web UIからの実行
アプリケーションを起動し、ブラウザで http://localhost:8080/ にアクセスします。
「Error Handling and Retry Jobs」セクションにある “Launch skipListenerJob” ボタンをクリックしてください。
7-2. APIでの直接実行
ターミナルから以下のcurlコマンドを実行します。
curl -X POST http://localhost:8080/launch/jobs/skip-listener-jobどちらの方法でも、アプリケーションのコンソールにログが出力され、スキップとリスナーの動作を実際に確認できます。
8. Step 3: 実行と結果解説
JobLaunchController と index.html にジョブを起動するためのエンドポイントとUIを追加した後、アプリケーションを実行してブラウザからジョブを起動します。
8-1. 実行ログの確認
ジョブを実行すると、コンソールに以下のようなログが出力されます。これらのログは、バッチ処理の内部で何が起こっているかを正確に示しています。
// ...
INFO --- [nio-8080-exec-1] c.e.s.l.LoggingStepExecutionListener : skipListenerStep started.
INFO --- [nio-8080-exec-1] c.e.s.listener.LoggingChunkListener : チャンク処理を開始します。
// ...
// 3行目の "not_a_number" で読み込みエラーが発生
WARN --- [nio-8080-exec-1] c.e.s.listener.LoggingItemReadListener : Error while reading an item: Parsing error at line: 4 in resource=[file [/.../input/products.csv]], input=[3,Notebook,not_a_number,false]
// SkipListenerが呼び出され、スキップ情報をDBに保存
WARN --- [nio-8080-exec-1] c.e.s.listener.CustomSkipListener : Skipped in read due to: Parsing error at line: 4 ...
Hibernate: insert into skipped_items (cause, item, job_name, phase, step_name) values (?, ?, ?, ?, ?)
// ...
// 5番目のアイテム "Marker" (invalid=true) で処理エラーが発生
WARN --- [nio-8080-exec-1] c.e.s.processor.ProductProcessor : Invalid product found: 5
WARN --- [nio-8080-exec-1] c.e.s.l.LoggingItemProcessListener : Error while processing item Product(id=5, name=Marker, price=2.99, invalid=true): Skipping invalid product with id=5
// SkipListenerが呼び出され、スキップ情報をDBに保存
WARN --- [nio-8080-exec-1] c.e.s.listener.CustomSkipListener : Skipped in process: item=Product(id=5, name=Marker, price=2.99, invalid=true), cause=Skipping invalid product with id=5
Hibernate: insert into skipped_items (cause, item, job_name, phase, step_name) values (?, ?, ?, ?, ?)
// ...
// 同様に7行目と9番目のアイテムもスキップされる
// ...
// 最終結果のサマリー
INFO --- [nio-8080-exec-1] c.e.s.l.LoggingStepExecutionListener : skipListenerStep finished. Status: COMPLETED, Read: 8, Write: 6, Skip: 4 | Summary: ...
INFO --- [nio-8080-exec-1] o.s.b.c.l.s.TaskExecutorJobLauncher : Job: [SimpleJob: [name=skipListenerJob]] completed with the following status: [COMPLETED]ログ解説:
- 読み込みエラー (
Read-Error):products.csvの3行目と7行目でFlatFileParseExceptionが発生します。これは.skip(FlatFileParseException.class)の定義によりスキップ対象となります。onSkipInReadがトリガーされ、SKIPPED_ITEMSテーブルに「phase: read」として記録されます。 - 処理エラー (
Process-Error): 5番目と9番目のアイテム(Marker, Highlighter)はinvalidフラグがtrueのため、ProductProcessorでSkippableExceptionがスローされます。これは.skip(SkippableException.class)の定義によりスキップ対象となります。onSkipInProcessがトリガーされ、SKIPPED_ITEMSテーブルに「phase: process」として、どのアイテムが原因だったかの情報と共に記録されます。 - 最終ステータス: 合計4回のスキップが発生しましたが、
skipLimit(5)の範囲内であるため、ジョブはCOMPLETEDステータスで正常に終了します。Read: 8,Write: 6,Skip: 4という最終カウントからも、10件の入力のうち2件が読み込み自体に失敗し(Readカウントに含まれない)、8件が読み込まれ、そのうち2件が処理でスキップされ、最終的に6件が書き込まれたことがわかります。
8-2. H2コンソールでの結果確認
http://localhost:8080/h2-console にアクセスして、テーブルの状態を確認します。
SELECT * FROM PRODUCTS
ID | NAME | PRICE | INVALID
---|-------------|-------|--------
1 | Book | 19.99 | false
2 | Pencil | 0.99 | false
4 | Eraser | 1.49 | false
6 | Stapler | 12.99 | false
8 | Ruler | 3.49 | false
10 | Tape | 4.99 | false
(6 rows)解説: 正常に処理が完了した6件のデータのみがPRODUCTSテーブルに永続化されています。スキップされたアイテムはチャンク処理がロールバックされるため、書き込み対象に含まれません。
SELECT * FROM SKIPPED_ITEMS
ID | JOB_NAME | STEP_NAME | PHASE | ITEM | CAUSE
---|-----------------|------------------|---------|------------------------------------------------------|----------------------------------
1 | skipListenerJob | skipListenerStep | read | N/A | org.springframework.batch.item.file.FlatFileParseException: Parsing error at line: 4...
2 | skipListenerJob | skipListenerStep | process | {"id":5,"name":"Marker","price":2.99,"invalid":true} | com.example.springbatchh2crud.exception.SkippableException: Skipping invalid product with id=5
3 | skipListenerJob | skipListenerStep | read | N/A | org.springframework.batch.item.file.FlatFileParseException: Parsing error at line: 8...
4 | skipListenerJob | skipListenerStep | process | {"id":9,"name":"Highlighter","price":1.99,"invalid":true} | com.example.springbatchh2crud.exception.SkippableException: Skipping invalid product with id=9
(4 rows)解説: スキップされた4件すべての情報が、SKIPPED_ITEMSテーブルに正確に記録されています。
PHASEカラムを見れば、どの処理段階(readまたはprocess)でエラーが発生したかが一目瞭然です。ITEMカラムには、処理中にスキップされたアイテムのJSON表現が記録されており、どのデータに問題があったかを特定できます(読み込みエラー時はアイテムを特定できないためN/A)。CAUSEカラムには、スキップの原因となった例外の詳細が記録されており、エラー解析に役立ちます。
まとめ:エラーに強いバッチ処理の実現へ
このハンズオンを通じて、Spring Batchのスキップ機能とリスナー機能がいかに強力であるかを学びました。
実際の開発現場では、データ品質のばらつきや外部システム連携の不安定さなど、予期せぬエラーが頻繁に発生します。
本記事で紹介したスキップ機能とリスナーを適切に活用することで、単にデータを処理するだけでなく、エラーが発生しても安定して動作し、かつ何が起きたのかを正確に追跡できる、堅牢で実践的なバッチアプリケーションを構築することが可能になります。
.faultTolerant()を起点としたエラーハンドリング設定により、予期せぬエラーに対する耐障害性を向上させることができます。SkipListenerを活用することで、スキップしたアイテムの情報を確実に捕捉できます。- リスナー内でのDB操作は、トランザクションの伝播設定(
Propagation.REQUIRES_NEW)を適切に行うことで、メイン処理のロールバックから保護し、信頼性の高いログ記録を実現できます。
次のステップへ
あなたのプロジェクトでは、現在どのようなエラーハンドリングを行っていますか?
本記事で学んだスキップとリスナーの機能を活用することで、どのような改善が考えられるでしょうか?
ぜひ、あなたのプロジェクトに合わせた応用例を考えてみてください。
免責事項
本記事の内容は、執筆時点での情報に基づいており、その正確性、完全性、有用性を保証するものではありません。
本記事の情報に基づいて発生したいかなる損害についても、著者および公開元は一切の責任を負いません。
技術仕様やライブラリのバージョンアップにより、記載内容が古くなる可能性があります。
ご自身の責任において、内容の検証およびご利用をお願いいたします。
