DEV Community

Sadiul Hakim
Sadiul Hakim

Posted on

Spring Batch Project with Multi-File Processing

Project Overview

This project demonstrates:

  1. Reading from multiple CSV files using ResourceAwareItemReaderItemStream
  2. Performing calculations and writing to an intermediate file
  3. Reading the intermediate file and writing to a database

Implementation

1. Maven Dependencies (pom.xml)

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
Enter fullscreen mode Exit fullscreen mode

2. Domain Model

// Employee.java - Input model
public class Employee {
    private String id;
    private String firstName;
    private String lastName;
    private double salary;

    // Constructors, getters, setters
}

// EmployeeProcessed.java - Intermediate model
public class EmployeeProcessed {
    private String employeeId;
    private String fullName;
    private double annualSalary;
    private double taxAmount;

    // Constructors, getters, setters
}

// EmployeeDB.java - Database model
@Entity
@Table(name = "employees")
public class EmployeeDB {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String employeeId;
    private String fullName;
    private double annualSalary;
    private double taxAmount;
    private Date processedDate;

    // Constructors, getters, setters
}
Enter fullscreen mode Exit fullscreen mode

3. Custom ResourceAwareItemReaderItemStream

/**
 * Custom reader that processes each line from multiple files
 * Implements ResourceAwareItemReaderItemStream to handle multiple resources
 */
public class MultiFileEmployeeReader implements ResourceAwareItemReaderItemStream<Employee> {

    private FlatFileItemReader<String> lineReader;
    private Resource[] resources;
    private int currentResourceIndex = -1;

    public MultiFileEmployeeReader(FlatFileItemReader<String> lineReader) {
        this.lineReader = lineReader;
    }

    @Override
    public void setResource(Resource resource) {
        // This method is called for each resource by MultiResourceItemReader
        // We don't implement it here as we handle resources differently
    }

    @Override
    public void setResources(Resource[] resources) {
        this.resources = resources;
    }

    @Override
    public Employee read() throws Exception {
        // Read line from current resource
        String line = lineReader.read();

        // If current resource is exhausted, move to next resource
        while (line == null) {
            currentResourceIndex++;

            if (currentResourceIndex >= resources.length) {
                return null; // All resources processed
            }

            // Set the new resource for the delegate reader
            lineReader.setResource(resources[currentResourceIndex]);
            lineReader.open(new ExecutionContext());
            line = lineReader.read();
        }

        // Parse the line and create Employee object
        return parseEmployee(line);
    }

    private Employee parseEmployee(String line) {
        String[] fields = line.split(",");
        Employee employee = new Employee();
        employee.setId(fields[0]);
        employee.setFirstName(fields[1]);
        employee.setLastName(fields[2]);
        employee.setSalary(Double.parseDouble(fields[3]));
        return employee;
    }

    @Override
    public void open(ExecutionContext executionContext) {
        currentResourceIndex = -1;
    }

    @Override
    public void update(ExecutionContext executionContext) {
        // Not implemented for simplicity
    }

    @Override
    public void close() {
        if (lineReader != null) {
            lineReader.close();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

4. Batch Configuration

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    // Define input resources (multiple CSV files)
    @Value("classpath:input/employees*.csv")
    private Resource[] inputResources;

    // Define output file for intermediate processing
    @Value("file:output/processed_employees.csv")
    private Resource outputResource;

    /**
     * Step 1: Read from multiple CSV files, process, and write to intermediate file
     */
    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Employee, EmployeeProcessed>chunk(10)
                .reader(multiResourceEmployeeReader())
                .processor(employeeProcessor())
                .writer(employeeFileWriter())
                .build();
    }

    /**
     * Step 2: Read from intermediate file and write to database
     */
    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .<EmployeeProcessed, EmployeeDB>chunk(10)
                .reader(processedEmployeeReader())
                .processor(employeeDBProcessor())
                .writer(employeeDBWriter())
                .build();
    }

