<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Márton Papp</title>
    <description>The latest articles on DEV Community by Márton Papp (@morz).</description>
    <link>https://dev.to/morz</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F20524%2Fd4798235-440e-48f5-b059-7a2361216160.png</url>
      <title>DEV Community: Márton Papp</title>
      <link>https://dev.to/morz</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/morz"/>
    <language>en</language>
    <item>
      <title>Beginners Guide to Caching Inside an Apache Beam Dataflow Streaming Pipeline Using Python</title>
      <dc:creator>Márton Papp</dc:creator>
      <pubDate>Wed, 09 Mar 2022 12:06:05 +0000</pubDate>
      <link>https://dev.to/morz/beginners-guide-to-caching-inside-an-apache-beam-dataflow-streaming-pipeline-using-python-47hi</link>
      <guid>https://dev.to/morz/beginners-guide-to-caching-inside-an-apache-beam-dataflow-streaming-pipeline-using-python-47hi</guid>
      <description>&lt;p&gt;Some days you win. Other days you have to process a continuous stream with thousands of incoming events per second from a PubSub subscription as quickly as possible. Even if your transformations are simple it is very likely that your business logic depends on some external and dynamic data. Let me show you a way how to efficiently cache these values and save you from  hours of debugging by uncovering a bug in the Python library of Apache Beam itself.&lt;/p&gt;

&lt;p&gt;If your configuration comes from an environment variable that changes once a year you can get away with updating your Dataflow every time but keep in mind that is usually takes a lot of time to execute by draining your existing pipelines and starting new ones. &lt;/p&gt;

&lt;p&gt;If it’s more dynamic and can change every minute or so - for example you want to filter messages based on a criteria that a user can configure through UI - instead of using an env variable you will probably fetch a value from an external resource (e.g. a file from a bucket, a database query or an API call, you name it...) that you want to cache for a certain amount of time to prevent hammering your attached resource.&lt;/p&gt;

&lt;h2&gt;
  
  
  The naive implementation
&lt;/h2&gt;

&lt;p&gt;Pipelines in Apache Beam are self-contained and isolated from each other, so they can be safely executed concurrently. But it also means that they can’t rely on shared state. &lt;/p&gt;

&lt;p&gt;Depending on the load the Dataflow runner may start 1 to 10 worker processes and those workers can spawn hundreds or thousands of threads independent from each other. With a naive implementation each thread will use their own cache pool so even with a TTL high as 60 sec you can end up with ~10.000 API calls per minute. While in certain cases it’s better than making a call for every PubSub message it’s still far from ideal.&lt;/p&gt;

&lt;h2&gt;
  
  
  Shared handle in Python
&lt;/h2&gt;

&lt;p&gt;&lt;code&gt;Shared&lt;/code&gt; is a serializable object that can be shared by all threads of each worker process, and an instance of it returns &lt;code&gt;Shared handle&lt;/code&gt; that can be used in your transform functions. This handle encapsulates a weak reference to a singleton object which will be our shared cache in form of a &lt;code&gt;dict&lt;/code&gt; and can be accessed through the handle’s &lt;code&gt;acquire&lt;/code&gt; function.&lt;/p&gt;

&lt;p&gt;Please note, that most built-in types in Python cannot be weak referenced, but a &lt;code&gt;dict&lt;/code&gt; can be easily made to support it through subclassing:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;_SideInputContainer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;pass&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Watch out for a bug in Apache Beam versions &amp;lt;= 2.35.0
&lt;/h2&gt;

&lt;p&gt;According to the &lt;a href="https://beam.apache.org/releases/pydoc/2.33.0/apache_beam.utils.shared.html"&gt;documentation&lt;/a&gt; the &lt;code&gt;acquire&lt;/code&gt; function accepts a &lt;code&gt;constructor_fn&lt;/code&gt; and a &lt;code&gt;tag&lt;/code&gt; parameter. The first one defines how you want to load your data on a cache miss and the tag defines an identifier you want use in subsequent calls to access the cached object. &lt;/p&gt;

