Project Overview
This project demonstrates:
- Reading from multiple CSV files using ResourceAwareItemReaderItemStream
- Performing calculations and writing to an intermediate file
- Reading the intermediate file and writing to a database
Implementation
1. Maven Dependencies (pom.xml)
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
2. Domain Model
// Employee.java - Input model
public class Employee {
    private String id;
    private String firstName;
    private String lastName;
    private double salary;
    // Constructors, getters, setters
}
// EmployeeProcessed.java - Intermediate model
public class EmployeeProcessed {
    private String employeeId;
    private String fullName;
    private double annualSalary;
    private double taxAmount;
    // Constructors, getters, setters
}
// EmployeeDB.java - Database model
@Entity
@Table(name = "employees")
public class EmployeeDB {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String employeeId;
    private String fullName;
    private double annualSalary;
    private double taxAmount;
    private Date processedDate;
    // Constructors, getters, setters
}
3. Custom ResourceAwareItemReaderItemStream
/**
 * Custom reader that processes each line from multiple files
 * Implements ResourceAwareItemReaderItemStream to handle multiple resources
 */
public class MultiFileEmployeeReader implements ResourceAwareItemReaderItemStream<Employee> {
    private FlatFileItemReader<String> lineReader;
    private Resource[] resources;
    private int currentResourceIndex = -1;
    public MultiFileEmployeeReader(FlatFileItemReader<String> lineReader) {
        this.lineReader = lineReader;
    }
    @Override
    public void setResource(Resource resource) {
        // This method is called for each resource by MultiResourceItemReader
        // We don't implement it here as we handle resources differently
    }
    @Override
    public void setResources(Resource[] resources) {
        this.resources = resources;
    }
    @Override
    public Employee read() throws Exception {
        // Read line from current resource
        String line = lineReader.read();
        // If current resource is exhausted, move to next resource
        while (line == null) {
            currentResourceIndex++;
            if (currentResourceIndex >= resources.length) {
                return null; // All resources processed
            }
            // Set the new resource for the delegate reader
            lineReader.setResource(resources[currentResourceIndex]);
            lineReader.open(new ExecutionContext());
            line = lineReader.read();
        }
        // Parse the line and create Employee object
        return parseEmployee(line);
    }
    private Employee parseEmployee(String line) {
        String[] fields = line.split(",");
        Employee employee = new Employee();
        employee.setId(fields[0]);
        employee.setFirstName(fields[1]);
        employee.setLastName(fields[2]);
        employee.setSalary(Double.parseDouble(fields[3]));
        return employee;
    }
    @Override
    public void open(ExecutionContext executionContext) {
        currentResourceIndex = -1;
    }
    @Override
    public void update(ExecutionContext executionContext) {
        // Not implemented for simplicity
    }
    @Override
    public void close() {
        if (lineReader != null) {
            lineReader.close();
        }
    }
}
4. Batch Configuration
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private DataSource dataSource;
    // Define input resources (multiple CSV files)
    @Value("classpath:input/employees*.csv")
    private Resource[] inputResources;
    // Define output file for intermediate processing
    @Value("file:output/processed_employees.csv")
    private Resource outputResource;
    /**
     * Step 1: Read from multiple CSV files, process, and write to intermediate file
     */
    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Employee, EmployeeProcessed>chunk(10)
                .reader(multiResourceEmployeeReader())
                .processor(employeeProcessor())
                .writer(employeeFileWriter())
                .build();
    }
    /**
     * Step 2: Read from intermediate file and write to database
     */
    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .<EmployeeProcessed, EmployeeDB>chunk(10)
                .reader(processedEmployeeReader())
                .processor(employeeDBProcessor())
                .writer(employeeDBWriter())
                .build();
    }
    /**
     * Main job that executes both steps
     */
    @Bean
    public Job processEmployeeJob(JobCompletionNotificationListener listener) {
        return jobBuilderFactory.get("processEmployeeJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(step1())
                .next(step2())
                .end()
                .build();
    }
    /**
     * MultiResourceItemReader that delegates to our custom reader
     */
    @Bean
    public MultiResourceItemReader<Employee> multiResourceEmployeeReader() {
        // Create a line reader that reads each line as a string
        FlatFileItemReader<String> lineReader = new FlatFileItemReaderBuilder<String>()
                .name("lineReader")
                .lineMapper((line, lineNumber) -> line) // Return the same line
                .build();
        // Create our custom reader that handles multiple files
        MultiFileEmployeeReader reader = new MultiFileEmployeeReader(lineReader);
        reader.setResources(inputResources);
        // Wrap with MultiResourceItemReader
        return new MultiResourceItemReaderBuilder<Employee>()
                .name("multiResourceEmployeeReader")
                .resources(inputResources)
                .delegate(reader)
                .build();
    }
    /**
     * Processor for step 1: Calculate annual salary and tax
     */
    @Bean
    public ItemProcessor<Employee, EmployeeProcessed> employeeProcessor() {
        return employee -> {
            EmployeeProcessed processed = new EmployeeProcessed();
            processed.setEmployeeId(employee.getId());
            processed.setFullName(employee.getFirstName() + " " + employee.getLastName());
            // Calculate annual salary (assuming monthly salary in input)
            double annualSalary = employee.getSalary() * 12;
            processed.setAnnualSalary(annualSalary);
            // Simple tax calculation (20% of annual salary)
            processed.setTaxAmount(annualSalary * 0.2);
            return processed;
        };
    }
    /**
     * File writer for step 1: Write processed data to CSV file
     */
    @Bean
    public FlatFileItemWriter<EmployeeProcessed> employeeFileWriter() {
        return new FlatFileItemWriterBuilder<EmployeeProcessed>()
                .name("employeeFileWriter")
                .resource(outputResource)
                .delimited()
                .names(new String[]{"employeeId", "fullName", "annualSalary", "taxAmount"})
                .build();
    }
    /**
     * Reader for step 2: Read from intermediate CSV file
     */
    @Bean
    public FlatFileItemReader<EmployeeProcessed> processedEmployeeReader() {
        return new FlatFileItemReaderBuilder<EmployeeProcessed>()
                .name("processedEmployeeReader")
                .resource(outputResource)
                .delimited()
                .names(new String[]{"employeeId", "fullName", "annualSalary", "taxAmount"})
                .targetType(EmployeeProcessed.class)
                .build();
    }
    /**
     * Processor for step 2: Convert to database entity
     */
    @Bean
    public ItemProcessor<EmployeeProcessed, EmployeeDB> employeeDBProcessor() {
        return processed -> {
            EmployeeDB employeeDB = new EmployeeDB();
            employeeDB.setEmployeeId(processed.getEmployeeId());
            employeeDB.setFullName(processed.getFullName());
            employeeDB.setAnnualSalary(processed.getAnnualSalary());
            employeeDB.setTaxAmount(processed.getTaxAmount());
            employeeDB.setProcessedDate(new Date());
            return employeeDB;
        };
    }
    /**
     * Database writer for step 2: Write to database using JdbcBatchItemWriter
     */
    @Bean
    public JdbcBatchItemWriter<EmployeeDB> employeeDBWriter() {
        return new JdbcBatchItemWriterBuilder<EmployeeDB>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO employees (employee_id, full_name, annual_salary, tax_amount, processed_date) " +
                     "VALUES (:employeeId, :fullName, :annualSalary, :taxAmount, :processedDate)")
                .dataSource(dataSource)
                .build();
    }
    /**
     * Initialize in-memory H2 database
     */
    @Bean
    public DataSourceInitializer dataSourceInitializer(DataSource dataSource) {
        DataSourceInitializer initializer = new DataSourceInitializer();
        initializer.setDataSource(dataSource);
        ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(new ClassPathResource("schema.sql"));
        initializer.setDatabasePopulator(populator);
        return initializer;
    }
}
5. Database Schema
-- schema.sql
CREATE TABLE employees (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    employee_id VARCHAR(50) NOT NULL,
    full_name VARCHAR(100) NOT NULL,
    annual_salary DOUBLE NOT NULL,
    tax_amount DOUBLE NOT NULL,
    processed_date DATE NOT NULL
);
6. Job Completion Listener
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
    private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
    @Override
    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("!!! JOB FINISHED! Time to verify the results");
        }
    }
}
Explanation of ResourceAwareItemReaderItemStream
The ResourceAwareItemReaderItemStream interface extends both ItemReader and ItemStream interfaces and adds the ability to handle resources. Here's how it enables reading from multiple files:
- Resource Awareness: The interface provides a - setResource(Resource resource)method that allows the reader to be aware of the resource it's currently processing.
- 
MultiResourceItemReader Integration: The MultiResourceItemReaderuses theResourceAwareItemReaderItemStreamimplementation by:- Iterating through all provided resources
- Setting each resource on the delegate reader using setResource()
- Delegating the reading to the custom reader for each file
 
- State Management: The implementation maintains state (current resource index, position in file) through the - ItemStreaminterface methods (- open(),- update(),- close()).
- Seamless Transition: When one resource is exhausted, the - MultiResourceItemReaderautomatically moves to the next resource and updates the delegate reader.
In our implementation:
- 
MultiFileEmployeeReaderimplements the resource handling logic
- 
MultiResourceItemReadermanages the resource iteration
- The custom reader processes each line and creates domain objects
Running the Application
- Place CSV files in src/main/resources/input/with names likeemployees1.csv,employees2.csv, etc.
- Run the Spring Boot application
- The batch job will:
- Read from all employee CSV files
- Process and write to an intermediate file
- Read from the intermediate file and write to the database
 
 

 
    
Top comments (0)