<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Alexey</title>
    <description>The latest articles on DEV Community by Alexey (@alexey2257).</description>
    <link>https://dev.to/alexey2257</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1747037%2F7368bcf4-b5db-4007-98aa-c0804d91e585.png</url>
      <title>DEV Community: Alexey</title>
      <link>https://dev.to/alexey2257</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/alexey2257"/>
    <language>en</language>
    <item>
      <title>Building a Multi-Connection Redis Server with Ruby's Async Library [Part 1]</title>
      <dc:creator>Alexey</dc:creator>
      <pubDate>Wed, 26 Feb 2025 12:39:26 +0000</pubDate>
      <link>https://dev.to/alexey2257/building-a-multi-connection-redis-server-with-rubys-async-library-part-1-4o74</link>
      <guid>https://dev.to/alexey2257/building-a-multi-connection-redis-server-with-rubys-async-library-part-1-4o74</guid>
      <description>&lt;p&gt;Recently I've started Codecrafters' &lt;a href="https://app.codecrafters.io/courses/redis/overview" rel="noopener noreferrer"&gt;Build your own Redis&lt;/a&gt; (which I highly recommend!). One of the first steps is to make a TCP server that can accept more than one connection. I noticed that all the provided solutions use threads or IO#Select, and none of them use a fiber-based solution with &lt;a href="https://github.com/socketry/async" rel="noopener noreferrer"&gt;Async&lt;/a&gt;. In this really short article, I'll show how one could achieve the result using Async.&lt;/p&gt;

&lt;p&gt;A brief intro: Ruby Fibers are cooperative concurrency primitives that enable context switching without threads by maintaining their own execution stack. The Async library implements a non-blocking reactor pattern using these Fibers to schedule I/O operations across an event loop. With Ruby 3's introduction of the Fiber scheduler API, Async can now intercept potentially blocking operations and transform them into non-blocking equivalents through event-driven callbacks. The result is natural, synchronous-looking code that executes asynchronously with lower memory overhead compared to thread-based solutions, providing developers with an elegant way to handle concurrent operations without sacrificing readability or performance.&lt;/p&gt;

&lt;h2&gt;
  
  
  A one-connection Redis server
&lt;/h2&gt;

&lt;p&gt;Now, let's move on to the task. In the beginning, we have something like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;YourRedisServer&lt;/span&gt;
  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;initialize&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;port&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="vi"&gt;@port&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;port&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;start&lt;/span&gt;
    &lt;span class="n"&gt;server&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;TCPServer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="vi"&gt;@port&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="kp"&gt;loop&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="n"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;server&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;accept&lt;/span&gt; &lt;span class="c1"&gt;# blocking&lt;/span&gt;
      &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;gets&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="c1"&gt;# blocking as well&lt;/span&gt;
        &lt;span class="n"&gt;process_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="vg"&gt;$_&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
      &lt;span class="k"&gt;end&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;process_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;

&lt;span class="no"&gt;YourRedisServer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;6379&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;start&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;What happens here? We seem to have an infinite loop; however, it's not actually looping. That's because we have blocking operations. First is &lt;code&gt;server.accept&lt;/code&gt;: the code won't move on unless a connection is established. Next, after a connection is established, we have &lt;code&gt;while client.gets&lt;/code&gt; that will read all input line by line and will wait for new input forever. Thus, we will never reach the start of the loop again, being unable to receive a new connection in parallel with an existing one.&lt;/p&gt;

&lt;h2&gt;
  
  
  Adding Async
&lt;/h2&gt;

&lt;p&gt;First of all, one important thing is that all Async operations should be called within an existing reactor, like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="no"&gt;Sync&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="c1"&gt;# This wraps everything with a Reactor&lt;/span&gt;
  &lt;span class="c1"&gt;# some code&lt;/span&gt;
  &lt;span class="no"&gt;Async&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="c1"&gt;# creates a Fiber and immediately moves on&lt;/span&gt;
    &lt;span class="c1"&gt;# some async stuff&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="no"&gt;Async&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="c1"&gt;# creates a Fiber and immediately moves on&lt;/span&gt;
    &lt;span class="c1"&gt;# some more async stuff&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="c1"&gt;# some more code&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt; &lt;span class="c1"&gt;# will wait for the finish of all async tasks inside&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you look at the &lt;a href="https://socketry.github.io/async/source/Kernel/index.html#Kernel#Sync" rel="noopener noreferrer"&gt;source code&lt;/a&gt; of Sync, you'll see that it calls &lt;code&gt;Fiber.set_scheduler(self)&lt;/code&gt; so that all Async tasks inside will use the same Async scheduler. Let's wrap our code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;start&lt;/span&gt;
  &lt;span class="n"&gt;server&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;TCPServer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="vi"&gt;@port&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="no"&gt;Sync&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="kp"&gt;loop&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="n"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;server&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;accept&lt;/span&gt; &lt;span class="c1"&gt;# blocking&lt;/span&gt;
      &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;gets&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="c1"&gt;# blocking as well&lt;/span&gt;
        &lt;span class="n"&gt;process_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="vg"&gt;$_&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
      &lt;span class="k"&gt;end&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The only thing left to do is to allow our loop to move on after accepting a connection to asynchronously process messages from the received connection and wait for another one:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;start&lt;/span&gt;
  &lt;span class="n"&gt;server&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;TCPServer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="vi"&gt;@port&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="no"&gt;Sync&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="kp"&gt;loop&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="n"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;server&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;accept&lt;/span&gt;
      &lt;span class="no"&gt;Async&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
        &lt;span class="n"&gt;process_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="vg"&gt;$_&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;gets&lt;/span&gt;
      &lt;span class="k"&gt;end&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;What happens here? After accepting a connection, we create a new Fiber that will process messages from this connection in a non-blocking way, allowing our code to move on in the loop and go back to &lt;code&gt;client = server.accept&lt;/code&gt; to wait for new connections. Thus, each connection will have its own Fiber. When receiving a message, the Fiber scheduler will pass control to the Fiber in which a message is received from the connection, allowing it to process the message.&lt;/p&gt;

