DEV Community

SUNIL KUMAR L
SUNIL KUMAR L

Posted on

Merge results of parallel service requests using CompletableFuture

Here is an approach of using java CompletableFuture to ensure multiple service requests are run/executed in parallel to collect the Service/API response data and aggregate them.

Github location

GitHub logo SUNIL-KUMAR-L / JavaCompletableFutureDemo

Merge results of parallel service requests using CompletableFuture

Step by Step Guide

  • Approach 1 : Plain old simple java way

let us introduce CompletableFuture with different approaches

  • Approach 2 : Parallel service calls and collect data using CompletableFuture::get()

  • Approach 3 : Parallel service calls and collect data using CompletableFuture::join()

  • Approach 4 : Parallel service calls and group futures using CompletableFuture::allOf() and then collect each data

  • Approach 5 (My Preference) : Parallel service calls and group futures using CompletableFuture::allOf() and then assemble using CompletableFuture::thenApply()

Approach 1 :

Goal is to get make multiple service calls in sequence (one after other) and then collect the response and aggregate to get final composite response.

pseudo code

  • Client inputs ID to get Person Info + Address Info (Composite PersonWithAddressInfo)

  • To achieve this -> make sequential service requests

  • trigger a Person service call which interacts with PersonRepository by ID to get Person Info

  • trigger a Address service call which interacts with AddressRepository by ID to get Address Info

  • finally, return PersonWithAddressInfo where Person Info + Address Info is aggregated.

code



public class Address {

    private String addressId;
    private String addressLine1;
    private String addressLine2;
    private String addressLine3;
    private String zipCode;
    private String city;
    private String state;
    private String country;

    public Address(
         String addressId, 
         String addressLine1, 
         String addressLine2, 
         String addressLine3, 
         String zipCode, 
         String city, 
         String state, 
         String country) {
        this.addressId = addressId;
        this.addressLine1 = addressLine1;
        this.addressLine2 = addressLine2;
        this.addressLine3 = addressLine3;
        this.zipCode = zipCode;
        this.city = city;
        this.state = state;
        this.country = country;
    }

// write toString, hashCode or use lombok annotation

}


Enter fullscreen mode Exit fullscreen mode


public class AddressRepository {

    public Address getAddressById(String id){
// hard coded ... change it to real data // fetch from DB or API 
        return new Address( "1","add1", "add2", "add3","55305",
                "Mpls", "MN","USA");
    }
}


Enter fullscreen mode Exit fullscreen mode


public class AddressService {

    private AddressRepository addressRepository;

    public AddressService(AddressRepository addressRepository) {
        this.addressRepository = addressRepository;
    }

    public Address getAddressById(String id) {
        return addressRepository.getAddressById(id);
    }
}


Enter fullscreen mode Exit fullscreen mode


public class Person {

    public Person(String id, String firstName, String age) {
        this.id = id;
        this.firstName = firstName;
        this.age = age;
    }

    private String id;
    private String firstName;
    private String age;
   // write toString, hashCode or use lombok annotation
}


Enter fullscreen mode Exit fullscreen mode


public class PersonRepository {

    public Person getPersonById(String id) {
// hard coded ... change it to real data // fetch from DB or API 
        return new Person("1", "hello", "22");
    }
}


Enter fullscreen mode Exit fullscreen mode


public class PersonService {

    private PersonRepository personRepository;

    public PersonService(PersonRepository personRepository) {
        this.personRepository = personRepository;
    }

    public Person getPersonById(String id){
        return personRepository.getPersonById(id);
    }
}


Enter fullscreen mode Exit fullscreen mode


public class PersonWithAddress {
    private Person person;
    private Address address;

    public PersonWithAddress(Person person, Address address) {
        this.person = person;
        this.address = address;
    }
// write toString, hashCode or use lombok annotation
}


Enter fullscreen mode Exit fullscreen mode


public class CFMain {
    static PersonRepository personRepository = 
new PersonRepository();
    static PersonService personService = 
new PersonService(personRepository);

    static AddressRepository addressRepository = 
new AddressRepository();
    static AddressService addressService = 
new AddressService(addressRepository);

