Introduction to Step Listeners in Spring Batch
Listeners are an important part of Spring Batch's event-driven architecture. They allow you to hook into the lifecycle of a batch job and execute custom logic at specific points, such as before a step starts, after a chunk is processed, or when an item is skipped due to an error.
Here's a breakdown of the key listeners, their purpose, and examples.
StepExecutionListener
This listener is invoked at the start and end of a step. It's primarily used for step-level setup and teardown, like initializing a resource or performing final logging.
- Purpose: To perform logic before a step begins and after it ends.
-
Methods:
-
beforeStep(StepExecution stepExecution)
: Called before a step's execution. -
afterStep(StepExecution stepExecution)
: Called after a step's execution. This method can return anExitStatus
to override the step's final status.
-
Example: Logging the start and end time of a step.
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import java.util.Date;
public class MyStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("Step '" + stepExecution.getStepName() + "' is starting at " + new Date());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("Step '" + stepExecution.getStepName() + "' finished with status " + stepExecution.getExitStatus().getExitCode() + " at " + new Date());
return stepExecution.getExitStatus();
}
}
ChunkListener
A chunk-oriented step is where the read-process-write cycle occurs. This listener is invoked at the start and end of each chunk.
- Purpose: To perform actions for each batch or "chunk" of items being processed. A common use case is tracking the progress of a large job.
-
Methods:
-
beforeChunk(ChunkContext context)
: Called before a chunk's transaction begins. -
afterChunk(ChunkContext context)
: Called after a chunk's transaction has committed successfully. -
afterChunkError(ChunkContext context)
: Called after a chunk fails.
-
Example: Counting and logging the number of items processed in each chunk.
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;
public class MyChunkListener implements ChunkListener {
@Override
public void beforeChunk(ChunkContext context) {
// You could log a message here
}
@Override
public void afterChunk(ChunkContext context) {
int itemsWritten = context.getStepContext().getReadCount();
System.out.println("Chunk processed! Items written: " + itemsWritten);
}
@Override
public void afterChunkError(ChunkContext context) {
System.err.println("Error processing chunk!");
}
}
ItemReadListener
, ItemProcessListener
, ItemWriteListener
These listeners are more granular and operate at the individual item level. They provide hooks for success and failure events for each read
, process
, and write
operation.
- Purpose: To perform specific actions on an item, like logging a failure or performing a retry.
Common Methods:
onReadError(Exception ex)
onProcessError(T item, Exception e)
onWriteError(Exception exception, List<? extends S> items)
Example:
This code snippet shows how to log a message for each successful read and log a skipped item if an error occurs.
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.core.ItemWriteListener;
import java.util.List;
public class MyItemListener implements ItemReadListener<String>, ItemProcessListener<String, String>, ItemWriteListener<String> {
@Override
public void onReadError(Exception ex) {
System.err.println("Failed to read item: " + ex.getMessage());
}
@Override
public void onProcessError(String item, Exception e) {
System.err.println("Failed to process item: " + item + " - " + e.getMessage());
}
@Override
public void onWriteError(Exception exception, List<? extends String> items) {
System.err.println("Failed to write chunk of " + items.size() + " items: " + exception.getMessage());
}
// You can also add `after` methods for success cases, e.g., onRead, afterProcess, beforeWrite
}
SkipListener
This is a specialized listener that is only invoked when a skippable exception occurs, allowing you to handle the skipped item explicitly.
- Purpose: To perform actions when an item is skipped. This is critical for data integrity and error handling.
-
Methods:
-
onSkipInRead(Throwable t)
: Called when an error occurs during reading, and the item is skipped. -
onSkipInProcess(T item, Throwable t)
: Called when an error occurs during processing, and the item is skipped. -
onSkipInWrite(S item, Throwable t)
: Called when an error occurs during writing, and the item is skipped.
-
Example: Logging a skipped record to a separate file for later review.
import org.springframework.batch.core.SkipListener;
public class MySkipListener implements SkipListener<String, String> {
@Override
public void onSkipInRead(Throwable t) {
System.out.println("Skipped reading due to: " + t.getMessage());
}
@Override
public void onSkipInProcess(String item, Throwable t) {
System.out.println("Skipped processing item: " + item + " due to: " + t.getMessage());
}
@Override
public void onSkipInWrite(String item, Throwable t) {
System.out.println("Skipped writing item: " + item + " due to: " + t.getMessage());
}
}
How to Use Listeners in a Step
To use any of these listeners, you must register them with the step configuration. This is typically done within the StepBuilder
.
Example Configuration:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.SkipListener;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobRepository jobRepository;
@Autowired
private PlatformTransactionManager transactionManager;
@Bean
public Step myStep() {
return new StepBuilder("myStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(...)
.processor(...)
.writer(...)
// Registering the listeners
.listener(new MyStepListener())
.listener(new MyChunkListener())
.listener(new MySkipListener())
.build();
}
}
By using listeners, you can build more resilient and observable batch jobs, separating your core business logic from cross-cutting concerns like logging, monitoring, and error handling.
Top comments (0)