In this part of the series, we'll walk through how to set up and run a distributed Spring Batch job using the open-source database-backed coordination framework we introduced earlier.
✅ By the end of this guide, you’ll have a working multi-node Spring Batch cluster that can process large datasets in parallel — with no messaging queues or external coordination middleware required.
📦 Step 1: Add the Framework Dependency
First, include the published artifact from Maven Central.
<dependency>
  <groupId>io.github.jchejarla</groupId>
  <artifactId> spring-batch-db-cluster-core</artifactId>
  <version>2.0.0</version>
</dependency>
<!-- XML Writing Support -->
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-oxm</artifactId>
</dependency>
<dependency>
  <groupId>com.thoughtworks.xstream</groupId>
  <artifactId>xstream</artifactId>
  <version>1.4.21</version>
</dependency>
Make sure you're using Spring Batch 5+ and Spring Boot 3+.
🧠 Step 2: Configure Your Cluster
Each node in the cluster is a regular Spring Boot application that shares access to the same relational database (PostgreSQL, Oracle, or MySQL).
In your application.yml, configure node identity and coordination settings:
spring:
   batch:
      enabled: true
      node-id: worker-${random.uuid} 
      grid-size: 4 # This is 
      heartbeat-interval: 5000
      task-polling-interval: 2000
📝 Note: While using
worker-${random.uuid}helps during local testing, it's recommended to assign static node IDs (e.g.,worker-1,worker-2, etc.) in production environments. This improves observability, debugging, and partition tracking across runs.
Make sure all nodes point to the same database and include the required tables.
⚠️ Important: This framework extends Spring Batch — so make sure to initialize the core Spring Batch schema before applying the coordination schema.
You can find the official Spring Batch schema files here.
Then, execute the coordination framework's schema available here.
📝 Note: We currently support Oracle, MySQL, PostgreSQL, and H2 (for testing purposes).
If you need support for another database, please open an issue on the Feature Request page.
⚙️ Step-by-Step: CSV to XML Job
⚙️ Step 1: Sample CSV File
We’ll use a sample file with 10,000 customer records. You can download the CSV from the GitHub example repo:
customer_id,first_name,last_name,email,signup_date
C00001,Michael,Miller,michael.miller1@test.net,2023-01-03
C00002,David,Brown,david.brown2@test.net,2023-10-01
...
⚙️ Step 2: Define the Customer POJO
@Setter
@Getter
@ToString
public class Customer {
    private String customerId;
    private String firstName;
    private String lastName;
    private String email;
    private String signupDate;
}
⚙️ Step 3: ItemReader Setup
 @Bean("customerReader")
    @StepScope
    public FlatFileItemReader<Customer> customerReader(
            @Value("#{stepExecutionContext['startRow']}") long startIndex,
            @Value("#{stepExecutionContext['endRow']}") long endIndex,
            @Value("#{jobParameters['inputFile']}") String inputFile,
            LineMapper<Customer> lineMapper) {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<Customer>() {
            private int currentLine = 0;
            @Override
            public Customer read() throws Exception {
                Customer customer;
                while ((customer = super.read()) != null) {
                    currentLine++;
                    if (currentLine < startIndex) {
                        continue;
                    }
                    if (currentLine > endIndex) {
                        return null;
                    }
                    return customer;
                }
                return null;
            }
        };
        reader.setResource(new FileSystemResource(inputFile));
        reader.setLinesToSkip(1); // Skip header
        reader.setLineMapper(lineMapper);
        return reader;
    }
    @Bean
    public LineMapper<Customer> lineMapper() {
        DefaultLineMapper<Customer> mapper = new DefaultLineMapper<Customer>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("customer_id", "first_name", "last_name", "email", "signup_date");
        BeanWrapperFieldSetMapper<Customer> fieldSetMapper = new BeanWrapperFieldSetMapper<Customer>();
        fieldSetMapper.setTargetType(Customer.class);
        mapper.setLineTokenizer(tokenizer);
        mapper.setFieldSetMapper(fieldSetMapper);
        return mapper;
    }
⚙️ Step 4: Optional Processor
@Bean
public ItemProcessor<Customer, Customer> customerProcessor() {
    return item -> item; // No-op for now
}
⚙️ Step 5: XML Writer
@StepScope
@Bean
public StaxEventItemWriter<Customer> customerXmlWriter(
    @Value("#{jobParameters['outputDir']}") String outputDir,
    @Value("#{stepExecutionContext['partitionId']}") String partitionId
) {
    StaxEventItemWriter<Customer> writer = new StaxEventItemWriter<>();
    writer.setRootTagName("customers");
    writer.setResource(new FileSystemResource(outputDir + "/customers-part-" + partitionId + ".xml"));
    XStreamMarshaller marshaller = new XStreamMarshaller();
    marshaller.setAliases(Map.of("customer", Customer.class));
    writer.setMarshaller(marshaller);
    return writer;
}
⚙️ Step 6: Job Config with Partitioning Strategy
Use round-robin or fixed-node allocation. Each ExecutionContext contains:
  context.putInt("startRow", i * range + 1); // Skip header
  context.putInt("endRow", (i + 1) * range);
  context.putInt("partitionId", i);
