This is part of a series of posts talking about FIFO and techniques for IPC(inter process comunication) today I'm gonna explore IPC between parent and child processes for asynchronous programming, one problem that arises when you fork a process is communication between the parent process and the child process or between child processes, usually this is solved in the standard library of the language or a third party library, node's exec
and spawn
for example handles communication between the process for you using something very similar with what we're gonna do, But how does that work and what can we do if we want to have more control over it, Let's explore some options.
Simple Queue Spawning Multiple Processes Without Error handling
The code
I always like to start with the code, because reading the code will give you a general idea of what's going on, the following is very simple ruby code, it's written in Ruby but the idea can be used in any language, and the point is the idea not the implementation.
read_from_fifo
reads from the fifo file line by line, push_to_fifo
writes a line to the fifo file and process_queue
reads from the input file forks the process passes the url that was read from the fifo file to the child process, the child process makes a http
request and writes the result to an output fifo file, since process_queue
is a recursive function that runs forever, any new line added to the fifo file will be processed.
require 'uri'
require 'net/http'
require 'json'
OUTPUT_FIFO = './output.fifo'
INPUT_FIFO = './input.fifo'
unless File.exist?(OUTPUT_FIFO)
File.mkfifo(OUTPUT_FIFO)
File.chmod(0666, OUTPUT_FIFO)
end
unless File.exist?(INPUT_FIFO)
File.mkfifo(INPUT_FIFO)
File.chmod(0666, INPUT_FIFO)
end
def read_from_fifo(fifo)
rpipe = File.open(fifo, 'r+')
res = ""
while (r = rpipe.read(1)) != "\n"
res << r
end
res
end
def push_to_fifo(endpoint, fifo)
wpipe = File.open(fifo, 'w+')
wpipe.write "#{endpoint}\n"
wpipe.flush
end
def process_queue()
rpipe = File.open(INPUT_FIFO, 'r+')
url = ""
while (r = rpipe.read(1)) != "\n"
url << r
end
fork do
puts "Processing: #{url} PID: #{Process.pid}"
uri = URI(url)
res = Net::HTTP.get_response(uri)
puts JSON.parse(res.body)['quote']
push_to_fifo(JSON.parse(res.body)['quote'], OUTPUT_FIFO)
end
process_queue
end
push_to_fifo('https://api.kanye.rest/', INPUT_FIFO)
push_to_fifo('https://api.kanye.rest/', INPUT_FIFO)
push_to_fifo('https://api.kanye.rest/', INPUT_FIFO)
process_queue()
Above is a terminal running the program, and below is another terminal where I pushed the url to the fifo file, the following is my attempt at drawing what is happening in more detail
Simple Queue Spawning Multiple Processes With Error Handling
One thing that was missing from our last implementation was error handling, if something goes wrong in the process that tries to make the GET request, the message is lost, the process is going to crash or hang forever we're not gonna try again or get any response.
We want to improve the resilience or our application and make it more fault tolerant.
What we can do in this situation is create another queue to process errors, also called a dead letter queue, any time we have a error processing a message we push the message to the dead letter queue to be processed later.
The Code
To do that we just need to push the error to the dead letter queue and to also process the dead letter, to do add we add another process just to process the dead letter queue, both processes, the main that processes the input queue and the process responsible for the dead letter queue run at the same time and wait for messages to get to the queue.
require 'uri'
require 'net/http'
require 'json'
OUTPUT_FIFO = './output.fifo'
INPUT_FIFO = './input.fifo'
DEADLETTER_FIFO = './deadletter.fifo'
unless File.exist?(OUTPUT_FIFO)
File.mkfifo(OUTPUT_FIFO)
File.chmod(0666, OUTPUT_FIFO)
end
unless File.exist?(INPUT_FIFO)
File.mkfifo(INPUT_FIFO)
File.chmod(0666, INPUT_FIFO)
end
unless File.exist?(DEADLETTER_FIFO)
File.mkfifo(DEADLETTER_FIFO)
File.chmod(0666, DEADLETTER_FIFO)
end
def read_from_fifo(fifo)
rpipe = File.open(fifo, 'r+')
res = ""
while (r = rpipe.read(1)) != "\n"
res << r
end
res
end
def push_to_fifo(endpoint, fifo)
wpipe = File.open(fifo, 'w+')
wpipe.write "#{endpoint}\n"
wpipe.flush
end
def process_deadletter()
puts "Waiting for messages in the dead letter queue"
rpipe = File.open(DEADLETTER_FIFO, 'r+')
url = ""
while (r = rpipe.read(1)) != "\n"
url << r
end
fork do
begin
puts "Dead Letter Processing: #{url} PID: #{Process.pid}"
uri = URI(url)
res = Net::HTTP.get_response(uri)
puts JSON.parse(res.body)['quote']
push_to_fifo(JSON.parse(res.body)['quote'], OUTPUT_FIFO)
rescue => e
puts "error: #{e}"
end
end
process_deadletter
end
def process_queue()
puts "Waiting for messages in the process queue"
rpipe = File.open(INPUT_FIFO, 'r+')
url = ""
while (r = rpipe.read(1)) != "\n"
url << r
end
fork do
puts "Processing: #{url} PID: #{Process.pid}"
begin
uri = URI(url)
res = Net::HTTP.get_response(uri)
puts JSON.parse(res.body)['quote']
push_to_fifo(JSON.parse(res.body)['quote'], OUTPUT_FIFO)
rescue => e
puts "Error trying to process #{url} sending message to deadletter queue"
push_to_fifo(url, DEADLETTER_FIFO)
end
end
process_queue
end
fork do
process_deadletter
end
process_queue
If we run the previous code we get the following response
Now we have two processes waiting for messages in the queue,
if we append a message to the input.fifo file with
echo 'https://api.kanye.rest/' >> input.fifo
We get the response
Nothing new here, we pass a valid url and everything worked fine, but if we pass a invalid URL we can see the dead letter queue working
echo 'invalid url' >> input.fifo
The first process fails and send the message to the dead letter queue, the dead letter queue process starts processing the new message in the queue, it also fails since in this case it's a invalid URL
Adding A Delay To The Second Call
In a real situation we would want to add a delay to the second call, if the service that we're hitting is not working now, it's unlikely that it's going to be working some milliseconds later, what we want to do in this case is to add a delay to the process that handles the dead letter queue, we have a lot of different approaches to do that, we can increase the time we do the retry for every retry, we can filter out some errors like the invalid URL case, since it's never going to work in this case.
Next post
In the next post I'm gonna experiment with other ways of setting up the queues and processing the messages.
Top comments (0)