Semaphores are synchronization primitives used to control access to shared resources in concurrent programming. They help coordinate multiple threads, processes, or asynchronous functions, ensuring that operations on shared resources do not conflict and cause unexpected behavior such as race conditions.
What Are Semaphores?
A semaphore is essentially a counter that regulates how many processes or threads can access a particular resource concurrently. Before accessing a resource, a process "acquires" the semaphore (decrementing the counter). After finishing with the resource, it "releases" the semaphore (incrementing the counter). If the counter is zero when a process tries to acquire it, the process must wait until the semaphore is released.
Types of Semaphores
Binary Semaphore (Mutex):
A binary semaphore can have only two states - locked (0) or unlocked (1). It is used to ensure mutual exclusion (mutex) for critical sections. Only one process can hold the semaphore at a time. Real-life use cases include protecting shared files or database writes where only one operation should occur at any given time.Counting Semaphore:
A counting semaphore can hold a value greater than one, which allows multiple processes to access a resource concurrently, but with a fixed upper limit. This is useful for rate limiting or managing connection pools.
Real-Life Use Cases
Binary Semaphore:
Use this when a shared resource must be accessed by only one process at a time. For example, in a database system, if two concurrent writes occur on a resource that cannot handle them simultaneously, a binary semaphore can serialize the writes, thus ensuring data integrity.Counting Semaphore:
Use this to limit the number of concurrent operations. For instance, when dealing with API calls to an external service that has a rate limit, a counting semaphore can ensure that only a fixed number of requests are sent at any one time.
Implementing Semaphores in TypeScript
Below are two basic TypeScript classes implementing a binary semaphore and a counting semaphore without using any external libraries. For actual production use, consider using packages like async-mutex
. But we'll build it ourselves for now to understand how they work.
// BinarySemaphore.ts
class BinarySemaphore {
private locked = false;
private waiters: Array<() => void> = [];
async acquire(): Promise<void> {
// If lock is free, take it immediately
if (!this.locked) {
this.locked = true;
return;
}
// Otherwise, return a promise that resolves when the lock is released
return new Promise<void>((resolve) => {
this.waiters.push(resolve);
}).then(() => {
this.locked = true;
});
}
release(): void {
// If there are waiting requests, resolve the first one
if (this.waiters.length > 0) {
const nextResolve = this.waiters.shift();
// resolve function sitting at the front of the queue (FIFO) is called, and lock is acquired again
if (nextResolve) nextResolve();
} else {
// No waiting promises, so mark the lock as free
this.locked = false;
}
}
}
// CountingSemaphore.ts
class CountingSemaphore {
private count: number;
private waiters: Array<() => void> = [];
constructor(count: number) {
if (count < 1) {
throw new Error("Semaphore count must be at least 1");
}
this.count = count;
}
async acquire(): Promise<void> {
if (this.count > 0) {
this.count--;
return;
}
return new Promise<void>((resolve) => {
this.waiters.push(resolve);
});
}
release(): void {
// If there are waiters, give the permit immediately.
if (this.waiters.length > 0) {
const nextResolve = this.waiters.shift();
if (nextResolve) nextResolve();
} else {
// No waiting acquire requests, so increment available count.
this.count++;
}
}
}
In these implementations, the BinarySemaphore leverages a queue to store waiting promises. When a process requests the lock, if it's available, it's granted immediately. If not, the process is added to the waiting queue, and its promise resolves once the lock is released. Similarly, the CountingSemaphore manages both a counter of available permits and a waiting queue for incoming requests. If a permit is available, the counter is decremented and access is granted right away. Otherwise, the request is queued until another process releases a permit, ensuring efficient and orderly access without unnecessary polling.
Use Cases
Managing Concurrent API Requests in Node.js
When working with Node.js applications that interact with external APIs, itβs crucial to avoid overwhelming those services with too many concurrent requests. A counting semaphore can limit the number of simultaneous API calls, ensuring that you adhere to rate limits and prevent potential downtime or throttling.
Implementation
// apiRequestManager.ts
const semaphore = new CountingSemaphore(3); // Allow a maximum of 3 concurrent API calls.
async function fetchData(apiUrl: string): Promise<void> {
await semaphore.acquire();
try {
console.log(`Fetching from ${apiUrl}...`);
// Simulate API call with a delay.
await new Promise((resolve) => setTimeout(resolve, 1000));
console.log(`Finished fetching from ${apiUrl}`);
} finally {
semaphore.release();
}
}
// Simulating multiple API requests.
async function runApiRequests() {
const apiUrls = [
"https://api.example.com/data1",
"https://api.example.com/data2",
"https://api.example.com/data3",
"https://api.example.com/data4",
"https://api.example.com/data5",
];
await Promise.all(apiUrls.map(url => fetchData(url)));
}
runApiRequests();
Output
Fetching from https://api.example.com/data1...
Fetching from https://api.example.com/data2...
Fetching from https://api.example.com/data3...
[1 second later]
Finished fetching from https://api.example.com/data1
Fetching from https://api.example.com/data4...
[1 second later]
Finished fetching from https://api.example.com/data2
Fetching from https://api.example.com/data5...
[1 second later]
Finished fetching from https://api.example.com/data3
[1 second later]
Finished fetching from https://api.example.com/data4
[1 second later]
Finished fetching from https://api.example.com/data5
Explanation
In this example, we create a counting semaphore with a maximum count of 3, meaning only three API requests can run at the same time. The fetchData
function acquires the semaphore before making an API call (simulated with a delay), and releases it afterward. This mechanism prevents the system from making more than three concurrent API calls, ensuring external services are not overloaded.
Preventing Race Conditions in Shared Resources
Race conditions occur when multiple processes attempt to modify the same resource concurrently, leading to inconsistent or corrupted data. By using a binary semaphore, you can ensure that only one process accesses the critical section at a time, such as writing to a file or updating a database.
Implementation
// sharedResourceManager.ts
const mutex = new BinarySemaphore();
async function writeToDatabase(record: string): Promise<void> {
await mutex.acquire();
try {
console.log(`Writing record: ${record}`);
// Simulate database write delay.
await new Promise((resolve) => setTimeout(resolve, 500));
console.log(`Finished writing record: ${record}`);
} finally {
mutex.release();
}
}
// Simulating concurrent database writes.
async function performDatabaseWrites() {
const records = ["record1", "record2", "record3", "record4"];
await Promise.all(records.map(record => writeToDatabase(record)));
}
performDatabaseWrites();
Output
Writing record: record1
[0.5 seconds later]
Finished writing record: record1
Writing record: record2
[0.5 seconds later]
Finished writing record: record2
Writing record: record3
[0.5 seconds later]
Finished writing record: record3
Writing record: record4
[0.5 seconds later]
Finished writing record: record4
Explanation
This example demonstrates using a binary semaphore to serialize access to a critical section. The writeToDatabase
function simulates a database write operation. Before the operation starts, the function acquires the binary semaphore to ensure no other write occurs simultaneously. After the operation is complete, the semaphore is released. This prevents concurrent writes that could otherwise lead to race conditions and data corruption.
Building a Task Queue in TypeScript
In scenarios such as background job processing, you may need to process tasks concurrently, but only up to a certain limit. A counting semaphore can be used to create a task queue where only a fixed number of tasks are processed at the same time, ensuring that system resources are not exhausted.
Implementation
// taskQueue.ts
const jobSemaphore = new CountingSemaphore(2); // Allow a maximum of 2 concurrent tasks.
interface Job {
id: number;
payload: string;
}
const jobQueue: Job[] = [
{ id: 1, payload: "Job payload 1" },
{ id: 2, payload: "Job payload 2" },
{ id: 3, payload: "Job payload 3" },
{ id: 4, payload: "Job payload 4" }
];
async function processJob(job: Job): Promise<void> {
await jobSemaphore.acquire();
try {
console.log(`Processing job ${job.id}: ${job.payload}`);
// Simulate job processing time.
await new Promise((resolve) => setTimeout(resolve, 1500));
console.log(`Completed job ${job.id}`);
} finally {
jobSemaphore.release();
}
}
async function runJobQueue() {
const processingJobs = jobQueue.map(job => processJob(job));
await Promise.all(processingJobs);
}
runJobQueue();
Output
Processing job 1: Job payload 1
Processing job 2: Job payload 2
[1.5 seconds later]
Completed job 1
Processing job 3: Job payload 3
[1.5 seconds later]
Completed job 2
Processing job 4: Job payload 4
[1.5 seconds later]
Completed job 3
[1.5 seconds later]
Completed job 4
Explanation
In this task queue example, a counting semaphore with a maximum count of 2 is used to control the number of concurrently processed jobs. The job queue is represented as an array of tasks. The processJob
function processes each job after acquiring the semaphore. This ensures that only two jobs are processed simultaneously, preventing resource exhaustion. Once a job is completed, the semaphore is released, allowing the next job in the queue to begin processing.
Conclusion
Semaphores are essential tools in concurrent programming. They provide a method to control access to shared resources, prevent race conditions, and manage limited system resources effectively. Whether you need to limit API calls, serialize database writes, or control the flow of background tasks, implementing semaphores in your projects can lead to more robust and efficient applications.
By understanding the theory behind semaphores and seeing practical examples in TypeScript, you can start applying these techniques to your own projects, ensuring that even in highly concurrent environments, your application remains stable and predictable.
Top comments (0)