Why two different implementations?
In this article I will refer to Spring and Resilence4j.
If you are reading this article you probably already know about the bulkhead pattern, what is the problem that it intends to solve and the most common implementations: semaphore based and threadPool based.
At least for me, it was not easy to realize when to use a semaphore implementation and when to use a threadPool one.
I know how a Semaphore works, and I also understand the ThreadPool pattern, therefore a short answer and the most obvious could have been: use a threadPool for limiting the number of asynchronous calls and use a semaphore for limiting synchronous ones.
So why was the difficult part? The reason was these questions:
Why not just combine @Async with a bulkhead semaphore based implementation?
Can both annotations be used together? If yes, why is the reason for a threadPool implementation?
@Async and @Bulkhead combined.
@Bulkhead(name = "Service3", fallbackMethod = "futureFallback")
@Async
public CompletableFuture<String> doSomeWork() {
System.out.println("Excecuting service 3 - " + Thread.currentThread().getName());
Util.mockExternalServiceHttpCall(DELAY);
return CompletableFuture.completedFuture("ok");
}
Yes, you can use both annotations together. They allow you to generate a limited number of async calls based on your bulkhead semaphore configuration.
However, there are some implications that are useful to know.
SimpleAsyncTaskExecutor.
When you annotate a method with the @Async annotation, Spring can use different implementations of the TaskExecutor interface.
By default, the framework uses the SimpleAsyncTaslExecutor.
This implementation will create a new thread each time the annotated method is invoked; this thread will not be reused.
The problem with this approach is that you will create a new thread even if the semaphore counter is 0 (bulkhead is full)
As you can see in the following stacktrace, the framework first creates a thread and then calls the bulkhead pattern which determines if there are available permissions to continue the execution. If the semaphore counter is cero, the bulkhead will reject the method execution.
Thread [_simpleAsyncTask1] (Suspended (breakpoint at line 18 in Service3))
Service3.doSomeWork() line: 18
Service3$$FastClassBySpringCGLIB$$6085f5a4.invoke(int, Object, Object[]) line: not available
MethodProxy.invoke(Object, Object[]) line: 218
CglibAopProxy$CglibMethodInvocation.invokeJoinpoint() line: 793
CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 163
CglibAopProxy$CglibMethodInvocation.proceed() line: 763
MethodInvocationProceedingJoinPoint.proceed() line: 89
BulkheadAspect.lambda$handleJoinPointCompletableFuture$0(ProceedingJoinPoint) line: 225
1457434357.get() line: not available
Bulkhead.lambda$decorateCompletionStage$1(Bulkhead, Supplier) line: 100
1251257755.get() line: not available
SemaphoreBulkhead(Bulkhead).executeCompletionStage(Supplier<CompletionStage<T>>) line: 557
BulkheadAspect.handleJoinPointCompletableFuture(ProceedingJoinPoint, Bulkhead) line: 223
BulkheadAspect.proceed(ProceedingJoinPoint, String, Bulkhead, Class<?>) line: 162
BulkheadAspect.lambda$bulkheadAroundAdvice$5eb13a26$1(ProceedingJoinPoint, String, Bulkhead, Class) line: 129
1746723773.apply() line: not available
1746723773(CheckedFunction0<R>).lambda$andThen$ca02ab3$1(CheckedFunction1) line: 265
1151489454.apply() line: not available
BulkheadAspect.executeFallBack(ProceedingJoinPoint, String, Method, CheckedFunction0<Object>) line: 139
==> here
BulkheadAspect.bulkheadAroundAdvice(ProceedingJoinPoint, Bulkhead) line: 128
NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
NativeMethodAccessorImpl.invoke(Object, Object[]) line: 62
DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 43
Method.invoke(Object, Object...) line: 566
AspectJAroundAdvice(AbstractAspectJAdvice).invokeAdviceMethodWithGivenArgs(Object[]) line: 634
AspectJAroundAdvice(AbstractAspectJAdvice).invokeAdviceMethod(JoinPoint, JoinPointMatch, Object, Throwable) line: 624
AspectJAroundAdvice.invoke(MethodInvocation) line: 72
CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 175
CglibAopProxy$CglibMethodInvocation.proceed() line: 763
ExposeInvocationInterceptor.invoke(MethodInvocation) line: 97
CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 186
CglibAopProxy$CglibMethodInvocation.proceed() line: 763
AnnotationAsyncExecutionInterceptor(AsyncExecutionInterceptor).lambda$invoke$0(MethodInvocation, Method) line: 115
1466446116.call() line: not available
AsyncExecutionAspectSupport.lambda$doSubmit$3(Callable) line: 278
409592088.get() line: not available
CompletableFuture$AsyncSupply<T>.run() line: 1700
==> here
SimpleAsyncTaskExecutor$ConcurrencyThrottlingRunnable.run() line: 286
Thread.run() line: 829
After executing a load test that invokes for 60 seconds an @Aync and @Bulkhead annotated method, you can see from a profiling tool picture, that the application only used 34 out of 514 created threads. This obviously represents a waste of resources.
ThreadPoolTaskExecutor
Another option could be to use the TreadPoolTaskExecutor implementation.
After executing the same test using this implementation, the number of created threads decreased a lot (41).
However, the problem with this approach is that we are using unnecessary redundancy. Using a thread pool and a semaphore together, in my opinion, has no real advantage.
Conclusion.
For limiting asynchronous calls, use a Bulkhead threadPool implementation instead of a combination of @Async and Bulkhead semaphore.
Top comments (0)