&lt;p&gt;Whenever the tag changes the cached data will be reloaded, which can ultimately serve as our TTL by writing a function that returns a different identifier every 60 seconds:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;CONFIG_TTL_IN_SECONDS&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;60&lt;/span&gt;
&lt;span class="c1"&gt;# Dummy (but usable!) implementation :)
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_cache_key&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="n"&gt;CONFIG_TTL_IN_SECONDS&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You might think that writing&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;shared_handle&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;acquire&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;load_sideinput&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;get_cache_key&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;will do the job, but due to a bug in versions prior to &lt;a href="https://github.com/apache/beam/commit/c67bed4080734fdff9ae416483ce5591a98441f8"&gt;this commit&lt;/a&gt; the tag parameter will be ignored. The cached object is going to be reloaded even if you provide the same identifier, rendering the whole mechanism useless and our transformation will hit our attached resources every time.&lt;/p&gt;

&lt;h2&gt;
  
  
  The workaround
&lt;/h2&gt;

&lt;p&gt;The forward and backward compatible solution is to store this identifier on the cached &lt;code&gt;dict&lt;/code&gt; itself, manually check if a reload is needed and call &lt;code&gt;acquire&lt;/code&gt; again with a new tag.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;_SideInputContainer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;pass&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_sideinput&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;shared_handle&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_cache_key&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="mi"&gt;60&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;load_sideinput&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="n"&gt;sideinput_container&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;_SideInputContainer&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;rawJson&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="c1"&gt;# API call that returns a JSON that you want to cache
&lt;/span&gt;        &lt;span class="n"&gt;sideinput_container&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"cached_response"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rawJson&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;sideinput_container&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"cache_key"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;get_cache_key&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;sideinput_container&lt;/span&gt;

    &lt;span class="n"&gt;sideinput_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;shared_handle&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;acquire&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;load_sideinput&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;current_cache_key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;get_cache_key&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;sideinput_data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"cache_key"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="n"&gt;current_cache_key&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;sideinput_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;shared_handle&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;acquire&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;load_sideinput&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;current_cache_key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;sideinput_data&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As always your likes and feedbacks are very much appreciated!&lt;/p&gt;

</description>
      <category>python</category>
      <category>beam</category>
      <category>dataflow</category>
      <category>cache</category>
    </item>
    <item>
      <title>Push the button - Getting Started With IoT &amp; MQTT</title>
      <dc:creator>Márton Papp</dc:creator>
      <pubDate>Mon, 06 Apr 2020 10:04:48 +0000</pubDate>
      <link>https://dev.to/morz/push-the-button-getting-started-with-iot-mqtt-53p1</link>
      <guid>https://dev.to/morz/push-the-button-getting-started-with-iot-mqtt-53p1</guid>
      <description>&lt;p&gt;Yes, I know what you think: there are already a ton of MQTT related posts and videos circulating around the net. But here's a thing: most of them either failed to demonstrate how to get started with an MQTT broker, or did not show any code about how to get things working, and did not provide a handy initial concept.&lt;/p&gt;

&lt;p&gt;My goal with this series is to teach you how to publish events from a simple IoT button using MQTT, and then receive and react to button presses on two separate devices at the same time - a computer with NodeJS and an another connected Arduino device. All this as simple as possible, even without signing up to a hosted message broker like CloudMQTT. &lt;/p&gt;

&lt;p&gt;Sounds good? Now let's get into it!&lt;/p&gt;

&lt;h2&gt;
  
  
  Setup your own MQTT broker
&lt;/h2&gt;

