<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Diags</title>
    <description>The latest articles on DEV Community by Diags (@diags).</description>
    <link>https://dev.to/diags</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1005514%2F355b9721-2254-4b21-820e-77647c91964e.png</url>
      <title>DEV Community: Diags</title>
      <link>https://dev.to/diags</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/diags"/>
    <language>en</language>
    <item>
      <title>Building Reactive Applications with Hexagonal Architecture in Spring Boot</title>
      <dc:creator>Diags</dc:creator>
      <pubDate>Fri, 15 Nov 2024 14:37:14 +0000</pubDate>
      <link>https://dev.to/diags/building-reactive-applications-with-hexagonal-architecture-in-spring-boot-e9m</link>
      <guid>https://dev.to/diags/building-reactive-applications-with-hexagonal-architecture-in-spring-boot-e9m</guid>
      <description>&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6r6081rhftyq7jp5l1wk.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6r6081rhftyq7jp5l1wk.png" alt="Image description" width="800" height="464"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Hexagonal Architecture, also known as Ports and Adapters, is a design pattern that emphasizes separating the business logic from infrastructure and frameworks, making the application core independent of external dependencies. This approach allows you to plug and replace various parts of the system (e.g., databases, APIs, or UI frameworks) without changing the business logic.&lt;/p&gt;

&lt;p&gt;In this article, we will explore:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;What Hexagonal Architecture is.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The advantages of using Hexagonal Architecture.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;A step-by-step implementation in Reactive Spring Boot.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;What is Hexagonal Architecture?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Hexagonal Architecture aims to create loosely coupled components, where the core domain logic is surrounded by interfaces known as Ports and implemented by external components called Adapters.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Core Domain Logic: Represents the core business logic of your application.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Ports: Interfaces that expose core functionalities and facilitate interactions with external systems.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Primary Ports: Used by external systems to communicate with the application (e.g., REST APIs).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Secondary Ports: Used by the core logic to communicate with external systems (e.g., databases).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Adapters: Implementations that connect the ports to actual infrastructure or external systems.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Primary Adapters: Entry points for external clients.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Secondary Adapters: Implementations to connect the core to external resources like databases.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Advantages of Hexagonal Architecture&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Independence: Business logic is separated from technology concerns.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Adaptability: Easily replace any component (e.g., swap out databases or web frameworks).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Testability: The independent nature makes testing easier by isolating logic from infrastructure.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Scalability: Loosely coupled architecture allows the system to scale without major restructuring.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Implementation of Hexagonal Architecture in Reactive Spring Boot&lt;br&gt;
We’ll implement a sample Product Management service using Reactive Spring Boot with Hexagonal Architecture.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Domain Layer: Core Logic&lt;/strong&gt;
The domain layer consists of entities and business logic. It is the heart of the system and doesn’t depend on any external technology.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Product Entity&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;`
public class Product {
    private final String id;
    private final String name;
    private final double price;

    public Product(String id, String name, double price) {
        this.id = id;
        this.name = name;
        this.price = price;
    }

    // Getters and additional domain logic can be added here
}`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Product Repository Port (Interface)&lt;/p&gt;

&lt;p&gt;This is the secondary port — an interface that defines operations the core logic requires for persistence.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;`public interface ProductRepository {
    Mono&amp;lt;Product&amp;gt; findById(String id);
    Mono&amp;lt;Product&amp;gt; save(Product product);
}`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;2.** Primary Adapter: &lt;strong&gt;REST API&lt;/strong&gt;&lt;br&gt;
The primary adapter serves as the entry point to interact with the domain logic. We’ll use a REST controller in Spring Boot to act as the primary adapter.&lt;/p&gt;