&lt;p&gt;When we call blocking operations like &lt;code&gt;server.accept&lt;/code&gt; within an Async block, something interesting happens behind the scenes. The Fiber.scheduler intercepts these blocking I/O calls and transforms them into non-blocking equivalents. Instead of halting execution of the entire program while waiting for a connection, the Fiber.scheduler registers interest in the socket with the event loop and yields control back to the scheduler. When a connection is ready, the event loop notifies the scheduler, which then resumes the appropriate fiber. This means that while one fiber is waiting for a connection on &lt;code&gt;server.accept&lt;/code&gt;, other fibers can continue executing, allowing our server to handle multiple connections concurrently without using threads.&lt;/p&gt;

&lt;p&gt;One should be careful here: if you write, for example, like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="kp"&gt;loop&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="no"&gt;Async&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="n"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;server&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;accept&lt;/span&gt;
    &lt;span class="n"&gt;process_input&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="vg"&gt;$LAST_READ_LINE&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;gets&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Without having a blocking operation in the loop, you end up with an infinite number of fibers each waiting for a connection, which is obviously not good.&lt;/p&gt;

&lt;h2&gt;
  
  
  Further improvements
&lt;/h2&gt;

&lt;p&gt;You've probably already noticed the absence of resource cleanup here. This was done intentionally: I will write part 2 of this article to explain this topic further, because although we could simply add ensure blocks, that's not the best approach. We can use Async::IO::Endpoint.tcp instead, and I will explain the differences in the follow-up article.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion: The Benefit of Async
&lt;/h2&gt;

&lt;p&gt;With just a few modifications to our code, we've transformed a basic single-connection server into one that can efficiently handle multiple connections simultaneously.&lt;/p&gt;

&lt;p&gt;The beauty of this approach lies in its simplicity. Instead of managing complex thread pools or wrestling with IO#Select, Async leverages Ruby's Fibers to provide a clean, readable solution to concurrency challenges. Each connection gets its own Fiber, making the code both efficient and easier to reason about.&lt;/p&gt;

&lt;p&gt;If you've been relying on threads for similar tasks, I'd strongly encourage giving Async a try. The library handles the complex event loop mechanics behind the scenes, allowing you to focus on your application logic rather than concurrency implementation details.&lt;/p&gt;

&lt;p&gt;Next time you need to build something that handles multiple connections, consider reaching for Async - it might just make your concurrent programming life considerably easier.&lt;/p&gt;

</description>
      <category>ruby</category>
      <category>async</category>
      <category>redis</category>
      <category>fiber</category>
    </item>
    <item>
      <title>Persistent Redis Connections in Sidekiq with Async::Redis: A Deep Dive.</title>
      <dc:creator>Alexey</dc:creator>
      <pubDate>Thu, 18 Jul 2024 11:27:02 +0000</pubDate>
      <link>https://dev.to/alexey2257/persistent-redis-connections-in-sidekiq-with-asyncredis-a-deep-dive-f3c</link>
      <guid>https://dev.to/alexey2257/persistent-redis-connections-in-sidekiq-with-asyncredis-a-deep-dive-f3c</guid>
      <description>&lt;p&gt;&lt;em&gt;TL;DR: In this post I explain how you can have persistent connections to &lt;strong&gt;Redis&lt;/strong&gt; using &lt;strong&gt;Async::Redis&lt;/strong&gt; client inside &lt;strong&gt;Sidekiq&lt;/strong&gt; and why it doesn't work by default. There are some basic explanations of how &lt;strong&gt;Async&lt;/strong&gt; and &lt;strong&gt;Sidekiq&lt;/strong&gt; work and some source code.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;All example code can be found &lt;a href="https://github.com/Alexey2257/sidekiq_with_async_redis" rel="noopener noreferrer"&gt;here&lt;/a&gt;. Note that we don't actually need Rails, but I used it for examples because Sidekiq is usually used with Rails.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/socketry/async" rel="noopener noreferrer"&gt;Async&lt;/a&gt; is a composable asynchronous I/O framework for Ruby. It allows you to do things concurrently using &lt;a href="https://docs.ruby-lang.org/en/master/Fiber.html" rel="noopener noreferrer"&gt;Fibers&lt;/a&gt;. Since 3.0, Ruby has a fiber scheduler and Ruby core supports it. This means you can have non-blocking I/O without much effort, for example, when using &lt;code&gt;Net::HTTP&lt;/code&gt;. If you perform a blocking operation, such as an HTTP call, inside a fiber, it will immediately yield so that another fiber can become active and do some useful work instead of blocking and waiting for the HTTP call to complete.&lt;/p&gt;

