<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Lukas Klein</title>
    <description>The latest articles on DEV Community by Lukas Klein (@lukasklein).</description>
    <link>https://dev.to/lukasklein</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F395709%2Fb748c95b-6e02-45ea-9a0a-ad77394ebbd8.jpeg</url>
      <title>DEV Community: Lukas Klein</title>
      <link>https://dev.to/lukasklein</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/lukasklein"/>
    <language>en</language>
    <item>
      <title>Counting the queued Celery tasks</title>
      <dc:creator>Lukas Klein</dc:creator>
      <pubDate>Thu, 13 May 2021 14:44:31 +0000</pubDate>
      <link>https://dev.to/lukasklein/counting-the-queued-celery-tasks-4bn</link>
      <guid>https://dev.to/lukasklein/counting-the-queued-celery-tasks-4bn</guid>
      <description>&lt;p&gt;If you are using Redis for your Celery queues, there's a way to count the number of queued tasks grouped by the task without the need for an external tool. Note that this approach assumes that you're using the JSON serializer for your tasks.&lt;/p&gt;

&lt;p&gt;Using a Redis-client of your choice, I chose &lt;a href="https://github.com/IBM-Cloud/redli"&gt;redli&lt;/a&gt; (which supports tls), use &lt;code&gt;LRANGE&lt;/code&gt; to get a list of all tasks, pipe them through &lt;code&gt;jq&lt;/code&gt; to query &lt;code&gt;headers-&amp;gt;task&lt;/code&gt;, &lt;code&gt;sort&lt;/code&gt; and make them &lt;code&gt;uniq&lt;/code&gt; / count them:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; redli &lt;span class="o"&gt;[&lt;/span&gt;your connection parameters] lrange celery 0 30000 | jq &lt;span class="s1"&gt;'.headers.task'&lt;/span&gt; | &lt;span class="nb"&gt;sort&lt;/span&gt; | &lt;span class="nb"&gt;uniq&lt;/span&gt; &lt;span class="nt"&gt;-c&lt;/span&gt;

 533 &lt;span class="s2"&gt;"devices.adapters.lorawan.tasks.run_lora_payload_decoder"&lt;/span&gt;
   2 &lt;span class="s2"&gt;"devices.adapters.particle.tasks.run_particle_payload_decoder"&lt;/span&gt;
  92 &lt;span class="s2"&gt;"devices.tasks.call_device_function"&lt;/span&gt;
8556 &lt;span class="s2"&gt;"devices.tasks.ping_device"&lt;/span&gt;
9682 &lt;span class="s2"&gt;"devices.tasks.process_device_field_rules"&lt;/span&gt;
   5 &lt;span class="s2"&gt;"devices.tasks.send_device_offline_email"&lt;/span&gt;
   2 &lt;span class="s2"&gt;"dzeroos.tasks.call_command"&lt;/span&gt;
   8 &lt;span class="s2"&gt;"dzeroos.tasks.flush_command_queue"&lt;/span&gt;
   1 &lt;span class="s2"&gt;"dzeroos.tasks.publish_device_config"&lt;/span&gt;
   1 &lt;span class="s2"&gt;"dzeroos.tasks.publish_protobuf_config"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Bonus: statistics on task details
&lt;/h2&gt;

