DEV Community

Scott Sotka
Scott Sotka

Posted on • Updated on

Learnings in Raku and Pg Concurrency

I've been making my living with Perl 5 and PostgreSQL since the mid 90's but lately I've been giving Raku a serious look. I've published a couple of Raku modules, but had not used it for work. Then recently I needed to automate a system to Mask/Fake data in our non-prod environments so I decided to use Raku.

My method was simple, once I had identified all the tables that contain PII (Personally Identifiable Information) I created SQL scripts with autogenerated Fake data for the fields I needed to mask. This created a number of files containing up to 5 million rows of update statements. After I had the files all I needed to do was run them against our non-prod databases. As you can imagine simply running them via psql proved to be very slow. I'm sure there were any number of ways to speed this process up that I didn't think of or couldn't afford. I chose to find out if Raku concurrency would help.

I've never really used concurrency in my job and only toyed with it outside. The main work being done in the script was simply batch up a number of update lines from a file and send them to PostgreSQL to be executed.

my $q = @batch.join("\n");
try {
   my @res = $dbh.execute($q).values;
}
CATCH {
   default {
      say "Error: $_";
   }
}
Enter fullscreen mode Exit fullscreen mode

I started by wrapping that in a for loop returning promises and awaiting each promise.

The start command returns a promise, which means it spawns a thread and either executes with out a problem (promise kept) or fails (promise broken). The await preceding the for loop collects all these promises and basically tells raku not to end the program until all the promises finish executing.

my @batch;
await for $script-file.IO.lines -> $line {
   push @batch, $line;
   if @batch.elems == $batch-size {
      start {
         <...main work code...>
      }
      @batch = ();
   }
}
Enter fullscreen mode Exit fullscreen mode

Once in the loop, spawning threads, I worried about the database handle being thread-safe. So I thought I would just write a connection sub that would cache the connections for each thread identified by $*THREAD.id. That way, each thread would get one connection and could re-use it. I wrote the code and it was working, but then I read the documentation and found

Learning #1. DB::Pg is thread-safe. It already does connection caching.

I also wanted to keep track of the progress for each file as it was processed. So, hey, threads! This promise simply prints the progress every second, showing how many lines had been processed vs. total lines, then that same thing as a percentage along with the elapsed time. I didn't bother with an await for this as it will finish when when the lines are all processed.

my $lines-total = 0 + qqx{ wc -l $script-path }.words[0];
my $lines-done = 0;
my $progress = start {
     loop {
         #show elapsed time
         my $elapsed = now - $start-time;
         my $pct = (($lines-done / $lines-total) * 100).fmt("%02.2f");
         say("Progress: $lines-done/$lines-total {$pct}%  - " ~ format-elapsed-time($elapsed) ~ " elapsed");
         sleep 1;
         last if $lines-done == $lines-total;
     }
     say( "All Queries Queued. Waiting on Database...");
 }
Enter fullscreen mode Exit fullscreen mode

Then at the end of the start block I incremented $lines-done by the $batch-size. It all looked good, and even ran...for a while. After a certain number of iterations it would suddenly end it only the message Killed.

At this point I turned to the Raku discord and posted my problem there and was kindly helped by user ab5tract.

Learning 2. The $lines-done variable was unprotected and being added to by each thread. It was inevitable that two of them would eventually stomp on each other. Which they were. There were two options to handle this problem. I could put a lock around the increment, or I could set $lines-done to be an atomicint. I chose the latter.

It was also pointed out to me that the for-loop was probably not the most elegant solution to batching and spawning the threads.

Learning 3. Rotor is a thing. Instead of worrying about doing the batching myself I could let Raku do it. If I combined it with race the threads would all be done behind the scenes.

Rotor lets you specify how many lines to take from the file (:partial means if we run out send what you have). So it grabs $batch-size lines and sends them to race which spawns a thread and executes the code in the map for each element of a list (in this case the batched lines). It's a more compact and raku-ish than producing the list and sending it to a for-loop. Race also doesn't care what order the threads execute, if you need the results of the map in a specific order use hyper instead.