&lt;p&gt;Assuming you have docker installed, setting up a local MQTT broker is much simpler than you might expect. Just execute the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;docker run &lt;span class="nt"&gt;-it&lt;/span&gt; &lt;span class="nt"&gt;-p&lt;/span&gt; 1883:1883 &lt;span class="nt"&gt;-p&lt;/span&gt; 9001:9001 eclipse-mosquitto
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;This tells docker to download the latest image of the open-source MQTT broker &lt;a href="https://mosquitto.org/"&gt;Eclipse Mosquitto&lt;/a&gt;, and start a container based on that image.&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--lw-iRI2z--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/c320so6y5r1q5snjzgno.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--lw-iRI2z--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/c320so6y5r1q5snjzgno.png" alt="Running the container in the terminal"&gt;&lt;/a&gt;&lt;br&gt;
BOOM, now you have fully-functioning MQTT server listening on port &lt;code&gt;1883&lt;/code&gt;. Later you can run the container in the background with the &lt;code&gt;--detach&lt;/code&gt; or &lt;code&gt;-d&lt;/code&gt; option, but this way you can see the logs more easily. You can stop the container any time using &lt;code&gt;CTRL+C&lt;/code&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Take a look behind the curtain
&lt;/h2&gt;

&lt;p&gt;Unlike RabbitMQ, Mosquitto doesn't have a built-in management plugin with a UI, but without one it's hard to see what's happening under the hood. Fortunately there are multiple brilliant solutions out there, the best ones are &lt;a href="http://mqttfx.jensd.de/"&gt;MQTT.fx&lt;/a&gt; and &lt;a href="http://mqtt-explorer.com/"&gt;MQTT Explorer&lt;/a&gt;, and the latter will be used throughout this tutorial.&lt;/p&gt;

&lt;p&gt;It's a good idea to figure out your local IP address to use instead of &lt;code&gt;localhost&lt;/code&gt;, because later the connected Arduino devices will need to know the broker IP address and thats obviously not &lt;code&gt;localhost&lt;/code&gt; from their point of view. &lt;/p&gt;

&lt;p&gt;One note here: this is for learning purposes only, later with port forwarding you can expose your MQTT broker to the internet if you want to (maybe from an ever-connected Raspberry PI), but when you done experimenting I suggest you to use a Cloud MQTT provider.&lt;/p&gt;

&lt;p&gt;After figuring out your IP address open up MQTT Explorer and try to connect to the broker. My IP is &lt;code&gt;192.168.0.195&lt;/code&gt;, so for me it looked something like this:&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--9KqyeN8x--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/8h772ri8rvxfen4ra03y.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--9KqyeN8x--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/8h772ri8rvxfen4ra03y.png" alt="MQTT Explorer connect dialog"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;After clicking connect you should see the empty broker. Going back to the terminal where the container runs will be logs from the incoming connection from the client.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--cJBXYsWE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/mu7r7gnodklbc6sr20qg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--cJBXYsWE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/mu7r7gnodklbc6sr20qg.png" alt="Successful connection in terminal"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  What's next?
&lt;/h2&gt;

&lt;p&gt;In the next chapter i'll show you how to publish messages to the broker using C/C++ code with &lt;a href="https://m5stack.com/products/stick-c"&gt;M5Stick-C&lt;/a&gt;, an inexpensive ESP32 powered mini IoT device and consuming those messages in NodeJS. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--xh6T1PFR--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/axr8rm40ee1kpq2rlgdb.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--xh6T1PFR--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/i/axr8rm40ee1kpq2rlgdb.jpg" alt="M5StickC in action"&gt;&lt;/a&gt;&lt;/p&gt;

</description>
      <category>arduino</category>
      <category>iot</category>
      <category>mqtt</category>
      <category>esp32</category>
    </item>
    <item>
      <title>Knex ❤️ PSQL: updating timestamps like a pro</title>
      <dc:creator>Márton Papp</dc:creator>
      <pubDate>Tue, 25 Feb 2020 14:22:51 +0000</pubDate>
      <link>https://dev.to/morz/knex-psql-updating-timestamps-like-a-pro-2fg6</link>
      <guid>https://dev.to/morz/knex-psql-updating-timestamps-like-a-pro-2fg6</guid>
      <description>&lt;p&gt;Knex.js is the most popular SQL query builder around and the go-to solution for most of us working with PostgreSQL. You can find dozens of articles on dev.to about how to get started, so I decided to focus on a more advanced and often overlooked topic on how to keep the &lt;code&gt;updated_at&lt;/code&gt; fields &lt;em&gt;really&lt;/em&gt; updated - automatically.&lt;/p&gt;

