I Rebuilt the Core of My Data Engine — Here's What Changed in v1.3.0
What is Nexus Core?
Nexus Core is a standalone Java application — a central data orchestration engine that sits between your distributed server infrastructure and your persistence layer. All data operations (reads, writes, deletes, increments) are routed through it via a Redis pub/sub message bus. Your servers stay thin. Your data logic lives in one place.
This release focused on three things: live health signaling, intelligent cache management, and thread isolation.
🫀 Live Protocol — Heartbeat Every Second
The most important addition in v1.3.0 is LIVE — a new RequestType in DataAddon that represents Nexus announcing its own presence on the network.
Every second, RedisDataContainer schedules a broadcast to the darkland_nexus_live channel:
// RedisDataContainer.java
rm.scheduleTask(this::sendNetworkLiveBroadcast, 1, 1, TimeUnit.SECONDS);
private void sendNetworkLiveBroadcast() {
NexusApplication.getApplication().getRedisManager().processTask(() -> {
NexusJsonDataContainer jsonDataContainer = new NexusJsonDataContainer();
jsonDataContainer.set("type", "LIVE");
jsonDataContainer.set("source", "nexus");
jsonDataContainer.set("time", System.currentTimeMillis() / 1000L);
NexusApplication.getApplication().getRedisManager()
.publish("darkland_nexus_live", jsonDataContainer.toFullJson());
});
}
Any subscriber listening to darkland_nexus_live can now verify that Nexus is alive before sending requests. If the heartbeat stops, clients know the engine is down and can react accordingly — no more firing requests into silence.
The LIVE type is also formally part of the RequestType enum in DataAddon:
public enum RequestType {
SET_DATA, GET_DATA, UPDATE_DATA, REMOVE_DATA,
BROADCAST, LOAD_CACHE, INCREMENT_DATA, LIVE
}
🗄️ Cache — Dynamic TTL per DataAddon
Previously there was no way to define per-module cache lifetimes. v1.3.0 adds an abstract method to DataAddon:
public abstract int getCacheTTL();
Every addon now declares its own TTL. When a key is written to Redis, that TTL is applied directly:
// RedisManager.java
public void setData(String key, String json, DataAddon addon) {
processTask(() -> {
try (Jedis jedis = pool.getResource()) {
SetParams params = new SetParams().ex(addon.getCacheTTL());
jedis.set(key, json, params);
}
});
}
And on every cache hit, the TTL is renewed (touch-to-renew / sliding expiration):
// DataAddon.java — inside getData()
if (app.getRedisManager().exists(keyTag)) {
String redisJson = app.getRedisManager().getData(keyTag).get();
DataModel m = new DataModel(keyTag, UUID.randomUUID().toString(),
modelInitComp(redisJson), this, specificValue);
app.getDataContainer().addModelFix(keyTag, m);
app.getRedisManager().renewTTL(keyTag, getCacheTTL()); // sliding expiration
return Optional.of(m);
}
// RedisManager.java
public void renewTTL(String key, int seconds) {
processTask(() -> {
try (Jedis jedis = pool.getResource()) {
jedis.expire(key, seconds);
}
});
}
Active data stays warm. Unused data evicts itself. No manual intervention needed.
🔄 L1 / L2 Auto-Invalidation via Keyspace Notifications
When Nexus connects to Redis, it automatically enables keyspace notifications:
// RedisManager.java
public void enableKeyspaceNotifications() {
try (Jedis jedis = pool.getResource()) {
jedis.configSet("notify-keyspace-events", "Ex");
System.out.println("Nexus: Keyspace Notifications (Expired) enabled via Jedis.");
}
}
A dedicated thread (Nexus-L1-Sync-Thread) subscribes to the expiry channel and immediately purges the in-process L1 cache when a key expires in Redis:
// RedisDataContainer.java
public void startL1SyncListener() {
new Thread(() -> {
String expiredChannel = "__keyevent@0__:expired";
while (!Thread.currentThread().isInterrupted()) {
try (Jedis jedis = pool.getResource()) {
jedis.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
removeModel(message);
LOGGER.warning("[Nexus] Key expired: " + message
+ ", Removing from L1 cache to maintain data integrity.");
}
}, expiredChannel);
} catch (Exception e) {
try { Thread.sleep(5000); } catch (InterruptedException ie) { break; }
}
}
}, "Nexus-L1-Sync-Thread").start();
}
The result: L1 (RAM) and L2 (Redis) are always in sync. Stale reads are gone.
⚡ Dual-Queue Worker Architecture
Before v1.3.0, inbound messages and outbound tasks competed on the same thread pool. v1.3.0 separates them completely with two independent LinkedBlockingQueue instances:
// RedisManager.java
private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>(50000);
private final BlockingQueue<Runnable> internalTaskQueue = new LinkedBlockingQueue<>(50000);
Inbound workers (one per CPU core) pull from messageQueue and handle incoming Redis messages:
private void startInboundWorkers() {
int cores = Runtime.getRuntime().availableProcessors();
for (int i = 0; i < cores; i++) {
new Thread(() -> {
NexusReceiver receiver = new NexusReceiver(this);
while (!Thread.currentThread().isInterrupted()) {
try {
String msg = messageQueue.take();
receiver.handleSyncMessage(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Nexus-Inbound-Worker-" + i).start();
}
}
Outbound workers pull from internalTaskQueue and handle Redis/Mongo write operations:
private void startOutboundWorkers() {
int workers = Math.max(2, Runtime.getRuntime().availableProcessors());
for (int i = 0; i < workers; i++) {
new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Runnable task = internalTaskQueue.take();
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Nexus-Outbound-Worker-" + i).start();
}
}
An inbound spike can no longer delay outbound responses. If the outbound queue is full, the task falls back to the ScheduledExecutorService:
public void processTask(Runnable task) {
if (!internalTaskQueue.offer(task)) {
scheduler.execute(task); // fallback
}
}
🔍 Reflection Caching
DataAddon uses annotations (@DbDataModels) to define its schema. Previously these fields were scanned via reflection on every request. v1.3.0 caches the result with double-checked locking:
// DataAddon.java
private volatile Field[] cachedAnnotatedFields = null;
private final Object fieldCacheLock = new Object();
private Field[] getAnnotatedFields() {
if (cachedAnnotatedFields != null) return cachedAnnotatedFields;
synchronized (fieldCacheLock) {
if (cachedAnnotatedFields != null) return cachedAnnotatedFields;
cachedAnnotatedFields = java.util.Arrays.stream(getClass().getDeclaredFields())
.filter(f -> f.isAnnotationPresent(DbDataModels.class))
.peek(f -> f.setAccessible(true))
.toArray(Field[]::new);
}
return cachedAnnotatedFields;
}
Same pattern for the ID field — scanned once, cached for all subsequent requests:
private volatile String cachedIdFieldName = null;
private volatile Class<?> cachedIdClass = null;
private final Object idCacheLock = new Object();
Reflection runs once per addon class. Every request after that hits the cache directly.
Architecture Summary
Distributed Servers
Server #1 ─┐
Server #2 ──┼──► darkland_nexus (Redis) ──► Nexus-Inbound-Worker-N
Server #3 ─┘ │
NexusReceiver.handleSyncMessage()
│
Nexus-Outbound-Worker-N
│
┌──────────────┴──────────────┐
│ L1 Cache (ConcurrentHashMap)│
│ L2 Cache (Redis + TTL) │
│ MongoDB (persistence) │
└─────────────────────────────┘
│
darkland_nexus_live ◄── heartbeat every 1s
Top comments (0)