The problem
Imagine we develop a SaaS platform that analyzes users' activity data - think fraud detection, behavioral analytics, compliance reporting. Our task is to create the integration pipeline that ingests and normalizes raw client data before it reaches those systems.
Our clients collect data, store it in CSV files, and upload them to their S3 buckets. Then they create a mapping for their data in our application - select a datatype for each field, some of them can be serialized JSON objects, others parsed with complex regular expressions or base64 decoded, etc. Our task is to download these files, parse each row, apply custom mapping, find corrupted records, and store the converted data on our side.
From a bird's-eye view the whole integration task is nothing else but:
In more detail it works like this:
- Client stores CSV files in the S3 bucket. Each row represents one item
- Client gives us credentials for accessing these files
- Client creates mapping for rows
- Client schedules integration runs
When scheduled time comes we execute the integration performing the next steps:
- push task to a queue
- receive task by the integration pipeline
- download file from client's S3
- get mapping
- apply mapping per row
- store new file to our S3 for further processing (uploading to the database, used by analytics tools, etc.)
Code is:
const rawData = await downloadFile(file.url);
const processed = applyMapping(rawData, clientMapping);
await saveToS3(file.s3Key, processed);
Introducing a message queue (SQS, RabbitMQ, any) helps with a few things:
- pure asynchronous execution
- queue's messages can be dispatched between any number of consumers so scaling is flexible
- acknowledgment mechanism of queue can be utilized for managing retries
- queues are great for managing backpressure
Message in queue can be something like this:
{
"clientConnection": { ...S3Credentials, bucketName, etc. },
"mappingId": ...
"pathToCSV": ...
}
At this point we observe two issues:
- if file size is relatively big (> 100 MB) we are starting to abuse RAM;
- we split the processing into distinct phases (download, map, upload) which are sequential, and that increases execution time.
But both issues are quite easy to fix - we just need to introduce streams and process data chunk by chunk, row by row.
Streams are good for two reasons:
- they allow us to keep memory usage flat by processing data chunk-by-chunk. All blob storages have streaming APIs as well as many HTTP-based services;
- they handle backpressure automatically and data flows from source to destination through mapping without any issues even if there are some network delays or CPU-intensive processing.
Code is nothing more than:
import { pipeline } from 'node:stream/promises';
import { createDownloadStream } from './download';
import { createMappingTransform } from './mapping';
import { createS3UploadStream } from './upload';
await pipeline(
createDownloadStream(file.url),
createMappingTransform(clientMapping),
createS3UploadStream(file.s3Key),
);
And that's it. Many production systems work fine with this simple architecture. For sure, a few more things should be added like failure handling and reporting results, but overall it is a valid way to build a typical integration pipeline.
We delivered the integration code packaged as AWS Lambda and started monitoring the system.
Problems started to show up.
The first issue arose when the integration started to fail because some client's CSV files were really big and it took a lot of time just to transfer gigabytes of data over the network. Lambdas had limited execution time and we just didn't fit within this timeframe.
We made a decision to ship our code as a regular service to good old EC2 instances.
It worked fine until...
The second issue arose when one client requested batch processing. Their CSV files were huge and they had split them into multiple. They wanted the integration to download all of them, but treat the whole bunch as one logical unit.
Our intuition told us that we could just process all files in parallel and combine the execution results:
const results = await Promise.all(
files.map(async file => {
await pipeline(
createDownloadStream(file.url),
createMappingTransform(clientMapping),
createS3UploadStream(file.s3Key),
);
return { fileId: file.id, status: 'ok' };
})
);
It worked great because for each file most of the time V8 engine was in the idle state - waiting for network responses - so we could process multiple files in parallel without increasing processing time.
But then we had clients with a huge number of fields that required some parsing: JSON strings, base64 encoded text, complex regular expressions. The monitoring showed that processing time was increasing and Node.js process started to use noticeable CPU time.
The worst thing was that we were paying for a multicore EC2 instance but Node.js process was struggling using only one core.
Where Node.js gets stuck
To understand what's going on we have to set our business logic aside for a bit and think how Node.js handles CPU-intensive tasks.
We all know that Node.js is single-threaded but it's only partially true.
First of all let's recall that Node.js consists of three parts:
- V8 - JavaScript engine responsible for memory management (Heap, Garbage Collector) and code execution (Call Stack)
- libuv - asynchronous operations handler (Event loop, thread pool, async I/O)
- Node.js API - bridge between V8 and libuv
Our application's JS code (business logic) is executed by V8 which is single-threaded and shines for I/O operations when all the heavy lifting is done by libuv and its C/C++ modules.
Libuv maintains a thread pool (configurable via UV_THREADPOOL_SIZE). Many standard Node.js modules - zlib, crypto, file system operations - delegate work to this pool via C/C++ libraries. So, even if we perform some heavy encryption or compression, V8's event loop stays free.
If a task requires only I/O operations without loading the CPU, Node.js already handles it well. Downloading and uploading multiple files in parallel is done by libuv and doesn't make the event loop busy, so for regular cases wrapping pipelines into Promise.all() would be enough.
However, our issue is rooted in the fact that we are going to perform CPU-intensive mapping on the V8 side and it blocks the single thread. Libuv can't help us with this because there are no native addons for our business logic.
And here we have the core question - how to optimize our pipeline for CPU-intensive operations and free up the main thread?
The obvious fix that doesn't help
Node.js provides a few built-in ways to execute CPU-intensive tasks outside the main thread:
- Worker Threads
- Clusters
- Child Processes (the low-level tool used by both above)
We will consider using Worker Threads as the best choice for our goals.
Worker Thread is a mechanism that allows creating a separate V8 isolate with its own event loop in a separate OS thread. The great thing about it is that the main process can pass the data to each Worker Thread, and on top of that, Worker Threads can send the data back to the main thread as well as notify it about errors and finishing of execution.
The simplest example of the integration pipeline execution can be:
import { Worker, isMainThread, workerData, parentPort } from 'node:worker_threads';
import { pipeline } from 'node:stream/promises';
import { createDownloadStream } from './download';
import { createMappingTransform } from './mapping';
import { createS3UploadStream } from './upload';
// This file runs twice: once as the main thread, once inside the worker.
// isMainThread tells us which context we're in.
if (isMainThread) {
function runWorker(file: File, mapping: Mapping): Promise<void> {
return new Promise((resolve, reject) => {
// Spawns a new worker thread running this same file.
// workerData is how we pass data into the worker at startup.
const worker = new Worker(__filename, {
workerData: { file, mapping }
});
// Workers communicate back via messages and error events
worker.on('message', resolve);
worker.on('error', reject);
});
}
await runWorker(file, clientMapping);
} else {
// Inside the worker thread: workerData contains what we passed in above
const { file, mapping } = workerData;
try {
await pipeline(
createDownloadStream(file.url),
createMappingTransform(mapping),
createS3UploadStream(file.s3Key),
);
// Signal the main thread that we're done
parentPort!.postMessage('done');
} catch (err) {
throw err;
}
}
For processing multiple files the naive solution is to create a worker per file:
import { Worker } from 'node:worker_threads';
await Promise.all(
files.map(file => new Promise<void>((resolve, reject) => {
const worker = new Worker('./pipeline.ts', {
workerData: { file, mapping: clientMapping }
});
worker.on('message', resolve);
worker.on('error', reject);
}))
);
and pipeline.ts is:
import { workerData, parentPort } from 'node:worker_threads';
const { file, mapping } = workerData;
try {
await pipeline(
download(file.url), // readable stream
transform(mapping), // transform stream
upload(file.s3Key), // writable stream
);
parentPort!.postMessage('done');
} catch (err) {
throw err;
}
However, this solution isn't optimal if the number of open worker threads is higher than the number of CPU cores. For 8 files we start 8 workers but EC2 has only 4 cores - worker threads start to compete for cores and we get context switching overhead. However, it is not the worst part. The worst part is that each Worker Thread spins up a full V8 Isolate with its own heap and its own event loop. It might not be an issue if the number of files is low, but with high numbers the overhead becomes real.
Two-level concurrency
And here we are for building a final solution. Let's first illustrate it with the next diagram:
As we discussed before, there is no need to create more Worker Threads than the number of CPU cores, so we can create exactly the same number N = num(CPU cores). It is our concurrency Level 1.
All N workers can be created during the service boot and wait for a task from the Main Thread.
If the number of files is higher than the number of workers N we can round-robin them between workers equally. Each worker then processes its assigned files through an async pool of M concurrent pipelines - this is our concurrency Level 2. While Level 1 spreads work across CPU cores, Level 2 keeps each core busy by overlapping I/O wait across multiple files within a single worker.
We can't just Promise.all() the files - if a worker gets 25 files, that's 25 pipelines fighting for memory and file descriptors at once. An async pool caps concurrency at M simultaneous pipelines per worker, starting the next one only when a slot frees up. When one pipeline waits for a network ACK, another executes CPU-intensive mapping and vice versa.
Let's illustrate this process with a diagram for 6 files and 2 Worker Threads:
Notice that when one pipeline uses the CPU power for performing mapping, we still can download or upload data over HTTP - it is handled by libuv threads outside of the Worker Thread's event loop.
As a result we can:
- process CPU-heavy pipelines for multiple files with real CPU cores parallelization
- utilize multicore server resources effectively
- as files are processed in parallel, we can drastically reduce execution time
- as parallel execution happens inside one Node.js process, orchestration and error handling would be much easier than if it were distributed between multiple processes or even machines
Keep in mind that each worker runs its own V8 isolate - that's roughly 10–30MB of baseline memory per worker, regardless of workload. With N workers, that cost is part of the process's total RSS. On a 4-core instance, you're looking at 40–120MB just for the isolates before any actual data flows through.
When Main Thread pulls a scheduled integration run message from a queue it fetches a mapping, S3 credentials and spreads the work between Worker Threads. It waits until all workers report that the job succeeded and sends the ACK to the queue.
Files in a batch represent the same dataset - think of a single day's activity split across multiple files for size reasons. If we ACK after processing only half of them, the queue considers the job done, but the downstream system gets an incomplete picture. For some clients, partial data is worse than no data at all. So the rule is simple: ACK fires only after every worker reports success. If anything fails, the message stays in the queue for a retry or lands in a dead-letter queue for investigation.
This is also why we keep a single batch inside a single process. Splitting one batch across multiple EC2 instances adds distributed coordination - every instance must agree on success, handle partial failures, and clean up after crashes. A single multi-core machine already handles the load. If multiple batches arrive at the same time, they queue up - each instance processes one batch at a time and picks up the next one when it's done.
Since N always equals the number of CPU cores, you can size the EC2 instance based on how many files a client's batch typically contains - a 2-core instance for small batches, a 16-core instance for large ones. No code changes required, and it could help optimize infrastructure costs.
Putting it together
The Main Thread will look something like this:
// (Main Thread)
const N = os.availableParallelism();
// Boot N workers once at process start
const workers = Array.from({ length: N }, () => new Worker('./worker.ts'));
// Listen for queue messages
queue.on('message', async (message) => {
const { files, mappingId, credentials } = message;
// Fetch mapping once - workers don't do this themselves
const mapping = await mappingService.get(mappingId);
// Distribute files across workers via round-robin
const slices = roundRobin(files, N);
// Dispatch each slice and wait for all workers to finish
await Promise.all(
workers.map((worker, i) =>
dispatch(worker, { files: slices[i], mapping, credentials })
)
);
// All workers done - safe to ACK
queue.ack(message);
});
function dispatch(worker, payload): Promise<void> {
return new Promise((resolve, reject) => {
worker.postMessage(payload);
worker.once('message', resolve);
worker.once('error', reject);
});
}
And Worker Thread will be updated with listening to a message from the Main Thread:
// (Worker Thread)
parentPort.on('message', async ({ files, mapping, credentials }) => {
// Process assigned files as a pool of M concurrent pipelines
// asyncPool(concurrency, items, fn) — runs at most M pipelines simultaneously
// (e.g. tiny-async-pool, or a custom implementation)
await asyncPool(M, files, (file) =>
pipeline(
createDownloadStream(file.url, credentials),
createMappingTransform(mapping),
createS3UploadStream(file.s3Key),
)
);
parentPort.postMessage({ status: 'done' });
});
This gives an idea how to start building CPU-bound integration pipelines avoiding computation bottleneck.
What this architecture does not solve (and why that's fine)
I intentionally omit a few important parts that will be crucial for a production-ready implementation - they are out of scope of the article and highly depend on your current setup, business needs and preferences. However, let's briefly discuss some of them and mention possible solutions:
- dealing with corrupted rows (could be a client's business choice):
- logging them and reporting to client after integration is executed
- failing the whole integration and then reporting
- clean up if integration failed:
- removing corrupted S3 files or using
abortMultipartUpload - sending ACK to the queue but reporting failure or retrying the whole process
- removing corrupted S3 files or using
- storing credentials to the client's storage:
- specialized service
- secrets vault
- sending scheduled integration to a queue:
- Amazon EventBridge Scheduler
- Apache Airflow
- Temporal
- and many other schedulers.
Where else this pattern applies
As a conclusion I'd like to mention when this pattern can be applied and when it doesn't make a lot of sense to implement it.
The main benefit of this solution is parallelization of CPU-bound tasks and it can be useful for:
- Complex business rules — scoring, pricing engines, eligibility checks, fraud detection running across hundreds of thousands of rows
- Statistical computation — rolling windows, percentile calculations, custom aggregations over large in-memory datasets.
- Text processing — heavy regex work, tokenization, parsing of custom or proprietary formats.
- Large JSON processing —
JSON.parseandJSON.stringifyrun synchronously in V8. For small payloads this is negligible, but deserializing a 50MB blob will block noticeably. - Templates rendering — generating thousands of HTML emails or reports in a tight loop.
If you suspect your pipeline is doing heavy computations - profile first, and if CPU load is constant and high, consider implementing something similar to the solution above. If you just use Node.js built-in modules like crypto or zlib - they already solved the issue by delegating work to libuv's C/C++ addons and don't block the event loop. Always worth checking before implementing.
Even if you don't find the described implementation useful, I hope you've got at least some intuition how to move big files over the network, use Streams, Worker Threads and how some parts of Node.js work.







Top comments (0)