    /**
     * Main job that executes both steps
     */
    @Bean
    public Job processEmployeeJob(JobCompletionNotificationListener listener) {
        return jobBuilderFactory.get("processEmployeeJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(step1())
                .next(step2())
                .end()
                .build();
    }

    /**
     * MultiResourceItemReader that delegates to our custom reader
     */
    @Bean
    public MultiResourceItemReader<Employee> multiResourceEmployeeReader() {
        // Create a line reader that reads each line as a string
        FlatFileItemReader<String> lineReader = new FlatFileItemReaderBuilder<String>()
                .name("lineReader")
                .lineMapper((line, lineNumber) -> line) // Return the same line
                .build();

        // Create our custom reader that handles multiple files
        MultiFileEmployeeReader reader = new MultiFileEmployeeReader(lineReader);
        reader.setResources(inputResources);

        // Wrap with MultiResourceItemReader
        return new MultiResourceItemReaderBuilder<Employee>()
                .name("multiResourceEmployeeReader")
                .resources(inputResources)
                .delegate(reader)
                .build();
    }

    /**
     * Processor for step 1: Calculate annual salary and tax
     */
    @Bean
    public ItemProcessor<Employee, EmployeeProcessed> employeeProcessor() {
        return employee -> {
            EmployeeProcessed processed = new EmployeeProcessed();
            processed.setEmployeeId(employee.getId());
            processed.setFullName(employee.getFirstName() + " " + employee.getLastName());

            // Calculate annual salary (assuming monthly salary in input)
            double annualSalary = employee.getSalary() * 12;
            processed.setAnnualSalary(annualSalary);

            // Simple tax calculation (20% of annual salary)
            processed.setTaxAmount(annualSalary * 0.2);

            return processed;
        };
    }

    /**
     * File writer for step 1: Write processed data to CSV file
     */
    @Bean
    public FlatFileItemWriter<EmployeeProcessed> employeeFileWriter() {
        return new FlatFileItemWriterBuilder<EmployeeProcessed>()
                .name("employeeFileWriter")
                .resource(outputResource)
                .delimited()
                .names(new String[]{"employeeId", "fullName", "annualSalary", "taxAmount"})
                .build();
    }

    /**
     * Reader for step 2: Read from intermediate CSV file
     */
    @Bean
    public FlatFileItemReader<EmployeeProcessed> processedEmployeeReader() {
        return new FlatFileItemReaderBuilder<EmployeeProcessed>()
                .name("processedEmployeeReader")
                .resource(outputResource)
                .delimited()
                .names(new String[]{"employeeId", "fullName", "annualSalary", "taxAmount"})
                .targetType(EmployeeProcessed.class)
                .build();
    }

    /**
     * Processor for step 2: Convert to database entity
     */
    @Bean
    public ItemProcessor<EmployeeProcessed, EmployeeDB> employeeDBProcessor() {
        return processed -> {
            EmployeeDB employeeDB = new EmployeeDB();
            employeeDB.setEmployeeId(processed.getEmployeeId());
            employeeDB.setFullName(processed.getFullName());
            employeeDB.setAnnualSalary(processed.getAnnualSalary());
            employeeDB.setTaxAmount(processed.getTaxAmount());
            employeeDB.setProcessedDate(new Date());
            return employeeDB;
        };
    }

    /**
     * Database writer for step 2: Write to database using JdbcBatchItemWriter
     */
    @Bean
    public JdbcBatchItemWriter<EmployeeDB> employeeDBWriter() {
        return new JdbcBatchItemWriterBuilder<EmployeeDB>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO employees (employee_id, full_name, annual_salary, tax_amount, processed_date) " +
                     "VALUES (:employeeId, :fullName, :annualSalary, :taxAmount, :processedDate)")
                .dataSource(dataSource)
                .build();
    }

    /**
     * Initialize in-memory H2 database
     */
    @Bean
    public DataSourceInitializer dataSourceInitializer(DataSource dataSource) {
        DataSourceInitializer initializer = new DataSourceInitializer();
        initializer.setDataSource(dataSource);

        ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(new ClassPathResource("schema.sql"));
        initializer.setDatabasePopulator(populator);

        return initializer;
    }
}
Enter fullscreen mode Exit fullscreen mode

5. Database Schema

-- schema.sql
CREATE TABLE employees (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    employee_id VARCHAR(50) NOT NULL,
    full_name VARCHAR(100) NOT NULL,
    annual_salary DOUBLE NOT NULL,
    tax_amount DOUBLE NOT NULL,
    processed_date DATE NOT NULL
);
Enter fullscreen mode Exit fullscreen mode

6. Job Completion Listener

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

    private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

    @Override
    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("!!! JOB FINISHED! Time to verify the results");
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Explanation of ResourceAwareItemReaderItemStream

The ResourceAwareItemReaderItemStream interface extends both ItemReader and ItemStream interfaces and adds the ability to handle resources. Here's how it enables reading from multiple files:

  1. Resource Awareness: The interface provides a setResource(Resource resource) method that allows the reader to be aware of the resource it's currently processing.

  2. MultiResourceItemReader Integration: The MultiResourceItemReader uses the ResourceAwareItemReaderItemStream implementation by:

    • Iterating through all provided resources
    • Setting each resource on the delegate reader using setResource()
    • Delegating the reading to the custom reader for each file
  3. State Management: The implementation maintains state (current resource index, position in file) through the ItemStream interface methods (open(), update(), close()).

  4. Seamless Transition: When one resource is exhausted, the MultiResourceItemReader automatically moves to the next resource and updates the delegate reader.

In our implementation:

  • MultiFileEmployeeReader implements the resource handling logic
  • MultiResourceItemReader manages the resource iteration
  • The custom reader processes each line and creates domain objects

Running the Application

  1. Place CSV files in src/main/resources/input/ with names like employees1.csv, employees2.csv, etc.
  2. Run the Spring Boot application
  3. The batch job will:
    • Read from all employee CSV files
    • Process and write to an intermediate file
    • Read from the intermediate file and write to the database

Top comments (0)