&lt;p&gt;In my case, I needed to get some insights on one of the parameters of one task (I was investigating a loop that caused a long queue). This required some more &lt;code&gt;jq&lt;/code&gt; and bash-magic and probably doesn't fit your use-case, I just paste it here for reference:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; redli &lt;span class="o"&gt;[&lt;/span&gt;your connection parameters] lrange celery 0 1000 | jq &lt;span class="nt"&gt;-c&lt;/span&gt; &lt;span class="s1"&gt;'. | select(.headers.task | contains("taskname")) .body'&lt;/span&gt; | &lt;span class="k"&gt;while &lt;/span&gt;&lt;span class="nb"&gt;read&lt;/span&gt; &lt;span class="nt"&gt;-r&lt;/span&gt; line&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;do &lt;/span&gt;&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="nv"&gt;$line&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt; | &lt;span class="nb"&gt;tr&lt;/span&gt; &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'"'&lt;/span&gt; | &lt;span class="nb"&gt;base64&lt;/span&gt; &lt;span class="nt"&gt;-D&lt;/span&gt; | jq &lt;span class="s1"&gt;'.[0][0]'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;done&lt;/span&gt; | &lt;span class="nb"&gt;sort&lt;/span&gt; | &lt;span class="nb"&gt;uniq&lt;/span&gt; &lt;span class="nt"&gt;-c&lt;/span&gt;

  15 &lt;span class="s2"&gt;"04576f6e-d5d1-45f4-8eef-a17e015335f4"&lt;/span&gt;
   9 &lt;span class="s2"&gt;"05264cc7-ae60-4f4f-9a18-2451e8d83f65"&lt;/span&gt;
  25 &lt;span class="s2"&gt;"4e240129-b84e-4e70-9f85-0e06f7a01875"&lt;/span&gt;
 224 &lt;span class="s2"&gt;"6c6a9aeb-10c7-417f-a928-791399d8adb9"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



