DEV Community

Sadiul Hakim
Sadiul Hakim

Posted on

Spring Batch Tutorial Part #2

Introduction to Step Listeners in Spring Batch

Listeners are an important part of Spring Batch's event-driven architecture. They allow you to hook into the lifecycle of a batch job and execute custom logic at specific points, such as before a step starts, after a chunk is processed, or when an item is skipped due to an error.

Here's a breakdown of the key listeners, their purpose, and examples.


StepExecutionListener

This listener is invoked at the start and end of a step. It's primarily used for step-level setup and teardown, like initializing a resource or performing final logging.

  • Purpose: To perform logic before a step begins and after it ends.
  • Methods:
    • beforeStep(StepExecution stepExecution): Called before a step's execution.
    • afterStep(StepExecution stepExecution): Called after a step's execution. This method can return an ExitStatus to override the step's final status.

Example: Logging the start and end time of a step.

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import java.util.Date;

public class MyStepListener implements StepExecutionListener {
    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("Step '" + stepExecution.getStepName() + "' is starting at " + new Date());
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("Step '" + stepExecution.getStepName() + "' finished with status " + stepExecution.getExitStatus().getExitCode() + " at " + new Date());
        return stepExecution.getExitStatus();
    }
}
Enter fullscreen mode Exit fullscreen mode

ChunkListener

A chunk-oriented step is where the read-process-write cycle occurs. This listener is invoked at the start and end of each chunk.

  • Purpose: To perform actions for each batch or "chunk" of items being processed. A common use case is tracking the progress of a large job.
  • Methods:
    • beforeChunk(ChunkContext context): Called before a chunk's transaction begins.
    • afterChunk(ChunkContext context): Called after a chunk's transaction has committed successfully.
    • afterChunkError(ChunkContext context): Called after a chunk fails.

Example: Counting and logging the number of items processed in each chunk.

import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;

public class MyChunkListener implements ChunkListener {
    @Override
    public void beforeChunk(ChunkContext context) {
        // You could log a message here
    }

    @Override
    public void afterChunk(ChunkContext context) {
        int itemsWritten = context.getStepContext().getReadCount();
        System.out.println("Chunk processed! Items written: " + itemsWritten);
    }

    @Override
    public void afterChunkError(ChunkContext context) {
        System.err.println("Error processing chunk!");
    }
}
Enter fullscreen mode Exit fullscreen mode

ItemReadListener, ItemProcessListener, ItemWriteListener

These listeners are more granular and operate at the individual item level. They provide hooks for success and failure events for each read, process, and write operation.

  • Purpose: To perform specific actions on an item, like logging a failure or performing a retry.

Common Methods:

  • onReadError(Exception ex)
  • onProcessError(T item, Exception e)
  • onWriteError(Exception exception, List<? extends S> items)

Example:
This code snippet shows how to log a message for each successful read and log a skipped item if an error occurs.

import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.core.ItemWriteListener;
import java.util.List;

public class MyItemListener implements ItemReadListener<String>, ItemProcessListener<String, String>, ItemWriteListener<String> {

    @Override
    public void onReadError(Exception ex) {
        System.err.println("Failed to read item: " + ex.getMessage());
    }

    @Override
    public void onProcessError(String item, Exception e) {
        System.err.println("Failed to process item: " + item + " - " + e.getMessage());
    }

    @Override
    public void onWriteError(Exception exception, List<? extends String> items) {
        System.err.println("Failed to write chunk of " + items.size() + " items: " + exception.getMessage());
    }

    // You can also add `after` methods for success cases, e.g., onRead, afterProcess, beforeWrite
}
Enter fullscreen mode Exit fullscreen mode

SkipListener

This is a specialized listener that is only invoked when a skippable exception occurs, allowing you to handle the skipped item explicitly.

  • Purpose: To perform actions when an item is skipped. This is critical for data integrity and error handling.
  • Methods:
    • onSkipInRead(Throwable t): Called when an error occurs during reading, and the item is skipped.
    • onSkipInProcess(T item, Throwable t): Called when an error occurs during processing, and the item is skipped.
    • onSkipInWrite(S item, Throwable t): Called when an error occurs during writing, and the item is skipped.

Example: Logging a skipped record to a separate file for later review.

import org.springframework.batch.core.SkipListener;

public class MySkipListener implements SkipListener<String, String> {
    @Override
    public void onSkipInRead(Throwable t) {
        System.out.println("Skipped reading due to: " + t.getMessage());
    }

    @Override
    public void onSkipInProcess(String item, Throwable t) {
        System.out.println("Skipped processing item: " + item + " due to: " + t.getMessage());
    }

    @Override
    public void onSkipInWrite(String item, Throwable t) {
        System.out.println("Skipped writing item: " + item + " due to: " + t.getMessage());
    }
}
Enter fullscreen mode Exit fullscreen mode

How to Use Listeners in a Step

To use any of these listeners, you must register them with the step configuration. This is typically done within the StepBuilder.

Example Configuration:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.SkipListener;

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    private JobRepository jobRepository;

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Bean
    public Step myStep() {
        return new StepBuilder("myStep", jobRepository)
                .<String, String>chunk(10, transactionManager)
                .reader(...)
                .processor(...)
                .writer(...)
                // Registering the listeners
                .listener(new MyStepListener())
                .listener(new MyChunkListener())
                .listener(new MySkipListener())
                .build();
    }
}
Enter fullscreen mode Exit fullscreen mode

By using listeners, you can build more resilient and observable batch jobs, separating your core business logic from cross-cutting concerns like logging, monitoring, and error handling.

Top comments (0)