&lt;h1&gt;
  
  
  What does table.timestamps() do?
&lt;/h1&gt;

&lt;p&gt;If you read along the documentation, upon creating a new table you will probably write a migration like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="nx"&gt;exports&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;up&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;knex&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;knex&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;createTable&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;products&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;table&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;table&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;increments&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;id&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nx"&gt;primary&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="nx"&gt;table&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;string&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;name&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="nx"&gt;table&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;timestamps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="p"&gt;});&lt;/span&gt;
&lt;span class="p"&gt;};&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;table.timestamps(false, true)&lt;/code&gt; line adds &lt;code&gt;created_at&lt;/code&gt; and &lt;code&gt;updated_at&lt;/code&gt; columns on the table. Both columns default to being not null and using the current timestamp when &lt;code&gt;true&lt;/code&gt; is passed as the second argument.&lt;br&gt;
While it's sufficent for the &lt;code&gt;created_at&lt;/code&gt; column, the &lt;code&gt;updated_at&lt;/code&gt; will remain unchanged even after an update query executed and it's your responsibility to keep it in sync.&lt;/p&gt;

&lt;p&gt;There is a good reason behind this behaviour: different SQL dialects - like MySQL - handle automatic updating &lt;a href="https://dev.mysql.com/doc/refman/8.0/en/timestamp-initialization.html"&gt;pretty well&lt;/a&gt;, but others, like PostgreSQL don't support it.&lt;/p&gt;
&lt;h1&gt;
  
  
  What is a Trigger Procedure in PSQL?
&lt;/h1&gt;

&lt;p&gt;Think of a trigger procedures like middlewares in &lt;code&gt;expressjs&lt;/code&gt;. You have the opportunity to execute functions that modify the inserted values &lt;em&gt;before&lt;/em&gt; actually committing the update. The &lt;code&gt;NEW&lt;/code&gt; value holds the new database row for INSERT/UPDATE operations, so setting the &lt;code&gt;updated_at&lt;/code&gt; field is really easy:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;BEGIN&lt;/span&gt;
    &lt;span class="k"&gt;NEW&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;updated_at&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;CURRENT_TIMESTAMP&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;RETURN&lt;/span&gt; &lt;span class="k"&gt;NEW&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;END&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h1&gt;
  
  
  Okay, okay, just give me the code already
&lt;/h1&gt;

&lt;p&gt;First you need to create this trigger function in a migration using &lt;code&gt;knex.raw&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="nx"&gt;exports&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;up&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;knex&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;knex&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;raw&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`
    CREATE OR REPLACE FUNCTION update_timestamp() RETURNS TRIGGER
    LANGUAGE plpgsql
    AS
    $$
    BEGIN
        NEW.updated_at = CURRENT_TIMESTAMP;
        RETURN NEW;
    END;
    $$;
  `&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;};&lt;/span&gt;

&lt;span class="nx"&gt;exports&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;down&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;knex&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;knex&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;raw&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`
    DROP FUNCTION IF EXISTS update_timestamp() CASCADE;
  `&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;};&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;To make sure everything went fine execute the following query:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;routine_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;routine_definition&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;information_schema&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;routines&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;routine_type&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'FUNCTION'&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;specific_schema&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'public'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="c1"&gt;------------------+---------------------------------------------+&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="k"&gt;routine_name&lt;/span&gt;     &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;routine_definition&lt;/span&gt;                          &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt;&lt;span class="c1"&gt;------------------+---------------------------------------------|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;update_timestamp&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;                                             &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt;                  &lt;span class="o"&gt;|&lt;/span&gt;     &lt;span class="k"&gt;BEGIN&lt;/span&gt;                                   &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt;                  &lt;span class="o"&gt;|&lt;/span&gt;         &lt;span class="k"&gt;NEW&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;updated_at&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;CURRENT_TIMESTAMP&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt;                  &lt;span class="o"&gt;|&lt;/span&gt;         &lt;span class="k"&gt;RETURN&lt;/span&gt; &lt;span class="k"&gt;NEW&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;                         &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt;                  &lt;span class="o"&gt;|&lt;/span&gt;     &lt;span class="k"&gt;END&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;                                    &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt;                  &lt;span class="o"&gt;|&lt;/span&gt;                                             &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="c1"&gt;------------------+---------------------------------------------+&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h1&gt;
  
  
  But how to use the function?