&lt;p&gt;Product Controller&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;`
@RestController
@RequestMapping("/products")
public class ProductController {

    private final ProductService productService;

    public ProductController(ProductService productService) {
        this.productService = productService;
    }

    @GetMapping("/{id}")
    public Mono&amp;lt;ResponseEntity&amp;lt;Product&amp;gt;&amp;gt; getProductById(@PathVariable String id) {
        return productService.getProductById(id)
                .map(product -&amp;gt; ResponseEntity.ok(product))
                .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @PostMapping
    public Mono&amp;lt;ResponseEntity&amp;lt;Product&amp;gt;&amp;gt; createProduct(@RequestBody Product product) {
        return productService.createProduct(product)
                .map(savedProduct -&amp;gt; ResponseEntity.status(HttpStatus.CREATED).body(savedProduct));
    }
}`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here, the controller acts as a primary adapter by exposing the product services to the user.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Service Layer&lt;/strong&gt;
The service layer is responsible for interacting with the domain layer and serves as a bridge between adapters and the core logic.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Product Service&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;`
@Service
public class ProductService {

    private final ProductRepository productRepository;

    public ProductService(ProductRepository productRepository) {
        this.productRepository = productRepository;
    }

    public Mono&amp;lt;Product&amp;gt; getProductById(String id) {
        return productRepository.findById(id);
    }

    public Mono&amp;lt;Product&amp;gt; createProduct(Product product) {
        return productRepository.save(product);
    }
}`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;ProductService relies on the ProductRepository port, which ensures that the domain logic remains independent from database implementations.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Secondary Adapter: Persistence Layer&lt;/strong&gt;
The secondary adapter is responsible for the implementation of the persistence logic. We will use Reactive Spring Data for a non-blocking interaction with a MongoDB database.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Reactive Product Repository Adapter&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;`
@Repository
public class ProductReactiveRepositoryAdapter implements ProductRepository {

    private final ReactiveMongoRepository&amp;lt;ProductDocument, String&amp;gt; mongoRepository;

    public ProductReactiveRepositoryAdapter(ReactiveMongoRepository&amp;lt;ProductDocument, String&amp;gt; mongoRepository) {
        this.mongoRepository = mongoRepository;
    }

    @Override
    public Mono&amp;lt;Product&amp;gt; findById(String id) {
        return mongoRepository.findById(id)
                .map(document -&amp;gt; new Product(document.getId(), document.getName(), document.getPrice()));
    }

    @Override
    public Mono&amp;lt;Product&amp;gt; save(Product product) {
        ProductDocument document = new ProductDocument(product.getId(), product.getName(), product.getPrice());
        return mongoRepository.save(document)
                .map(savedDocument -&amp;gt; new Product(savedDocument.getId(), savedDocument.getName(), savedDocument.getPrice()));
    }
}`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The ProductReactiveRepositoryAdapter is a secondary adapter. It maps the core Product to a MongoDB-compatible document and uses the reactive persistence APIs.&lt;/p&gt;

&lt;p&gt;Product Document for MongoDB&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;`
@Document(collection = "products")
public class ProductDocument {
    @Id
    private String id;
    private String name;
    private double price;

    public ProductDocument(String id, String name, double price) {
        this.id = id;
        this.name = name;
        this.price = price;
    }

    // Getters and setters
}`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Reactive Programming and Its Role in Hexagonal Architecture&lt;br&gt;
Reactive Spring Boot uses non-blocking, asynchronous APIs, which make it suitable for applications that deal with many simultaneous requests or need to interact with external services without blocking.&lt;/p&gt;

&lt;p&gt;Mono and Flux are used to handle reactive streams, ensuring non-blocking behavior across different layers of the application.&lt;br&gt;
The ProductRepository port returns a Mono to represent asynchronous data retrieval.&lt;br&gt;
Testing Hexagonal Architecture&lt;br&gt;
Hexagonal Architecture makes testing easier by allowing you to test the core logic independently of external systems.&lt;/p&gt;

&lt;p&gt;Unit Test for Product Service&lt;/p&gt;

&lt;p&gt;We can mock the ProductRepository to test the ProductService.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;`
@ExtendWith(MockitoExtension.class)
public class ProductServiceTest {

    @Mock
    private ProductRepository productRepository;

    @InjectMocks
    private ProductService productService;

    @Test
    public void testGetProductById() {
        String productId = "123";
        Product product = new Product(productId, "Test Product", 100.0);

        when(productRepository.findById(productId)).thenReturn(Mono.just(product));

        Mono&amp;lt;Product&amp;gt; result = productService.getProductById(productId);

        StepVerifier.create(result)
                .expectNext(product)
                .verifyComplete();
    }
}`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;By mocking ProductRepository, the test focuses on the ProductService's behavior, allowing easy verification of core logic without involving actual database operations.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Conclusion&lt;/strong&gt;:&lt;/p&gt;

&lt;p&gt;Hexagonal Architecture allows building flexible and adaptable systems by decoupling business logic from external concerns. By using Reactive Spring Boot, we can leverage the non-blocking, asynchronous nature of reactive streams to create high-performance and scalable applications.&lt;/p&gt;

&lt;p&gt;Hexagonal Architecture in Reactive Spring Boot enables:&lt;/p&gt;

