In the last post, we used io.ebean's ChangeLog
annotation to log entity changes in a synchronous blocking I/O application. In this post, we will see how that works in a multithreaded or async (non-blocking) I/O application.
If your system uses a separate thread-pool or executor context for doing I/O tasks such as making an HTTP API request or a database transaction, then, the audit log prepare class will not be able to detect the user context on model entities that are updated, as the model entity will be saved in a different context or thread-pool. Hence, we need to ensure that the thread-context is not lost when switching the executor or when switching threads in the same thread-pool.
To do so, first we need to understand how to store some data in thread context. For this we will use ThreadLocals. ThreadLocal construct allows us to store data that will be accessible only by a specific thread. Each thread will have its own ThreadLocal instance, hence while switching the context, we need to ensure that the thread context of the current thread is propagated to the next thread.
STORING USER CONTEXT IN THREAD LOCALS
We can set ThreadLocals with user info and use it in change log prepare’s implementation to set it in change set.
ThreadLocalManager
public class ThreadLocalManager {
private static ThreadLocal<Map<String, Object>> context = new ThreadLocal<>();
public static void addToContext(String key, Object value) {
Map<String, Object> currentContext = ThreadLocalManager.context.get() == null ? new HashMap<>() : context.get();
currentContext.put(key, value);
context.set(currentContext);
}
public static void setContext(Map<String, Object> contextMap) {
context.set(contextMap);
}
public static Map<String, Object> getContext() {
return context.get();
}
public static Object getFromContext(String key) {
return Nullifier.get(() -> context.get().getOrDefault(key, "NA"));
}
public static void clearContext() {
ThreadLocalManager.context.remove();
}
}
We can add additional info before setting the thread local
ThreadLocalManager.addToContext("userContext", new HashMap<String, String>() {{
put("userName", "some user");
put("userEmail", "some.user@company.com");
}}
entity.save()
Now we'll modify our ChangeLogPrepare to read user data from thread context
Custom ChangeLogPrepare
public class AuditLogPrepare implements ChangeLogPrepare {
private final play.Logger.ALogger logger = Logger.of(this.getClass());
@Override
public boolean prepare(ChangeSet changes) {
Map<String, String> userContext = Nullifier.get(() -> (Map<String, String>) ThreadLocalManager.getContext().get("userContext"), new HashMap<>());
if (userContext.isEmpty()) logger.warn("[ALERT] userContext is empty for changeset: " + changes.toString());
changes.getUserContext().put("userName", authMap.getOrDefault("userName", DEFAULT_USER_NAME));
changes.getUserContext().put("userEmail", authMap.getOrDefault("userEmail", DEFAULT_USER_EMAIL));
changes.setSource("MyApp");
return true;
}
}
As we see, the user data is taken from thread local, we need to ensure that the thread context is maintained while switching the thread. For that, we'll create a utility class that helps us propagate the thread context to next runnable/callable.
ContextUtility
public class ContextUtility {
public static <T> Callable<T> wrapWithContext(Callable<T> task) {
Map<String, Object> previousContext = ThreadLocalManager.getContext();
if (previousContext == null)
return task;
else
return () -> {
ThreadLocalManager.setContext(previousContext);
try {
return task.call();
} finally {
ThreadLocalManager.clearContext();
}
};
}
public static Runnable wrapWithContext(Runnable task) {
Map<String, Object> previousContext = ThreadLocalManager.getContext();
if (previousContext == null) {
return task;
} else
return () -> {
ThreadLocalManager.setContext(previousContext);
try {
task.run();
} finally {
ThreadLocalManager.clearContext();
}
};
}
}
Using the methods from ContextUtility we create CustomThreadPoolExecutor to override methods to attach thread context before submitting/executing tasks
CustomThreadPoolExecutor
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@NotNull TimeUnit unit,
@NotNull BlockingQueue<Runnable> workQueue,
@NotNull ThreadFactory threadFactory,
@NotNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public <T> @NotNull Future<T> submit(@NotNull Callable<T> task) {
return super.submit(ContextUtility.wrapWithContext(task));
}
@Override
public <T> @NotNull Future<T> submit(@NotNull Runnable task, T result) {
return super.submit(ContextUtility.wrapWithContext(task), result);
}
@Override
public @NotNull Future<?> submit(@NotNull Runnable task) {
return super.submit(ContextUtility.wrapWithContext(task));
}
@Override
public void execute(@NotNull Runnable task) {
super.execute(ContextUtility.wrapWithContext(task));
}
Now, we will use this executor in our custom MDC to allow creating custom dispatchers.
CustomDispatcherConfigurator
public class CustomDispatcherConfigurator extends MessageDispatcherConfigurator {
private final CustomDispatcher instance;
public CustomDispatcherConfigurator(Config config, DispatcherPrerequisites prerequisites) {
super(config, prerequisites);
Config threadPoolConfig = config.getConfig("thread-pool-executor");
int fixedPoolSize = threadPoolConfig.getInt("fixed-pool-size");
instance = new CustomDispatcher(
this,
config.getString("id"),
config.getInt("throughput"),
Duration.create(config.getDuration("throughput-deadline-time", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS),
(id, threadFactory) -> () -> new CustomThreadPoolExecutor(fixedPoolSize,
fixedPoolSize,
threadPoolConfig.getDuration("keep-alive-time", TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(),
new ThreadFactory() {
private int threadId = 1;
@Override
public Thread newThread(@NotNull Runnable r) {
Thread thread = new Thread(r);
thread.setName(config.getString("name") + "-" + threadId++);
return thread;
}
}),
Duration.create(config.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
);
}
@Override
public MessageDispatcher dispatcher() {
return instance;
}
}
class CustomDispatcher extends Dispatcher {
public CustomDispatcher(MessageDispatcherConfigurator _configurator,
String id,
int throughput,
Duration throughputDeadlineTime,
ExecutorServiceFactoryProvider executorServiceFactoryProvider,
scala.concurrent.duration.FiniteDuration shutdownTimeout) {
super(_configurator, id, throughput, throughputDeadlineTime, executorServiceFactoryProvider, shutdownTimeout);
}
}
Now we can create different actors in our actor system using this custom MDC and define their config in aplication.conf
db-io-dispatcher {
type = "contexts.CustomDispatcherConfigurator"
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 11
}
throughput = 1
shutdown-timeout = 60s
}
web-io-dispatcher {
type = "contexts.CustomDispatcherConfigurator"
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 20
}
throughput = 1
shutdown-timeout = 60s
}
To create an actor for db execution context
DatabaseIODispatcher
public class DatabaseIODispatcher extends CustomExecutionContext {
@Inject
public DatabaseIODispatcher(ActorSystem actorSystem) {
super(actorSystem, "db-io-dispatcher");
}
}
This allows us to switch context from one executor to another without losing thread’s context.
Points to Remember
In async implementations, if the thread is switched in-between from custom executors to default ThreadPoolExecutor or ForkJoinPool, the thread context will get lost, hence we need to ensure that the thread context is not getting lost if any library method is using default pools.
We need to clear thread context after the task is complete or else it can cause memory leaks or OOM issues.
Thanks for reading. I hope this helps the community to provide more transparency in their applications.
Top comments (0)