Introduction
Rate limitation refers to restricting the number of times a specific action can be performed within a certain time frame. For example, an API might have rate limitations restricting user or app requests within a given period. This helps prevent server overload, ensures fair usage, and maintains system stability and security.
Rate limitation is also a challenge for the apps that encounter it, as it requires to “slow down” or pause. Here’s a typical scenario:
Initial Request: When the app initiates communication with the API, it requests specific data or functionality.
API Response: The API processes the request and responds with the requested information or performs the desired action.
Rate-Limitation: If the app has reached the limit, it will usually need to wait until the next designated time frame (like a minute to an hour) before making additional requests. If it is a “soft” rate limitation and timeframes are known and linear, it’s easier to handle. Often, the waiting time climbs and increases in every block, requiring a whole different and custom handling per each API.
Handling Rate Limit Exceedances: If the app exceeds the rate limit, it might receive an error response from the API (such as a “429 Too Many Requests” status code). The app needs to handle this gracefully, possibly by queuing requests, implementing backoff strategies (waiting for progressively more extended periods before retrying), or informing the user about the rate limit being reached.
To effectively operate within rate limitations, apps often incorporate strategies like:
Throttling: Regulating the rate of outgoing requests to align with the API’s rate limit.
Caching: Storing frequently requested data locally to reduce the need for repeated API calls.
Exponential Backoff: Implementing a strategy where the app waits increasingly longer between subsequent retries after hitting a rate limit to reduce server load and prevent immediate retries.
Queue? More in the next section
Using a queue
A queue serves as an excellent “sidekick” or tool for helping services manage rate limitations due to its ability to handle tasks systematically. However, while it offers significant benefits, it’s not a standalone solution for this purpose.
In constructing a robust architecture, the service or app used to interact with an external API subject to rate limitations often handles tasks asynchronously. This service is typically initiated by tasks derived from a queue. When the service encounters a rate limit, it can easily return the job to the main queue, or assign it to a separate queue designated for delayed tasks, and revisit it after a specific waiting period, say X seconds.
This reliance on a queue system is highly advantageous, primarily because of its temporary nature and ordering. However, the queue alone cannot fully address rate limitations; it requires additional features or help from the service itself to effectively handle these constraints.
Challenges may arise when utilizing a queue:
- Tasks re-entering the queue might return earlier than necessary, as their timing isn’t directly controlled by your service.
- Exceeding rate limitations due to frequent calls within restricted timeframes. This may necessitate implementing sleep or wait mechanisms, commonly considered poor practice due to their potential impact on performance and responsiveness.
Here is what it will look like with RabbitMQ:
const amqp = require('amqplib');
const axios = require('axios');
// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {
try {
const response = await axios.get(url);
console.log('API Response:', response.data);
} catch (error) {
console.error('API Error:', error.message);
}
}
// Connect to RabbitMQ server
async function connect() {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'rateLimitedQueue';
channel.assertQueue(queue, { durable: true });
// Consume messages from the queue
channel.consume(queue, async msg => {
const { url, delayInSeconds } = JSON.parse(msg.content.toString());
// Simulating rate limitation
await new Promise(resolve => setTimeout(resolve, delayInSeconds * 1000));
await makeAPICall(url); // Make the API call
channel.ack(msg); // Acknowledge message processing completion
});
} catch (error) {
console.error('RabbitMQ Connection Error:', error.message);
}
}
// Function to send a message to the queue
async function addToQueue(url, delayInSeconds) {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'rateLimitedQueue';
channel.assertQueue(queue, { durable: true });
const message = JSON.stringify({ url, delayInSeconds });
channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
console.log('Task added to the queue');
} catch (error) {
console.error('RabbitMQ Error:', error.message);
}
}
// Usage example
addToQueue('https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds
// Start the consumer
connect();
Or with Kafka
const { Kafka } = require('kafkajs');
const axios = require('axios');
// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {
try {
const response = await axios.get(url);
console.log('API Response:', response.data);
} catch (error) {
console.error('API Error:', error.message);
}
}
// Kafka configuration
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // Replace with your Kafka broker address
});
// Create a Kafka producer
const producer = kafka.producer();
// Connect to Kafka and send messages
async function produceToKafka(topic, message) {
await producer.connect();
await producer.send({
topic,
messages: [{ value: message }],
});
await producer.disconnect();
}
// Create a Kafka consumer
const consumer = kafka.consumer({ groupId: 'my-group' });
// Consume messages from Kafka topic
async function consumeFromKafka(topic) {
await consumer.connect();
await consumer.subscribe({ topic });
await consumer.run({
eachMessage: async ({ message }) => {
const { url, delayInSeconds } = JSON.parse(message.value.toString());
// Simulating rate limitation
await new Promise(resolve => setTimeout(resolve, delayInSeconds * 1000));
await makeAPICall(url); // Make the API call
},
});
}
// Usage example - Sending messages to Kafka topic
async function addToKafka(topic, url, delayInSeconds) {
const message = JSON.stringify({ url, delayInSeconds });
await produceToKafka(topic, message);
console.log('Message added to Kafka topic');
}
// Start consuming messages from Kafka topic
const kafkaTopic = 'rateLimitedTopic';
consumeFromKafka(kafkaTopic);
// Usage example - Adding messages to Kafka topic
addToKafka('rateLimitedTopic', 'https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds
Both approaches are legitimate, yet they necessitate your service to incorporate a ‘sleep’ mechanism.
With Memphis, you can offload the delay from the client to the queue using a simple feature made
just for that purpose and called “Delayed Messages”. Delayed messages allow you to send a received message back to the broker when your consumer application requires extra processing time.
What sets apart Memphis’ implementation is the consumer’s capability to control this delay independently and atomically.
Within the station, the count of unconsumed messages doesn’t impact the consumption of delayed messages. For instance, if a 60-second delay is necessary, it precisely configures the invisibility time for that specific message.
Memphis.dev Delayed Messages
- message is received by the consumer group.
- An event occurs, prompting the consumer group to pause processing the message.
- Assuming the
maxMsgDeliveries
hasn’t hit its limit, the consumer will activatemessage.delay(delayInMilliseconds)
, bypassing the message. Instead of immediately reprocessing the same message, the broker will retain it for the specified duration. - The subsequent message will be consumed.
- Once the requested delayInMilliseconds has passed, the broker will halt the primary message flow and reintroduce the delayed message into circulation.
const { memphis } = require('memphis-dev');
// Function to make API requests, simulating rate limitations
async function makeAPICall(message)
{
try {
const response = await axios.get(message.getDataAsJson()['url']);
console.log('API Response:', response.data);
message.ack();
} catch (error) {
console.error('API Error:', error.message);
console.log("Delaying message for 1 minute");
message.delay(60000);
}
}
(async function () {
let memphisConnection;
try {
memphisConnection = await memphis.connect({
host: '<broker-hostname>',
username: '<application-type username>',
password: '<password>'
});
const consumer = await memphisConnection.consumer({
stationName: '<station-name>',
consumerName: '<consumer-name>',
consumerGroup: ''
});
consumer.setContext({ key: "value" });
consumer.on('message', (message, context) => {
await makeAPICall(url, message);
});
consumer.on('error', (error) => { });
} catch (ex) {
console.log(ex);
if (memphisConnection) memphisConnection.close();
}
})();
Wrapping up
Understanding and adhering to rate limitations is crucial for app developers working with APIs. It involves managing request frequency, handling errors when limits are reached, implementing backoff strategies to prevent overloading the API servers, and utilizing rate limit information provided by the API to optimize app performance, and now you know how to do it with a queue as well!
Head to our blog or [docs(https://docs.memphis.dev/memphis/getting-started/readme) for more examples like that!
Join 4500+ others and sign up for our data engineering newsletter.
Originally published at Memphis.dev By Idan Asulin, Co-Founder & CTO at @Memphis.dev.
Follow Us to get the latest updates!
Github • Twitter • Discord
Top comments (0)