&lt;p&gt;Separation of Concerns: Core business logic remains isolated and unaffected by infrastructure changes.&lt;br&gt;
Independence: Switching between different databases or external systems becomes easier by changing the adapter implementation.&lt;br&gt;
Scalability and Performance: With a reactive approach, the system can handle more concurrent users and external connections effectively.&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Spring Batch Remote Chunking with Spring Integration And kafka Broker</title>
      <dc:creator>Diags</dc:creator>
      <pubDate>Tue, 10 Jan 2023 17:40:02 +0000</pubDate>
      <link>https://dev.to/diags/spring-batch-remote-chunk-with-spring-integration-with-kafka-broker-2njj</link>
      <guid>https://dev.to/diags/spring-batch-remote-chunk-with-spring-integration-with-kafka-broker-2njj</guid>
      <description>&lt;blockquote&gt;
&lt;p&gt;Knowledge comes through experience, everything else is just information.&lt;br&gt;
Albert Einstein&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  &lt;u&gt;&lt;strong&gt;Introduction&lt;/strong&gt;&lt;/u&gt;
&lt;/h2&gt;

&lt;p&gt;In this article, I offer you feedback on large volume processing in banking information systems. I will not describe here the definition of certain notions. I assume that these are already acquired. ☺️&lt;br&gt;
  To know :&lt;br&gt;
● Spring batch (itemReader, itemProcessor, itemWriter)&lt;br&gt;
● Apache Kafka&lt;br&gt;
● Mysql database&lt;br&gt;
● Lombok&lt;br&gt;
● Spring Integration.&lt;br&gt;
● Scheduler&lt;br&gt;
If these notions are not yet acquired, it will be imperative to read the official documentation: See the references section&lt;/p&gt;
&lt;h2&gt;
  
  
  &lt;u&gt;&lt;strong&gt;Problem&lt;/strong&gt;&lt;/u&gt;
&lt;/h2&gt;

&lt;p&gt;The management of a mass of deferred data streams is generally managed by a batch system. A batch is a planned collection of data in an information system. Two questions seem to us to be very important in the collection of planned data.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;How to improve performance?&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;How to handle error handling (error recovery)?&lt;br&gt;
Throughout this article, I will answer these two questions.&lt;br&gt;
To better understand the problem, we will use the following&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;scenario:&lt;/strong&gt;&lt;br&gt;
➢ A French bank must digitize a set of documents every day (transaction slips, contracts for opening/closing accounts with legal persons or not). The GED implementation of this data must be managed by a system of fairly specific business rules while respecting the RGPD. The bank would like the GED to be placed at a later time between 10 p.m. and 11:59 p.m. To help us we are going to draw up an architecture to collect this information and put it in GED.&lt;br&gt;
Architecture&lt;br&gt;
To collect the bank's flows, we will use Spring batch in 'remote' mode, coupled with an apache kafka broker, mysql to illustrate our database and Spring integration 🤔.&lt;br&gt;
First of all, you should know that a batch consists of three major parts:&lt;br&gt;
● Reading on external files, or via a database, or via an external API&lt;br&gt;
● A management part of the data read (specific processing linked to business rules, example: the cash code of a bank must be on 5 alphanumeric characters)&lt;br&gt;
● Writing on different supports (in database, in file (xml, csv, text…), apis).&lt;br&gt;
The following architecture will be very useful for the rest of the article:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fd7tbygy64nddcfhtf3bt.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fd7tbygy64nddcfhtf3bt.png" alt="Image description"&gt;&lt;/a&gt;&lt;br&gt;
Figure 1: General Architecture &lt;/p&gt;