</description>
      <category>django</category>
      <category>celery</category>
      <category>redis</category>
      <category>bash</category>
    </item>
    <item>
      <title>Debouncing a Celery task</title>
      <dc:creator>Lukas Klein</dc:creator>
      <pubDate>Thu, 28 Jan 2021 13:42:12 +0000</pubDate>
      <link>https://dev.to/lukasklein/debouncing-a-celery-task-6pl</link>
      <guid>https://dev.to/lukasklein/debouncing-a-celery-task-6pl</guid>
      <description>&lt;p&gt;&lt;a href="https://docs.celeryproject.org/en/stable/getting-started/introduction.html"&gt;Celery&lt;/a&gt; is a powerful task queue with many great features already built-in. Unfortunately, debouncing tasks is not one of them (or I didn't find it in the docs), but fortunately, it's not that hard to build it yourself.&lt;/p&gt;

&lt;h2&gt;
  
  
  What is debouncing
&lt;/h2&gt;

&lt;p&gt;What is debouncing, you may ask? Let's start with a simple example that, while not directly applicable to the Python world, should make it easier to understand the concept. Say you have an input element on a webpage. In order to implement auto-completion, you have to query a server-side API with the user's input. If you would do that on every keystroke, you would probably fry your server. You probably only want to send it e.g. 500ms after the user stopped typing.&lt;/p&gt;

&lt;p&gt;To do this, you could start a timeout after a keystroke that, when expired, would query the server API. Instead of starting a separate timeout after every keystroke, you are re-starting the same timeout over and over again.&lt;br&gt;
This way, the timeout will expire and query the server 500ms after the user stopped typing.&lt;/p&gt;
&lt;h2&gt;
  
  
  Why I needed it
&lt;/h2&gt;

&lt;p&gt;Another real-world application is the so-called Downlinks functionality on &lt;a href="https://datacake.co"&gt;Datacake&lt;/a&gt;. Datacake is an IoT-platform that not only allows you to receive data from devices ("Uplink") but also send data back to devices ("Downlink"). Downlinks on Datacake can be triggered by different events, which could lead to the same Downlink being sent in a short period of time. To avoid this, I implemented debouncing (this time for a Celery task, finally), in order to only send the last Downlink for a given period of time.&lt;/p&gt;
&lt;h2&gt;
  
  
  Ingredients used
&lt;/h2&gt;

&lt;p&gt;Asides from Celery itself I've used Redis to store a temporary lock. Since we used Redis already as our broker for Celery, it was available anyway.&lt;/p&gt;
&lt;h2&gt;
  
  
  Just show me the code
&lt;/h2&gt;

&lt;p&gt;Say you have a task like this that you want to debounce:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;celery_app&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;send_downlink&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;device&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;downlink&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="c1"&gt;# Do some magic
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;First, I renamed the real task to a "private, do not use directly" one:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;celery_app&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_send_downlink&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;device&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;downlink&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="c1"&gt;# Do some magic
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I then created a new task with the official name, which increments a Redis counter named after a unique identifier for the task including its arguments (what you want to debounce) every time it is called and finally schedules the internal task with a timeout (your debounce time):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;celery_app&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;send_downlink&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;device&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;downlink&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;redis_con&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;incr&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="s"&gt;"debounce-downlink-&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;device&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;-&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;downlink&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;_send_downlink&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;apply_async&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;device&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;downlink&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;countdown&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, in our internal task, I am decrementing the counter and check, if the new value is &amp;lt;= 0, in which case this is the "last call" during the debounce period:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;celery_app&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;send_downlink&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;device&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;downlink&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;redis_con&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;decr&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="s"&gt;"debounce-downlink-&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;device&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;-&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;downlink&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="s"&gt;"Debounce hit for &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;downlink&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; on &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;device&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt;
    &lt;span class="c1"&gt;# Do some magic
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The result is a Celery-task that you can call as many times as you want but only gets executed once after a call-break of your set timeout.&lt;/p&gt;

</description>
      <category>celery</category>
      <category>python</category>
      <category>django</category>
      <category>redis</category>
    </item>
    <item>
      <title>Migrating a Digital Ocean LoadBalancer to another Kubernetes cluster</title>
      <dc:creator>Lukas Klein</dc:creator>
      <pubDate>Thu, 28 Jan 2021 10:34:46 +0000</pubDate>
      <link>https://dev.to/lukasklein/migrating-a-digital-ocean-loadbalancer-to-another-kubernetes-cluster-44dl</link>
      <guid>https://dev.to/lukasklein/migrating-a-digital-ocean-loadbalancer-to-another-kubernetes-cluster-44dl</guid>
      <description>&lt;p&gt;At &lt;a href="https://datacake.co"&gt;Datacake&lt;/a&gt;, we provide our customers with the option to white-label their IoT platform. To do this, they set their DNS to point to a Load Balancer on our Kubernetes cluster, which is running on Digital Ocean.&lt;/p&gt;

&lt;p&gt;In an ideal world, all customers would use a &lt;code&gt;CNAME&lt;/code&gt; on a subdomain or an &lt;code&gt;ALIAS&lt;/code&gt; on a top-level domain. In the real world, not all domain providers support the &lt;a href="https://blog.dnsimple.com/2011/11/introducing-alias-record/"&gt;introduced-in-2011 ALIAS&lt;/a&gt; record type, so most customers with a top-level domain resort to using a classic A-record.&lt;/p&gt;

&lt;p&gt;Recently, we upgraded our Kubernetes to a new cluster (for various reasons we couldn’t use the old one anymore). Unfortunately, Load Balancers on Digital Ocean still don’t support floating IPs in 2021 and so we somehow had to move the existing Load Balancer to prevent our customers from having to update their DNS. Turns out, according to their support, there’s no way to do this. &lt;strong&gt;Spoiler: there is.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;While going through the resource definition of the old service, I noticed an annotation called &lt;code&gt;kubernetes.digitalocean.com/load-balancer-id&lt;/code&gt;. The value of this annotation looked a lot like a UUID and, as it turned out, it’s exactly what it says it is: It’s the ID of the Load Balancer object on Digital Ocean.&lt;/p&gt;

&lt;p&gt;So I spun up a test Load Balancer, copied its ID, created a new K8S service of type &lt;code&gt;LoadBalancer&lt;/code&gt; with an annotation containing the ID and voilà, the Load Balancer quickly became populated with our nodes. To prevent the old K8S from interfering with the new K8S, I first created another LB, changed the annotation on the old service to this new ID, and created a service on the new K8S containing the ID of the old Load Balancer. After that, I went ahead and deleted the old service which also removed the temporary Load Balancer.&lt;br&gt;
Mission accomplished.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Tl;Dr: You can create a Kubernetes service of type LoadBalancer with an annotation called &lt;code&gt;kubernetes.digitalocean.com/load-balancer-id&lt;/code&gt;, which will tell the system to use an existing Digital Ocean Load Balancer instead of creating a new one.&lt;/p&gt;
&lt;/blockquote&gt;

</description>
      <category>kubernetes</category>
      <category>digitalocean</category>
    </item>
  </channel>
</rss>