    public static void sequencial(String personId) {
        final Person personById = 
personService.getPersonById(personId);
        final Address addressById = 
addressService.getAddressById(personId);
        final PersonWithAddress personWithAddress = 
new PersonWithAddress(personById, addressById);
        System.out.println(personWithAddress);
    }

    public static void main(String[] args) {
        sequencial("1");
    }
}


Enter fullscreen mode Exit fullscreen mode

Good that we were able to achieve that goal but can this be improved?
if yes how? Answer : execute each service call in parallel to improve response time (see Approach 2)

Approach 2 :

Goal is to execute multiple service calls in parallel using Completable future and collect data from each CompletableFuture using get()
get() throws InterruptedException, ExecutionException
(CheckedException)

pseudo code

  • Client inputs ID to get Person Info + Address Info (Composite PersonWithAddressInfo)

  • To achieve this -> make parallel service requests

  • trigger a Person service call which interacts with PersonRepository by ID to get Person Info using java CompletableFuture (say CompletableFuture 1)

  • trigger a Address service call which interacts with AddressRepository by ID to get Address Info using java CompletableFuture (say CompletableFuture 2)

  • now, block each CompletableFuture to get the data (one by one / sequential) use CompletableFuture::get method

  • finally, return PersonWithAddressInfo where Person Info + Address Info is aggregated.

code improvement



import java.util.concurrent.CompletableFuture;

public class CFMain {

    static PersonRepository personRepository = 
new PersonRepository();
    static PersonService personService = new PersonService(personRepository);

    static AddressRepository addressRepository = 
new AddressRepository();
    static AddressService addressService = new AddressService(addressRepository);

    public static void main(String[] args) {
        parallelServiceCallWithCFUsingGet("1");
    }


    public static void parallelServiceCallWithCFUsingGet(String personId) {
        final CompletableFuture<Person> personCompletableFuture = 
CompletableFuture.supplyAsync(() -> 
personService.getPersonById(personId));
        final CompletableFuture<Address> addressCompletableFuture = 
CompletableFuture.supplyAsync(() -> 
addressService.getAddressById(personId));
        try {
            final Person person = 
personCompletableFuture.get(); // until this is not complete below line is not executed
            final Address address = 
addressCompletableFuture.get(); // until this is not complete below line is not executed
            // below line will be executed based on the slowest service response
            System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service response time
        } catch (Exception exp) {
            System.err.println(exp);
        }
    }

}


Enter fullscreen mode Exit fullscreen mode

Good that we were able to achieve that goal but can this be improved?
if yes how? Answer : use CompletableFuture join() instead of get() (see Approach 3)

Approach 3 :

Goal is to execute multiple service calls in parallel using Completable future and collect data from each CompletableFuture using join()
join() throws UnCheckedException
(RuntimeException)

code improvement



import java.util.concurrent.CompletableFuture;

public class CFMain {

    static PersonRepository personRepository = 
new PersonRepository();
    static PersonService personService = 
new PersonService(personRepository);

    static AddressRepository addressRepository = 
new AddressRepository();
    static AddressService addressService = 
new AddressService(addressRepository);

    public static void main(String[] args) {
        parallelServiceCallWithCFUsingJoin("1");
    }

    public static void parallelServiceCallWithCFUsingJoin(String personId) {
        final CompletableFuture<Person> personCompletableFuture = 
CompletableFuture.supplyAsync(() -> 
personService.getPersonById(personId));
        final CompletableFuture<Address> addressCompletableFuture = 
CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId));
        final Person person = 
personCompletableFuture.join(); // until this is not complete below line is not executed
        final Address address = 
addressCompletableFuture.join(); // until this is not complete below line is not executed
        // below line will be executed based on the slowest service response
        System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service response time

    }
}


Enter fullscreen mode Exit fullscreen mode

Good that we were able to achieve that goal but can this be improved?
if yes how? Answer : use CompletableFuture allOf(CompletableFuture<?>... cfs) to add all futures (see Approach 4)

Approach 4:

Goal is to execute multiple service calls in parallel using Completable future and use allOf then collect data from each CompletableFuture using join() or get()

code improvement



import java.util.concurrent.CompletableFuture;

public class CFMain {

    static PersonRepository personRepository = 
new PersonRepository();
    static PersonService personService = 
new PersonService(personRepository);