&lt;/h1&gt;

&lt;p&gt;This function alone does nothing, we also need to tell the database engine where and when to use it. The best place for this the upcoming migrations where you create a new table - going back to my first example the code will be this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;tableName&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;products&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="nx"&gt;exports&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;up&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;knex&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;knex&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;createTable&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;tableName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;table&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;table&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;increments&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;id&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nx"&gt;primary&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="nx"&gt;table&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;string&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;name&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="nx"&gt;table&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;timestamps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="p"&gt;});&lt;/span&gt;

  &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;knex&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;raw&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`
    CREATE TRIGGER update_timestamp
    BEFORE UPDATE
    ON &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;tableName&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;
    FOR EACH ROW
    EXECUTE PROCEDURE update_timestamp();
  `&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;};&lt;/span&gt;

&lt;span class="nx"&gt;exports&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;down&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;knex&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;knex&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;dropTable&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;tableName&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;};&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;If you run the &lt;code&gt;\d products&lt;/code&gt; command, at the bottom of the table you will see that the trigger function will be executed on each row update on this table.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="err"&gt;\&lt;/span&gt;&lt;span class="n"&gt;d&lt;/span&gt; &lt;span class="n"&gt;products&lt;/span&gt;
&lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="c1"&gt;------------+--------------------------+--------------------------------------------------------+&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="k"&gt;Column&lt;/span&gt;     &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="k"&gt;Type&lt;/span&gt;                     &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;Modifiers&lt;/span&gt;                                              &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt;&lt;span class="c1"&gt;------------+--------------------------+--------------------------------------------------------|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;         &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="nb"&gt;integer&lt;/span&gt;                  &lt;span class="o"&gt;|&lt;/span&gt;  &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt; &lt;span class="n"&gt;nextval&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'products_id_seq'&lt;/span&gt;&lt;span class="p"&gt;::&lt;/span&gt;&lt;span class="n"&gt;regclass&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;       &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="nb"&gt;character&lt;/span&gt; &lt;span class="nb"&gt;varying&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;   &lt;span class="o"&gt;|&lt;/span&gt;                                                        &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;created_at&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nb"&gt;time&lt;/span&gt; &lt;span class="k"&gt;zone&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;  &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;                                &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;updated_at&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nb"&gt;time&lt;/span&gt; &lt;span class="k"&gt;zone&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt;  &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;                                &lt;span class="o"&gt;|&lt;/span&gt;
&lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="c1"&gt;------------+--------------------------+--------------------------------------------------------+&lt;/span&gt;
&lt;span class="n"&gt;Indexes&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="nv"&gt;"products_pkey"&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;btree&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;Triggers&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;update_timestamp_on_products&lt;/span&gt; &lt;span class="k"&gt;BEFORE&lt;/span&gt; &lt;span class="k"&gt;UPDATE&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;products&lt;/span&gt; &lt;span class="k"&gt;FOR&lt;/span&gt; &lt;span class="k"&gt;EACH&lt;/span&gt; &lt;span class="k"&gt;ROW&lt;/span&gt; &lt;span class="k"&gt;EXECUTE&lt;/span&gt; &lt;span class="k"&gt;PROCEDURE&lt;/span&gt; &lt;span class="n"&gt;update_timestamp&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;As always, your likes and feedbacks are much appreciated!&lt;/p&gt;