&lt;p&gt;We have a KAFKA broker which allows communication between the master and the different partitions.&lt;br&gt;
The following figure (Figure 2) from the general documentation shows us this in depth.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwkl7t5c5fs01sy2k75jf.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwkl7t5c5fs01sy2k75jf.png" alt="Image description"&gt;&lt;/a&gt;&lt;br&gt;
Figure 2 : Architecture générale.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Script&lt;/strong&gt;:&lt;br&gt;
● The master will read the (external) data streams. Once the reading is done,&lt;br&gt;
● The master subscribes to an apache kafka broker to send read data, it will dispatch the flow to different processes called SLAVES.&lt;br&gt;
● Each slave guarantees the data read and its writing.&lt;br&gt;
● The master can launch several slaves in parallel.&lt;br&gt;
● At the end of slave processing&lt;br&gt;
● The master aggregates all of the writes from the slaves to make a single output.&lt;br&gt;
● In the event of a slave error.&lt;br&gt;
● The error is sent to the master which has the possibility of replaying the same stream by launching a new slave.&lt;br&gt;
&lt;strong&gt;Advantage&lt;/strong&gt;&lt;br&gt;
The advantage of this architecture is that in terms of performance the master can scale horizontally. That is to say increased the scalability of slaves. In addition, when a slave is in error, the data is not lost. It is simply sent back to the broker. A new slave can take over the stream and process it automatically. This automation of error recovery avoids physical intervention.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Inconvenience&lt;/strong&gt;&lt;br&gt;
The only problem in this system is that you have to invest in tools:&lt;br&gt;
● Have a server for the broker&lt;br&gt;
● Have competent resources on technologies and tools.&lt;/p&gt;
&lt;h2&gt;
  
  
  &lt;u&gt;&lt;strong&gt;SIDE PROGRAM&lt;/strong&gt;&lt;/u&gt;
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;
Technical prerequisites:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;Java 17&lt;/li&gt;
&lt;li&gt;Apache kafka &lt;/li&gt;
&lt;li&gt;Spring Integration&lt;/li&gt;
&lt;li&gt;Spring Batch 5.0&lt;/li&gt;
&lt;li&gt;Spring Boot 3&lt;/li&gt;
&lt;li&gt;Mysql&lt;/li&gt;
&lt;li&gt;Docker&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;&lt;u&gt;PROJECT STRUCTURE&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frg41np5vvsls1n5mkamz.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frg41np5vvsls1n5mkamz.png" alt="Image description"&gt;&lt;/a&gt;&lt;br&gt;
Figure 3 : Projet Structure  &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;DATA&lt;/strong&gt;&lt;br&gt;
For the article we will use this csv 🙂 which is on the root of the project:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwf8dyvs0fft952cxtwlj.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwf8dyvs0fft952cxtwlj.png" alt="Image description"&gt;&lt;/a&gt;&lt;br&gt;
Figure 4: data.csv&lt;/p&gt;