    static AddressRepository addressRepository = 
new AddressRepository();
    static AddressService addressService = 
new AddressService(addressRepository);

    public static void main(String[] args) {
        parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndGet("1");
        parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndJoin("1");
    }

    public static void parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndGet(String personId) {
        final CompletableFuture<Person> personCompletableFuture =
 CompletableFuture.supplyAsync(() -> 
personService.getPersonById(personId));
        final CompletableFuture<Address> addressCompletableFuture = 
CompletableFuture.supplyAsync(() -> 
addressService.getAddressById(personId));

        final CompletableFuture<Void> completableFutureAllOf = 
CompletableFuture.allOf(personCompletableFuture, addressCompletableFuture);
        try {
            completableFutureAllOf.get(); //  time taken to return is based on the slowest service response/ slowest future response
            final Person person = 
personCompletableFuture.get();
            final Address address = 
addressCompletableFuture.get();
            System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service return time
        } catch (Exception exp) {
            System.err.println(exp);
        }
    }

public static void parallelServiceCallUseAllToMergeFuturesDataUseAllOfAndJoin(String personId) {
        final CompletableFuture<Person> personCompletableFuture = 
CompletableFuture.supplyAsync(() -> 
personService.getPersonById(personId));
        final CompletableFuture<Address> addressCompletableFuture = 
CompletableFuture.supplyAsync(() -> addressService.getAddressById(personId));
        final CompletableFuture<Void> completableFutureAllOf = 
CompletableFuture.allOf(
personCompletableFuture, 
addressCompletableFuture);
        completableFutureAllOf.join(); //  time taken to return is based on the slowest future call
        final Person person = 
personCompletableFuture.join();
        final Address address = 
addressCompletableFuture.join();
        System.out.println(new PersonWithAddress(person, address)); // time taken to return is based on the slowest service response time

    }
}



Enter fullscreen mode Exit fullscreen mode

Good that we were able to achieve that goal but can this be improved?
if yes how? Answer : Refactor Approach 4, assemble CompletableFuture::allOf() return type with thenApply() (see Approach 5)

Approach 5:

Goal is to execute multiple service calls in parallel using Completable future and use allOf then aggregate data using thenApply()

pseudo code

  • Client inputs ID to get Person Info + Address Info (Composite PersonWithAddressInfo)

  • To achieve this -> make parallel service requests

  • trigger a Person service call which interacts with PersonRepository by ID to get Person Info using java CompletableFuture (say CompletableFuture 1)

  • trigger a Address service call which interacts with AddressRepository by ID to get Address Info using java CompletableFuture (say CompletableFuture 2)

  • add these CompletableFutures to CompletableFuture::allOf

  • after the above step, compose/assemble above CompletableFuture using thenApply()

  • now, get PersonWithAddressInfo where Person Info + Address Info data is aggregated using CompletableFuture.join()

code improvement



import java.util.concurrent.CompletableFuture;

public class CFMain {

    static PersonRepository personRepository = 
new PersonRepository();
    static PersonService personService = 
new PersonService(personRepository);

    static AddressRepository addressRepository = 
new AddressRepository();
    static AddressService addressService = 
new AddressService(addressRepository);

    public static void main(String[] args) {
 parallelServiceCallUseAllToMergeFuturesDataUseAllAndThenApply("1");
    }

    public static void parallelServiceCallUseAllToMergeFuturesDataUseAllAndThenApply(String personId) {
        final CompletableFuture<Person> personCompletableFuture = 
CompletableFuture.supplyAsync(() -> 
personService.getPersonById(personId));
        final CompletableFuture<Address> addressCompletableFuture =
 CompletableFuture.supplyAsync(() -> 
addressService.getAddressById(personId));
        final CompletableFuture<Void> completableFutureAllOf = 
CompletableFuture.allOf(personCompletableFuture, addressCompletableFuture);
        final CompletableFuture<PersonWithAddress> personWithAddressCompletableFuture = 
completableFutureAllOf.thenApply(
                (voidInput) -> 
new PersonWithAddress(
personCompletableFuture.join(), 
addressCompletableFuture.join()));
        System.out.println(personWithAddressCompletableFuture.join()); // time taken to return is based on the slowest service response time

    }
}


Enter fullscreen mode Exit fullscreen mode

Top comments (0)