DEV Community

Cover image for I Rebuilt the Core of My Data Engine — Here's What Changed in v1.3.0
Mustafa Bingül
Mustafa Bingül

Posted on

I Rebuilt the Core of My Data Engine — Here's What Changed in v1.3.0

I Rebuilt the Core of My Data Engine — Here's What Changed in v1.3.0

What is Nexus Core?

Nexus Core is a standalone Java application — a central data orchestration engine that sits between your distributed server infrastructure and your persistence layer. All data operations (reads, writes, deletes, increments) are routed through it via a Redis pub/sub message bus. Your servers stay thin. Your data logic lives in one place.

This release focused on three things: live health signaling, intelligent cache management, and thread isolation.


🫀 Live Protocol — Heartbeat Every Second

The most important addition in v1.3.0 is LIVE — a new RequestType in DataAddon that represents Nexus announcing its own presence on the network.

Every second, RedisDataContainer schedules a broadcast to the darkland_nexus_live channel:

// RedisDataContainer.java
rm.scheduleTask(this::sendNetworkLiveBroadcast, 1, 1, TimeUnit.SECONDS);
Enter fullscreen mode Exit fullscreen mode
private void sendNetworkLiveBroadcast() {
    NexusApplication.getApplication().getRedisManager().processTask(() -> {
        NexusJsonDataContainer jsonDataContainer = new NexusJsonDataContainer();
        jsonDataContainer.set("type", "LIVE");
        jsonDataContainer.set("source", "nexus");
        jsonDataContainer.set("time", System.currentTimeMillis() / 1000L);

        NexusApplication.getApplication().getRedisManager()
            .publish("darkland_nexus_live", jsonDataContainer.toFullJson());
    });
}
Enter fullscreen mode Exit fullscreen mode

Any subscriber listening to darkland_nexus_live can now verify that Nexus is alive before sending requests. If the heartbeat stops, clients know the engine is down and can react accordingly — no more firing requests into silence.

The LIVE type is also formally part of the RequestType enum in DataAddon:

public enum RequestType {
    SET_DATA, GET_DATA, UPDATE_DATA, REMOVE_DATA,
    BROADCAST, LOAD_CACHE, INCREMENT_DATA, LIVE
}
Enter fullscreen mode Exit fullscreen mode

🗄️ Cache — Dynamic TTL per DataAddon

Previously there was no way to define per-module cache lifetimes. v1.3.0 adds an abstract method to DataAddon:

public abstract int getCacheTTL();
Enter fullscreen mode Exit fullscreen mode

Every addon now declares its own TTL. When a key is written to Redis, that TTL is applied directly:

// RedisManager.java
public void setData(String key, String json, DataAddon addon) {
    processTask(() -> {
        try (Jedis jedis = pool.getResource()) {
            SetParams params = new SetParams().ex(addon.getCacheTTL());
            jedis.set(key, json, params);
        }
    });
}
Enter fullscreen mode Exit fullscreen mode

And on every cache hit, the TTL is renewed (touch-to-renew / sliding expiration):

// DataAddon.java — inside getData()
if (app.getRedisManager().exists(keyTag)) {
    String redisJson = app.getRedisManager().getData(keyTag).get();
    DataModel m = new DataModel(keyTag, UUID.randomUUID().toString(),
            modelInitComp(redisJson), this, specificValue);
    app.getDataContainer().addModelFix(keyTag, m);
    app.getRedisManager().renewTTL(keyTag, getCacheTTL()); // sliding expiration
    return Optional.of(m);
}
Enter fullscreen mode Exit fullscreen mode
// RedisManager.java
public void renewTTL(String key, int seconds) {
    processTask(() -> {
        try (Jedis jedis = pool.getResource()) {
            jedis.expire(key, seconds);
        }
    });
}
Enter fullscreen mode Exit fullscreen mode

Active data stays warm. Unused data evicts itself. No manual intervention needed.


🔄 L1 / L2 Auto-Invalidation via Keyspace Notifications

When Nexus connects to Redis, it automatically enables keyspace notifications:

// RedisManager.java
public void enableKeyspaceNotifications() {
    try (Jedis jedis = pool.getResource()) {
        jedis.configSet("notify-keyspace-events", "Ex");
        System.out.println("Nexus: Keyspace Notifications (Expired) enabled via Jedis.");
    }
}
Enter fullscreen mode Exit fullscreen mode

A dedicated thread (Nexus-L1-Sync-Thread) subscribes to the expiry channel and immediately purges the in-process L1 cache when a key expires in Redis:

