I invented this architecture while designing a scalable solution for serverless Next.js applications that require AI-powered long-running processing.
You can think of this architecture as event-driven architecture, with pub-sub without message brokers like rabbitmq or kafka.
As scalable without the limitation of not being able to do stateful connections to message brokers in the edge.
So it's the best of the two words, the responsiveness and the speed of the edge, with the ability to off-set long-running processes to the notifier or to the processor.
NPC stands for Notifier, Processor, Core — an architecture pattern for orchestrating AI workflows efficiently within serverless environments.
1. Core Idea
At its core, the NPC architecture leverages Redis Lists and atomic operations such as rPush
and blPop
to orchestrate communication between:
- Core Backend → where business logic lives (e.g., edge/serverless functions in Next.js). Any long running process is enqueued as job from here for the notifier or the processor.
- Processor → server that handles long-running background AI tasks.
- Notifier → server that sends near real-time updates to clients (email, browser, mobile, etc.).
Both Processor and Notifier are servers scaled horizontally that have job running every set of millisecond as ticks. Within those ticks Jobs are pulled, processed and retried. This decoupling allows high scalability, fault tolerance, and near-real-time responsiveness.
Jobs can be enqueued from anywhere but mostly from the core-backend.
2. Flow of Execution
Core backend will enqueue the job into Notifier or Processor Redis List. Both notifier and processor have long running background job which runs at configurable tick e.g. ( 30 seconds ) when job is processed the state of the job is updated in the database and a notifier job is enqueued. The notifier responsibility is to update the client after each job a notifier worker complete.
This architecture allow for predictable one-direction from left to right flow, less prone to errors, failures.
3. The Components
🔹 Core Backend
- Lives in serverless edge functions.
- Enqueues jobs into processor queues (e.g.,
processor:bistroai:menu
). - Updates job state in a persistent Jobs DB.
🔹 Processor
- Stateless and leaderless (scales horizontally).
- Long-running background worker.
- Configurable ticks (millisecond intervals) to pull jobs from Redis.
- Processes jobs from queues such as
processor:bistroai:menu
. - Implements retry logic with exponential backoff:
- Example:
retry:jobId → 3
(max 3 retries). - Failed jobs are re-enqueued with increasing wait times.
- Example:
🔹 Notifier
- Stateless and leaderless (scales horizontally).
- Uses
blPop
atomics for safe concurrent processing. - Listens to queues like
email:bistroai:queue
. - Sends updates to clients (browser, email, SMS, push, etc.).
4. Scalability & Resilience
- Horizontal Scaling → Each Notifier/Processor instance handles a few jobs; the rest are distributed across more workers.
-
Race Condition Safety → Thanks to
blPop
, multiple workers can listen to the same queue without collisions. - Exponential Backoff → Prevents hammering the system with repeated retries.
-
Queue Naming → Encodes responsibility (e.g.,
email:bistroai:queue
).
5. Benefits of NPC Architecture
✅ Serverless-ready → fits naturally with Next.js & edge functions.
✅ Massive scalability → jobs don’t pile up; they are processed in ticks.
✅ Near real-time notifications → clients aren’t left waiting.
✅ Resilient → safe concurrency & retry strategies.
✅ Decoupled → core, processor, and notifier are modular and independent.
✅ Security → Your APIs keys lives in isolated server weather for the notifier logic or long processing logic.
6. Example: Email Workflow
-
Core enqueues a job →
processor:bistroai:menu
. - Processor generates an AI-powered menu and stores results in DB.
-
Processor enqueues →
email:bistroai:queue
. - Notifier sends an email update to the client.
7. Sample Business Logic for Notifier or Processor with ( NodeJS )
// src/jobs/catAdoptionJob.ts
import { createClient } from 'redis';
// ---- Redis Setup ----
export const redisClient = createClient();
await redisClient.connect();
// ---- Queue Config ----
const QUEUE_KEY = 'adoption:queue';
const MAX_RETRY = 3;
// ---- Types ----
export interface AdoptionPayload {
applicationId: string;
applicantName: string;
catName: string;
}
// ---- Enqueue ----
// Any instance can enqueue a job
export const enqueueAdoptionJob = async (
payload: AdoptionPayload
): Promise<void> => {
await redisClient.rPush(QUEUE_KEY, JSON.stringify(payload));
// set retries counter
await redisClient.set(payload.applicationId, MAX_RETRY.toString());
console.log(`[Enqueue] Job added: ${payload.applicationId}`);
};
// ---- Retry Logic ----
const isJobRetriable = async (applicationId: string): Promise<boolean> => {
let currRetry: number | string | null = await redisClient.get(applicationId);
if (!currRetry || typeof currRetry !== 'string') {
await redisClient.set(applicationId, MAX_RETRY.toString());
currRetry = MAX_RETRY;
}
let retriesLeft = parseInt(currRetry as string, 10);
if (retriesLeft <= 0) {
return false;
}
await redisClient.set(applicationId, retriesLeft - 1);
return true;
};
// ---- Processor ----
// Only one consumer gets a job because BLPOP is atomic
export const processAdoptionJob = async (): Promise<void> => {
try {
const result = await redisClient.blPop(QUEUE_KEY, 5); // wait up to 5s
if (!result) return; // no job available
const payload = JSON.parse(result.element) as AdoptionPayload;
console.log(`[Processor] Processing adoption: ${payload.applicationId}`);
try {
// ---- Core Job Logic ----
// Example: Simulate long-running work
console.log(
`[Processor] Checking background for ${payload.applicantName} adopting ${payload.catName}...`
);
await new Promise((resolve) => setTimeout(resolve, 1000)); // fake async work
console.log(
`[Processor] Adoption approved: ${payload.applicantName} -> ${payload.catName}`
);
} catch (error) {
console.error(`[Processor] Job failed: ${payload.applicationId}`, error);
const canRetryJob = await isJobRetriable(payload.applicationId);
if (canRetryJob) {
console.log(`[Processor] Retrying job: ${payload.applicationId}`);
await redisClient.rPush(QUEUE_KEY, JSON.stringify(payload));
} else {
console.log(`[Processor] Job permanently failed: ${payload.applicationId}`);
}
}
} catch (error) {
console.error(`[Processor] Unexpected error:`, error);
}
};
// ---- Example Usage ----
(async () => {
// enqueue some jobs
await enqueueAdoptionJob({
applicationId: 'job1',
applicantName: 'Alice',
catName: 'Whiskers',
});
await enqueueAdoptionJob({
applicationId: 'job2',
applicantName: 'Bob',
catName: 'Mittens',
});
// continuously process jobs
while (true) {
await processAdoptionJob();
}
})();
👉 With this architecture, heavy traffic AI workloads can be supported by dynamically scaling Processor and Notifier workers, while the Core Backend remains lean, serverless, and responsive.
Top comments (0)