DEV Community

Cover image for Enhancing Transparency and Accountability: Implementing Entity Audit Logging in Java [Part-2/2]
Anuj Singh
Anuj Singh

Posted on

Enhancing Transparency and Accountability: Implementing Entity Audit Logging in Java [Part-2/2]

In the last post, we used io.ebean's ChangeLog annotation to log entity changes in a synchronous blocking I/O application. In this post, we will see how that works in a multithreaded or async (non-blocking) I/O application.

If your system uses a separate thread-pool or executor context for doing I/O tasks such as making an HTTP API request or a database transaction, then, the audit log prepare class will not be able to detect the user context on model entities that are updated, as the model entity will be saved in a different context or thread-pool. Hence, we need to ensure that the thread-context is not lost when switching the executor or when switching threads in the same thread-pool.

To do so, first we need to understand how to store some data in thread context. For this we will use ThreadLocals. ThreadLocal construct allows us to store data that will be accessible only by a specific thread. Each thread will have its own ThreadLocal instance, hence while switching the context, we need to ensure that the thread context of the current thread is propagated to the next thread.

STORING USER CONTEXT IN THREAD LOCALS

We can set ThreadLocals with user info and use it in change log prepare’s implementation to set it in change set.

ThreadLocalManager

public class ThreadLocalManager {
    private static ThreadLocal<Map<String, Object>> context = new ThreadLocal<>();
    public static void addToContext(String key, Object value) {
        Map<String, Object> currentContext = ThreadLocalManager.context.get() == null ? new HashMap<>() : context.get();
        currentContext.put(key, value);
        context.set(currentContext);
    }
    public static void setContext(Map<String, Object> contextMap) {
        context.set(contextMap);
    }
    public static Map<String, Object> getContext() {
        return context.get();
    }
    public static Object getFromContext(String key) {
        return Nullifier.get(() -> context.get().getOrDefault(key, "NA"));
    }
    public static void clearContext() {
        ThreadLocalManager.context.remove();
    }
}
Enter fullscreen mode Exit fullscreen mode

We can add additional info before setting the thread local