&lt;h2&gt;
  
  
  Using Ruby Async::Redis gem
&lt;/h2&gt;

&lt;p&gt;Besides Ruby's support for the fiber scheduler, for some I/O operations, you might use specific gems, like &lt;a href="https://github.com/socketry/async-redis" rel="noopener noreferrer"&gt;Async::Redis&lt;/a&gt;. You can still use other Ruby gems that use native Ruby I/O with Async and they will give you non-blocking I/O as well, but there are two reasons to prefer Async::Redis:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Its author, Samuel Williams, worked hard to ensure the interface is more consistent and easier to use. This is especially true for support of pipelines, transactions and subscriptions.&lt;/li&gt;
&lt;li&gt;It uses Async::Pool so you don't need to make connection pool by yourself. Async::Pool has persistent connections out of box, and also it's great that connections are created in a lazy way: a new one will be created only if there are no free connections (and if specified connections limit wasn't reached). This can be extremely handy if you use Async inside Sidekiq and can't calculate how many connections you will need. &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The usage is simple:&lt;/p&gt;

&lt;p&gt;First, add gem to Gemfile:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="n"&gt;gem&lt;/span&gt; &lt;span class="s1"&gt;'async-redis'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;Then create a client:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="n"&gt;endpoint&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Async&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;local_endpoint&lt;/span&gt; &lt;span class="c1"&gt;# localhost:6379&lt;/span&gt;
&lt;span class="vi"&gt;@client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Async&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Redis&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;endpoint&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;limit: &lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;&lt;code&gt;Limit&lt;/code&gt; parameter will be passed to Async::Pool to limit maximum number of acquired connections, you also can check a &lt;a href="https://socketry.github.io/async-pool/guides/getting-started/index#limit-and-concurrency" rel="noopener noreferrer"&gt;short doc&lt;/a&gt; about another parameter you can pass, &lt;code&gt;concurrency&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="c1"&gt;# source code of Async::Pool&lt;/span&gt;

&lt;span class="k"&gt;module&lt;/span&gt; &lt;span class="nn"&gt;Async&lt;/span&gt;
  &lt;span class="k"&gt;module&lt;/span&gt; &lt;span class="nn"&gt;Pool&lt;/span&gt;
    &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Controller&lt;/span&gt;
      &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nc"&gt;self&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;wrap&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;block&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nb"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;block&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
      &lt;span class="k"&gt;end&lt;/span&gt;

      &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;initialize&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;constructor&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;limit: &lt;/span&gt;&lt;span class="kp"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;concurrency: &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;limit&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="ss"&gt;policy: &lt;/span&gt;&lt;span class="kp"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;&lt;code&gt;Async::Redis.local_endpoint&lt;/code&gt; is just a &lt;code&gt;IO::Endpoint.tcp&lt;/code&gt; with default local Redis host and port, we can find it in the gem's source code:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nc"&gt;self&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;local_endpoint&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;port: &lt;/span&gt;&lt;span class="mi"&gt;6379&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;IO&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Endpoint&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;tcp&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'localhost'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;port&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;Now we can call the client:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="no"&gt;Sync&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="vi"&gt;@client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'key'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'value'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="nb"&gt;puts&lt;/span&gt; &lt;span class="vi"&gt;@client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'key'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;# =&amp;gt; "value"&lt;/span&gt;
&lt;span class="k"&gt;ensure&lt;/span&gt;
  &lt;span class="vi"&gt;@client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;close&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;The resulting code:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="c1"&gt;# app/lib/redis_example.rb&lt;/span&gt;

&lt;span class="nb"&gt;require&lt;/span&gt; &lt;span class="s1"&gt;'async/redis'&lt;/span&gt;

&lt;span class="n"&gt;endpoint&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Async&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;local_endpoint&lt;/span&gt;
&lt;span class="vi"&gt;@client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Async&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Redis&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;endpoint&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="no"&gt;Sync&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="vi"&gt;@client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'key'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'value'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="nb"&gt;puts&lt;/span&gt; &lt;span class="vi"&gt;@client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'key'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;# =&amp;gt; "value"&lt;/span&gt;
&lt;span class="k"&gt;ensure&lt;/span&gt;
  &lt;span class="vi"&gt;@client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;close&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;Why do we need a &lt;code&gt;Sync do&lt;/code&gt; block? We'll dive into this a little further.&lt;/p&gt;
&lt;h2&gt;
  
  
  Using Async::Redis gem with Ruby on Rails
&lt;/h2&gt;

&lt;p&gt;This setup is pretty simple, however, most of the time we don't write pure Ruby programs, usually we have Rails. For Rails, it would be handy to have a persistent client that we can call anywhere. Let's create one:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="c1"&gt;# app/lib/async_redis_client.rb&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;AsyncRedisClient&lt;/span&gt;
  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nc"&gt;self&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;instance&lt;/span&gt;
    &lt;span class="no"&gt;Thread&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;current&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;thread_variable_get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:redis_client&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; 
      &lt;span class="no"&gt;Thread&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;current&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;thread_variable_set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:redis_client&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;new&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;initialize&lt;/span&gt;
    &lt;span class="vi"&gt;@client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Async&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Redis&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="no"&gt;Async&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;local_endpoint&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;limit: &lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;with&lt;/span&gt;
    &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="vi"&gt;@client&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;We use &lt;code&gt;Thread.current.thread_variable_get&lt;/code&gt; and &lt;code&gt;Thread.current.thread_variable_set&lt;/code&gt; to make our client thread-safe. If you won't do it, you will get &lt;code&gt;fiber called across threads&lt;/code&gt; error pretty soon.&lt;/p&gt;

&lt;p&gt;Now we have a persistent client that should have persistent connections. We can use it in Sidekiq, right?&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="c1"&gt;# app/workers/redis_worker.rb&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RedisWorker&lt;/span&gt;
  &lt;span class="kp"&gt;include&lt;/span&gt; &lt;span class="no"&gt;Sidekiq&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Job&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;perform&lt;/span&gt;
    &lt;span class="no"&gt;AsyncRedisClient&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;instance&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;with&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;&lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="o"&gt;|&lt;/span&gt;
      &lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'key'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'value'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;No. Remember using &lt;code&gt;Sync do&lt;/code&gt; in the beginning? Without it, we will receive this error:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;No async task available! (RuntimeError)&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;A quick search and we see that Async Redis client needs to be called within an existing &lt;strong&gt;reactor&lt;/strong&gt;. You can achieve it by running code inside &lt;code&gt;Async do ...&lt;/code&gt; or &lt;code&gt;Sync do ...&lt;/code&gt; block. Let's have a look at &lt;code&gt;Sync&lt;/code&gt; source code:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="k"&gt;module&lt;/span&gt; &lt;span class="nn"&gt;Kernel&lt;/span&gt;
  &lt;span class="c1"&gt;# Run the given block of code synchronously, but within a reactor if not already in one.&lt;/span&gt;
  &lt;span class="c1"&gt;#&lt;/span&gt;
  &lt;span class="c1"&gt;# @yields {|task| ...} The block that will execute asynchronously.&lt;/span&gt;
  &lt;span class="c1"&gt;#     @parameter task [Async::Task] The task that is executing the given block.&lt;/span&gt;
  &lt;span class="c1"&gt;#&lt;/span&gt;
  &lt;span class="c1"&gt;# @public Since `stable-v1`.&lt;/span&gt;
  &lt;span class="c1"&gt;# @asynchronous Will block until given block completes executing.&lt;/span&gt;
  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;Sync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;block&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Async&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;current?&lt;/span&gt;
      &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt;
    &lt;span class="k"&gt;else&lt;/span&gt;
      &lt;span class="c1"&gt;# This calls Fiber.set_scheduler(self):&lt;/span&gt;
      &lt;span class="n"&gt;reactor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Async&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Reactor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;

      &lt;span class="k"&gt;begin&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;reactor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;finished: &lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Async&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Condition&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;block&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;wait&lt;/span&gt;
      &lt;span class="k"&gt;ensure&lt;/span&gt;
        &lt;span class="no"&gt;Fiber&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set_scheduler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kp"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
      &lt;span class="k"&gt;end&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;Pretty straightforward, creates reactor if not inside existing one. Typical pure Ruby program with Async looks like this:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;  &lt;span class="no"&gt;Sync&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="c1"&gt;# blabla&lt;/span&gt;
    &lt;span class="no"&gt;Async&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="c1"&gt;# some async stuff&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
    &lt;span class="no"&gt;Async&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="c1"&gt;# some more async stuff&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
    &lt;span class="c1"&gt;# some more blabla&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt; &lt;span class="c1"&gt;# will wait for the finish of all async tasks inside&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;But we are using Sidekiq, so let's just do what it asks, wrap in &lt;code&gt;Sync&lt;/code&gt; block. You could also wrap it in &lt;code&gt;Async&lt;/code&gt;, in this example it doesn't matter a lot because both will create a Reactor and wait for task to be completed before closing Reactor. But If you already have a Reactor (like we will have at the end of the article), using &lt;code&gt;Async&lt;/code&gt; will lead for a Sidekiq job to be completed before our async task completed. But you also can do &lt;code&gt;Async do ... end.wait&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="c1"&gt;# app/workers/redis_worker_with_sync.rb&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RedisWorkerWithSync&lt;/span&gt;
  &lt;span class="kp"&gt;include&lt;/span&gt; &lt;span class="no"&gt;Sidekiq&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Job&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;perform&lt;/span&gt;
    &lt;span class="no"&gt;Sync&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="no"&gt;AsyncRedisClient&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;instance&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;with&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;&lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="o"&gt;|&lt;/span&gt;
        &lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'key'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'value'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
      &lt;span class="k"&gt;end&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;That works! You can also refactor it; for example move &lt;code&gt;Sync&lt;/code&gt; block to Sidekiq middleware or to &lt;code&gt;with&lt;/code&gt; block of our client. &lt;/p&gt;

&lt;p&gt;The only thing left is to check that it works as we want: with persistent connections to Redis. &lt;/p&gt;
&lt;h2&gt;
  
  
  Monitoring Redis connections with Ruby and Async
&lt;/h2&gt;

&lt;p&gt;We could just use &lt;code&gt;redis-cli&lt;/code&gt; for it but we also can write a simple monitoring using &lt;code&gt;Async&lt;/code&gt;. Redis has an &lt;code&gt;info&lt;/code&gt; command that shows &lt;code&gt;total_connections_received&lt;/code&gt;. We can use it to monitor newly received connections after the monitor has started:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="c1"&gt;# app/lib/redis_monitor.rb&lt;/span&gt;

&lt;span class="nb"&gt;require&lt;/span&gt; &lt;span class="s1"&gt;'async'&lt;/span&gt;
&lt;span class="nb"&gt;require&lt;/span&gt; &lt;span class="s1"&gt;'async/redis'&lt;/span&gt;
&lt;span class="nb"&gt;require&lt;/span&gt; &lt;span class="s1"&gt;'tty-sparkline'&lt;/span&gt;
&lt;span class="nb"&gt;require&lt;/span&gt; &lt;span class="s1"&gt;'tty-screen'&lt;/span&gt;

&lt;span class="n"&gt;endpoint&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Async&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;local_endpoint&lt;/span&gt;
&lt;span class="n"&gt;redis&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;Async&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Redis&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;endpoint&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;connection_counts&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;

&lt;span class="n"&gt;interval&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
&lt;span class="n"&gt;max_width&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;TTY&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Screen&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;width&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="mi"&gt;20&lt;/span&gt;

&lt;span class="no"&gt;Sync&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="n"&gt;initial_connections_count&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="ss"&gt;:total_connections_received&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;to_i&lt;/span&gt;

  &lt;span class="no"&gt;Async&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="o"&gt;|&lt;/span&gt;
    &lt;span class="kp"&gt;loop&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="n"&gt;new_connections&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="ss"&gt;:total_connections_received&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;to_i&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;initial_connections_count&lt;/span&gt;

      &lt;span class="n"&gt;connection_counts&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;new_connections&lt;/span&gt;

      &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;connection_counts&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;size&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;max_width&lt;/span&gt;
        &lt;span class="n"&gt;connection_counts&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;shift&lt;/span&gt;
      &lt;span class="k"&gt;end&lt;/span&gt;

      &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;interval&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;

  &lt;span class="no"&gt;Async&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="o"&gt;|&lt;/span&gt;
    &lt;span class="kp"&gt;loop&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="nb"&gt;print&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="se"&gt;\e&lt;/span&gt;&lt;span class="s2"&gt;[H&lt;/span&gt;&lt;span class="se"&gt;\e&lt;/span&gt;&lt;span class="s2"&gt;[2J"&lt;/span&gt; &lt;span class="c1"&gt;# Clear the screen&lt;/span&gt;

      &lt;span class="nb"&gt;puts&lt;/span&gt; &lt;span class="s2"&gt;"Redis Connection Monitoring"&lt;/span&gt;
      &lt;span class="nb"&gt;puts&lt;/span&gt; &lt;span class="s2"&gt;"Total created connections: &lt;/span&gt;&lt;span class="si"&gt;#{&lt;/span&gt;&lt;span class="n"&gt;connection_counts&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;last&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;

      &lt;span class="n"&gt;sparkline&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;TTY&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Sparkline&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;new&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;connection_counts&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;width: &lt;/span&gt;&lt;span class="n"&gt;max_width&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;height: &lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
      &lt;span class="nb"&gt;puts&lt;/span&gt; &lt;span class="n"&gt;sparkline&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;render&lt;/span&gt;

      &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;interval&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;Here we have two tasks. The first one collects &lt;code&gt;total_connections_received&lt;/code&gt; and writes it to &lt;code&gt;connection_counts&lt;/code&gt; that will be used for our chart. The second task uses the &lt;code&gt;tty-sparkline&lt;/code&gt; gem to output the chart to console.&lt;/p&gt;

&lt;p&gt;Let's also add a Rake task to fill our Sidekiq queue:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="c1"&gt;# /lib/tasks/job_pusher.rb&lt;/span&gt;

&lt;span class="n"&gt;namespace&lt;/span&gt; &lt;span class="ss"&gt;:jobs&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="n"&gt;desc&lt;/span&gt; &lt;span class="s2"&gt;"Push jobs to the queue continuously"&lt;/span&gt;
  &lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="ss"&gt;push: :environment&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="kp"&gt;loop&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="nb"&gt;puts&lt;/span&gt; &lt;span class="s1"&gt;'Pushing job to the queue'&lt;/span&gt;
      &lt;span class="no"&gt;RedisWorkerWithSync&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;perform_async&lt;/span&gt;
      &lt;span class="nb"&gt;sleep&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;Now let's run all we need for our experiment:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;bundle exec sidekiq&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;bundle exec rake jobs:push&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;ruby app/lib/redis_monitor.rb&lt;/code&gt; &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;And lets have a look at monitoring result:&lt;/p&gt;


&lt;div class="ltag_asciinema"&gt;
  
&lt;/div&gt;



&lt;p&gt;The connection count starts at 6 (Sidekiq creates and manages its own connection pool to Redis, atually 7th connection is created by Sidekiq as well when it starts processing jobs) and increases constantly, adding one new connection every second (because we push a new job every second). Why does it happen? I mentioned in the beginning that &lt;strong&gt;Async::Redis&lt;/strong&gt; has persistent connections, hasn't it? &lt;/p&gt;

&lt;p&gt;Well, it has. But only &lt;strong&gt;within a reactor&lt;/strong&gt;. Because existing Reactor is definition of a "lifetime" of a Ruby program with Async. After our top Sync or Async block finishes Redis should close all connections, why does this need them anymore?&lt;/p&gt;

&lt;h2&gt;
  
  
  How does Sidekiq process run
&lt;/h2&gt;

&lt;p&gt;Okay, back to our Rails app. In our app, the lifetime should be the whole Sidekiq process. Luckily, Sidekiq has &lt;a href="https://github.com/sidekiq/sidekiq/blob/main/docs/internals.md" rel="noopener noreferrer"&gt;internal documentation&lt;/a&gt; on how it runs. I won't copy the entire documentation here, just the part we are interested in:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;code&gt;Sidekiq::Manager&lt;/code&gt; manages the resources for a given &lt;code&gt;Sidekiq::Capsule&lt;/code&gt;. It creates and starts N &lt;code&gt;Sidekiq::Processor&lt;/code&gt; instances. ... Each &lt;code&gt;Sidekiq::Processor&lt;/code&gt; instance is a separate thread. &lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;That's exactly what we need, because we need to have a separate reactor for each thread.&lt;/p&gt;

&lt;p&gt;Sidekiq 7.3 source code uses &lt;code&gt;Processor&lt;/code&gt; method &lt;code&gt;run&lt;/code&gt; looks like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="c1"&gt;# frozen_string_literal: true&lt;/span&gt;

&lt;span class="k"&gt;module&lt;/span&gt; &lt;span class="nn"&gt;Sidekiq&lt;/span&gt;
  &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Processor&lt;/span&gt;
     &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;run&lt;/span&gt;
      &lt;span class="c1"&gt;# By setting this thread-local, Sidekiq.redis will access +Sidekiq::Capsule#redis_pool+&lt;/span&gt;
      &lt;span class="c1"&gt;# instead of the global pool in +Sidekiq::Config#redis_pool+.&lt;/span&gt;
      &lt;span class="no"&gt;Thread&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;current&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="ss"&gt;:sidekiq_capsule&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="vi"&gt;@capsule&lt;/span&gt;

      &lt;span class="n"&gt;process_one&lt;/span&gt; &lt;span class="k"&gt;until&lt;/span&gt; &lt;span class="vi"&gt;@done&lt;/span&gt;
      &lt;span class="vi"&gt;@callback&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;call&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;rescue&lt;/span&gt; &lt;span class="no"&gt;Sidekiq&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Shutdown&lt;/span&gt;
      &lt;span class="vi"&gt;@callback&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;call&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;rescue&lt;/span&gt; &lt;span class="no"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;ex&lt;/span&gt;
      &lt;span class="vi"&gt;@callback&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;call&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ex&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;It uses a loop for processing jobs &lt;code&gt;process_one until @done&lt;/code&gt; and it's easy to wrap:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="no"&gt;Sync&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="n"&gt;process_one&lt;/span&gt; &lt;span class="k"&gt;until&lt;/span&gt; &lt;span class="vi"&gt;@done&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;We can put a file with our monkey-patch in &lt;code&gt;app/lib/sidekiq/processor.rb&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="c1"&gt;# app/lib/sidekiq/processor.rb&lt;/span&gt;

&lt;span class="c1"&gt;# frozen_string_literal: true&lt;/span&gt;

&lt;span class="k"&gt;module&lt;/span&gt; &lt;span class="nn"&gt;Sidekiq&lt;/span&gt;
  &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Processor&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;run&lt;/span&gt;
      &lt;span class="c1"&gt;# By setting this thread-local, Sidekiq.redis will access +Sidekiq::Capsule#redis_pool+&lt;/span&gt;
      &lt;span class="c1"&gt;# instead of the global pool in +Sidekiq::Config#redis_pool+.&lt;/span&gt;
      &lt;span class="no"&gt;Thread&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;current&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="ss"&gt;:sidekiq_capsule&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="vi"&gt;@capsule&lt;/span&gt;

      &lt;span class="no"&gt;Sync&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
        &lt;span class="n"&gt;process_one&lt;/span&gt; &lt;span class="k"&gt;until&lt;/span&gt; &lt;span class="vi"&gt;@done&lt;/span&gt;
      &lt;span class="k"&gt;end&lt;/span&gt;
      &lt;span class="vi"&gt;@callback&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;call&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;rescue&lt;/span&gt; &lt;span class="no"&gt;Sidekiq&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Shutdown&lt;/span&gt;
      &lt;span class="vi"&gt;@callback&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;call&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;rescue&lt;/span&gt; &lt;span class="no"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;ex&lt;/span&gt;
      &lt;span class="vi"&gt;@callback&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;call&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ex&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;And we need to add it to initializer to load our code instead of original:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="c1"&gt;# config/initializers/sidekiq.rb&lt;/span&gt;

&lt;span class="nb"&gt;require_relative&lt;/span&gt; &lt;span class="s1"&gt;'../../app/lib/sidekiq/processor'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;That's it! It will run in Sync block until we shut down Sidekiq. Now we can remove Sync block from our worker and everything works perfectly with persistent connections.&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RedisWorker&lt;/span&gt;
  &lt;span class="kp"&gt;include&lt;/span&gt; &lt;span class="no"&gt;Sidekiq&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Job&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;perform&lt;/span&gt;
    &lt;span class="no"&gt;AsyncRedisClient&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;instance&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;with&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;&lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="o"&gt;|&lt;/span&gt;
      &lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'key'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'value'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;Now when our Sidekiq is wrapped, we can change job pusher to use &lt;code&gt;RedisWorker&lt;/code&gt; instead of &lt;code&gt;RedisWorkerWithSinc&lt;/code&gt; and run it. Let's have a look at the monitor:&lt;/p&gt;


&lt;div class="ltag_asciinema"&gt;
  
&lt;/div&gt;



&lt;p&gt;We see that connections size raises slowly until it reaches 12. That's because we have Sidekiq with default concurrency of 5, this means that we have 5 separate threads and when our job gets into thread that wasn't used before it has to create it's first connection. Let's change our rake task to push more jobs:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="n"&gt;namespace&lt;/span&gt; &lt;span class="ss"&gt;:jobs&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="n"&gt;desc&lt;/span&gt; &lt;span class="s2"&gt;"Push jobs to the queue continuously"&lt;/span&gt;
  &lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="ss"&gt;push: :environment&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="kp"&gt;loop&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="nb"&gt;puts&lt;/span&gt; &lt;span class="s1"&gt;'Pushing job to the queue'&lt;/span&gt;
      &lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;times&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
        &lt;span class="no"&gt;RedisWorker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;perform_async&lt;/span&gt;
      &lt;span class="k"&gt;end&lt;/span&gt;
      &lt;span class="nb"&gt;sleep&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;div class="ltag_asciinema"&gt;
  
&lt;/div&gt;



&lt;p&gt;It reaches 12 instantly and stops at this number as we wanted. &lt;/p&gt;

&lt;p&gt;Now, let's use Async inside Sidekiq. Currently, each woker performs only one Redis operation, that's why we use one same connection from the pool each time. Imagine that each worker needs to do 100 different Redis operations, with Async we could do them concurrently and more effective:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RedisWorker&lt;/span&gt;
  &lt;span class="kp"&gt;include&lt;/span&gt; &lt;span class="no"&gt;Sidekiq&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Job&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;perform&lt;/span&gt;
    &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;times&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="no"&gt;Async&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
        &lt;span class="nb"&gt;rand&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;SecureRandom&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;hex&lt;/span&gt;
        &lt;span class="no"&gt;AsyncRedisClient&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;instance&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;with&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;&lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="o"&gt;|&lt;/span&gt;
          &lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;rand&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;rand&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;end&lt;/span&gt;
      &lt;span class="k"&gt;end&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;div class="ltag_asciinema"&gt;
  
&lt;/div&gt;



&lt;p&gt;The mathematics is simple: Sidekiq utilizes 7 connection by itself and we have 5 Sidekiq threads, each thread has a connection pool to Redis with limit equal 10. Eventually we reach 57 connections.&lt;/p&gt;

&lt;p&gt;Note that code above will finish Sidekiq job as soon as all tasks will be scheduled, not completed. We can change code to wait for completion, but we'll need much more tasks to utilize all connections in this way:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RedisWorker&lt;/span&gt;
  &lt;span class="kp"&gt;include&lt;/span&gt; &lt;span class="no"&gt;Sidekiq&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Job&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;perform&lt;/span&gt;
    &lt;span class="n"&gt;tasks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;10000&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;times&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="no"&gt;Async&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
        &lt;span class="nb"&gt;rand&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;SecureRandom&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;hex&lt;/span&gt;
        &lt;span class="no"&gt;AsyncRedisClient&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;instance&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;with&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;&lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="o"&gt;|&lt;/span&gt;
          &lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;rand&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_s&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;rand&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_s&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;end&lt;/span&gt;
      &lt;span class="k"&gt;end&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
    &lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="ss"&gt;:wait&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;And this is what would happen if we rolled back our monkey-patch:&lt;br&gt;
&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="c1"&gt;# config/initializers/sidekiq.rb&lt;/span&gt;
&lt;span class="c1"&gt;# require_relative '../../app/lib/sidekiq/processor'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RedisWorker&lt;/span&gt;
  &lt;span class="kp"&gt;include&lt;/span&gt; &lt;span class="no"&gt;Sidekiq&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Job&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;perform&lt;/span&gt;
    &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;times&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="no"&gt;Async&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
        &lt;span class="nb"&gt;rand&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;SecureRandom&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;hex&lt;/span&gt;
        &lt;span class="no"&gt;AsyncRedisClient&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;instance&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;with&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;&lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="o"&gt;|&lt;/span&gt;
          &lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;rand&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;rand&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;end&lt;/span&gt;
      &lt;span class="k"&gt;end&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;div class="ltag_asciinema"&gt;
  
&lt;/div&gt;



&lt;p&gt;Finally, we can wrap whole worker in &lt;code&gt;Sync&lt;/code&gt; as before to see what will happen:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ruby"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RedisWorker&lt;/span&gt;
  &lt;span class="kp"&gt;include&lt;/span&gt; &lt;span class="no"&gt;Sidekiq&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="no"&gt;Job&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;perform&lt;/span&gt;
    &lt;span class="no"&gt;Sync&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;times&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
        &lt;span class="no"&gt;Async&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
          &lt;span class="nb"&gt;rand&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;SecureRandom&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;hex&lt;/span&gt;
          &lt;span class="no"&gt;AsyncRedisClient&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;instance&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;with&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;&lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="o"&gt;|&lt;/span&gt;
            &lt;span class="n"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;rand&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;rand&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
          &lt;span class="k"&gt;end&lt;/span&gt;
        &lt;span class="k"&gt;end&lt;/span&gt;
      &lt;span class="k"&gt;end&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;div class="ltag_asciinema"&gt;
  
&lt;/div&gt;



&lt;p&gt;A little better because previously each &lt;code&gt;Async&lt;/code&gt; created it's Reactor with new connection pool and now we have &lt;code&gt;Sync&lt;/code&gt; that's owns a Reactor with pool for all Async tasks inside. But still, much worse than with  patched Sidekiq.&lt;/p&gt;

&lt;p&gt;This example is built around Redis, but same would be true for other Async I/O gems that need a Reactor, for example &lt;code&gt;Async::HTTP&lt;/code&gt;. &lt;/p&gt;

&lt;p&gt;Using &lt;code&gt;Async&lt;/code&gt; could be tricky, but I encourage you to try it. If you are not familiar to it (and for some reason read this article till the end) I can recommend you to watch &lt;a href="https://www.youtube.com/watch?v=QeYcKw7nOkg" rel="noopener noreferrer"&gt;this video&lt;/a&gt; from the latest RailsConf 2024.&lt;/p&gt;

</description>
      <category>ruby</category>
      <category>rails</category>
      <category>sidekiq</category>
      <category>redis</category>
    </item>
  </channel>
</rss>
