DEV Community

Cover image for Bulkhead pattern: semaphore vs threadPool
Gabriel Aramburu
Gabriel Aramburu

Posted on

Bulkhead pattern: semaphore vs threadPool

Why two different implementations?

In this article I will refer to Spring and Resilence4j.

If you are reading this article you probably already know about the bulkhead pattern, what is the problem that it intends to solve and the most common implementations: semaphore based and threadPool based.

At least for me, it was not easy to realize when to use a semaphore implementation and when to use a threadPool one.

I know how a Semaphore works, and I also understand the ThreadPool pattern, therefore a short answer and the most obvious could have been: use a threadPool for limiting the number of asynchronous calls and use a semaphore for limiting synchronous ones.

So why was the difficult part? The reason was these questions:

  • Why not just combine @Async with a bulkhead semaphore based implementation?

  • Can both annotations be used together? If yes, why is the reason for a threadPool implementation?

@Async and @Bulkhead combined.

@Bulkhead(name = "Service3", fallbackMethod = "futureFallback")
    @Async
    public CompletableFuture<String> doSomeWork() {
        System.out.println("Excecuting service 3 - " + Thread.currentThread().getName());   
        Util.mockExternalServiceHttpCall(DELAY);
        return CompletableFuture.completedFuture("ok");
    }

Enter fullscreen mode Exit fullscreen mode

Complete Code.

Yes, you can use both annotations together. They allow you to generate a limited number of async calls based on your bulkhead semaphore configuration.

However, there are some implications that are useful to know.

SimpleAsyncTaskExecutor.

When you annotate a method with the @Async annotation, Spring can use different implementations of the TaskExecutor interface.
By default, the framework uses the SimpleAsyncTaslExecutor.

This implementation will create a new thread each time the annotated method is invoked; this thread will not be reused.

The problem with this approach is that you will create a new thread even if the semaphore counter is 0 (bulkhead is full)

plantUml

As you can see in the following stacktrace, the framework first creates a thread and then calls the bulkhead pattern which determines if there are available permissions to continue the execution. If the semaphore counter is cero, the bulkhead will reject the method execution.

Thread [_simpleAsyncTask1] (Suspended (breakpoint at line 18 in Service3))  
    Service3.doSomeWork() line: 18  
    Service3$$FastClassBySpringCGLIB$$6085f5a4.invoke(int, Object, Object[]) line: not available    
    MethodProxy.invoke(Object, Object[]) line: 218  
    CglibAopProxy$CglibMethodInvocation.invokeJoinpoint() line: 793 
    CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 163 
    CglibAopProxy$CglibMethodInvocation.proceed() line: 763 
    MethodInvocationProceedingJoinPoint.proceed() line: 89  
    BulkheadAspect.lambda$handleJoinPointCompletableFuture$0(ProceedingJoinPoint) line: 225 
    1457434357.get() line: not available    
    Bulkhead.lambda$decorateCompletionStage$1(Bulkhead, Supplier) line: 100 
    1251257755.get() line: not available    
    SemaphoreBulkhead(Bulkhead).executeCompletionStage(Supplier<CompletionStage<T>>) line: 557  
    BulkheadAspect.handleJoinPointCompletableFuture(ProceedingJoinPoint, Bulkhead) line: 223    
    BulkheadAspect.proceed(ProceedingJoinPoint, String, Bulkhead, Class<?>) line: 162   
    BulkheadAspect.lambda$bulkheadAroundAdvice$5eb13a26$1(ProceedingJoinPoint, String, Bulkhead, Class) line: 129   
    1746723773.apply() line: not available  
    1746723773(CheckedFunction0<R>).lambda$andThen$ca02ab3$1(CheckedFunction1) line: 265    
    1151489454.apply() line: not available  
    BulkheadAspect.executeFallBack(ProceedingJoinPoint, String, Method, CheckedFunction0<Object>) line: 139 

==> here
 BulkheadAspect.bulkheadAroundAdvice(ProceedingJoinPoint, Bulkhead) line: 128   
    NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]  
    NativeMethodAccessorImpl.invoke(Object, Object[]) line: 62  
    DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 43  
    Method.invoke(Object, Object...) line: 566  
    AspectJAroundAdvice(AbstractAspectJAdvice).invokeAdviceMethodWithGivenArgs(Object[]) line: 634  
    AspectJAroundAdvice(AbstractAspectJAdvice).invokeAdviceMethod(JoinPoint, JoinPointMatch, Object, Throwable) line: 624   
    AspectJAroundAdvice.invoke(MethodInvocation) line: 72   
    CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 175 
    CglibAopProxy$CglibMethodInvocation.proceed() line: 763 
    ExposeInvocationInterceptor.invoke(MethodInvocation) line: 97   
    CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 186 
    CglibAopProxy$CglibMethodInvocation.proceed() line: 763 
    AnnotationAsyncExecutionInterceptor(AsyncExecutionInterceptor).lambda$invoke$0(MethodInvocation, Method) line: 115  
    1466446116.call() line: not available   
    AsyncExecutionAspectSupport.lambda$doSubmit$3(Callable) line: 278   
    409592088.get() line: not available 
    CompletableFuture$AsyncSupply<T>.run() line: 1700   

    ==> here
SimpleAsyncTaskExecutor$ConcurrencyThrottlingRunnable.run() line: 286   
    Thread.run() line: 829  

Enter fullscreen mode Exit fullscreen mode

After executing a load test that invokes for 60 seconds an @Aync and @Bulkhead annotated method, you can see from a profiling tool picture, that the application only used 34 out of 514 created threads. This obviously represents a waste of resources.

test1

ThreadPoolTaskExecutor

Another option could be to use the TreadPoolTaskExecutor implementation.
After executing the same test using this implementation, the number of created threads decreased a lot (41).

test2

However, the problem with this approach is that we are using unnecessary redundancy. Using a thread pool and a semaphore together, in my opinion, has no real advantage.

Conclusion.

For limiting asynchronous calls, use a Bulkhead threadPool implementation instead of a combination of @Async and Bulkhead semaphore.

Top comments (0)