Complete Job Config:
import io.github.jchejarla.springbatch.clustering.api.ClusterAwarePartitioner;
import io.github.jchejarla.springbatch.clustering.api.PartitionStrategy;
import io.github.jchejarla.springbatch.clustering.autoconfigure.conditions.ConditionalOnClusterEnabled;
import io.github.jchejarla.springbatch.clustering.partition.ClusterAwarePartitionHandler;
import io.github.jchejarla.springbatch.clustering.partition.PartitionTransferableProp;
import io.github.jchejarla.springbatch.clustering.partition.PartitioningMode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.StepExecutionAggregator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Configuration
@EnableBatchProcessing
@ConditionalOnClusterEnabled
public class ETLJobConfig {
    @Autowired
    private FlatFileItemReader<Customer> customerItemReader;
    @Autowired
    private CustomerProcessor customerProcessor;
    @Autowired
    private StaxEventItemWriter<Customer> customerXmlWriter;
    @Autowired
    @Qualifier("etlJobPartitioner")
    private Partitioner partitioner;
    @Autowired
    private ClusterAwarePartitionHandler partitionHandler;
    @Bean("etlClusteredJob")
    public Job clusteredJob(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager, @Qualifier("multiStepAggregator") StepExecutionAggregator clusterAwareAggregator) {
        return new JobBuilder("etl-clustered-job", jobRepository)
                .incrementer(new RunIdIncrementer())
                .preventRestart()
                .start(multiNodeExecutionStep(jobRepository, platformTransactionManager, clusterAwareAggregator))
                .build();
    }
    public Step multiNodeExecutionStep(JobRepository jobRepository, PlatformTransactionManager txnManager, StepExecutionAggregator clusterAwareAggregator) {
        return new StepBuilder("etlStep.manager", jobRepository)
                .partitioner("multiNodeExecStep", partitioner)
                .partitionHandler(partitionHandler)
                .step(etlReaderWriterStep(jobRepository, txnManager))
                .aggregator(clusterAwareAggregator)
                .build();
    }
    @Bean
    public Step etlReaderWriterStep(JobRepository jobRepository, PlatformTransactionManager txnManager) {
        return new StepBuilder("etlReaderWriterStep", jobRepository).<Customer, Customer>chunk(100, txnManager)
                .reader(customerItemReader)
                .processor(customerProcessor)
                .writer(customerXmlWriter)
                .build();
    }
    @Bean("etlJobPartitioner")
    @StepScope
    public Partitioner etlJobPartitioner(@Value("#{jobParameters['rows']}") Integer rows) {
        return new ClusterAwarePartitioner() {
            @Override
            public List<ExecutionContext> createDistributedPartitions(int availableNodeCount) {
                int range = rows / availableNodeCount;
                List<ExecutionContext> partitions = new ArrayList<>(availableNodeCount);
                for (int i = 0; i < availableNodeCount; i++) {
                    ExecutionContext context = new ExecutionContext();
                    context.putInt("startRow", i * range + 1); // Skip header
                    context.putInt("endRow", (i + 1) * range);
                    context.putInt("partitionId", i);
                    partitions.add(context);
                }
                return partitions;
            }
            @Override
            public PartitionTransferableProp arePartitionsTransferableWhenNodeFailed() {
                return PartitionTransferableProp.YES;
            }
            @Override
            public PartitionStrategy buildPartitionStrategy() {
                return PartitionStrategy.builder().partitioningMode(PartitioningMode.ROUND_ROBIN).build();
            }
        };
    }
}
⚙️ Step 7: Launching Job with parameters
JobParameters parameters = new JobParametersBuilder()
                .addString("RUN_TIME", LocalDateTime.now().toString(), true)
                .addLong("rows", rows)
                .addString("inputFile", ETL_JOB_INPUT_FILE)
                .addString("outputDir", ETL_JOB_OUTPUT_DIR)
                .toJobParameters();
        Job job = applicationContext.getBean("etlClusteredJob", Job.class);
try {
      JobExecution jobExecution = jobLauncher.run(job, parameters);
     } catch(Exception e) {
         throw new RuntimeException("Exception occurred when launching the Job", e);
     }
✅ What’s Next (Part 5 Preview)
In the next part, we’ll cover:
- 📈 Monitoring batch executions (e.g., via /actuator/batch-cluster)
- ⚠️ Failure handling and retries
- 📊 Best practices for large-scale ingestion
- 🚀 Future roadmap (e.g., metrics, better dashboards)
 

 
    
Top comments (0)