$script-path.IO.lines.rotor($batch-size, :partial).race.map: -> @batch { 
     my $q = @batch.join("\n");
     try {
         my @res = $dbh.execute($q).values;
     }
     CATCH {
         default {
             say "Error: $_";
         }
     }
     #$dbh.finish;
     atomic-fetch-add($lines-done, @batch.elems);
 };
Enter fullscreen mode Exit fullscreen mode

This is the final form of the main loop.

Learning 4. The $dbh.finish call is not needed for updates, inserts, and deletes. In fact, I found that if you do call finish in these threads, you will eventually segfault. In the case of selects you must either pull all the data selected or make the call to finish.

At this point, I gussied up the output using the Terminal::Print module. This kept my progress messages all updating the same terminal line and not scrolling off the page.

Setting the batch size to 2000 statements gave me pretty good throughput. I have not yet pushed that number beyond 2000. I'll get around to it. Five-million lines of update statements took about 30 minutes.

Here is the full code for my update script (I may have overdone it with atomicints, but it works):

#!/usr/bin/env raku

use v6;
use DB::Pg;
use Terminal::Print <T>;

multi sub MAIN ( :$host = 'host.docker.internal', :$port = 8888, :$script-path, :$batch-size, :$dbname, :$user, :$password ) {
    T.initialize-screen;
    my @columns = ^T.columns;
    my @rows = ^T.rows;

    my $conninfo = join " ",
        ('dbname=' ~ $dbname),
        ('host=' ~ $host),
        ('port=' ~ $port),
        ('user=' ~ $user),
        ('password=' ~ $password);

    # to get the total number of lines in the file shell out to wc -l
    my atomicint $lines-total = 0 + qqx{ wc -l $script-path }.words[0];
    my atomicint $lines-done = 0;   # atomic integer to keep track of the number of lines processed hardware atomic operations

    T.print-string( 1, @rows.elems - 8, $*PROGRAM-NAME );
    T.print-string( 1, @rows.elems - 7, "Script-path: $script-path");
    T.print-string( 1, @rows.elems - 6, "Total lines: $lines-total");

    #every second print the progress
    my $start-time = now;
    sub format-elapsed-time($elapsed) {
        my $hours = $elapsed.Int div 3600;
        my $minutes = ($elapsed.Int mod 3600) div 60;
        my $seconds = $elapsed.Int mod 60;
        return $hours.fmt("%02d") ~ ':' ~ $minutes.fmt("%02d") ~ ':' ~ $seconds.fmt("%02d");
    }

    my atomicint $update-line = @rows.elems - 5;
    my atomicint $doneline = @rows.elems - 1;
    my $progress = start {
        loop {
            #show elapsed time
            my $elapsed = now - $start-time;
            my $local-lines-done = atomic-fetch($lines-done);
            my $local-lines-total = atomic-fetch($lines-total);
            my $pct = (($local-lines-done / $local-lines-total) * 100).fmt("%02.2f");
            T.print-string( 1, $update-line,"Progress: $local-lines-done/$local-lines-total {$pct}%  - " ~ format-elapsed-time($elapsed) ~ " elapsed");
            #say("Progress: $local-lines-done/$local-lines-total {$pct}%  - " ~ format-elapsed-time($elapsed) ~ " elapsed");
            sleep 1;
            last if $local-lines-done == $local-lines-total;
        }
        T.print-string( 1, $doneline, "All Queries Queued. Waiting on Database...");
        #say( "All Queries Queued. Waiting on Database...");
    }


    my @batch;
    my @promises;
    my $dbh = DB::Pg.new(:$conninfo);
    # check the connection
    my $res = $dbh.execute("SELECT 1");
    $dbh.finish;
    #say "Connection: $res";

    $script-path.IO.lines.rotor($batch-size, :partial).race.map: -> @batch { 
        my $q = @batch.join("\n");
        try {
            my @res = $dbh.execute($q).values;
        }
        CATCH {
            default {
                say "Error: $_";
            }
        }
        #$dbh.finish;
        atomic-fetch-add($lines-done, @batch.elems);
    };

    T.shutdown-screen;
}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)