&lt;p&gt;We are using a Maven project here are the dependencies for the article 👍&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;?xml version="1.0" encoding="UTF-8"?&amp;gt;

   &amp;lt;version&amp;gt;0.0.1-SNAPSHOT&amp;lt;/version&amp;gt;
   &amp;lt;name&amp;gt;batch&amp;lt;/name&amp;gt;
   &amp;lt;description&amp;gt;Demo project for Spring Boot&amp;lt;/description&amp;gt;
   &amp;lt;properties&amp;gt;
       &amp;lt;java.version&amp;gt;17&amp;lt;/java.version&amp;gt;
   &amp;lt;/properties&amp;gt;
   &amp;lt;dependencies&amp;gt;
       &amp;lt;dependency&amp;gt;
           &amp;lt;groupId&amp;gt;org.springframework.boot&amp;lt;/groupId&amp;gt;
           &amp;lt;artifactId&amp;gt;spring-boot-starter-batch&amp;lt;/artifactId&amp;gt;
       &amp;lt;/dependency&amp;gt;
       &amp;lt;dependency&amp;gt;
           &amp;lt;groupId&amp;gt;com.mysql&amp;lt;/groupId&amp;gt;
           &amp;lt;artifactId&amp;gt;mysql-connector-j&amp;lt;/artifactId&amp;gt;
           &amp;lt;scope&amp;gt;runtime&amp;lt;/scope&amp;gt;
       &amp;lt;/dependency&amp;gt;
       &amp;lt;dependency&amp;gt;
           &amp;lt;groupId&amp;gt;org.springframework.boot&amp;lt;/groupId&amp;gt;
           &amp;lt;artifactId&amp;gt;spring-boot-starter-data-jpa&amp;lt;/artifactId&amp;gt;
       &amp;lt;/dependency&amp;gt;
       &amp;lt;dependency&amp;gt;
           &amp;lt;groupId&amp;gt;org.springframework.boot&amp;lt;/groupId&amp;gt;
           &amp;lt;artifactId&amp;gt;spring-boot-starter-integration&amp;lt;/artifactId&amp;gt;
       &amp;lt;/dependency&amp;gt;
       &amp;lt;dependency&amp;gt;
           &amp;lt;groupId&amp;gt;org.springframework.boot&amp;lt;/groupId&amp;gt;
           &amp;lt;artifactId&amp;gt;spring-boot-starter-jdbc&amp;lt;/artifactId&amp;gt;
       &amp;lt;/dependency&amp;gt;
       &amp;lt;dependency&amp;gt;
           &amp;lt;groupId&amp;gt;org.springframework.integration&amp;lt;/groupId&amp;gt;
           &amp;lt;artifactId&amp;gt;spring-integration-jpa&amp;lt;/artifactId&amp;gt;
       &amp;lt;/dependency&amp;gt;
       &amp;lt;dependency&amp;gt;
           &amp;lt;groupId&amp;gt;org.springframework.integration&amp;lt;/groupId&amp;gt;
           &amp;lt;artifactId&amp;gt;spring-integration-kafka&amp;lt;/artifactId&amp;gt;
       &amp;lt;/dependency&amp;gt;
       &amp;lt;dependency&amp;gt;
           &amp;lt;groupId&amp;gt;org.springframework.kafka&amp;lt;/groupId&amp;gt;
           &amp;lt;artifactId&amp;gt;spring-kafka&amp;lt;/artifactId&amp;gt;
       &amp;lt;/dependency&amp;gt;
       &amp;lt;dependency&amp;gt;
           &amp;lt;groupId&amp;gt;org.projectlombok&amp;lt;/groupId&amp;gt;
           &amp;lt;artifactId&amp;gt;lombok&amp;lt;/artifactId&amp;gt;
           &amp;lt;optional&amp;gt;true&amp;lt;/optional&amp;gt;
       &amp;lt;/dependency&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We will go over the code part:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;u&gt;MASTER&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;We will start with the master and its configuration. We will need some dependencies and annotations for its configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Profile("master")
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@PropertySource("classpath:application-master.properties")
@Component
@Data
//@Import({DatabaseConfig.class})
public class MasterJob {
   @Autowired
   private RemoteChunkingManagerStepBuilderFactory remoteChunkingManagerStepBuilderFactory;
   @Autowired
   private KafkaTemplate&amp;lt;String, TransactionDto&amp;gt; masterKafkaTemplate;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Figure 4 : Configuration du master&lt;/p&gt;

&lt;p&gt;“&lt;a class="mentioned-user" href="https://dev.to/profile"&gt;@profile&lt;/a&gt;("master")” is required to run the master. It is injected into the spring component with other annotations essential for the launch of the master such as:&lt;br&gt;
  @Configuration: spring batch configuration&lt;br&gt;
@EnableBatchProcessing: this allows us to manage the batch execution set&lt;br&gt;
@EnableBatchIntegration: enable spring integration.&lt;br&gt;
@PropertySource("classpath:application-master.properties"):&lt;br&gt;
Inject the master configuration file into the batch:&lt;br&gt;
KafkaTemplate is injected into the spring component to allow communication between the master and the partitions.&lt;br&gt;
Now we will see how the job is configured.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Bean(name = "jobMaster")
public Job masterJob(JobRepository jobRepository) {
   return new JobBuilder("jobMaster", jobRepository)
           .incrementer(new RunIdIncrementer())
          // .listener(jobListener)
           .start(masterStep())
           .build();
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;For the job to run, we inject it with a masterStep():&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/*
* Configure master step components
*/
@Bean(name = "masterStep")
public TaskletStep masterStep() {
   return this.remoteChunkingManagerStepBuilderFactory.get("masterStep")
           .&amp;lt;TransactionDto, TransactionDto&amp;gt;chunk(10)
           .reader(masterReader())
           .outputChannel(outboundChannel()) //produces the chunkRequest&amp;lt;Transaction&amp;gt; to kafka wokers
           .inputChannel(inboundChannel()) //consumes the chunkResponse&amp;lt;Transaction&amp;gt; from kafka workers
           .allowStartIfComplete(Boolean.TRUE)
           .allowStartIfComplete(true)
           .build();
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This masterStep will chunk every 10 items read. We read through a csv file which is on the root of the project.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Bean
public FlatFileItemReader&amp;lt;TransactionDto&amp;gt; masterReader() {
   return new FlatFileItemReaderBuilder&amp;lt;TransactionDto&amp;gt;()
           .resource(new ClassPathResource("/data.csv"))
           .name("Transaction")
           .delimited()
           .delimiter(",")
           .names("id", "name", "transactionDate")
           .linesToSkip(1)  //skipping the header of the file
           .fieldSetMapper(fieldSet -&amp;gt; {
               TransactionDto data = new TransactionDto();
               data.setId(fieldSet.readInt("id"));
               data.setName(fieldSet.readString("name"));
               data.setTransactionDate(fieldSet.readString("transactionDate"));
               return data;
           })
           .targetType(TransactionDto.class)
           .build();
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then every 10 elements read from the csv file are sent to the partitions channel.&lt;br&gt;
● outputChannel(outboundChannel())&lt;br&gt;
● outboundFlow&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel outboundChannel() {
   return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow() {
   var producerMessageHandler = new KafkaProducerMessageHandler&amp;lt;String, TransactionDto&amp;gt;(this.masterKafkaTemplate);
   producerMessageHandler.setTopicExpression(new LiteralExpression("requestsForWokers"));
   return IntegrationFlow.from(outboundChannel())
           .log(LoggingHandler.Level.WARN)
           .handle(producerMessageHandler)
           .get();
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Spring integration allows you to configure exchanges through a Kafka topic.&lt;br&gt;
Then when the wokers have finished their treatment. The master receives the response through another channel from the Kafka broker.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel inboundChannel() {
   return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ConsumerFactory&amp;lt;String, TransactionDto&amp;gt; consumerFactory) {
   return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory, "repliesFromWokers"))
           .log(LoggingHandler.Level.WARN)
           .channel(inboundChannel())
           .get();
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Spring integration allows to receive exchanges with wokers.&lt;br&gt;
It should be noted that we can add a listener to the job in order to act on all the states of the job. According to our scenario cited above we can query a database. To retrieve the last execution date of the batch and all the data whose status is “R” as recovery or “NA” not archived.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Override
public void beforeJob(JobExecution jobExecution) {
   // check if we can go to database for somme process.
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can also add to the job a tasklet to summarize at the end of the job. This tasklet will describe all the processing. Or other apis can feed on it.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Override
public void afterJob(JobExecution jobExecution) {
   // we can create en resume file
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;&lt;u&gt;WOKER&lt;/u&gt;&lt;/strong&gt;&lt;br&gt;
For the woker we will have practically the same configuration.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Profile("woker")
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@PropertySource("classpath:application-worker.properties")
//@Import({DatabaseConfig.class})
@Slf4j
@Component
public class WokerJob {

   @Autowired
   private RemoteChunkingWorkerBuilder&amp;lt;TransactionDto, TransactionDto&amp;gt; remoteChunkingWorkerBuilder;

   @Autowired
   private KafkaTemplate&amp;lt;String, TransactionDto&amp;gt; transactionKafkaTemplate;

   @Autowired
   private DataSource dataSource;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We also use a database so that each woker can write to it. Here is his step.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Bean
public IntegrationFlow workerStep() {
   return this.remoteChunkingWorkerBuilder
           .inputChannel(inboundChannel()) //consumes the chunkRequest&amp;lt;Transaction&amp;gt; from kafka wokers
           .outputChannel(outboundChannel()) //produces the chunkResponse&amp;lt;Transaction&amp;gt; to kafka master
           .itemProcessor(itemProcessor())
           .itemWriter(itemWriter())
           .build();
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;First we listen to the flow coming from the master.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel inboundChannel() {
   return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ConsumerFactory&amp;lt;String, TransactionDto&amp;gt; consumerFactory) {
   return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory, "requestsForWokers"))
           .log(LoggingHandler.Level.WARN)
           .channel(inboundChannel())
           .get();
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, we process this flow via the itemProcessor then the itemWriter.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
/*
* Configure worker components
*/
@Bean
public ItemProcessor&amp;lt;TransactionDto, TransactionDto&amp;gt; itemProcessor() {
   return item -&amp;gt; {
       System.out.println("processing item " + item);
       return item;
   };
}

@Bean
public ItemWriter&amp;lt;TransactionDto&amp;gt; itemWriter() {
   return new JdbcBatchItemWriterBuilder&amp;lt;TransactionDto&amp;gt;()
           .beanMapped()
           .dataSource(dataSource)
           .sql("INSERT INTO TRANSACTDTO (id,name,transactionDate) VALUES (:id, :name, :transactionDate)")
           .build();
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;At the end we send to the master information so that the master makes a committee of the chunk for each 10 elements.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt; /*
    * Configure outbound flow (replies going to the manager)
    */
   @Bean
   public DirectChannel outboundChannel() {
       return new DirectChannel();
   }

   @Bean
   public IntegrationFlow outboundFlow() {
       var producerMessageHandler = new KafkaProducerMessageHandler&amp;lt;String, TransactionDto&amp;gt;(transactionKafkaTemplate);
       producerMessageHandler.setTopicExpression(new LiteralExpression("repliesFromWokers"));
       return IntegrationFlow.from(outboundChannel())
               .log(LoggingHandler.Level.WARN)
               .handle(producerMessageHandler)
               .get();
   }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;**&lt;/p&gt;

&lt;h2&gt;
  
  
  ETAPE CONFIGURATIONS PROJET
&lt;/h2&gt;

&lt;p&gt;**&lt;br&gt;
application-master.properties&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;server.port=8087
###
#spring.kafka.producer.group-id=producer-master-g
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=fr.hn.services.batch.utils.ChunkRequestSerializer
spring.kafka.producer.properties.spring.json.trusted.packages=*
#####
spring.kafka.consumer.group-id=consumer-master-g
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=fr.hn.services.batch.utils.ChunkResponseDeserializer
#####
broker.url=tcp://localhost:8087
application-mastworker.properties:
server.port=8088
spring.kafka.consumer.group-id=consumer-woker-g
#spring.kafka.producer.group-id=producer-woker-g
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=fr.hn.services.batch.utils.ChunkResponseSerializer
#########
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=fr.hn.services.batch.utils.ChunkRequestDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
######

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;docker-compose.yml&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;version: "3"
services:
 zookeeper:
   image: 'bitnami/zookeeper:latest'
   ports:
     - '2181:2181'
   environment:
     - ALLOW_ANONYMOUS_LOGIN=yes
 kafka:
   image: 'bitnami/kafka:latest'
   ports:
     - '9092:9092'
   environment:
     - KAFKA_BROKER_ID=1
     - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
     - ALLOW_PLAINTEXT_LISTENER=yes
     - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
     - KAFKA_CFG_LISTENERS=CLIENT://:9093,EXTERNAL://:9092
     - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9093,EXTERNAL://localhost:9092
     - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
   depends_on:
     - zookeeper
 kafka-ui:
   image: provectuslabs/kafka-ui:latest
   ports:
     - '8080:8080'
   environment:
     - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9093
   depends_on:
     - kafka
 mysql:
   image: mysql
   ports:
     - '3306:3306'
   command: --default-authentication-plugin=mysql_native_password
   restart: always
   environment:
     MYSQL_DATABASE: 'db'
     # So you don't have to use root, but you can if you like
     MYSQL_USER: 'user'
     # You can use whatever password you like
     MYSQL_PASSWORD: 'password'
     # Password for root access
     MYSQL_ROOT_PASSWORD: 'password'
   volumes:
     - "./scripts/schema.sql:/docker-entrypoint-initdb.d/schema.sql"

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Our POJO works the following:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Component
@Getter
@Setter
public class TransactionDto implements Serializable {
   private int id;
   private String name;
   private String transactionDate;
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Our utility classes:&lt;br&gt;
● To deserialize the objects transferred into the broker from&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Component
public class ChunkRequestDeserializer implements Deserializer&amp;lt;ChunkRequest&amp;lt;TransactionDto&amp;gt;&amp;gt; {
   @Override
   public ChunkRequest deserialize(String s, byte[] bytes) {
       try {
           return (ChunkRequest) new DefaultDeserializer().deserialize(new ByteArrayInputStream(bytes));
       } catch (IOException e) {
           e.printStackTrace();
       }
       return null;
   }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;● To serialize the objects sent from the woker.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Component
public class ChunkRequestSerializer implements Serializer&amp;lt;ChunkRequest&amp;lt;TransactionDto&amp;gt;&amp;gt; {
   @Override
   public byte[] serialize(String s, ChunkRequest&amp;lt;TransactionDto&amp;gt; chunkRequest) {
       if (chunkRequest == null) {
           return new byte[0];
       }
       return SerializationUtils.serialize(chunkRequest);
   }
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;● To serialize the received stream.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Component
public class ChunkResponseSerializer implements Serializer&amp;lt;ChunkResponse&amp;gt; {
   @Override
   public byte[] serialize(String s, ChunkResponse chunkResponse) {
       return SerializationUtils.serialize(chunkResponse);
   }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;● To deserialize responses.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Component
public class ChunkResponseDeserializer implements Deserializer&amp;lt;ChunkResponse&amp;gt; {
   @Override
   public ChunkResponse deserialize(String s, byte[] bytes) {
       try {
           if (bytes == null) {
               return null;
           }
           return (ChunkResponse) new DefaultDeserializer().deserialize(new ByteArrayInputStream(bytes));
       } catch (IOException e) {
           e.printStackTrace();
       }
       return null;
   }
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;u&gt;&lt;strong&gt;BATCH EXECUTION&lt;/strong&gt;&lt;/u&gt;&lt;br&gt;
To run the batch we use a Scheduler launcher to launch the batch every 1 second.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Profile("master")
@Component
@Slf4j
@EnableScheduling
public class BatchLauncher {
   @Autowired
   private Job job;
   @Autowired
   private JobLauncher jobLauncher;

   @Scheduled(cron = "* * * * * ?")
   public void perform() throws Exception {
       JobParameters jobParameters = new JobParametersBuilder()
               .addString("joId", String.valueOf(System.currentTimeMillis()))
               .toJobParameters();
       log.info("", job.getName());
       jobLauncher.run(job, jobParameters);
   }
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  &lt;strong&gt;&lt;u&gt;RÉSULTATS&lt;/u&gt;&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F41cykg4keueap2kg8gp2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F41cykg4keueap2kg8gp2.png" alt="Image description"&gt;&lt;/a&gt;&lt;br&gt;
Figure 5 : Docker Images running&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flz1gormfxnukouc1ko5o.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flz1gormfxnukouc1ko5o.png" alt="Image description"&gt;&lt;/a&gt;&lt;br&gt;
Figure 6 : Exécution master et Woker.&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4g92j68i24jtxdb7x6ju.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4g92j68i24jtxdb7x6ju.png" alt="Image description"&gt;&lt;/a&gt;&lt;br&gt;
Figure 7 : Tables in db.&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fg7d5dvpb45omghdc6qcl.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fg7d5dvpb45omghdc6qcl.png" alt="Image description"&gt;&lt;/a&gt;&lt;br&gt;
Figure 8 : Table TRANSACTDTO.&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fx7bfzygbllfh31equl1e.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fx7bfzygbllfh31equl1e.png" alt="Image description"&gt;&lt;/a&gt;&lt;br&gt;
Figure 9 : Broker kafka with partitions.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzyuckpngae70xabl85wf.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzyuckpngae70xabl85wf.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Figure 10 : Messages in broker.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6rw67y8su2tb0fvldii4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6rw67y8su2tb0fvldii4.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Figure 11 : Example of message in the broker.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;&lt;u&gt;CONCLUSION&lt;/u&gt;&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;In summary, this article shows how you can gain performance while managing error recovery. With this ecosystem, there is spring batch, spring integration and kafka listener. The major interest is that we can always increase the capacity of the wokers for more sizing. We can also partition the broker as many times as we want. In addition, each woker has its own JVM. The choice of broker is important. Because Kafka allows flow recovery and data tolerance compared to other brokers. The only disadvantage is that you have to invest in tools and human resources.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;REFERENCES&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://docs.spring.io/spring-batch/docs/current/reference/html/" rel="noopener noreferrer"&gt;https://docs.spring.io/spring-batch/docs/current/reference/html/&lt;/a&gt;&lt;br&gt;
&lt;a href="https://www.h2database.com/html/main.html" rel="noopener noreferrer"&gt;https://www.h2database.com/html/main.html&lt;/a&gt;&lt;br&gt;
&lt;a href="https://docs.spring.io/spring-batch/docs/current/reference/html/" rel="noopener noreferrer"&gt;https://docs.spring.io/spring-batch/docs/current/reference/html/&lt;/a&gt;&lt;br&gt;
&lt;a href="https://docs.spring.io/spring-batch/docs/current/reference/html/job.html#javaConfig" rel="noopener noreferrer"&gt;https://docs.spring.io/spring-batch/docs/current/reference/html/job.html#javaConfig&lt;/a&gt;&lt;br&gt;
&lt;a href="https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#spring-integration" rel="noopener noreferrer"&gt;https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#spring-integration&lt;/a&gt;&lt;br&gt;
&lt;a href="https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html#remote-chunking" rel="noopener noreferrer"&gt;https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html#remote-chunking&lt;/a&gt;&lt;br&gt;
&lt;a href="https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/integration/chunk/RemoteChunkingManagerStepBuilderFactory.html" rel="noopener noreferrer"&gt;https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/integration/chunk/RemoteChunkingManagerStepBuilderFactory.html&lt;/a&gt;&lt;br&gt;
&lt;a href="https://github.com/Diags/HNBATCH" rel="noopener noreferrer"&gt;GITHUB&lt;/a&gt;&lt;/p&gt;

</description>
    </item>
  </channel>
</rss>
