TL;DR
Need to limit how many tasks run at once in your Crystal app? You can implement semaphore-like control using WaitGroup
. This article shows how to manage concurrent downloads with just a few lines of code.
Why I wrote this article
When I started working on a problem that required limiting concurrent downloads in Crystal, I didn't know what semaphores were at all. I just needed a way to control how many threads run at the same time. While searching for a solution, I didn't find any information about using semaphores in Crystal—so I decided to write this article and share what I've learned.
What is a semaphore?
"Think of semaphores as bouncers at a nightclub. There are a dedicated number of people that are allowed in the club at once. If the club is full no one is allowed to enter, but as soon as one person leaves another person might enter."
— Stack Overflow comment https://stackoverflow.com/questions/34519/what-is-a-semaphore/40473#40473
In simpler terms, a semaphore controls access to resources by setting a limit on how many tasks can run concurrently.
Implementing semaphore-like behavior in Crystal
Crystal doesn't have built-in semaphores, but we can achieve similar control using Atomic
counters and WaitGroup
.
WaitGroup is a simpler and more efficient alternative to using a Channel(Nil)
.
Here's a practical example of concurrent file downloads:
require "http/client"
require "wait_group"
# Only used to synchronize console output, not part of concurrency control logic
mutex = Mutex.new
# Number of concurrent downloads allowed at a time
threads = 4
# List of URLs to be downloaded with random sizes
download_urls = [] of {String, Int32}
15.times do |i|
size = Random.rand(1_000_000..20_000_000) # Random size between 1MB and 20MB
url = "http://speedtest.astra.in.ua.prod.hosts.ooklaserver.net:8080/download?size=#{size}"
download_urls << {url, size}
end
# Atomic counter to track the number of currently active downloads
active_downloads = Atomic(Int32).new(0)
puts "⬇️ Starting downloads with WaitGroup..."
WaitGroup.wait do |wg|
# Process each download URL with concurrency control
download_urls.each_with_index do |(url, size), index|
# Block execution if the number of active downloads reaches the limit
while active_downloads.get >= threads
sleep 10.milliseconds
end
active_downloads.add(1) # Increase the count of active downloads
start_time = Time.local
mutex.synchronize do
puts "[#{start_time}] 🟢 Starting download #{index + 1} (Size: #{size.humanize_bytes}) | Active downloads: #{active_downloads.get}"
end
wg.spawn do
HTTP::Client.get(url)
rescue ex
mutex.synchronize do
puts "❌ Download failed: #{url} - #{ex.message}"
end
ensure
active_downloads.sub(1) # Decrease the count of active downloads
end_time = Time.local
duration = end_time - start_time
mutex.synchronize do
puts "[#{end_time}] 🔴 Finished download #{index + 1} (Size: #{size.humanize_bytes}) | Active downloads: #{active_downloads.get} | Duration: #{duration.total_seconds.round(2)}s"
end
end
end
end
puts "✅ All downloads completed!"
How this code works
- Atomic counter tracks how many downloads are currently active.
- WaitGroup ensures the program waits for all downloads to finish.
- The loop pauses briefly (
sleep 10.milliseconds
) when the maximum allowed concurrent downloads (threads
) is reached.
Note: The Mutex
used in the example is not part of the concurrency control mechanism. It's only used to prevent overlapping output in the terminal when multiple fibers print simultaneously. You can safely remove it if you don't care about how the logs look. Alternatively, for a more structured approach to logging, you can use Crystal's built-in Log
module to print status information.
Example output
Below is a screenshot of the actual output showing how downloads start and complete asynchronously:
As you can see, download 1 (started first) finishes later than the last download 15 starts. This is because downloads run concurrently, not sequentially. Some files are smaller or download faster, allowing other downloads to begin as soon as a slot opens. This demonstrates the efficient use of concurrency, where tasks don't block one another unnecessarily.
Happy Crystalling!
Top comments (4)
Another concern is that for this particular use case, a semaphore might not actually be the best solution.
A set of worker fibers looping though a channel would be easier to implement. I haven't considered performance differences. They ought to be negligible in comparison to the time spent in IO.
Nice! I’ll use this approach in my project instead of the semaphore-like control.
This is a great writeup, thanks!
I think the mutex in the example is a bit confusing, though. It's another concurrency primitive but unrelated to the semaphore business. That purpose isn't immediately clear. Maybe point that out in a comment? Also the error message in the
rescue
block isn't protected by the mutex.Have you considered using
Log
for printing status information? That might actually be the best way to handle this. The output already almost resembles the default log format.Thanks! I’ve added a note clarifying the purpose of the
Mutex
.Good catch regarding the use of the
Log
module, but I’ll keep the example as is.