When developing applications that rely on multiple AI agents, performance and security become massive challenges. AI tasks are heavy and take time. If we process them on the main application thread, the whole system freezes.
To solve this, I built AegisPipeline a lightweight, event driven Java engine. It uses a Scatter Gather architectural pattern to run AI tasks in the background safely and efficiently.
Let’s break down the main layers, the Security Gateway, the Orchestrator, the Dispatcher, and the Job Tracker, and see how they interact to process files.
1. Security & Ingestion (The Gateway)
This is the entry point for user uploads. We cannot simply trust user-provided file extensions (like .pdf or .png) because hackers can easily spoof them to upload malicious scripts.
How it works: This layer uses Apache Tika to read the file’s “magic bytes” (the raw binary signature) to determine its true MIME type.
Example: If a user uploads a file named document.pdf but it's actually an executable script, the Gateway detects the mismatch, fails fast, and rejects the payload before it ever reaches our AI agents. It also generates a random UUID for the filename to prevent Path Traversal attacks.
2. Core Orchestrator (The Brain)
Once a file is secured and stored, the Orchestrator takes over. It contains the JobManager and the WorkflowStrategy. It acts as the "traffic controller" for AI tasks.
How it works: Instead of hardcoding which AI agent processes which file, the Orchestrator looks at the validated file type and dynamically selects a strategy.
Example: If the file is a binary (application/x-binary), the Orchestrator selects the SecurityScanStrategy, which knows it needs both a VIRUS_SCANNER agent and a STATIC_ANALYZER agent.
3. Task Dispatcher (The Muscle)
The dispatcher is where the magic of asynchronous processing happens. It pushes the heavy lifting off the main application thread so the system remains fast and responsive for other users.
How it works: It uses Java’s ExecutorService (a fixed thread pool) and CompletableFuture to scatter the tasks to different AI agents simultaneously.
// We use CompletableFuture to push the heavy AI processing to a background thread.
// This frees up the main application thread to accept more user uploads.
CompletableFuture.supplyAsync(() -> {
try {
// The specific AI agent executes its heavy analysis here
return agent.executeAnalysis(secureFilePath);
} catch(Exception e) {
System.err.println("Agent failed: " + e.getMessage());
return 0.0; // Fail-safe return
}
}, aiThreadPool)
4. Job Tracker (The Memory)
Because multiple AI agents are processing data at the exact same time on different threads, we need a highly secure way to store their results without data corruption.
How it works: The JobTracker acts as an in-memory database using a ConcurrentHashMap. It uses an AtomicInteger to count how many tasks have finished.
Example: When the VIRUS_SCANNER finishes, it saves its score here. Because we use atomic operations, if two agents finish at the exact same millisecond, the system still counts them perfectly.
Let’s look at an example: How all these layers work together
Here is the exact flow when a user uploads a file for AI analysis:
- Client Upload -> The user uploads a file to the system.
- Security Gateway -> The SecureIngestionService validates the magic bytes, assigns a safe UUID, and saves the file via the StorageProvider.
- Core Orchestrator -> The JobManager finds the matching WorkflowStrategy for the file type to see which AI agents are needed.
- Job Tracker -> Initializes a new job session, locking in the exact number of AI agents we expect to hear back from.
- Task Dispatcher (Scatter) -> Throws the tasks into the background ExecutorService thread pool.
- AI Agents -> The required agents analyze the file in parallel.
- Job Tracker (Gather) -> Each agent reports its score back to the thread-safe map.
- Core Orchestrator (Fusion) -> Once the final agent finishes, the WorkflowStrategy calculates a final weighted probability score and cleans up the memory.
Find it on -> GitHub


Top comments (0)