<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Scott Sotka</title>
    <description>The latest articles on DEV Community by Scott Sotka (@ssotka).</description>
    <link>https://dev.to/ssotka</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F807590%2F07d37407-966d-4da3-98ed-ab64e8599651.png</url>
      <title>DEV Community: Scott Sotka</title>
      <link>https://dev.to/ssotka</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/ssotka"/>
    <language>en</language>
    <item>
      <title>Continued Learnings</title>
      <dc:creator>Scott Sotka</dc:creator>
      <pubDate>Thu, 04 Jul 2024 21:30:31 +0000</pubDate>
      <link>https://dev.to/ssotka/continued-learnings-33mo</link>
      <guid>https://dev.to/ssotka/continued-learnings-33mo</guid>
      <description>&lt;p&gt;After writing my original post &lt;a href="https://dev.to/ssotka/learnings-in-raku-and-pg-concurrency-35a0"&gt;Learnings in Raku and Pg Concurrency&lt;/a&gt; I learned a few more things to make the script work better and become more &lt;em&gt;raku-ish&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Learning 1&lt;/strong&gt;. Try was not needed. In the previous post I showed this snippet of the code doing the main work.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;my $q = @batch.join("\n");
try {
   my @res = $dbh.execute($q).values;
}
CATCH {
   default {
      say "Error: $_";
   }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this snippet you see paired &lt;em&gt;try/catch&lt;/em&gt; blocks. In other languages I'm familiar with that support try and catch, the two almost always go together. If code in the &lt;em&gt;try&lt;/em&gt; block throws an exception, the exception will be caught in the catch &lt;em&gt;block&lt;/em&gt;. However  in Raku when you have a catch block in a given scope, any exceptions thrown in the scope can be caught and handled by that &lt;em&gt;catch&lt;/em&gt; block.&lt;/p&gt;

&lt;p&gt;A &lt;em&gt;try&lt;/em&gt; can be used alone if you simply want to keep your code from dying because of an exception. In the documentation it is explained as "&lt;em&gt;a try block is a normal block which implicitly turns on the use fatal pragma and includes an implicit CATCH block that drops the exception, which means you can use it to contain them&lt;/em&gt;".&lt;/p&gt;

&lt;p&gt;When I learned this I changed that block to simplify it.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;my $q = @batch.join("\n");

    $dbh.execute($q);

    CATCH {
        default {
            say "Error: $_";
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Learning 2&lt;/strong&gt;. DB Indexes. After refreshing the data in our non-prod environment I was working with fresh indexes on the tables I was updating. I mentioned in the previous article that I was able to update 4.5M rows within 30 minutes. This was from my testing on the non-refreshed DB and I had not run a fresh ANALYZE on the tables before the updates. Working with fresh indexes those 4.5M updates finished in 6 minutes.&lt;/p&gt;

&lt;p&gt;So keep your indexes up to date. As we don't do many deletes I am assuming there were very few zombie rows (deleted but not expunged by the database) that had to be dealt with in the tests.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Learning 3&lt;/strong&gt;. Atomicints. The lesson I learned about using an atomicint when concurrent threads each need to update a value, in this case the number of rows that have been processed, still applies. However I found an alternative to this in my case.&lt;/p&gt;

&lt;p&gt;I decided to set up a Supply to be used in the threads to emit the number of rows that thread had just processed. I also set up a tap for that Suppy which would then add those values to the overall number of rows processed. &lt;/p&gt;

&lt;p&gt;At first I worried that this would simply have the same problem and would also require an atomicint. However, the documentation for Supply indicated that "&lt;em&gt;A supply is a thread-safe, asynchronous data stream&lt;/em&gt;". So I decided to give it a try. This probably means that The Supply is using locking behind the scenes, however, it's now hidden from me and doesn't slow down the update threads. I removed the &lt;em&gt;atomicint&lt;/em&gt; type from &lt;strong&gt;$lines-done&lt;/strong&gt; and set up the &lt;em&gt;suppier&lt;/em&gt; and &lt;em&gt;tap&lt;/em&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply.tap: -&amp;gt; $batch {
    $lines-done += $batch;
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then in the main thread code I added an emit call.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$supplier.emit(@batch.elems);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Testing did not turn up any problems (which I realize is not proof that it won't at sometime in the future). &lt;/p&gt;

&lt;p&gt;Here is the cleaned up code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;#!/usr/bin/env raku

use v6;
use DB::Pg;
use Terminal::Print &amp;lt;T&amp;gt;;

multi sub MAIN ( :$host = 'host.docker.internal', :$port = 8888, :$script-path, :$batch-size, :$dbname, :$user, :$password ) {
    T.initialize-screen;
    my @columns = ^T.columns;
    my @rows = ^T.rows;

    my $conninfo = join " ",
        ('dbname=' ~ $dbname),
        ('host=' ~ $host),
        ('port=' ~ $port),
        ('user=' ~ $user),
        ('password=' ~ $password);

    # to get the total number of lines in the file shell out to wc -l
    my $lines-total = 0 + qqx{ wc -l $script-path }.words[0];
    my $lines-done = 0; 

    T.print-string( 1, @rows.elems - 8, $*PROGRAM-NAME );
    T.print-string( 1, @rows.elems - 7, "Script-path: $script-path");
    T.print-string( 1, @rows.elems - 6, "Total lines: $lines-total");

    #every second print the progress
    my $start-time = now;
    sub format-elapsed-time($elapsed) {
        my $hours = $elapsed.Int div 3600;
        my $minutes = ($elapsed.Int mod 3600) div 60;
        my $seconds = $elapsed.Int mod 60;
        return $hours.fmt("%02d") ~ ':' ~ $minutes.fmt("%02d") ~ ':' ~ $seconds.fmt("%02d");
    }

    my  $update-line = @rows.elems - 5;
    my $doneline = @rows.elems - 1;
    my $progress = start {
        loop {
            #show elapsed time
            my $elapsed = now - $start-time;
            my $local-lines-done = $lines-done;
            my $local-lines-total = $lines-total;
            my $pct = (($local-lines-done / $local-lines-total) * 100).fmt("%02.2f");
            T.print-string( 1, $update-line,"Progress: $local-lines-done/$local-lines-total {$pct}%  - " ~ format-elapsed-time($elapsed) ~ " elapsed");

            sleep 1;
            last if $local-lines-done == $local-lines-total;
        }
        T.print-string( 1, $doneline, "All Queries Queued. Waiting on Database...");
    }


    my @batch;
    my $dbh = DB::Pg.new(:$conninfo);
    # check the connection
    my $res = $dbh.execute("SELECT 1");
    $dbh.finish;
    #say "Connection: $res";

    my $supplier = Supplier.new;
    my $supply = $supplier.Supply;
    $supply.tap: -&amp;gt; $batch {
        $lines-done += $batch;
    }

    $script-path.IO.lines.rotor($batch-size, :partial).race.map: -&amp;gt; @batch { 
        my $q = @batch.join("\n");

        $dbh.execute($q);

        CATCH {
            default {
                say "Error: $_";
            }
        }

        $supplier.emit(@batch.elems);
    };

    T.shutdown-screen;
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



</description>
      <category>rakulang</category>
      <category>postgres</category>
      <category>programming</category>
      <category>learning</category>
    </item>
    <item>
      <title>Learnings in Raku and Pg Concurrency</title>
      <dc:creator>Scott Sotka</dc:creator>
      <pubDate>Fri, 28 Jun 2024 23:04:31 +0000</pubDate>
      <link>https://dev.to/ssotka/learnings-in-raku-and-pg-concurrency-35a0</link>
      <guid>https://dev.to/ssotka/learnings-in-raku-and-pg-concurrency-35a0</guid>
      <description>&lt;p&gt;I've been making my living with Perl 5 and PostgreSQL since the mid 90's but lately I've been giving Raku a serious look. I've published a couple of Raku modules, but had not used it for work. Then recently  I needed to automate a system to Mask/Fake data in our non-prod environments so I decided to use Raku.&lt;/p&gt;

&lt;p&gt;My method was simple, once I had identified all the tables that contain PII (Personally Identifiable Information) I created SQL scripts with autogenerated Fake data for the fields I needed to mask. This created a number of files containing up to 5 million rows of update statements. After I had the files all I needed to do was run them against our non-prod databases. As you can imagine simply running them via &lt;em&gt;psql&lt;/em&gt; proved to be very slow. I'm sure there were any number of ways to speed this process up that I didn't think of or couldn't afford. I chose to find out if Raku concurrency would help.&lt;/p&gt;

&lt;p&gt;I've never really used concurrency in my job and only toyed with it outside. The main work being done in the script was simply batch up a number of update lines from a file and send them to PostgreSQL to be executed.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;my $q = @batch.join("\n");
try {
   my @res = $dbh.execute($q).values;
}
CATCH {
   default {
      say "Error: $_";
   }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I started by wrapping that in a for loop returning promises and awaiting each promise. &lt;/p&gt;

&lt;p&gt;The start command returns a promise, which means it spawns a thread and either executes with out a problem (promise kept) or fails (promise broken). The await preceding the for loop collects all these promises and basically tells raku not to end the program until all the promises finish executing.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;my @batch;
await for $script-file.IO.lines -&amp;gt; $line {
   push @batch, $line;
   if @batch.elems == $batch-size {
      start {
         &amp;lt;...main work code...&amp;gt;
      }
      @batch = ();
   }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once in the loop, spawning threads, I worried about the database handle being thread-safe. So I thought I would just write a connection sub that would cache the connections for each thread identified by &lt;strong&gt;$*THREAD.id&lt;/strong&gt;. That way, each thread would get one connection and could re-use it. I wrote the code and it was working, but then I read the documentation and found &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Learning #1&lt;/strong&gt;. DB::Pg is thread-safe. It already does connection caching.&lt;/p&gt;

&lt;p&gt;I also wanted to keep track of the progress for each file as it was processed. So, hey, threads! This promise simply prints the progress every second, showing how many lines had been processed vs. total lines, then that same thing as a percentage along with the elapsed time. I didn't bother with an await for this as it will finish when when the lines are all processed.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;my $lines-total = 0 + qqx{ wc -l $script-path }.words[0];
my $lines-done = 0;
my $progress = start {
     loop {
         #show elapsed time
         my $elapsed = now - $start-time;
         my $pct = (($lines-done / $lines-total) * 100).fmt("%02.2f");
         say("Progress: $lines-done/$lines-total {$pct}%  - " ~ format-elapsed-time($elapsed) ~ " elapsed");
         sleep 1;
         last if $lines-done == $lines-total;
     }
     say( "All Queries Queued. Waiting on Database...");
 }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then at the end of the main start block I incremented &lt;strong&gt;$lines-done&lt;/strong&gt; by the &lt;strong&gt;$batch-size&lt;/strong&gt;. It all looked good, and even ran...for a while. After a certain number of iterations process would suddenly end producing only the message &lt;strong&gt;&lt;em&gt;Killed&lt;/em&gt;&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;At this point I turned to the Raku discord and posted my problem there and was kindly helped by user &lt;strong&gt;ab5tract&lt;/strong&gt;. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Learning 2&lt;/strong&gt;. The &lt;strong&gt;$lines-done&lt;/strong&gt; variable was unprotected and being added to by each thread. It was inevitable that two of them would eventually stomp on each other. Which they were. There were two options to handle this problem. I could put a lock around the increment, or I could set &lt;strong&gt;$lines-done&lt;/strong&gt; to be an &lt;strong&gt;&lt;em&gt;atomicint&lt;/em&gt;&lt;/strong&gt;. I chose the latter.&lt;/p&gt;

&lt;p&gt;It was also pointed out to me that the for-loop was probably not the most elegant solution to batching and spawning the threads. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Learning 3&lt;/strong&gt;. &lt;em&gt;Rotor&lt;/em&gt; is a thing. Instead of worrying about doing the batching myself I could let Raku do it. If I combined it with &lt;em&gt;race&lt;/em&gt; the threads would all be done behind the scenes. &lt;/p&gt;

&lt;p&gt;&lt;em&gt;Rotor&lt;/em&gt; lets you specify how many lines to take from the file (&lt;em&gt;:partial&lt;/em&gt; means if we run out send what you have). So it grabs &lt;strong&gt;$batch-size&lt;/strong&gt; lines and sends them to &lt;em&gt;race&lt;/em&gt; which spawns a thread and executes the code in the &lt;em&gt;map&lt;/em&gt; for each element of a list (in this case the batched lines). It's more compact and raku-ish than producing the list and sending it to a for-loop. &lt;em&gt;Race&lt;/em&gt; also doesn't care what order the threads execute, if you need the results of the map in a specific order use &lt;em&gt;hyper&lt;/em&gt; instead.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$script-path.IO.lines.rotor($batch-size, :partial).race.map: -&amp;gt; @batch { 
     my $q = @batch.join("\n");
     try {
         my @res = $dbh.execute($q).values;
     }
     CATCH {
         default {
             say "Error: $_";
         }
     }
     #$dbh.finish;
     atomic-fetch-add($lines-done, @batch.elems);
 };
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is the final form of the main loop. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Learning 4&lt;/strong&gt;. The &lt;strong&gt;$dbh.finish&lt;/strong&gt; call is not needed for &lt;em&gt;updates&lt;/em&gt;, &lt;em&gt;inserts&lt;/em&gt;, and &lt;em&gt;deletes&lt;/em&gt;. In fact, I found that if you do call finish in these threads, you will eventually segfault. In the case of &lt;em&gt;selects&lt;/em&gt; you must either pull all the data selected or make the call to finish.&lt;/p&gt;

&lt;p&gt;At this point, I gussied up the output using the &lt;strong&gt;Terminal::Print&lt;/strong&gt; module.  This kept my progress messages all updating the same terminal line and not scrolling off the page.&lt;/p&gt;

&lt;p&gt;Setting the batch size to 2000 statements gave me pretty good throughput. I have not yet pushed that number beyond 2000. I'll get around to it. Five-million lines of update statements took about 30 minutes. &lt;/p&gt;

&lt;p&gt;Here is the full code for my update script (I may have overdone it with atomicints, but it works):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;#!/usr/bin/env raku

use v6;
use DB::Pg;
use Terminal::Print &amp;lt;T&amp;gt;;

multi sub MAIN ( :$host = 'host.docker.internal', :$port = 8888, :$script-path, :$batch-size, :$dbname, :$user, :$password ) {
    T.initialize-screen;
    my @columns = ^T.columns;
    my @rows = ^T.rows;

    my $conninfo = join " ",
        ('dbname=' ~ $dbname),
        ('host=' ~ $host),
        ('port=' ~ $port),
        ('user=' ~ $user),
        ('password=' ~ $password);

    # to get the total number of lines in the file shell out to wc -l
    my atomicint $lines-total = 0 + qqx{ wc -l $script-path }.words[0];
    my atomicint $lines-done = 0;   # atomic integer to keep track of the number of lines processed hardware atomic operations

    T.print-string( 1, @rows.elems - 8, $*PROGRAM-NAME );
    T.print-string( 1, @rows.elems - 7, "Script-path: $script-path");
    T.print-string( 1, @rows.elems - 6, "Total lines: $lines-total");

    #every second print the progress
    my $start-time = now;
    sub format-elapsed-time($elapsed) {
        my $hours = $elapsed.Int div 3600;
        my $minutes = ($elapsed.Int mod 3600) div 60;
        my $seconds = $elapsed.Int mod 60;
        return $hours.fmt("%02d") ~ ':' ~ $minutes.fmt("%02d") ~ ':' ~ $seconds.fmt("%02d");
    }

    my atomicint $update-line = @rows.elems - 5;
    my atomicint $doneline = @rows.elems - 1;
    my $progress = start {
        loop {
            #show elapsed time
            my $elapsed = now - $start-time;
            my $local-lines-done = atomic-fetch($lines-done);
            my $local-lines-total = atomic-fetch($lines-total);
            my $pct = (($local-lines-done / $local-lines-total) * 100).fmt("%02.2f");
            T.print-string( 1, $update-line,"Progress: $local-lines-done/$local-lines-total {$pct}%  - " ~ format-elapsed-time($elapsed) ~ " elapsed");
            #say("Progress: $local-lines-done/$local-lines-total {$pct}%  - " ~ format-elapsed-time($elapsed) ~ " elapsed");
            sleep 1;
            last if $local-lines-done == $local-lines-total;
        }
        T.print-string( 1, $doneline, "All Queries Queued. Waiting on Database...");
        #say( "All Queries Queued. Waiting on Database...");
    }


    my @batch;
    my @promises;
    my $dbh = DB::Pg.new(:$conninfo);
    # check the connection
    my $res = $dbh.execute("SELECT 1");
    $dbh.finish;
    #say "Connection: $res";

    $script-path.IO.lines.rotor($batch-size, :partial).race.map: -&amp;gt; @batch { 
        my $q = @batch.join("\n");
        try {
            my @res = $dbh.execute($q).values;
        }
        CATCH {
            default {
                say "Error: $_";
            }
        }
        #$dbh.finish;
        atomic-fetch-add($lines-done, @batch.elems);
    };

    T.shutdown-screen;
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



</description>
      <category>learning</category>
      <category>rakulang</category>
      <category>programming</category>
      <category>postgres</category>
    </item>
  </channel>
</rss>