// RedisDataContainer.java
public void startL1SyncListener() {
    new Thread(() -> {
        String expiredChannel = "__keyevent@0__:expired";
        while (!Thread.currentThread().isInterrupted()) {
            try (Jedis jedis = pool.getResource()) {
                jedis.subscribe(new JedisPubSub() {
                    @Override
                    public void onMessage(String channel, String message) {
                        removeModel(message);
                        LOGGER.warning("[Nexus] Key expired: " + message
                            + ", Removing from L1 cache to maintain data integrity.");
                    }
                }, expiredChannel);
            } catch (Exception e) {
                try { Thread.sleep(5000); } catch (InterruptedException ie) { break; }
            }
        }
    }, "Nexus-L1-Sync-Thread").start();
}
Enter fullscreen mode Exit fullscreen mode

The result: L1 (RAM) and L2 (Redis) are always in sync. Stale reads are gone.


⚡ Dual-Queue Worker Architecture

Before v1.3.0, inbound messages and outbound tasks competed on the same thread pool. v1.3.0 separates them completely with two independent LinkedBlockingQueue instances:

// RedisManager.java
private final BlockingQueue<String>   messageQueue      = new LinkedBlockingQueue<>(50000);
private final BlockingQueue<Runnable> internalTaskQueue = new LinkedBlockingQueue<>(50000);
Enter fullscreen mode Exit fullscreen mode

Inbound workers (one per CPU core) pull from messageQueue and handle incoming Redis messages:

private void startInboundWorkers() {
    int cores = Runtime.getRuntime().availableProcessors();
    for (int i = 0; i < cores; i++) {
        new Thread(() -> {
            NexusReceiver receiver = new NexusReceiver(this);
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    String msg = messageQueue.take();
                    receiver.handleSyncMessage(msg);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }, "Nexus-Inbound-Worker-" + i).start();
    }
}
Enter fullscreen mode Exit fullscreen mode

Outbound workers pull from internalTaskQueue and handle Redis/Mongo write operations:

private void startOutboundWorkers() {
    int workers = Math.max(2, Runtime.getRuntime().availableProcessors());
    for (int i = 0; i < workers; i++) {
        new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Runnable task = internalTaskQueue.take();
                    task.run();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }, "Nexus-Outbound-Worker-" + i).start();
    }
}
Enter fullscreen mode Exit fullscreen mode

An inbound spike can no longer delay outbound responses. If the outbound queue is full, the task falls back to the ScheduledExecutorService:

public void processTask(Runnable task) {
    if (!internalTaskQueue.offer(task)) {
        scheduler.execute(task); // fallback
    }
}
Enter fullscreen mode Exit fullscreen mode

🔍 Reflection Caching

DataAddon uses annotations (@DbDataModels) to define its schema. Previously these fields were scanned via reflection on every request. v1.3.0 caches the result with double-checked locking:

// DataAddon.java
private volatile Field[] cachedAnnotatedFields = null;
private final Object fieldCacheLock = new Object();

private Field[] getAnnotatedFields() {
    if (cachedAnnotatedFields != null) return cachedAnnotatedFields;
    synchronized (fieldCacheLock) {
        if (cachedAnnotatedFields != null) return cachedAnnotatedFields;
        cachedAnnotatedFields = java.util.Arrays.stream(getClass().getDeclaredFields())
                .filter(f -> f.isAnnotationPresent(DbDataModels.class))
                .peek(f -> f.setAccessible(true))
                .toArray(Field[]::new);
    }
    return cachedAnnotatedFields;
}
Enter fullscreen mode Exit fullscreen mode

Same pattern for the ID field — scanned once, cached for all subsequent requests:

private volatile String cachedIdFieldName = null;
private volatile Class<?> cachedIdClass = null;
private final Object idCacheLock = new Object();
Enter fullscreen mode Exit fullscreen mode

Reflection runs once per addon class. Every request after that hits the cache directly.


Architecture Summary

Distributed Servers
  Server #1 ─┐
  Server #2 ──┼──► darkland_nexus (Redis) ──► Nexus-Inbound-Worker-N
  Server #3 ─┘                                        │
                                              NexusReceiver.handleSyncMessage()
                                                        │
                                              Nexus-Outbound-Worker-N
                                                        │
                                         ┌──────────────┴──────────────┐
                                         │  L1 Cache (ConcurrentHashMap)│
                                         │  L2 Cache (Redis + TTL)      │
                                         │  MongoDB (persistence)       │
                                         └─────────────────────────────┘
                                                        │
                            darkland_nexus_live ◄── heartbeat every 1s
Enter fullscreen mode Exit fullscreen mode

Source → github.com/mustafabinguldev/nexus-core

Top comments (0)