ThreadLocalManager.addToContext("userContext", new HashMap<String, String>() {{
    put("userName", "some user");
    put("userEmail", "some.user@company.com");
}}
entity.save()
Enter fullscreen mode Exit fullscreen mode

Now we'll modify our ChangeLogPrepare to read user data from thread context

Custom ChangeLogPrepare

public class AuditLogPrepare implements ChangeLogPrepare {
    private final play.Logger.ALogger logger = Logger.of(this.getClass());
    @Override
    public boolean prepare(ChangeSet changes) {
        Map<String, String> userContext = Nullifier.get(() -> (Map<String, String>) ThreadLocalManager.getContext().get("userContext"), new HashMap<>());
        if (userContext.isEmpty()) logger.warn("[ALERT] userContext is empty for changeset: " + changes.toString());
        changes.getUserContext().put("userName", authMap.getOrDefault("userName", DEFAULT_USER_NAME));
        changes.getUserContext().put("userEmail", authMap.getOrDefault("userEmail", DEFAULT_USER_EMAIL));
        changes.setSource("MyApp");
        return true;
    }
}
Enter fullscreen mode Exit fullscreen mode

As we see, the user data is taken from thread local, we need to ensure that the thread context is maintained while switching the thread. For that, we'll create a utility class that helps us propagate the thread context to next runnable/callable.

ContextUtility

public class ContextUtility {
    public static <T> Callable<T> wrapWithContext(Callable<T> task) {
        Map<String, Object> previousContext = ThreadLocalManager.getContext();
        if (previousContext == null)
            return task;
        else
            return () -> {
                ThreadLocalManager.setContext(previousContext);
                try {
                    return task.call();
                } finally {
                    ThreadLocalManager.clearContext();
                }
            };
    }
    public static Runnable wrapWithContext(Runnable task) {
        Map<String, Object> previousContext = ThreadLocalManager.getContext();
        if (previousContext == null) {
            return task;
        } else
            return () -> {
                ThreadLocalManager.setContext(previousContext);
                try {
                    task.run();
                } finally {
                    ThreadLocalManager.clearContext();
                }
            };
    }
}
Enter fullscreen mode Exit fullscreen mode

Using the methods from ContextUtility we create CustomThreadPoolExecutor to override methods to attach thread context before submitting/executing tasks

CustomThreadPoolExecutor

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    public CustomThreadPoolExecutor(int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            @NotNull TimeUnit unit,
            @NotNull BlockingQueue<Runnable> workQueue,
            @NotNull ThreadFactory threadFactory,
            @NotNull RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }
    @Override
    public <T> @NotNull Future<T> submit(@NotNull Callable<T> task) {
        return super.submit(ContextUtility.wrapWithContext(task));
    }
    @Override
    public <T> @NotNull Future<T> submit(@NotNull Runnable task, T result) {
        return super.submit(ContextUtility.wrapWithContext(task), result);
    }
    @Override
    public @NotNull Future<?> submit(@NotNull Runnable task) {
        return super.submit(ContextUtility.wrapWithContext(task));
    }
    @Override
    public void execute(@NotNull Runnable task) {
        super.execute(ContextUtility.wrapWithContext(task));
    }
Enter fullscreen mode Exit fullscreen mode

Now, we will use this executor in our custom MDC to allow creating custom dispatchers.

CustomDispatcherConfigurator

public class CustomDispatcherConfigurator extends MessageDispatcherConfigurator {
    private final CustomDispatcher instance;
    public CustomDispatcherConfigurator(Config config, DispatcherPrerequisites prerequisites) {
        super(config, prerequisites);
        Config threadPoolConfig = config.getConfig("thread-pool-executor");
        int fixedPoolSize = threadPoolConfig.getInt("fixed-pool-size");
        instance = new CustomDispatcher(
                this,
                config.getString("id"),
                config.getInt("throughput"),
                Duration.create(config.getDuration("throughput-deadline-time", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS),
                (id, threadFactory) -> () -> new CustomThreadPoolExecutor(fixedPoolSize,
                                                                          fixedPoolSize,
                                                                          threadPoolConfig.getDuration("keep-alive-time", TimeUnit.MILLISECONDS),
                                                                          TimeUnit.MILLISECONDS,
                                                                          new LinkedBlockingDeque<>(),
                                                                          new ThreadFactory() {
                                                                              private int threadId = 1;
                                                                              @Override
                                                                              public Thread newThread(@NotNull Runnable r) {
                                                                                  Thread thread = new Thread(r);
                                                                                  thread.setName(config.getString("name") + "-" + threadId++);
                                                                                  return thread;
                                                                              }
                                                                          }),
                Duration.create(config.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
        );
    }
    @Override
    public MessageDispatcher dispatcher() {
        return instance;
    }
}
class CustomDispatcher extends Dispatcher {
    public CustomDispatcher(MessageDispatcherConfigurator _configurator,
                            String id,
                            int throughput,
                            Duration throughputDeadlineTime,
                            ExecutorServiceFactoryProvider executorServiceFactoryProvider,
                            scala.concurrent.duration.FiniteDuration shutdownTimeout) {
        super(_configurator, id, throughput, throughputDeadlineTime, executorServiceFactoryProvider, shutdownTimeout);
    }
}
Enter fullscreen mode Exit fullscreen mode

Now we can create different actors in our actor system using this custom MDC and define their config in aplication.conf

db-io-dispatcher {
    type = "contexts.CustomDispatcherConfigurator"
    executor = "thread-pool-executor"
    thread-pool-executor {
        fixed-pool-size = 11
    }
    throughput = 1
    shutdown-timeout = 60s
}
web-io-dispatcher {
    type = "contexts.CustomDispatcherConfigurator"
    executor = "thread-pool-executor"
    thread-pool-executor {
        fixed-pool-size = 20
    }
    throughput = 1
    shutdown-timeout = 60s
}
Enter fullscreen mode Exit fullscreen mode

To create an actor for db execution context

DatabaseIODispatcher

public class DatabaseIODispatcher extends CustomExecutionContext {
    @Inject
    public DatabaseIODispatcher(ActorSystem actorSystem) {
        super(actorSystem, "db-io-dispatcher");
    }
}
Enter fullscreen mode Exit fullscreen mode

This allows us to switch context from one executor to another without losing thread’s context.

Points to Remember

  • In async implementations, if the thread is switched in-between from custom executors to default ThreadPoolExecutor or ForkJoinPool, the thread context will get lost, hence we need to ensure that the thread context is not getting lost if any library method is using default pools.

  • We need to clear thread context after the task is complete or else it can cause memory leaks or OOM issues.

Thanks for reading. I hope this helps the community to provide more transparency in their applications.

Top comments (0)