</description>
      <category>node</category>
      <category>knex</category>
      <category>psql</category>
      <category>postgres</category>
    </item>
    <item>
      <title>Pipeline API 🔥 - the best way to handle stream errors that nobody tells you about</title>
      <dc:creator>Márton Papp</dc:creator>
      <pubDate>Tue, 18 Feb 2020 14:54:57 +0000</pubDate>
      <link>https://dev.to/morz/pipeline-api-the-best-way-to-handle-stream-errors-that-nobody-tells-you-about-122o</link>
      <guid>https://dev.to/morz/pipeline-api-the-best-way-to-handle-stream-errors-that-nobody-tells-you-about-122o</guid>
      <description>&lt;p&gt;... except the documentation on the millionth page, without any context, buried deeper than a superfluous dependency in your &lt;code&gt;node_modules&lt;/code&gt; directory.&lt;/p&gt;

&lt;h2&gt;
  
  
  A little background
&lt;/h2&gt;

&lt;p&gt;Streams are cruel and unpredictable, but usually you can copy-paste top rated answers from Stackoverflow for years without having full comprehension over them - a very important skill that most of us mastered during our career.&lt;/p&gt;

&lt;p&gt;But one day you will be asked to transform and upload huge amounts of data from a database table to Google Storage and you will probably write something like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="c1"&gt;/// this is bad, please do not do this!&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nx"&gt;streamFromDbToGcloudBucket&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;incomingStream&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;file&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;...&lt;/span&gt;

  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nb"&gt;Promise&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;resolve&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;reject&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;incomingStream&lt;/span&gt;
      &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;pipe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;createWriteStream&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
      &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;error&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;reject&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
      &lt;span class="p"&gt;})&lt;/span&gt;
      &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;finish&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;resolve&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
      &lt;span class="p"&gt;});&lt;/span&gt;
  &lt;span class="p"&gt;});&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Wrapped in a promise, piping incoming stream to a gCloud file pretty neat, huh? After months in production things started to go south as we got inactivity alerts that sometimes the files are not uploaded hourly as expected.&lt;/p&gt;

&lt;h2&gt;
  
  
  The ugly
&lt;/h2&gt;

&lt;p&gt;During debugging I stumbled upon the following lines in the storage lib from Google:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="nx"&gt;fs&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;createReadStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;pathString&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;error&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;callback&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;pipe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;newFile&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;createWriteStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;options&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
  &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;on&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;error&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;callback&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;What?&lt;/strong&gt; You need multiple &lt;code&gt;.on('error', callback)&lt;/code&gt;'s in the same chain? Am I stupid for not knowing this? As it turns out you need to subscribe to error handlers on every stream, because pipe does not propagate errors like you would expect. It also means that you need to repeat this for every pipe you use.&lt;/p&gt;

&lt;h2&gt;
  
  
  Pipeline to the rescue
&lt;/h2&gt;

&lt;p&gt;Fortunately Node 10 introduced the Pipeline API to alleviate such problems. Instead of using &lt;code&gt;pipe&lt;/code&gt;, you can use &lt;code&gt;pipeline(...streams, callback)&lt;/code&gt;. It does pretty much the same, except that callback will be called when the pipeline is fully done, or an error occurred &lt;strong&gt;at some point.&lt;/strong&gt; Let's see how it works:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;pipeline&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;require&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;stream&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="nx"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="nx"&gt;readableStream&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nx"&gt;writableStream&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;Pipeline failed.&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;err&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;Pipeline succeeded.&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  One more thing
&lt;/h2&gt;

&lt;p&gt;You might notice that it's not wrapped in a promise. The good news is that pipeline is promisify-able (&lt;em&gt;is this even a word?&lt;/em&gt;) as well, so you can write this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;pipeline&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;util&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;promisify&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;stream&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="nx"&gt;readableStream&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nx"&gt;writableStream&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;... and wrap it in a try-catch block.&lt;/p&gt;

&lt;p&gt;In any case, I hope you the above useful, and as my first article ever, your likes and feedbacks are much appreciated!&lt;/p&gt;

</description>
      <category>node</category>
      <category>javascript</category>
      <category>stream</category>
    </item>
  </channel>
</rss>
