Problem
Let’s say we need to traverse through thousands of files in our S3 Storage in a Ruby app. Let’s say we have a bunch of logs there that we need to read every day and process. If we just use a straightforward approach, like opening, reading, and processing every file one by one, our solution will work, but It will take a lot of time to process. So we need to improve the speed. Here ruby-concurrent gem is our helper https://github.com/ruby-concurrency/concurrent-ruby. Why do we need this gem? Because It’s simpler to use than Threads and this gem has a lot more features in It. In this article, we will use Concurrent::Promises.future
as the most common use of concurrent code. Because reading a file from S3 is an IO operation, we can get a huge benefit in speed if we gonna use concurrent code doing HTTP requests. Remember that concurrency will not give you speed improvements if in every Promise or Thread you will do any calculations. Because of Ruby GIL, every thread will be blocked until calculations are finished.
Solution
Step 1
Let’s start building our small project. First, let’s add needed gems:
gem 'concurrent-ruby'
gem 'aws-sdk-s3', '~> 1'
gem 'dotenv', groups: [:development, :test]
gem 'benchmark'
Summary of added gems:
- concurrent-ruby — our main instrument to write concurrent code as described above
- aws-sdk-s3 — official AWS S3 SDK gem to work with S3 Storage
- dotenv — a great gem that helps you put our secret keys in .env file on your local machine. We need this to configure AWS SDK as described here https://github.com/aws/aws-sdk-ruby?tab=readme-ov-file#configuration benchmark — to measure the speed
Step 2
To read files from S3, let’s create a new Ruby class — S3Storage:
class S3Storage
attr_reader :bucket_name
def initialize(bucket_name)
@bucket_name = bucket_name
end
def get_files(keys)
keys.each_with_object([]) do |key, events|
events << bucket.objects(prefix: key).to_a
end.flatten
end
private
def client
@_client ||= Aws::S3::Client.new
end
def bucket
@_bucket ||= Aws::S3::Bucket.new(bucket_name, client: client)
end
end
Here we need to traverse through all files (objects) in needed folders (keys). This part is needed because we don’t know how many files are there, but don’t worry. With this approach, we will have an array of ObjectSummary
objects. Basically, it’s a pointer to a file that contains file metadata, but It does not include file content.
Now let’s create a method to read files in a certain folder and then open the first 100 of them to measure a code performance without the use of concurrency:
def process
s3_storage = S3Storage.new('bucket-name')
files = s3_storage.get_files(['path/folder-one', 'path/folder-two'])
Benchmark.measure do
files.first(100).each { |file| file.object.get.body.read }
# here we just open files, in real project you can parse them and process they way you need
end
end
The results are (total is 7.2 seconds):
0.525316 0.096526 0.621842 ( 7.241752)
So the next step is doing the same but with concurrency:
THREADS_COUNT = 5 # we are testing with 5 threads
def process_with_threads(objects)
objects.each_slice(THREADS_COUNT) do |logs|
futures = logs.each_with_index.map do |log, i|
Concurrent::Promises.future do
log.object.get.body.read
# here we just open files, in real project you can parse them and process they way you need
end
end.compact
# we are composing 5 promises into one and then reading value from every one
Concurrent::Promises.zip(*futures).value! if futures
# when you call value! method, it means you are accessing the data in a promise
end
end
def process_concurrent
s3_storage = S3Storage.new('bucket-name')
files = s3_storage.get_files(['path/folder-one', 'path/folder-two'])
Benchmark.measure do
process_with_threads(files.first(100))
end
end
The results are (total is 2.9 seconds):
0.444163 0.055578 0.499741 ( 2.970004)
As you can see, with this approach you can easily speed up the reading files procedure using concurrency. I have run the same tests, but for 1000 files and the difference is even bigger in this case.
No concurenncy: 5.388176 0.716480 6.104656 ( 89.667263)
Concurrency: 3.732760 0.476933 4.209693 ( 24.761635)
PS: in my testing, I am using small .txt files that have S3 access logs in them, so that is another reason why reading those files does not take a lot of time.
So, take advantage of concurrency in Ruby for the tasks that It feets the most — like IO (input-output). Examples of such operations are:
- HTTP requests
- DB requests
- Reading a file
A copy of my original post: https://medium.com/@zozulyak.nick/concurrent-ruby-async-s3-files-download-85f46f298319
Top comments (3)
Hi! Thanks for the post,
I already thought that parallelize HTTP request was a bit risky because of the huuuge number of unhandled errors you can have based a simple HTTP call.
Let's imagine that you have 100 000 files you want to deal with. How can you benchmark the maximum number of threads you can use without starting to have like dead process or anything?
Well, we are using Promises from concurrent ruby for 10 years now in our app for different type of requests. S3, Api calls, etc. We never had an issue with memory leaks or something. In my example, if any request will fail, code will raise an exception and all threads will be cleared. About the number of threads that should be used - I wasn’t able to find any real arguments that stands for any number to be used. Just the more you use, the more memory will be used too, just try to be reasonable with the number of threads)
Thanks a lot for your feedback !!!