Project Overview
This project demonstrates:
- Reading from multiple CSV files using
ResourceAwareItemReaderItemStream
- Performing calculations and writing to an intermediate file
- Reading the intermediate file and writing to a database
Implementation
1. Maven Dependencies (pom.xml)
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2. Domain Model
// Employee.java - Input model
public class Employee {
private String id;
private String firstName;
private String lastName;
private double salary;
// Constructors, getters, setters
}
// EmployeeProcessed.java - Intermediate model
public class EmployeeProcessed {
private String employeeId;
private String fullName;
private double annualSalary;
private double taxAmount;
// Constructors, getters, setters
}
// EmployeeDB.java - Database model
@Entity
@Table(name = "employees")
public class EmployeeDB {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String employeeId;
private String fullName;
private double annualSalary;
private double taxAmount;
private Date processedDate;
// Constructors, getters, setters
}
3. Custom ResourceAwareItemReaderItemStream
/**
* Custom reader that processes each line from multiple files
* Implements ResourceAwareItemReaderItemStream to handle multiple resources
*/
public class MultiFileEmployeeReader implements ResourceAwareItemReaderItemStream<Employee> {
private FlatFileItemReader<String> lineReader;
private Resource[] resources;
private int currentResourceIndex = -1;
public MultiFileEmployeeReader(FlatFileItemReader<String> lineReader) {
this.lineReader = lineReader;
}
@Override
public void setResource(Resource resource) {
// This method is called for each resource by MultiResourceItemReader
// We don't implement it here as we handle resources differently
}
@Override
public void setResources(Resource[] resources) {
this.resources = resources;
}
@Override
public Employee read() throws Exception {
// Read line from current resource
String line = lineReader.read();
// If current resource is exhausted, move to next resource
while (line == null) {
currentResourceIndex++;
if (currentResourceIndex >= resources.length) {
return null; // All resources processed
}
// Set the new resource for the delegate reader
lineReader.setResource(resources[currentResourceIndex]);
lineReader.open(new ExecutionContext());
line = lineReader.read();
}
// Parse the line and create Employee object
return parseEmployee(line);
}
private Employee parseEmployee(String line) {
String[] fields = line.split(",");
Employee employee = new Employee();
employee.setId(fields[0]);
employee.setFirstName(fields[1]);
employee.setLastName(fields[2]);
employee.setSalary(Double.parseDouble(fields[3]));
return employee;
}
@Override
public void open(ExecutionContext executionContext) {
currentResourceIndex = -1;
}
@Override
public void update(ExecutionContext executionContext) {
// Not implemented for simplicity
}
@Override
public void close() {
if (lineReader != null) {
lineReader.close();
}
}
}
4. Batch Configuration
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
// Define input resources (multiple CSV files)
@Value("classpath:input/employees*.csv")
private Resource[] inputResources;
// Define output file for intermediate processing
@Value("file:output/processed_employees.csv")
private Resource outputResource;
/**
* Step 1: Read from multiple CSV files, process, and write to intermediate file
*/
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Employee, EmployeeProcessed>chunk(10)
.reader(multiResourceEmployeeReader())
.processor(employeeProcessor())
.writer(employeeFileWriter())
.build();
}
/**
* Step 2: Read from intermediate file and write to database
*/
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.<EmployeeProcessed, EmployeeDB>chunk(10)
.reader(processedEmployeeReader())
.processor(employeeDBProcessor())
.writer(employeeDBWriter())
.build();
}
/**
* Main job that executes both steps
*/
@Bean
public Job processEmployeeJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory.get("processEmployeeJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1())
.next(step2())
.end()
.build();
}
/**
* MultiResourceItemReader that delegates to our custom reader
*/
@Bean
public MultiResourceItemReader<Employee> multiResourceEmployeeReader() {
// Create a line reader that reads each line as a string
FlatFileItemReader<String> lineReader = new FlatFileItemReaderBuilder<String>()
.name("lineReader")
.lineMapper((line, lineNumber) -> line) // Return the same line
.build();
// Create our custom reader that handles multiple files
MultiFileEmployeeReader reader = new MultiFileEmployeeReader(lineReader);
reader.setResources(inputResources);
// Wrap with MultiResourceItemReader
return new MultiResourceItemReaderBuilder<Employee>()
.name("multiResourceEmployeeReader")
.resources(inputResources)
.delegate(reader)
.build();
}
/**
* Processor for step 1: Calculate annual salary and tax
*/
@Bean
public ItemProcessor<Employee, EmployeeProcessed> employeeProcessor() {
return employee -> {
EmployeeProcessed processed = new EmployeeProcessed();
processed.setEmployeeId(employee.getId());
processed.setFullName(employee.getFirstName() + " " + employee.getLastName());
// Calculate annual salary (assuming monthly salary in input)
double annualSalary = employee.getSalary() * 12;
processed.setAnnualSalary(annualSalary);
// Simple tax calculation (20% of annual salary)
processed.setTaxAmount(annualSalary * 0.2);
return processed;
};
}
/**
* File writer for step 1: Write processed data to CSV file
*/
@Bean
public FlatFileItemWriter<EmployeeProcessed> employeeFileWriter() {
return new FlatFileItemWriterBuilder<EmployeeProcessed>()
.name("employeeFileWriter")
.resource(outputResource)
.delimited()
.names(new String[]{"employeeId", "fullName", "annualSalary", "taxAmount"})
.build();
}
/**
* Reader for step 2: Read from intermediate CSV file
*/
@Bean
public FlatFileItemReader<EmployeeProcessed> processedEmployeeReader() {
return new FlatFileItemReaderBuilder<EmployeeProcessed>()
.name("processedEmployeeReader")
.resource(outputResource)
.delimited()
.names(new String[]{"employeeId", "fullName", "annualSalary", "taxAmount"})
.targetType(EmployeeProcessed.class)
.build();
}
/**
* Processor for step 2: Convert to database entity
*/
@Bean
public ItemProcessor<EmployeeProcessed, EmployeeDB> employeeDBProcessor() {
return processed -> {
EmployeeDB employeeDB = new EmployeeDB();
employeeDB.setEmployeeId(processed.getEmployeeId());
employeeDB.setFullName(processed.getFullName());
employeeDB.setAnnualSalary(processed.getAnnualSalary());
employeeDB.setTaxAmount(processed.getTaxAmount());
employeeDB.setProcessedDate(new Date());
return employeeDB;
};
}
/**
* Database writer for step 2: Write to database using JdbcBatchItemWriter
*/
@Bean
public JdbcBatchItemWriter<EmployeeDB> employeeDBWriter() {
return new JdbcBatchItemWriterBuilder<EmployeeDB>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO employees (employee_id, full_name, annual_salary, tax_amount, processed_date) " +
"VALUES (:employeeId, :fullName, :annualSalary, :taxAmount, :processedDate)")
.dataSource(dataSource)
.build();
}
/**
* Initialize in-memory H2 database
*/
@Bean
public DataSourceInitializer dataSourceInitializer(DataSource dataSource) {
DataSourceInitializer initializer = new DataSourceInitializer();
initializer.setDataSource(dataSource);
ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
populator.addScript(new ClassPathResource("schema.sql"));
initializer.setDatabasePopulator(populator);
return initializer;
}
}
5. Database Schema
-- schema.sql
CREATE TABLE employees (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
employee_id VARCHAR(50) NOT NULL,
full_name VARCHAR(100) NOT NULL,
annual_salary DOUBLE NOT NULL,
tax_amount DOUBLE NOT NULL,
processed_date DATE NOT NULL
);
6. Job Completion Listener
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Time to verify the results");
}
}
}
Explanation of ResourceAwareItemReaderItemStream
The ResourceAwareItemReaderItemStream
interface extends both ItemReader
and ItemStream
interfaces and adds the ability to handle resources. Here's how it enables reading from multiple files:
Resource Awareness: The interface provides a
setResource(Resource resource)
method that allows the reader to be aware of the resource it's currently processing.-
MultiResourceItemReader Integration: The
MultiResourceItemReader
uses theResourceAwareItemReaderItemStream
implementation by:- Iterating through all provided resources
- Setting each resource on the delegate reader using
setResource()
- Delegating the reading to the custom reader for each file
State Management: The implementation maintains state (current resource index, position in file) through the
ItemStream
interface methods (open()
,update()
,close()
).Seamless Transition: When one resource is exhausted, the
MultiResourceItemReader
automatically moves to the next resource and updates the delegate reader.
In our implementation:
-
MultiFileEmployeeReader
implements the resource handling logic -
MultiResourceItemReader
manages the resource iteration - The custom reader processes each line and creates domain objects
Running the Application
- Place CSV files in
src/main/resources/input/
with names likeemployees1.csv
,employees2.csv
, etc. - Run the Spring Boot application
- The batch job will:
- Read from all employee CSV files
- Process and write to an intermediate file
- Read from the intermediate file and write to the database
Top comments (0)