After writing my original post Learnings in Raku and Pg Concurrency I learned a few more things to make the script work better and become more raku-ish.
Learning 1. Try was not needed. In the previous post I showed this snippet of the code doing the main work.
my $q = @batch.join("\n");
try {
my @res = $dbh.execute($q).values;
}
CATCH {
default {
say "Error: $_";
}
}
In this snippet you see paired try/catch blocks. In other languages I'm familiar with that support try and catch, the two almost always go together. If code in the try block throws an exception, the exception will be caught in the catch block. However in Raku when you have a catch block in a given scope, any exceptions thrown in the scope can be caught and handled by that catch block.
A try can be used alone if you simply want to keep your code from dying because of an exception. In the documentation it is explained as "a try block is a normal block which implicitly turns on the use fatal pragma and includes an implicit CATCH block that drops the exception, which means you can use it to contain them".
When I learned this I changed that block to simplify it.
my $q = @batch.join("\n");
$dbh.execute($q);
CATCH {
default {
say "Error: $_";
}
}
}
Learning 2. DB Indexes. After refreshing the data in our non-prod environment I was working with fresh indexes on the tables I was updating. I mentioned in the previous article that I was able to update 4.5M rows within 30 minutes. This was from my testing on the non-refreshed DB and I had not run a fresh ANALYZE on the tables before the updates. Working with fresh indexes those 4.5M updates finished in 6 minutes.
So keep your indexes up to date. As we don't do many deletes I am assuming there were very few zombie rows (deleted but not expunged by the database) that had to be dealt with in the tests.
Learning 3. Atomicints. The lesson I learned about using an atomicint when concurrent threads each need to update a value, in this case the number of rows that have been processed, still applies. However I found an alternative to this in my case.
I decided to set up a Supply to be used in the threads to emit the number of rows that thread had just processed. I also set up a tap for that Suppy which would then add those values to the overall number of rows processed.
At first I worried that this would simply have the same problem and would also require an atomicint. However, the documentation for Supply indicated that "A supply is a thread-safe, asynchronous data stream". So I decided to give it a try. This probably means that The Supply is using locking behind the scenes, however, it's now hidden from me and doesn't slow down the update threads. I removed the atomicint type from $lines-done and set up the suppier and tap.
my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply.tap: -> $batch {
$lines-done += $batch;
}
Then in the main thread code I added an emit call.
$supplier.emit(@batch.elems);
Testing did not turn up any problems (which I realize is not proof that it won't at sometime in the future).
Here is the cleaned up code:
#!/usr/bin/env raku
use v6;
use DB::Pg;
use Terminal::Print <T>;
multi sub MAIN ( :$host = 'host.docker.internal', :$port = 8888, :$script-path, :$batch-size, :$dbname, :$user, :$password ) {
T.initialize-screen;
my @columns = ^T.columns;
my @rows = ^T.rows;
my $conninfo = join " ",
('dbname=' ~ $dbname),
('host=' ~ $host),
('port=' ~ $port),
('user=' ~ $user),
('password=' ~ $password);
# to get the total number of lines in the file shell out to wc -l
my $lines-total = 0 + qqx{ wc -l $script-path }.words[0];
my $lines-done = 0;
T.print-string( 1, @rows.elems - 8, $*PROGRAM-NAME );
T.print-string( 1, @rows.elems - 7, "Script-path: $script-path");
T.print-string( 1, @rows.elems - 6, "Total lines: $lines-total");
#every second print the progress
my $start-time = now;
sub format-elapsed-time($elapsed) {
my $hours = $elapsed.Int div 3600;
my $minutes = ($elapsed.Int mod 3600) div 60;
my $seconds = $elapsed.Int mod 60;
return $hours.fmt("%02d") ~ ':' ~ $minutes.fmt("%02d") ~ ':' ~ $seconds.fmt("%02d");
}
my $update-line = @rows.elems - 5;
my $doneline = @rows.elems - 1;
my $progress = start {
loop {
#show elapsed time
my $elapsed = now - $start-time;
my $local-lines-done = $lines-done;
my $local-lines-total = $lines-total;
my $pct = (($local-lines-done / $local-lines-total) * 100).fmt("%02.2f");
T.print-string( 1, $update-line,"Progress: $local-lines-done/$local-lines-total {$pct}% - " ~ format-elapsed-time($elapsed) ~ " elapsed");
sleep 1;
last if $local-lines-done == $local-lines-total;
}
T.print-string( 1, $doneline, "All Queries Queued. Waiting on Database...");
}
my @batch;
my $dbh = DB::Pg.new(:$conninfo);
# check the connection
my $res = $dbh.execute("SELECT 1");
$dbh.finish;
#say "Connection: $res";
my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply.tap: -> $batch {
$lines-done += $batch;
}
$script-path.IO.lines.rotor($batch-size, :partial).race.map: -> @batch {
my $q = @batch.join("\n");
$dbh.execute($q);
CATCH {
default {
say "Error: $_";
}
}
$supplier.emit(@batch.elems);
};
T.shutdown-screen;
}
Top comments (0)