<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: matthieucham</title>
    <description>The latest articles on DEV Community by matthieucham (@matthieucham).</description>
    <link>https://dev.to/matthieucham</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F560117%2Fd229a777-2969-47d8-a2c5-07fcff76bb8f.jpg</url>
      <title>DEV Community: matthieucham</title>
      <link>https://dev.to/matthieucham</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/matthieucham"/>
    <language>en</language>
    <item>
      <title>How to pass an Array of Structs in Bigquery's parameterized queries</title>
      <dc:creator>matthieucham</dc:creator>
      <pubDate>Tue, 15 Oct 2024 06:57:39 +0000</pubDate>
      <link>https://dev.to/stack-labs/how-to-pass-an-array-of-structs-in-bigquerys-parameterized-queries-39nm</link>
      <guid>https://dev.to/stack-labs/how-to-pass-an-array-of-structs-in-bigquerys-parameterized-queries-39nm</guid>
      <description>&lt;p&gt;In Google's Bigquery, &lt;a href="https://cloud.google.com/bigquery/docs/parameterized-queries" rel="noopener noreferrer"&gt;SQL queries can be parameterized&lt;/a&gt;. If you're not familiar with this concept, it basically means that you can write SQL queries as parameterized templates like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;mydataset&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mytable&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;columnA&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;columnB&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;VALUES&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;valueA&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;valueB&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And pass the values separately. This has numerous benefits:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The query is more readable than when it's built by string concatenation&lt;/li&gt;
&lt;li&gt;The code is more robust and industrialized&lt;/li&gt;
&lt;li&gt;It's a great protection against SQL injection attacks &lt;a href="https://xkcd.com/327" rel="noopener noreferrer"&gt;(mandatory XKCD)&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The passing of query parameters from a Python script appears straightforward... at first sight. For example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;google.cloud.bigquery&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;Client&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;ScalarQueryParameter&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;ArrayQueryParameter&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;StructQueryParameter&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;QueryJobConfig&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;Client&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;
INSERT INTO mydataset.mytable(columnA, columnB)
    VALUES (@valueA, @valueB)
&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;job_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;QueryJobConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;query_parameters&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;
        &lt;span class="nc"&gt;ScalarQueryParameter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;valueA&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;STRING&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;A&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; 
        &lt;span class="nc"&gt;ScalarQueryParameter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;valueB&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;STRING&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;B&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;])&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The example above inserts simple ("Scalar") values in columns A and B. But you can also pass more complex parameters:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Arrays (ArrayQueryParameter)&lt;/li&gt;
&lt;li&gt;Structs (StructQueryParameter)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Problems arise when you want to insert arrays of structs : there are many gotchas, almost no documentation and very few resources on the subject on the web. The goal of this article is to fill this gap.&lt;/p&gt;

&lt;h1&gt;
  
  
  How to persist an array of structs in bigquery using parameterized queries
&lt;/h1&gt;

&lt;p&gt;Let's define the following object that we want to store in our destination table&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dataclasses&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;dataclass&lt;/span&gt;

&lt;span class="nd"&gt;@dataclass&lt;/span&gt;
&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Country&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;capital_city&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;

&lt;span class="nd"&gt;@dataclass&lt;/span&gt;
&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Continent&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;countries&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;list&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;Country&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;by invoking this parameterized query&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;UPDATE&lt;/span&gt; &lt;span class="n"&gt;continents&lt;/span&gt; &lt;span class="n"&gt;SET&lt;/span&gt; &lt;span class="n"&gt;countries&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nd"&gt;@countries&lt;/span&gt; &lt;span class="n"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Oceania&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The first try by following the &lt;a href="https://cloud.google.com/bigquery/docs/samples/bigquery-query-params-arrays" rel="noopener noreferrer"&gt;shallow documentation&lt;/a&gt; would be&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
    &lt;span class="n"&gt;job_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;QueryJobConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query_parameters&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;
        &lt;span class="nc"&gt;ArrayQueryParameter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;countries&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;RECORD&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
             &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;New Zealand&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;capital_city&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Wellington&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
             &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Fiji&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;capital_city&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Suva&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;...]&lt;/span&gt;
&lt;span class="p"&gt;]))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;which would fail miserably&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;AttributeError: 'dict' object has no attribute 'to_api_repr'&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Gotcha n°1: ArrayQueryParameter's values must be instances of StructQueryParameter
&lt;/h2&gt;

&lt;p&gt;It turns out that the third argument of the constructor - &lt;code&gt;values&lt;/code&gt;- must be a collection of &lt;code&gt;StructQueryParameter&lt;/code&gt; instances, not the wanted values directly. So let's build them:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
&lt;span class="n"&gt;job_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;QueryJobConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query_parameters&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;
    &lt;span class="nc"&gt;ArrayQueryParameter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;countries&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;RECORD&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
    &lt;span class="nc"&gt;StructQueryParameter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;countries&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nc"&gt;ScalarQueryParameter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;name&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;STRING&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ct&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; 
        &lt;span class="nc"&gt;ScalarQueryParameter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;capital_city&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;STRING&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ct&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;capital_city&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;ct&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;countries&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
&lt;span class="p"&gt;]))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This time it works... Until you try to set an empty array&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
    &lt;span class="n"&gt;job_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;QueryJobConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;query_parameters&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;
        &lt;span class="nc"&gt;ArrayQueryParameter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;countries&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;RECORD&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;[])&lt;/span&gt;
&lt;span class="p"&gt;]))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;blockquote&gt;
&lt;p&gt;ValueError: Missing detailed struct item type info for an empty array, please provide a StructQueryParameterType instance.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Gotcha n°2: Provide the full structure type as second argument
&lt;/h2&gt;

&lt;p&gt;The error message is pretty clear: "RECORD" is not enough for Bigquery to know what to do with your empty array. It needs the fully detailed structure. So be it&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;job_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;QueryJobConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query_parameters&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;
    &lt;span class="nc"&gt;ArrayQueryParameter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;countries&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nc"&gt;StructQueryParameterType&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="nc"&gt;ScalarQueryParameterType&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;STRING&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;name&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="nc"&gt;ScalarQueryParameterType&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;STRING&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;capital_city&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="p"&gt;[])&lt;/span&gt;
&lt;span class="p"&gt;]))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(Notice how the order of the arguments of the &lt;code&gt;...ParameterType&lt;/code&gt; constructor is the reverse of &lt;code&gt;...Parameter&lt;/code&gt; constructor. Just another trap on the road...)&lt;/p&gt;

&lt;p&gt;And now it works for empty arrays too, yay !&lt;/p&gt;

&lt;p&gt;One last gotcha to be aware of: &lt;strong&gt;every subfield of a StructQueryParameterType must have a name&lt;/strong&gt;, even if the second parameter (&lt;code&gt;name&lt;/code&gt;) is optional in the constructor. It's actually mandatory for subfields, otherwise you'll get a new kind of error&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Empty struct field name&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;I think that's all we need to know to complete the usage of arrays of records in query parameters, I hope this helps !&lt;/p&gt;




&lt;p&gt;Thanks for reading! I’m Matthieu, data engineer at Stack Labs.&lt;br&gt;
If you want to discover the &lt;a href="https://cloud.stack-labs.com/cloud-data-platform" rel="noopener noreferrer"&gt;Stack Labs Data Platform&lt;/a&gt; or join an enthousiast &lt;a href="https://www.welcometothejungle.com/fr/companies/stack-labs" rel="noopener noreferrer"&gt;Data Engineering team&lt;/a&gt;, please contact us.&lt;/p&gt;




&lt;p&gt;Photo de &lt;a href="https://unsplash.com/fr/@dnevozhai?utm_content=creditCopyText&amp;amp;utm_medium=referral&amp;amp;utm_source=unsplash" rel="noopener noreferrer"&gt;Denys Nevozhai&lt;/a&gt; sur &lt;a href="https://unsplash.com/fr/photos/photographie-en-contre-plongee-de-linterieur-du-batiment-JsdvKIcvAGo?utm_content=creditCopyText&amp;amp;utm_medium=referral&amp;amp;utm_source=unsplash" rel="noopener noreferrer"&gt;Unsplash&lt;/a&gt;&lt;/p&gt;

</description>
      <category>bigquery</category>
      <category>python</category>
      <category>googlecloud</category>
      <category>sql</category>
    </item>
    <item>
      <title>Automatically Update BigQuery View Schema Changes</title>
      <dc:creator>matthieucham</dc:creator>
      <pubDate>Tue, 30 Jul 2024 15:29:59 +0000</pubDate>
      <link>https://dev.to/stack-labs/automatically-update-bigquery-view-schema-changes-27j8</link>
      <guid>https://dev.to/stack-labs/automatically-update-bigquery-view-schema-changes-27j8</guid>
      <description>&lt;p&gt;SQL views are virtual tables simplifying data access and security. They offer tailored data perspectives, protecting sensitive information. Data analysts widely use them to streamline modeling.&lt;/p&gt;

&lt;p&gt;As such, views are a crucial feature of Google Cloud's fully managed data warehouse, BigQuery. However, they have certain &lt;a href="https://cloud.google.com/bigquery/docs/views-intro" rel="noopener noreferrer"&gt;limitations&lt;/a&gt;. One of these limitations can be particularly troublesome for data analysts and end-users:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;The schemas of the underlying tables are stored with the view when the view is created. If columns are added, deleted, or modified after the view is created, the view isn't automatically updated and the reported schema will remain inaccurate until the view SQL definition is changed or the view is recreated. Even though the reported schema may be inaccurate, all submitted queries produce accurate results.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;To see this limitation into action, create a source table with two columns&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;OR&lt;/span&gt; &lt;span class="k"&gt;REPLACE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="nv"&gt;`demo_devto.source_table`&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;A&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;B&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="nv"&gt;"a"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;"b"&lt;/span&gt; 
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then create a view above it&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;OR&lt;/span&gt; &lt;span class="k"&gt;REPLACE&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="nv"&gt;`demo_devto.expo_view`&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="nv"&gt;`demo_devto.source_table`&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As expected, the schema of the view presents 2 columns A and B&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fi6nor5lksypwnjjpn9i1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fi6nor5lksypwnjjpn9i1.png" alt="Image description" width="800" height="223"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Now add a column to the source table&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;ALTER&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="nv"&gt;`demo_devto.source_table`&lt;/span&gt;
  &lt;span class="k"&gt;ADD&lt;/span&gt; &lt;span class="k"&gt;COLUMN&lt;/span&gt; &lt;span class="k"&gt;C&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The new column is reflected by the source table's schema&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fw6vykfljuqqfjqs549ku.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fw6vykfljuqqfjqs549ku.png" alt="Image description" width="800" height="223"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;But not by the view's schema&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8n68ga6nquqf50vw5vsw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8n68ga6nquqf50vw5vsw.png" alt="Image description" width="800" height="223"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Still, the result of a query is correct with 3 columns&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7qg9ojd430hk2tdj294e.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7qg9ojd430hk2tdj294e.png" alt="Image description" width="800" height="223"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;This article outlines a method to circumvent this limitation and maintain the view's schema in alignment with the underlying table's schema as closely as possible.&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  A fully serverless event-driven architecture to synchronize schemas
&lt;/h2&gt;

&lt;p&gt;This solution make use of a log sink to capture audit logs from BigQuery, a PubSub topic where relevant log entries are directed, a PubSub subscription and a Cloud Run service to process them&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Feq51bmh4xc7w8obrea9d.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Feq51bmh4xc7w8obrea9d.png" alt="Image description" width="800" height="423"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Let's review each step and dive into details&lt;/p&gt;

&lt;h3&gt;
  
  
  1. Bigquery audit logs
&lt;/h3&gt;

&lt;p&gt;All Google Cloud services generate logs which are viewable in Cloud Logging. BigQuery is no exception and audit logs offer all information we need. See their structure &lt;a href="https://cloud.google.com/bigquery/docs/reference/auditlogs" rel="noopener noreferrer"&gt;here&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  2. Cloud Logging log sink
&lt;/h3&gt;

&lt;p&gt;A log sink is a location where the logs are collected and stored. Google Cloud Logging log sinks collect within a scope - project, folder, organization. So to capture update logs from tables for a whole organization, a log sink at organization level is needed. To monitor a project only, a sink at project level is enough. &lt;/p&gt;

&lt;p&gt;A log sink must declare a filter. This is very important to limit costs - which depend of the volume of captured logs - and to process relevant events only. Here we are using the following filter to capture events about schema changes:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;resource.type="bigquery_resource"
AND protoPayload.serviceName="bigquery.googleapis.com"
AND protoPayload.methodName="tableservice.update"
AND protoPayload.authenticationInfo.principalEmail !~ &amp;lt;regex identifying the service account used by the cloud run service who process logs&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;The filter on principalEmail serves as a mechanism to identify updates to exposition views made by the Cloud Run service, which we wish to exclude from processing as our focus lies solely on source table update events.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Finally, we need to give the sink a destination, where received logs who pass the filter are routed. &lt;a href="https://cloud.google.com/logging/docs/export/aggregated_sinks#supported-destinations" rel="noopener noreferrer"&gt;Several kinds of destination are possible.&lt;/a&gt; Because our architecture is event-driven, the selected destination is a PubSub topic. The log entry is then encoded as JSON&lt;/p&gt;

&lt;p&gt;Here is how to provision such a sink with Terraform, at project level:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight hcl"&gt;&lt;code&gt;&lt;span class="nx"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_logging_project_sink"&lt;/span&gt; &lt;span class="s2"&gt;"demo"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nx"&gt;provider&lt;/span&gt;               &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google-beta&lt;/span&gt;
  &lt;span class="nx"&gt;project&lt;/span&gt;                &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"my-project"&lt;/span&gt;
  &lt;span class="nx"&gt;name&lt;/span&gt;                   &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"logsink-demo"&lt;/span&gt;
  &lt;span class="nx"&gt;destination&lt;/span&gt;            &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"pubsub.googleapis.com/${google_pubsub_topic.demo.id}"&lt;/span&gt;
  &lt;span class="nx"&gt;filter&lt;/span&gt;                 &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;EOT&lt;/span&gt;&lt;span class="sh"&gt;
    resource.type="bigquery_resource"
    AND protoPayload.serviceName="bigquery.googleapis.com"
    AND protoPayload.methodName="tableservice.update"
    AND protoPayload.authenticationInfo.principalEmail !~ "^sa-demo@myproject.iam.gserviceaccount.com$"
&lt;/span&gt;&lt;span class="no"&gt;  EOT
&lt;/span&gt;  &lt;span class="nx"&gt;unique_writer_identity&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="nx"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_pubsub_topic_iam_member"&lt;/span&gt; &lt;span class="s2"&gt;"demo"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nx"&gt;provider&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google-beta&lt;/span&gt;
  &lt;span class="nx"&gt;topic&lt;/span&gt;    &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google_pubsub_topic&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;demo&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;
  &lt;span class="nx"&gt;role&lt;/span&gt;     &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"roles/pubsub.publisher"&lt;/span&gt;
  &lt;span class="nx"&gt;member&lt;/span&gt;   &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google_logging_project_sink&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;demo&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;writer_identity&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  3. PubSub topic and subscription
&lt;/h3&gt;

&lt;p&gt;The PubSub topic is the destination of log events who pass the log sink filter.&lt;/p&gt;

&lt;p&gt;To consume these events, a subscription in PUSH mode send these events to a HTTPS endpoint.&lt;/p&gt;

&lt;p&gt;Here is an example of how these resources can be provisioned with Terraform:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight hcl"&gt;&lt;code&gt;&lt;span class="nx"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_pubsub_topic"&lt;/span&gt; &lt;span class="s2"&gt;"demo"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nx"&gt;provider&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google-beta&lt;/span&gt;
  &lt;span class="nx"&gt;name&lt;/span&gt;     &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"topic-demo"&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="nx"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_pubsub_subscription"&lt;/span&gt; &lt;span class="s2"&gt;"demo"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nx"&gt;provider&lt;/span&gt;             &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google-beta&lt;/span&gt;
  &lt;span class="nx"&gt;name&lt;/span&gt;                 &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"sub-demo"&lt;/span&gt;
  &lt;span class="nx"&gt;topic&lt;/span&gt;                &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google_pubsub_topic&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;demo&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;
  &lt;span class="nx"&gt;ack_deadline_seconds&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;600&lt;/span&gt;

  &lt;span class="nx"&gt;push_config&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;push_endpoint&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="err"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;URL&lt;/span&gt; &lt;span class="nx"&gt;of&lt;/span&gt; &lt;span class="nx"&gt;the&lt;/span&gt; &lt;span class="nx"&gt;cloud&lt;/span&gt; &lt;span class="nx"&gt;run&lt;/span&gt; &lt;span class="nx"&gt;endpoint&lt;/span&gt;&lt;span class="err"&gt;&amp;gt;&lt;/span&gt;
    &lt;span class="nx"&gt;oidc_token&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="nx"&gt;service_account_email&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google_service_account&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;default&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;email&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;

  &lt;span class="nx"&gt;expiration_policy&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;ttl&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;""&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  4. 5. and 6. Events processing
&lt;/h3&gt;

&lt;p&gt;The processing of log events is performed by a Cloud Run service in this system, but could be done by a Cloud Function for example.&lt;/p&gt;

&lt;p&gt;In Python, the decoding of incoming events can be done like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;base64&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;

&lt;span class="n"&gt;bq_log&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;base64&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;b64decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]).&lt;/span&gt;&lt;span class="nf"&gt;decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;utf-8&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;By parsing the bq_log object, we can retrieve the updated table id:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;re&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;google.cloud.bigquery&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;TableReference&lt;/span&gt;

&lt;span class="n"&gt;RESOURCENAME_PATTERN&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;re&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;compile&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;^projects/(?P&amp;lt;project&amp;gt;[^/]+)/datasets/(?P&amp;lt;dataset&amp;gt;[^/]+)/tables/(?P&amp;lt;table&amp;gt;[^/]+)$&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;resource_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;bq_log&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;protoPayload&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{}).&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;resourceName&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;""&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="nf"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;match&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;RESOURCENAME_PATTERN&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;match&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;resource_name&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;TableReference&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;from_api_repr&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;k&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;match&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;group&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;k&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;k&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;project&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;dataset&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;table&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]}&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The next step is to identify the views which relies on this source table. Here, associations between source tables and exposition views are registered in a Firestore database, but other designs are possible. For example, you could query INFORMATION_SCHEMA.VIEWS metadata views and identify the affected views by parsing the content of the VIEW_DEFINITION column&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;VIEW_DEFINITION&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="nv"&gt;`demo_devto.INFORMATION_SCHEMA.VIEWS`&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, synchronize all affected views. BigQuery views seem to not support the updating of the "schema" field by the &lt;code&gt;update_table()&lt;/code&gt; method when columns are added. The recommended way is then to re-create views with &lt;a href="https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#create_view_statement" rel="noopener noreferrer"&gt;SQL DDL statements&lt;/a&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;OR&lt;/span&gt; &lt;span class="k"&gt;REPLACE&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="p"&gt;...&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With all steps pieced together, any schema update from source tables automatically triggers the re-creation of exposition views, keeping the schema synchronized after a short delay !&lt;/p&gt;




&lt;p&gt;Thanks for reading! I’m Matthieu, data engineer at Stack Labs.&lt;br&gt;
If you want to discover the &lt;a href="https://cloud.stack-labs.com/cloud-data-platform" rel="noopener noreferrer"&gt;Stack Labs Data Platform&lt;/a&gt; or join an enthousiast &lt;a href="https://www.welcometothejungle.com/fr/companies/stack-labs" rel="noopener noreferrer"&gt;Data Engineering team&lt;/a&gt;, please contact us.&lt;/p&gt;




&lt;p&gt;Cover picture by &lt;a href="https://unsplash.com/fr/@migueldelmar?utm_content=creditCopyText&amp;amp;utm_medium=referral&amp;amp;utm_source=unsplash" rel="noopener noreferrer"&gt;Miguel Delmar&lt;/a&gt; on &lt;a href="https://unsplash.com/fr/photos/plan-deau-bleu-W4qWlYbYqI4?utm_content=creditCopyText&amp;amp;utm_medium=referral&amp;amp;utm_source=unsplash" rel="noopener noreferrer"&gt;Unsplash&lt;/a&gt;&lt;/p&gt;

</description>
      <category>bigquery</category>
      <category>eventdriven</category>
      <category>googlecloud</category>
      <category>dataengineering</category>
    </item>
    <item>
      <title>What exactly is exactly-once delivery ?</title>
      <dc:creator>matthieucham</dc:creator>
      <pubDate>Fri, 10 Feb 2023 08:15:50 +0000</pubDate>
      <link>https://dev.to/stack-labs/pubsub-exactly-once-delivery--1dd3</link>
      <guid>https://dev.to/stack-labs/pubsub-exactly-once-delivery--1dd3</guid>
      <description>&lt;p&gt;GoogleCloud PubSub is the serverless implementation of a Publisher - Subscriber management service. It's built around the concept of topics (where messages are published to) and subscriptions (where message are consumed from).&lt;/p&gt;

&lt;p&gt;There are 3 types of subscriptions:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Push&lt;/strong&gt;: message consumption is initiated by PubSub&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Pull&lt;/strong&gt;: message consumption is initiated by the subscriber&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;BigQuery&lt;/strong&gt;: special mode where the subscriber is a BigQuery agent which stores messages in a Bigquery table.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;PubSub now offers &lt;strong&gt;Exactly-once delivery&lt;/strong&gt; option for &lt;strong&gt;Pull&lt;/strong&gt; subscriptions ; this option is GA since December 2022 &lt;a href="https://cloud.google.com/pubsub/docs/exactly-once-delivery" rel="noopener noreferrer"&gt;(doc)&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Let's see what does that mean in details.&lt;/p&gt;

&lt;h2&gt;
  
  
  What is a delivery ?
&lt;/h2&gt;

&lt;p&gt;In PubSub context, a delivery is a process which encompasses the following items:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;sending a message to a consumer&lt;/li&gt;
&lt;li&gt;receiving an acknowledgement (ack) from the consumer before the ack delay of the sent message times out.&lt;/li&gt;
&lt;li&gt;Alternatively, the consumer can send a "nack" (negative acknowledgment) instead of an ack. It tells the sender that the message could not be processed and must be sent again.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;When the ack of the message is received, the message is considered delivered by PubSub&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The following schema illustrates the delivery process in Pub/Sub:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fm4nauaxo0qiswtdikvef.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fm4nauaxo0qiswtdikvef.png" alt="Image description" width="800" height="262"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The publisher sends the message in the topic&lt;/li&gt;
&lt;li&gt;The subscriber pulls the message from the subscription. The message ack delay starts here&lt;/li&gt;
&lt;li&gt;When the processing is done, the subscriber acknowledges the message&lt;/li&gt;
&lt;li&gt;The message is marked as delivered.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Any failure occurring during this flow - networking issue, VM crash - can potentially lead to new delivery attempts resulting in duplicate outputs if multiple attempts finally succeed for the same message.&lt;/p&gt;

&lt;h2&gt;
  
  
  Exactly-once delivery
&lt;/h2&gt;

&lt;p&gt;The usual guarantee offered by PubSub is &lt;strong&gt;at least once delivery&lt;/strong&gt;. It means that in case of such a failure, PubSub will attempt to deliver the message again, until it's successfully acked (or the subscription retention limit is reached).&lt;/p&gt;

&lt;p&gt;The exactly-once delivery option ensures that PubSub will not resend messages&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;while the ack (or nack) is not received and the delay has not expired&lt;/li&gt;
&lt;li&gt;once the ack is received&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This guarantee is made possible by the usage of &lt;strong&gt;persistent storage&lt;/strong&gt; by PubSub agents: Contrary to the default mode where messages' status are stored in transient memory, Exactly-once uses a regional persistent storage service. Hence, this guarantee is enforced at regional level.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcc0886m5ped0gskmoa0q.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcc0886m5ped0gskmoa0q.png" alt="Image description" width="800" height="262"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Consequences
&lt;/h2&gt;

&lt;p&gt;The Exactly-once mode enforces that under the conditions detailed above, the message will only be delivered once. However &lt;strong&gt;this doesn't mean that no message will ever be sent multiple times&lt;/strong&gt;. How come ?&lt;/p&gt;

&lt;p&gt;Indeed, one has to make the difference between duplicates and legitimate redeliveries. For example, if the consumer takes too much time to process a message, so much that the ack delay expires, or if the process crashes and no ack is sent back to PubSub, then PubSub has no way to know that the message was already processed. Thus the message will be sent again in response to the next Pull request.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;As a consequence, if "exactly-once" message processing is of primary importance for you and you absolutely want to avoid any duplicate, you have to put extra care at every layer of your system&lt;/strong&gt;. You need to pick ack delays according to the maximum time the processing can take (including retries and such). You also need to &lt;a href="https://cloud.google.com/blog/products/data-analytics/handling-duplicate-data-in-streaming-pipeline-using-pubsub-dataflow" rel="noopener noreferrer"&gt;mitigate all risks of duplicates at each stage of your workflow&lt;/a&gt;. As you can see, &lt;strong&gt;Exactly-once solves the central step of the pipeline below&lt;/strong&gt;, not the others who still have to be taken care of:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcmaevdh6apqyhob3guih.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcmaevdh6apqyhob3guih.png" alt="Image description" width="800" height="250"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;PubSub's Exactly-once option is definitely a good step forward in helping to reach this goal, but it doesn't solve everything. End-to-end exactly-once delivery remains a challenge for any event-based system !&lt;/p&gt;




&lt;p&gt;Thanks for reading! I’m Matthieu, data engineer at Stack Labs.&lt;br&gt;
If you want to discover the &lt;a href="https://cloud.stack-labs.com/cloud-data-platform" rel="noopener noreferrer"&gt;Stack Labs Data Platform&lt;/a&gt; or join an enthousiast &lt;a href="https://www.welcometothejungle.com/fr/companies/stack-labs" rel="noopener noreferrer"&gt;Data Engineering team&lt;/a&gt;, please contact us.&lt;/p&gt;




&lt;p&gt;Cover picture by &lt;a href="https://unsplash.com/@brettjordan?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText" rel="noopener noreferrer"&gt;Brett Jordan&lt;/a&gt; on &lt;a href="https://unsplash.com/fr/photos/phUtWl8RyFE?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText" rel="noopener noreferrer"&gt;Unsplash&lt;/a&gt;&lt;/p&gt;

</description>
      <category>googlecloud</category>
      <category>pubsub</category>
      <category>programming</category>
    </item>
    <item>
      <title>How to mock a decorator in Python</title>
      <dc:creator>matthieucham</dc:creator>
      <pubDate>Thu, 26 Jan 2023 15:58:31 +0000</pubDate>
      <link>https://dev.to/stack-labs/how-to-mock-a-decorator-in-python-55jc</link>
      <guid>https://dev.to/stack-labs/how-to-mock-a-decorator-in-python-55jc</guid>
      <description>&lt;p&gt;&lt;strong&gt;Programming is the art of adding bugs to an empty text file.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Mastering &lt;code&gt;unittest.mock&lt;/code&gt; capabilities of Python to prevent these bugs, is another art of its own. And one of the trickiest move of them all is how to mock a decorator, such as it doesn't go in the way of the function you want to test.&lt;/p&gt;

&lt;p&gt;Let's consider the following function to test:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;### module api.service
&lt;/span&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;utils.decorators&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;decorator_in_the_way&lt;/span&gt;

&lt;span class="nd"&gt;@decorator_in_the_way&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;function_to_be_tested&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;kwargs&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="c1"&gt;# do something
&lt;/span&gt;    &lt;span class="bp"&gt;...&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The problem here is to write a unit test for &lt;code&gt;function_to_be_tested&lt;/code&gt; without invoking the decorator (which would make your test fail)&lt;/p&gt;

&lt;p&gt;Problem: it's not possible to &lt;code&gt;@patch&lt;/code&gt; the decorator above the test function, as it is too late: in Python functions are decorated at module-loading time. So this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;### file service_test.py
&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;unittest.mock&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;patch&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;.service&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;function_to_be_tested&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;mock_decorator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;...&lt;/span&gt;


&lt;span class="nd"&gt;@patch&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;api.service.decorator_in_the_way&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;mock_decorator&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;test_function_to_be_tested&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;function_to_be_tested&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;assert&lt;/span&gt; &lt;span class="bp"&gt;...&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;... simply does not work. The patched decorator will simply be ignored, as the function to be tested already has been instrumented with the original one.&lt;/p&gt;

&lt;p&gt;Fortunately, there is a workaround. The idea is to patch the decorator &lt;strong&gt;before&lt;/strong&gt; the module to test is loaded. Of course the requirement is that the decorator and the function to test &lt;strong&gt;don't belong to the same module&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The following example will work:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;### file service_test.py
&lt;/span&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;unittest.mock&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;patch&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;functools&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;wraps&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;mock_decorator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;kwargs&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Decorate by doing nothing.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;decorator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="nd"&gt;@wraps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;decorated_function&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;kwargs&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;f&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;kwargs&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;decorated_function&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;decorator&lt;/span&gt;

&lt;span class="c1"&gt;# PATCH THE DECORATOR HERE
&lt;/span&gt;&lt;span class="nf"&gt;patch&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;utils.decorators.decorator_in_the_way&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;mock_decorator&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;start&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# THEN LOAD THE SERVICE MODULE
&lt;/span&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;.service&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;function_to_be_tested&lt;/span&gt;

&lt;span class="c1"&gt;# ALL TESTS OF THE TEST SESSION WILL USE THE PATCHED DECORATOR 
&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;test_function_to_be_tested&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;function_to_be_tested&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="c1"&gt;# uses the mock decorator
&lt;/span&gt;    &lt;span class="k"&gt;assert&lt;/span&gt; &lt;span class="bp"&gt;...&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Hope this helps !&lt;/p&gt;

&lt;p&gt;I’m Matthieu, data engineer at Stack Labs.&lt;br&gt;
If you want to &lt;a href="https://www.welcometothejungle.com/fr/companies/stack-labs" rel="noopener noreferrer"&gt;join an enthousiast Data Engineering or Cloud Developer team&lt;/a&gt;, please contact us.&lt;/p&gt;

</description>
      <category>gratitude</category>
    </item>
    <item>
      <title>13 tricks for the new Bigquery Storage Write API in Python</title>
      <dc:creator>matthieucham</dc:creator>
      <pubDate>Wed, 16 Nov 2022 09:35:56 +0000</pubDate>
      <link>https://dev.to/stack-labs/13-tricks-for-the-new-bigquery-storage-write-api-in-python-296e</link>
      <guid>https://dev.to/stack-labs/13-tricks-for-the-new-bigquery-storage-write-api-in-python-296e</guid>
      <description>&lt;p&gt;In order to stream data into a BigQuery table programmatically, Google is promoting a new API: The &lt;a href="https://cloud.google.com/bigquery/docs/write-api" rel="noopener noreferrer"&gt;Storage Write API&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Hence, the usual API and its ominous &lt;strong&gt;tabledata.insertAll&lt;/strong&gt; method is now called "Legacy streaming API" which does not look very appealing when starting a new project.&lt;/p&gt;

&lt;p&gt;Indeed, as stated in the &lt;a href="https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery" rel="noopener noreferrer"&gt;official Google Cloud documentation&lt;/a&gt;:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;For new projects, we recommend using the BigQuery Storage Write API instead of the tabledata.insertAll method.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Moreover, the new API is advertised with lower price and new features such as the possibility of &lt;em&gt;exactly-once delivery&lt;/em&gt;. &lt;/p&gt;

&lt;p&gt;Exciting, isn't it ?&lt;/p&gt;

&lt;p&gt;Well, it is, but the Python client wrapping the new API is very bare-metal and its usage does not feel pythonic at all. As a consequence, integrating this API is much more difficult than usual with other Google Cloud clients, which is quite surprising. Having recently completed the integration of this new product I can speak from experience: I faced an unusually high number of challenges to integrate the Storage Write API into a Python application, for the most common use case: writing data rows into a BigQuery table.&lt;/p&gt;

&lt;p&gt;This article aims to list these issues and help future developers to overcome them.&lt;/p&gt;

&lt;h2&gt;
  
  
  Describe the target schema with Protobuf
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://developers.google.com/protocol-buffers" rel="noopener noreferrer"&gt;Protocol Buffers aka Protobuf&lt;/a&gt; is the portable exchange format widely used amongst Google Cloud API. It is usually hidden in the implementation when client libraries are used. With the new streaming API however, you will have to dive into it.&lt;/p&gt;

&lt;p&gt;Protobuf relies on a schema descriptor. It describes how exchanged data are structured. The descriptor is written as a &lt;code&gt;.proto&lt;/code&gt; text file where all fields, their type and their cardinality are listed.&lt;/p&gt;

&lt;p&gt;The first thing to do when integrating the Storage Write API is to write the proto file. &lt;strong&gt;The message description must match the schema of the target BigQuery table&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;same field names (case insensitive)&lt;/li&gt;
&lt;li&gt;same structure (nested types are supported)&lt;/li&gt;
&lt;li&gt;compatible field types (see the type mapping table &lt;a href="https://cloud.google.com/bigquery/docs/write-api#data_type_conversions" rel="noopener noreferrer"&gt;here&lt;/a&gt;)&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Trick #1: use proto2 syntax
&lt;/h3&gt;

&lt;p&gt;Protobuf now exists in two flavours: proto2 and proto3, the newest.&lt;br&gt;
proto2 works well with BigQuery, whereas there are some issues with proto3 which is fairly recent. Moreover, all examples provided by GCP currently use proto2. So for now I recommend to stick to this version.&lt;/p&gt;
&lt;h3&gt;
  
  
  Trick #2: all your fields are optional
&lt;/h3&gt;

&lt;p&gt;In proto2 syntax you can declare a field as optional or required. This possibility is removed from proto3 (optional is implicit). In proto2, it is now recommended by Google to declare all your fields as optional, even if they are REQUIRED in the Bigquery schema. However, you will still see some &lt;code&gt;required&lt;/code&gt; fields in GCP examples like &lt;a href="https://github.com/googleapis/python-bigquery-storage/blob/main/samples/snippets/customer_record.proto" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;/p&gt;
&lt;h3&gt;
  
  
  Trick #3: auto-generate the proto file
&lt;/h3&gt;

&lt;p&gt;Writing a &lt;code&gt;.proto&lt;/code&gt; descriptor can be very tedious, if the target schema has many columns with deep nested structures: don't forget that the descriptor has to match the target schema &lt;strong&gt;exactly&lt;/strong&gt; !&lt;/p&gt;

&lt;p&gt;You can ease the pain by autogenerating some of the proto file from the bigquery schema. First, download the target schema from &lt;code&gt;bq&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;bq show &lt;span class="nt"&gt;--schema&lt;/span&gt; &lt;span class="nt"&gt;--format&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;prettyjson dataset_name:project_name:target_table_name &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; schema_target_table.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then use some scripting to convert the downloaded schema file into a proto. &lt;a href="https://github.com/matthiasa4/GCPUtils/tree/master/GenerateProtoBufferFile" rel="noopener noreferrer"&gt;ParseToProtoBuffer.py&lt;/a&gt;, courtesy of &lt;a href="https://github.com/matthiasa4" rel="noopener noreferrer"&gt;matthiasa4&lt;/a&gt;, is useful for inspiration.&lt;/p&gt;

&lt;h3&gt;
  
  
  Trick #4 : Bigquery TIMESTAMP are protobuf int64
&lt;/h3&gt;

&lt;p&gt;Even though protobuf provides a timestamp data type, the best way to send a timestamp value to Bigquery is to declare an &lt;code&gt;int64&lt;/code&gt; field.&lt;br&gt;
Set the field value to the Epoch timestamp &lt;strong&gt;in microseconds&lt;/strong&gt; and it will be automatically converted into a Bigquery TIMESTAMP&lt;/p&gt;
&lt;h2&gt;
  
  
  Generate Python Protobuf objects
&lt;/h2&gt;

&lt;p&gt;The next step is to generate Python code from the &lt;code&gt;.proto&lt;/code&gt; file.&lt;/p&gt;
&lt;h3&gt;
  
  
  Trick #5: install or upgrade protoc
&lt;/h3&gt;

&lt;p&gt;Ensure you have installed the latest version of &lt;a href="https://grpc.io/docs/protoc-installation/" rel="noopener noreferrer"&gt;protoc&lt;/a&gt;, the Protobuf compiler.&lt;/p&gt;

&lt;p&gt;The aim of this software is to generate code artefacts from proto files. Several flavours are available. Of course we pick Python. Invoke protoc like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;protoc &lt;span class="nt"&gt;-I&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;.&lt;/span&gt; &lt;span class="nt"&gt;--python_out&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;.&lt;/span&gt;  schema_target_table.proto
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The outcome is a file named &lt;code&gt;schema_target_table_pb2.py&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;The content of the generated file is surprising: it appears to be lacking a lot of definitions ! The reason is that the missing parts are going to be dynamically inserted at runtime by the Protobuf Python library. As a consequence:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;your IDE will be mad about you&lt;/li&gt;
&lt;li&gt;Pylint will insult you&lt;/li&gt;
&lt;li&gt;you have to take a guess about the missing definition names&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Come on Google, are you serious ?&lt;/p&gt;

&lt;h3&gt;
  
  
  Trick #6: make _pb2.py files pass Pylint
&lt;/h3&gt;

&lt;p&gt;Simply put the following line on top of each _pb2 file, and Pylint will leave you alone:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# pylint: skip-file
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Trick #7: Import the missing classes
&lt;/h3&gt;

&lt;p&gt;The generated classes have the following format:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The same name as the message&lt;/li&gt;
&lt;li&gt;In case of a nested type, it will be accessible as a class variable of the parent type / class&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Let's illustrate. The following proto file&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight protobuf"&gt;&lt;code&gt;&lt;span class="na"&gt;syntax&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"proto2"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;message&lt;/span&gt; &lt;span class="nc"&gt;Mymessage&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;optional&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;myfield&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

  &lt;span class="kd"&gt;message&lt;/span&gt; &lt;span class="nc"&gt;Mysubmessage&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;optional&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;mysubfield&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;

  &lt;span class="k"&gt;optional&lt;/span&gt; &lt;span class="n"&gt;Mysubmessage&lt;/span&gt; &lt;span class="na"&gt;mycomplexfield&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="k"&gt;repeated&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;collection&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Will generate python classes which can be imported like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;.schema_target_table_pb2.py&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Mymessage&lt;/span&gt;


&lt;span class="n"&gt;submessage_instance&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Mymessage&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Mysubmessage&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Needless to say, your IDE will turn red because of these imports. You will have to tame Pylint too:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# pylint: disable=no-name-in-module
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Set Protobuf object fields
&lt;/h2&gt;

&lt;p&gt;Filling proto fields up is very counterintuitive. Good job that Google provides an &lt;a href="https://developers.google.com/protocol-buffers/docs/pythontutorial" rel="noopener noreferrer"&gt;exhaustive documentation&lt;/a&gt; about it.&lt;/p&gt;

&lt;p&gt;Here is a straight-to-the-point TL;DR:&lt;/p&gt;

&lt;h3&gt;
  
  
  Trick #8: Simple (scalar) type fields can be directly assigned
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;mymsg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Mymessage&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="n"&gt;mymsg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;myfield&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;toto&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Trick #9: Use CopyFrom for nested type fields
&lt;/h3&gt;

&lt;p&gt;Yes, &lt;code&gt;CopyFrom()&lt;/code&gt;, a Python method name in CamelCase starting with an upper letter. Come on, Google !&lt;/p&gt;

&lt;p&gt;Anyway, you cannot assign a complex field directly:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;mymsg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Mymessage&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="n"&gt;mymsg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;myfield&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;toto&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="n"&gt;mysubmsg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Mymessage&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Mysubmessage&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="n"&gt;mymsg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mycomplexfield&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;CopyFrom&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;mysubmsg&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Trick #10: Use append for repeated fields
&lt;/h3&gt;

&lt;p&gt;You mustn't instanciate an empty list. Append it as if it existed&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;mymsg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Mymessage&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="n"&gt;mymsg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;titi&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Store Protobuf object into Bigquery
&lt;/h2&gt;

&lt;p&gt;The next step is to store the Protobuf objects into Bigquery. There again there are some tricks to achieve this:&lt;/p&gt;

&lt;h3&gt;
  
  
  Trick #11: Be a dataEditor
&lt;/h3&gt;

&lt;p&gt;The user or service account performing the storage must have &lt;code&gt;bigquery.tables.updateData&lt;/code&gt; permission on the target table.&lt;/p&gt;

&lt;p&gt;You get this permission in the &lt;code&gt;bigquery.dataEditor&lt;/code&gt; role&lt;/p&gt;

&lt;h3&gt;
  
  
  Trick #12: Don't set a package name in the proto file
&lt;/h3&gt;

&lt;p&gt;In many proto file samples a &lt;code&gt;package&lt;/code&gt; directive is set:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;package foo.bar;

message Mymessage{...
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is to avoid name clashes. But they are not really useful in Python (generated classes are identified by their file path) and moreover &lt;strong&gt;package names are not supported by Bigquery in nested message types when storing&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;So, just don't set a package.&lt;/p&gt;

&lt;h3&gt;
  
  
  Trick #13: abstract the storage in a manager.
&lt;/h3&gt;

&lt;p&gt;The Protobuf object is ready to be inserted at last ! Adapt the &lt;a href="https://github.com/googleapis/python-bigquery-storage/blob/main/samples/snippets/append_rows_proto2.py" rel="noopener noreferrer"&gt;snippet given by Google&lt;/a&gt; to your own code to perform the storage. As you can see, it's not really a one-liner: more than 20 lines are necessary just to setup the destination stream. Besides, each append operation requires an AppendRowsRequest to be created, which is tedious too. &lt;/p&gt;

&lt;p&gt;It's a good idea to wrap all these tasks in a practical Manager class for your application to use. Here is an example implementation:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Wrapper around BigQuery call.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;__future__&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;annotations&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;typing&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Any&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Iterable&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;google.cloud&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;bigquery_storage&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;google.cloud.bigquery_storage_v1&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;exceptions&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;bqstorage_exceptions&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;google.cloud.bigquery_storage_v1&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;writer&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;google.protobuf&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;descriptor_pb2&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;google.protobuf.descriptor&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Descriptor&lt;/span&gt;



&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;DefaultStreamManager&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;  &lt;span class="c1"&gt;# pragma: no cover
&lt;/span&gt;    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Manage access to the _default stream write streams.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;table_path&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;message_protobuf_descriptor&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Descriptor&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;bigquery_storage_write_client&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;bigquery_storage&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BigQueryWriteClient&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Init.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stream_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;table_path&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/_default&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;message_protobuf_descriptor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message_protobuf_descriptor&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;write_client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;bigquery_storage_write_client&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;append_rows_stream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_init_stream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Init the underlying stream manager.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="c1"&gt;# Create a template with fields needed for the first request.
&lt;/span&gt;        &lt;span class="n"&gt;request_template&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;AppendRowsRequest&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="c1"&gt;# The initial request must contain the stream name.
&lt;/span&gt;        &lt;span class="n"&gt;request_template&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;write_stream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stream_name&lt;/span&gt;
        &lt;span class="c1"&gt;# So that BigQuery knows how to parse the serialized_rows, generate a
&lt;/span&gt;        &lt;span class="c1"&gt;# protocol buffer representation of our message descriptor.
&lt;/span&gt;        &lt;span class="n"&gt;proto_schema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;ProtoSchema&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;proto_descriptor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;descriptor_pb2&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;DescriptorProto&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# pylint: disable=no-member
&lt;/span&gt;        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;message_protobuf_descriptor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;CopyToProto&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;proto_descriptor&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;proto_schema&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;proto_descriptor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;proto_descriptor&lt;/span&gt;
        &lt;span class="n"&gt;proto_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AppendRowsRequest&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;ProtoData&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;proto_data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;writer_schema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;proto_schema&lt;/span&gt;
        &lt;span class="n"&gt;request_template&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;proto_rows&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;proto_data&lt;/span&gt;
        &lt;span class="c1"&gt;# Create an AppendRowsStream using the request template created above.
&lt;/span&gt;        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;append_rows_stream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;writer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;AppendRowsStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;write_client&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;request_template&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;send_appendrowsrequest&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AppendRowsRequest&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;writer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AppendRowsFuture&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Send request to the stream manager. Init the stream manager if needed.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;append_rows_stream&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;_init_stream&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;append_rows_stream&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="n"&gt;bqstorage_exceptions&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;StreamClosedError&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="c1"&gt;# the stream needs to be reinitialized
&lt;/span&gt;            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;append_rows_stream&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;append_rows_stream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
            &lt;span class="k"&gt;raise&lt;/span&gt;

    &lt;span class="c1"&gt;# Use as a context manager
&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__enter__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;DefaultStreamManager&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Enter the context manager. Return the stream name.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;_init_stream&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__exit__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;exc_type&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;exc_value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;traceback&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Exit the context manager : close the stream.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;append_rows_stream&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="c1"&gt;# Shutdown background threads and close the streaming connection.
&lt;/span&gt;            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;append_rows_stream&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;


&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BigqueryWriteManager&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Encapsulation for bigquery client.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;project_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;dataset_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;table_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;bigquery_storage_write_client&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;bigquery_storage&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BigQueryWriteClient&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;pb2_descriptor&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Descriptor&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;):&lt;/span&gt;  &lt;span class="c1"&gt;# pragma: no cover
&lt;/span&gt;        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Create a BigQueryManager.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bigquery_storage_write_client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;bigquery_storage_write_client&lt;/span&gt;

        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;table_path&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bigquery_storage_write_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;table_path&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;project_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dataset_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;table_id&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;pb2_descriptor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pb2_descriptor&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;write_rows&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pb_rows&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Iterable&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;Any&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Write data rows.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;DefaultStreamManager&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;table_path&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;pb2_descriptor&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bigquery_storage_write_client&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;target_stream_manager&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;proto_rows&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;ProtoRows&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="c1"&gt;# Create a batch of row data by appending proto2 serialized bytes to the
&lt;/span&gt;            &lt;span class="c1"&gt;# serialized_rows repeated field.
&lt;/span&gt;            &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;row&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;pb_rows&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;proto_rows&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;serialized_rows&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;row&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;SerializeToString&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
            &lt;span class="c1"&gt;# Create an append row request containing the rows
&lt;/span&gt;            &lt;span class="n"&gt;request&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;AppendRowsRequest&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="n"&gt;proto_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AppendRowsRequest&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;ProtoData&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="n"&gt;proto_data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;rows&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;proto_rows&lt;/span&gt;
            &lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;proto_rows&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;proto_data&lt;/span&gt;

            &lt;span class="n"&gt;future&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;target_stream_manager&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;send_appendrowsrequest&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

            &lt;span class="c1"&gt;# Wait for the append row requests to finish.
&lt;/span&gt;            &lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;result&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;This API is promising but so more difficult to integrate in Python app than usual ! Hopefully, Google will publish a more high-level client library in the future.&lt;/p&gt;

&lt;p&gt;If it's not the case, I hope that at least I spared you some headaches with this API usage.&lt;/p&gt;

&lt;p&gt;Thanks for reading! I’m Matthieu, data engineer at Stack Labs.&lt;br&gt;
If you want to discover the &lt;a href="https://cloud.stack-labs.com/cloud-data-platform" rel="noopener noreferrer"&gt;Stack Labs Data Platform&lt;/a&gt; or join an enthousiast &lt;a href="https://www.welcometothejungle.com/fr/companies/stack-labs" rel="noopener noreferrer"&gt;Data Engineering team&lt;/a&gt;, please contact us.&lt;/p&gt;

&lt;p&gt;Photo by &lt;a href="https://unsplash.com/@emilybernal?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText" rel="noopener noreferrer"&gt;Emily Bernal&lt;/a&gt; on &lt;a href="https://unsplash.com/s/photos/thirteen?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText" rel="noopener noreferrer"&gt;Unsplash&lt;/a&gt;&lt;/p&gt;

</description>
      <category>python</category>
      <category>googlecloud</category>
      <category>programming</category>
      <category>api</category>
    </item>
    <item>
      <title>BigQuery transactions over multiple queries, with sessions</title>
      <dc:creator>matthieucham</dc:creator>
      <pubDate>Mon, 09 May 2022 15:05:01 +0000</pubDate>
      <link>https://dev.to/stack-labs/bigquery-transactions-over-multiple-queries-with-sessions-2ll5</link>
      <guid>https://dev.to/stack-labs/bigquery-transactions-over-multiple-queries-with-sessions-2ll5</guid>
      <description>&lt;p&gt;BigQuery supports transactions since last year (Presented at Google Cloud Next'21) : it is now possible to perform mutating operations over one or several tables and then commit or rollback the result atomically, by wrapping the script between&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;BEGIN&lt;/span&gt; &lt;span class="n"&gt;TRANSACTION&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;and&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;COMMIT&lt;/span&gt; &lt;span class="n"&gt;TRANSACTION&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;or&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;ROLLBACK&lt;/span&gt; &lt;span class="n"&gt;TRANSACTION&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Easy enough ! It's all explained &lt;a href="https://cloud.google.com/bigquery/docs/reference/standard-sql/transactions" rel="noopener noreferrer"&gt;in the official documentation&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Yet transactions come with a limitation:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;A transaction cannot span multiple scripts.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;While this is not an issue most of the time, it can be a problem when the scripts enclosed in the transaction become too complex or have too many query parameters, or break any other &lt;a href="https://cloud.google.com/bigquery/quotas#jobs" rel="noopener noreferrer"&gt;quota of BigQuery jobs&lt;/a&gt;. This can happen when query scripts are auto-generated from a request payload, for example.&lt;/p&gt;

&lt;p&gt;There is a way around it, with BigQuery sessions. Let's see how it works&lt;/p&gt;

&lt;h2&gt;
  
  
  BigQuery Sessions
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://cloud.google.com/bigquery/docs/sessions-intro" rel="noopener noreferrer"&gt;Sessions&lt;/a&gt; are a way to link jobs and persist transient data, like temporary tables, between them.&lt;/p&gt;

&lt;p&gt;One common use case for sessions is exactly what we want:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Create multi-statement transactions over multiple queries. Within a session, you can begin a transaction, make changes, and view the temporary result before deciding to commit or rollback. You can do this over several queries in the session. If you do not use a session, a multi-statement transaction needs to be completed in a single query.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;The idea is to stack the transaction queries inside the same session, beginning with &lt;code&gt;BEGIN TRANSACTION;&lt;/code&gt; and ending with &lt;code&gt;COMMIT TRANSACTION;&lt;/code&gt;.&lt;br&gt;
In between, you can call put as many queries as necessary and the whole session will have atomic behavior.&lt;/p&gt;

&lt;p&gt;A session is closed automatically after 24 hours of inactivity. However, when mixed with transactions, it can happen that the targeted table gets "locked" in the session and becomes unusable until the end of the session. That's why I recommend to force the session ending at the end of the script. It is done by invoking the following query:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CALL&lt;/span&gt; &lt;span class="n"&gt;BQ&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ABORT_SESSION&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Python implementation
&lt;/h2&gt;

&lt;p&gt;We are dealing with the notion of session that we need to open and always close at the end of the processing : a context manager is naturally indicated for this.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;ContextManager wrapping a bigquery session.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;google.cloud&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;bigquery&lt;/span&gt;


&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BigquerySession&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;ContextManager wrapping a bigquerySession.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bqclient&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;bigquery&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Client&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bqlocation&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;EU&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Construct instance.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_bigquery_client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;bqclient&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_location&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;bqlocation&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_session_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__enter__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Initiate a Bigquery session and return the session_id.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;job&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_bigquery_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;SELECT 1;&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;# a query can't fail
&lt;/span&gt;            &lt;span class="n"&gt;job_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;bigquery&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;QueryJobConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;create_session&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="n"&gt;location&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_session_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;session_info&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;session_id&lt;/span&gt;
        &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;result&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# wait job completion
&lt;/span&gt;        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_session_id&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__exit__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;exc_type&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;exc_value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;traceback&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Abort the opened session.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_session_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="c1"&gt;# abort the session in any case to have a clean state at the end
&lt;/span&gt;            &lt;span class="c1"&gt;# (sometimes in case of script failure, the table is locked in
&lt;/span&gt;            &lt;span class="c1"&gt;# the session)
&lt;/span&gt;            &lt;span class="n"&gt;job&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_bigquery_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;CALL BQ.ABORT_SESSION();&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;job_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;bigquery&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;QueryJobConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                    &lt;span class="n"&gt;create_session&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="n"&gt;connection_properties&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;
                        &lt;span class="n"&gt;bigquery&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;ConnectionProperty&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                            &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;session_id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_session_id&lt;/span&gt;
                        &lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="p"&gt;],&lt;/span&gt;
                &lt;span class="p"&gt;),&lt;/span&gt;
                &lt;span class="n"&gt;location&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;result&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It then become really easy to use this context to stack jobs into a single session, thus to create a multistatement, multiscripts bigquery transaction:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;BigquerySession&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bigquery_client&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;BIGQUERY_LOCATION&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;session_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="c1"&gt;# open transaction
&lt;/span&gt;    &lt;span class="n"&gt;job&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bigquery_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;BEGIN TRANSACTION;&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;job_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;bigquery&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;QueryJobConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;create_session&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;connection_properties&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;
                &lt;span class="n"&gt;bigquery&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;ConnectionProperty&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                    &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;session_id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;session_id&lt;/span&gt;
                &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;],&lt;/span&gt;
        &lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="n"&gt;location&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;BIGQUERY_LOCATION&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;result&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="c1"&gt;# stack queries
&lt;/span&gt;    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;queryscript&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;scripts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;job&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bigquery_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;queryscript&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;job_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;bigquery&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;QueryJobConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;create_session&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;connection_properties&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;
                    &lt;span class="n"&gt;bigquery&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;ConnectionProperty&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                        &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;session_id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;session_id&lt;/span&gt;
                    &lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="n"&gt;location&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;BIGQUERY_LOCATION&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;result&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="c1"&gt;# end transaction
&lt;/span&gt;    &lt;span class="n"&gt;job&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;bigquery_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;COMMIT TRANSACTION;&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;job_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;bigquery&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;QueryJobConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;create_session&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;connection_properties&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;
                &lt;span class="n"&gt;bigquery&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;ConnectionProperty&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                    &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;session_id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;session_id&lt;/span&gt;
                &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;],&lt;/span&gt;
        &lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="n"&gt;location&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;BIGQUERY_LOCATION&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;result&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice how all jobs are run with the same session_id (i.e. within the same session) and in the same location (this is a requirement for sessions).&lt;/p&gt;

&lt;p&gt;Hope this helps !&lt;/p&gt;




&lt;p&gt;Thanks for reading! I’m Matthieu, data engineer at Stack Labs.&lt;br&gt;
If you want to discover the &lt;a href="https://cloud.stack-labs.com/cloud-data-platform" rel="noopener noreferrer"&gt;Stack Labs Data Platform&lt;/a&gt; or join an enthousiast &lt;a href="https://www.welcometothejungle.com/fr/companies/stack-labs" rel="noopener noreferrer"&gt;Data Engineering team&lt;/a&gt;, please contact us.&lt;/p&gt;




&lt;p&gt;Photo by Caroline Selfors on Unsplash&lt;/p&gt;

</description>
      <category>googlecloud</category>
      <category>python</category>
      <category>database</category>
      <category>bigdata</category>
    </item>
    <item>
      <title>How to overcome Cloud Run's 32MB request limit</title>
      <dc:creator>matthieucham</dc:creator>
      <pubDate>Wed, 06 Apr 2022 10:27:36 +0000</pubDate>
      <link>https://dev.to/stack-labs/how-to-overcome-cloud-runs-32mb-request-limit-190j</link>
      <guid>https://dev.to/stack-labs/how-to-overcome-cloud-runs-32mb-request-limit-190j</guid>
      <description>&lt;p&gt;&lt;a href="https://cloud.google.com/run" rel="noopener noreferrer"&gt;Cloud Run&lt;/a&gt; is an awesome serverless product provided by Google Cloud which is often a perfect fit to run containerized web services. It offers many advantages such as autoscaling, rolling updates, autorestart, scale to 0 to name just a few. All of it without the hassle of provisioning and managing any cluster !&lt;/p&gt;

&lt;p&gt;You would definitely pick this product to host, say, a Python Flask Rest API with the following design:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fglk53v5nv9xlmox1ewl5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fglk53v5nv9xlmox1ewl5.png" alt="Image description" width="800" height="140"&gt;&lt;/a&gt;&lt;br&gt;
1- Upload a data file by HTTP POST to a REST endpoint&lt;br&gt;
2- Process the file&lt;br&gt;
3- Insert the data into BigQuery using the client lib&lt;/p&gt;

&lt;p&gt;Which is perfectly fine... Unless you want to be able to handle data file bigger than 32 MB !&lt;/p&gt;

&lt;p&gt;Indeed, Cloud Run won't let you upload such a big file. Instead, you'll get an error message:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;413: Request entity too large&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Congratulations, you've just hit the hard &lt;a href="https://cloud.google.com/run/quotas#cloud_run_limits" rel="noopener noreferrer"&gt;size limit of Cloud Run inbound requests&lt;/a&gt;.&lt;br&gt;
But don't worry, you can keep using Cloud Run for your service, if you apply the improved design below:&lt;/p&gt;
&lt;h2&gt;
  
  
  Improved design, with Cloud Storage, Signed Url and PubSub notifications
&lt;/h2&gt;

&lt;p&gt;To work around the limitation, you can design a solution based upon &lt;a href="https://cloud.google.com/storage/docs/access-control/signed-urls" rel="noopener noreferrer"&gt;Cloud Storage signed urls&lt;/a&gt;:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsj4l2lg839dxyviad8g2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsj4l2lg839dxyviad8g2.png" alt="Image description" width="800" height="366"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This time, the file is not directly uploaded to the REST endpoint, but uploaded to cloud storage instead, thus bypassing the 32 MB limitation.&lt;/p&gt;

&lt;p&gt;The downside of this process is that the client has to make two requests instead of one. Hence, the whole new sequence goes like this:&lt;/p&gt;

&lt;p&gt;1- the client requests a signed url to upload to&lt;br&gt;
2- the webservice, using the Cloud Storage client, generates a signed url and returns it to the client&lt;br&gt;
3- the client uploads the file to the Cloud Storage bucket directly (HTTP PUT to the signed url)&lt;br&gt;
4- at the end of the file upload, the notification &lt;code&gt;OBJECT_FINALIZE&lt;/code&gt; is sent to PubSub&lt;br&gt;
5- the notification is then pushed back to the webservice on Cloud Run through a subscription&lt;br&gt;
6- the webservice reacts to the notification by downloading the file&lt;br&gt;
7- the webservice can then process the file, in the exact same way it did it in the original design&lt;br&gt;
8- likewise, data are inserted into BigQuery&lt;/p&gt;

&lt;p&gt;This design is entirely serverless and scales neatly, without any single point of failure. Now, let's see in more details how to implement it.&lt;/p&gt;
&lt;h3&gt;
  
  
  Make a signed url from Cloud Run
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Gotcha!&lt;/strong&gt; It is required for the Cloud Run service to have the role &lt;code&gt;roles/iam.serviceAccountTokenCreator&lt;/code&gt; in order to be able to generate a signed url. It is not really documented, and if you don't grant it, you get a HTTP error 403 without much more information.&lt;/p&gt;

&lt;p&gt;This python code, courtesy of &lt;a href="https://blog.bavard.ai/how-to-generate-signed-urls-using-python-in-google-cloud-run-835ddad5366" rel="noopener noreferrer"&gt;this blog post by Evan Peterson&lt;/a&gt;, exposes how to produce signed urls with the Cloud Run webservice's default service account, without requiring the private key file locally (which is big no-no for security reason !)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;typing&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;timedelta&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;google&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;auth&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;google.auth.transport&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;google.cloud.storage&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Client&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;make_signed_upload_url&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;bucket&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;blob&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;exp&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;timedelta&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;content_type&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;application/octet-stream&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;min_size&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;max_size&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nf"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;1e6&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    Compute a GCS signed upload URL without needing a private key file.
    Can only be called when a service account is used as the application
    default credentials, and when that service account has the proper IAM
    roles, like `roles/storage.objectCreator` for the bucket, and
    `roles/iam.serviceAccountTokenCreator`.
    Source: https://stackoverflow.com/a/64245028

    Parameters
    ----------
    bucket : str
        Name of the GCS bucket the signed URL will reference.
    blob : str
        Name of the GCS blob (in `bucket`) the signed URL will reference.
    exp : timedelta, optional
        Time from now when the signed url will expire.
    content_type : str, optional
        The required mime type of the data that is uploaded to the generated
        signed url.
    min_size : int, optional
        The minimum size the uploaded file can be, in bytes (inclusive).
        If the file is smaller than this, GCS will return a 400 code on upload.
    max_size : int, optional
        The maximum size the uploaded file can be, in bytes (inclusive).
        If the file is larger than this, GCS will return a 400 code on upload.
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;exp&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;exp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;timedelta&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;hours&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;credentials&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;project_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;auth&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;default&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;credentials&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;token&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="c1"&gt;# Perform a refresh request to populate the access token of the
&lt;/span&gt;        &lt;span class="c1"&gt;# current credentials.
&lt;/span&gt;        &lt;span class="n"&gt;credentials&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;refresh&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;requests&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Request&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
    &lt;span class="n"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Client&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;bucket&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_bucket&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bucket&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;blob&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;bucket&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;blob&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;blob&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;blob&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;generate_signed_url&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;version&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;v4&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;expiration&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;exp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;service_account_email&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;credentials&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;service_account_email&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;access_token&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;credentials&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;method&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;PUT&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;content_type&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;content_type&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;headers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;X-Goog-Content-Length-Range&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;min_size&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;,&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;max_size&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Terraform
&lt;/h3&gt;

&lt;p&gt;There is no robust way to do Cloud without infra as code, and &lt;a href="https://www.terraform.io/" rel="noopener noreferrer"&gt;Terraform&lt;/a&gt; is the perfect tool to manage your Cloud resources.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fom2n7915d1enl0751fmx.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fom2n7915d1enl0751fmx.jpeg" alt="Image description" width="400" height="400"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here are the Terraform fragments for deploying this design:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight terraform"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Resources to handle big data files (&amp;gt;32 Mb)&lt;/span&gt;
&lt;span class="c1"&gt;# These files are uploaded to a special bucket with notifications&lt;/span&gt;

&lt;span class="k"&gt;provider&lt;/span&gt; &lt;span class="s2"&gt;"google-beta"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nx"&gt;project&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="err"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;your&lt;/span&gt; &lt;span class="nx"&gt;GCP&lt;/span&gt; &lt;span class="nx"&gt;project&lt;/span&gt; &lt;span class="nx"&gt;name&lt;/span&gt;&lt;span class="err"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;data&lt;/span&gt; &lt;span class="s2"&gt;"google_project"&lt;/span&gt; &lt;span class="s2"&gt;"default"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;provider&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google-beta&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_storage_bucket"&lt;/span&gt; &lt;span class="s2"&gt;"bigframes_bucket"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nx"&gt;project&lt;/span&gt;  &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="err"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;your&lt;/span&gt; &lt;span class="nx"&gt;GCP&lt;/span&gt; &lt;span class="nx"&gt;project&lt;/span&gt; &lt;span class="nx"&gt;name&lt;/span&gt;&lt;span class="err"&gt;&amp;gt;&lt;/span&gt;
  &lt;span class="nx"&gt;name&lt;/span&gt;     &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"upload-big-files"&lt;/span&gt;
  &lt;span class="nx"&gt;location&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"EU"&lt;/span&gt;

  &lt;span class="nx"&gt;cors&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;origin&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"*"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="nx"&gt;method&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"*"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="nx"&gt;response_header&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
      &lt;span class="s2"&gt;"Content-Type"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="s2"&gt;"Access-Control-Allow-Origin"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="s2"&gt;"X-Goog-Content-Length-Range"&lt;/span&gt;
    &lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="nx"&gt;max_age_seconds&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;3600&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_service_account"&lt;/span&gt; &lt;span class="s2"&gt;"default"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;provider&lt;/span&gt;     &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google-beta&lt;/span&gt;
  &lt;span class="nx"&gt;account_id&lt;/span&gt;   &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"sa-webservice"&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_storage_bucket_iam_member"&lt;/span&gt; &lt;span class="s2"&gt;"bigframes_admin"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nx"&gt;bucket&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google_storage_bucket&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;bigframes_bucket&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;name&lt;/span&gt;
  &lt;span class="nx"&gt;role&lt;/span&gt;   &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"roles/storage.admin"&lt;/span&gt;
  &lt;span class="nx"&gt;member&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"serviceAccount:&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;google_service_account&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;default&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;email&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# required to generate a signed url&lt;/span&gt;
&lt;span class="k"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_service_account_iam_member"&lt;/span&gt; &lt;span class="s2"&gt;"tokencreator"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;provider&lt;/span&gt;           &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google-beta&lt;/span&gt;
  &lt;span class="nx"&gt;service_account_id&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google_service_account&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;default&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;name&lt;/span&gt;
  &lt;span class="nx"&gt;role&lt;/span&gt;               &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"roles/iam.serviceAccountTokenCreator"&lt;/span&gt;
  &lt;span class="nx"&gt;member&lt;/span&gt;             &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"serviceAccount:&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;google_service_account&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;default&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;email&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# upload topic for notifications&lt;/span&gt;
&lt;span class="k"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_pubsub_topic"&lt;/span&gt; &lt;span class="s2"&gt;"bigframes_topic"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;provider&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google-beta&lt;/span&gt;
  &lt;span class="nx"&gt;name&lt;/span&gt;     &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"topic-bigframes"&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# upload deadletter topic for failed notifications&lt;/span&gt;
&lt;span class="k"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_pubsub_topic"&lt;/span&gt; &lt;span class="s2"&gt;"bigframes_topic_deadletter"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;provider&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google-beta&lt;/span&gt;
  &lt;span class="nx"&gt;name&lt;/span&gt;     &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"topic-bigframesdeadletter"&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# add frame upload notifications on the bucket&lt;/span&gt;
&lt;span class="k"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_storage_notification"&lt;/span&gt; &lt;span class="s2"&gt;"bigframes_notification"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;provider&lt;/span&gt;       &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google-beta&lt;/span&gt;
  &lt;span class="nx"&gt;bucket&lt;/span&gt;         &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google_storage_bucket&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;bigframes_bucket&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;name&lt;/span&gt;
  &lt;span class="nx"&gt;payload_format&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"JSON_API_V1"&lt;/span&gt;
  &lt;span class="nx"&gt;topic&lt;/span&gt;          &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google_pubsub_topic&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;bigframes_topic&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;
  &lt;span class="nx"&gt;event_types&lt;/span&gt;    &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"OBJECT_FINALIZE"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
  &lt;span class="nx"&gt;depends_on&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;google_pubsub_topic_iam_binding&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;bigframes_binding&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# required for storage notifications&lt;/span&gt;
&lt;span class="c1"&gt;# seriously, Google, this should be by default !&lt;/span&gt;
&lt;span class="k"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_pubsub_topic_iam_binding"&lt;/span&gt; &lt;span class="s2"&gt;"bigframes_binding"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nx"&gt;topic&lt;/span&gt;   &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google_pubsub_topic&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;bigframes_topic&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;
  &lt;span class="nx"&gt;role&lt;/span&gt;    &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"roles/pubsub.publisher"&lt;/span&gt;
  &lt;span class="nx"&gt;members&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"serviceAccount:service-&lt;/span&gt;&lt;span class="k"&gt;${data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;google_project&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;default&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;number&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;@gs-project-accounts.iam.gserviceaccount.com"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# frame upload main sub&lt;/span&gt;
&lt;span class="k"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_pubsub_subscription"&lt;/span&gt; &lt;span class="s2"&gt;"bigframes_sub"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;provider&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google-beta&lt;/span&gt;
  &lt;span class="nx"&gt;name&lt;/span&gt;     &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"sub-bigframes"&lt;/span&gt;
  &lt;span class="nx"&gt;topic&lt;/span&gt;    &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google_pubsub_topic&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;bigframes_topic&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;

  &lt;span class="nx"&gt;push_config&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;push_endpoint&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="err"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;URL&lt;/span&gt; &lt;span class="nx"&gt;where&lt;/span&gt; &lt;span class="nx"&gt;pushed&lt;/span&gt; &lt;span class="nx"&gt;notification&lt;/span&gt; &lt;span class="nx"&gt;are&lt;/span&gt; &lt;span class="nx"&gt;POST-ed&lt;/span&gt;&lt;span class="err"&gt;&amp;gt;&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="nx"&gt;dead_letter_policy&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;dead_letter_topic&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google_pubsub_topic&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;bigframes_topic_deadletter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# frame upload deadletter subscription&lt;/span&gt;
&lt;span class="k"&gt;resource&lt;/span&gt; &lt;span class="s2"&gt;"google_pubsub_subscription"&lt;/span&gt; &lt;span class="s2"&gt;"bigframes_sub_deadletter"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;provider&lt;/span&gt;             &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google-beta&lt;/span&gt;
  &lt;span class="nx"&gt;name&lt;/span&gt;                 &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"sub-bigframesdeadletter"&lt;/span&gt;
  &lt;span class="nx"&gt;topic&lt;/span&gt;                &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;google_pubsub_topic&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;bigframes_topic_deadletter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;
  &lt;span class="nx"&gt;ack_deadline_seconds&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;600&lt;/span&gt;

  &lt;span class="nx"&gt;push_config&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;push_endpoint&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="err"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;URL&lt;/span&gt; &lt;span class="nx"&gt;where&lt;/span&gt; &lt;span class="nx"&gt;pushed&lt;/span&gt; &lt;span class="nx"&gt;notification&lt;/span&gt; &lt;span class="nx"&gt;are&lt;/span&gt; &lt;span class="nx"&gt;POST-ed&lt;/span&gt;&lt;span class="err"&gt;&amp;gt;&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Just &lt;code&gt;terraform deploy&lt;/code&gt; it !&lt;/p&gt;

&lt;h3&gt;
  
  
  How to upload
&lt;/h3&gt;

&lt;p&gt;One final gotcha: to upload to Cloud Storage with the signed url, you must set an additional header in the &lt;code&gt;PUT&lt;/code&gt; request:&lt;br&gt;
&lt;code&gt;X-Goog-Content-Length-Range: &amp;lt;min size&amp;gt;,&amp;lt;max size&amp;gt;&lt;/code&gt;&lt;br&gt;
where &lt;code&gt;min size&lt;/code&gt; and &lt;code&gt;max size&lt;/code&gt; match &lt;code&gt;min_size&lt;/code&gt; and &lt;code&gt;max_size&lt;/code&gt; of the &lt;code&gt;make_signed_upload_url()&lt;/code&gt; method above.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;h2&gt;
  
  
  Have you experienced this design ? How would you improve it ? Please let me know in the comments.
&lt;/h2&gt;

&lt;p&gt;Thanks for reading! I’m Matthieu, data engineer at Stack Labs.&lt;br&gt;
If you want to discover the &lt;a href="https://cloud.stack-labs.com/cloud-data-platform" rel="noopener noreferrer"&gt;Stack Labs Data Platform&lt;/a&gt; or join an enthousiast &lt;a href="https://www.welcometothejungle.com/fr/companies/stack-labs" rel="noopener noreferrer"&gt;Data Engineering team&lt;/a&gt;, please contact us.&lt;/p&gt;




&lt;p&gt;Design schemas made with &lt;a href="https://excalidraw.com/" rel="noopener noreferrer"&gt;Excalidraw&lt;/a&gt; and the GCP Icons library by &lt;a class="mentioned-user" href="https://dev.to/clementbosc"&gt;@clementbosc&lt;/a&gt; &lt;br&gt;
Cover photo by &lt;a href="https://unsplash.com/@joel_herzog?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText" rel="noopener noreferrer"&gt;joel herzog&lt;/a&gt; on &lt;a href="https://unsplash.com/s/photos/overcome?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText" rel="noopener noreferrer"&gt;Unsplash&lt;/a&gt;&lt;/p&gt;

</description>
      <category>googlecloud</category>
      <category>terraform</category>
      <category>python</category>
    </item>
    <item>
      <title>Brace yourself: Using BigQuery as an operational backend</title>
      <dc:creator>matthieucham</dc:creator>
      <pubDate>Tue, 11 Jan 2022 10:38:34 +0000</pubDate>
      <link>https://dev.to/stack-labs/brace-yourself-using-bigquery-as-an-operational-backend-3e1i</link>
      <guid>https://dev.to/stack-labs/brace-yourself-using-bigquery-as-an-operational-backend-3e1i</guid>
      <description>&lt;h2&gt;
  
  
  BigQuery as backend ? WTF ?
&lt;/h2&gt;

&lt;p&gt;BigQuery has become a major player in the field of Data analytics solutions. It provides an &lt;a href="https://cloud.google.com/bigquery/docs/introduction" rel="noopener noreferrer"&gt;ever-growing list of powerful features&lt;/a&gt; in an easy, performant and cost-effective way. However BigQuery definitely is &lt;a href="https://techdifferences.com/difference-between-oltp-and-olap.html" rel="noopener noreferrer"&gt;OLAP, while the sensible option for an application backend is OLTP.&lt;/a&gt;&lt;br&gt;
Therefore, using BigQuery as a backend may seem weird, and... it is indeed! Just to scratch the surface of many problems that would arise:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;BigQuery is not optimized for writing, but for performing complex queries.&lt;/li&gt;
&lt;li&gt;Single-line inserts are discouraged.&lt;/li&gt;
&lt;li&gt;BigQuery does not enforce keys, foreign keys nor constraints&lt;/li&gt;
&lt;li&gt;BigQuery does not perform well with normalized schema, on the contrary it encourages denormalization&lt;/li&gt;
&lt;li&gt;...&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;So, if you think about BigQuery to be the storage backend of a full-blown application, you should definitely think again.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;There can be some situation though, where this design has some interest...&lt;/p&gt;

&lt;p&gt;Think of a huge analytics data platform revolving around BigQuery. Now imagine a corner case of the platform where some configuration have to be stored in a relational fashion, and this configuration data will have an impact on how data will be accessed. In order to update the configuration, an API is exposed. There are now 2 options to store the configuration values:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;The normal way: set up a transactional database like PostgreSQL or MySQL, maybe through &lt;a href="https://cloud.google.com/sql" rel="noopener noreferrer"&gt;CloudSQL since we're dealing with the Google Cloud Platform&lt;/a&gt;. Use it as application's storage backend. Setup redundancy and backup strategy. Sort out IAM permissions. Query configuration data from BigQuery via &lt;a href="https://cloud.google.com/bigquery/docs/cloud-sql-federated-queries" rel="noopener noreferrer"&gt;federated queries&lt;/a&gt;. All of this will of course cost you some extra dollars.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The hacky way: store the configuration in a BigQuery dataset somewhere, and profit of the near-free hosting and redundancy provided by this serverless database. Use the bigquery client API to integrate with the application. Query like any other dataset.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhoq0u45cow960p02tch5.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhoq0u45cow960p02tch5.jpeg" alt="Image description" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Don't try this at home ! This stunt is being performed by professionals&lt;/em&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Enforcing unicity constraints on BigQuery table
&lt;/h2&gt;

&lt;p&gt;All warnings having been exposed, let's proceed with the implementation. It's not really difficult because Google provide BigQuery client libraries for many languages. It is also possible to use the &lt;a href="https://cloud.google.com/bigquery/docs/reference/rest" rel="noopener noreferrer"&gt;REST Api&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;The problem is, even if you agreed on cutting corners by not provisioning a proper OLTP database, you may still need to enforce some constraints.&lt;/p&gt;

&lt;p&gt;Imagine that your configuration table consists in the following schema:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;| Id (STRING) | Attribute (STRING) | Value (STRING) |
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And you want to enforce that &lt;code&gt;Id&lt;/code&gt; values are unique. Normally, this would be a simple &lt;code&gt;UNIQUE(Id)&lt;/code&gt; statement in OLTP databases. But such a statement doesn't exist in BigQuery.&lt;/p&gt;

&lt;p&gt;Luckily, there is a new feature of BigQuery to the rescue: &lt;a href="https://cloud.google.com/bigquery/docs/reference/standard-sql/transactions" rel="noopener noreferrer"&gt;Transactions&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  BigQuery Transactions
&lt;/h3&gt;

&lt;p&gt;Multi-statement transactions feature is covered by Pre-GA offerings at the time of writing. It enables the wrapping of standard SQL scripts into atomic transactions.&lt;/p&gt;

&lt;p&gt;So, to compensate the absence of a traditional UNIQUE constraint, we can implement the following sequence when saving or modifying an entry into the table:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;1) Open a transaction&lt;/li&gt;
&lt;li&gt;2) Search if the Id to save already exists, raise an error if found&lt;/li&gt;
&lt;li&gt;3) Insert the new entry&lt;/li&gt;
&lt;li&gt;4) Commit the transaction&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Here is an implementation of this script:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- 1
BEGIN TRANSACTION;
-- 2
SELECT * FROM (
    SELECT COUNT(1) AS conflict
    FROM `configds.configtable`
    WHERE Id=@input_id
) WHERE
IF (conflict=0, TRUE, ERROR("Id already exists));
-- 3
INSERT INTO `configds.configtable`
VALUES(@input_id,@input_attribute,@input_value);
-- 4
COMMIT TRANSACTION;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;At step 2, &lt;code&gt;ERROR()&lt;/code&gt; will automatically rollback the transaction, so that step 3 will not occur. Named parameters are used here to protect against SQL injection.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Call this script from the application backend on config saving, and this will protect the table against concurrent inserts from the application's clients.&lt;/p&gt;

&lt;h3&gt;
  
  
  Limitations
&lt;/h3&gt;

&lt;p&gt;As mentioned earlier, this way of implementing constraints is not to be generalized. Here are the most prominent limitations:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Transactions are only Pre-GA at the moment&lt;/li&gt;
&lt;li&gt;The unicity is enforced at the application level, but not at the database level. Nothing prevents another BigQuery client, like the BigQuery console itself, from inserting rows regardless of unicity of Id. Only the application is safe&lt;/li&gt;
&lt;li&gt;Performance is very poor: it takes several seconds to perform the script.&lt;/li&gt;
&lt;li&gt;Not supported by ORMs, you have to write plain SQL queries and be careful of SQL injections&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;h2&gt;
  
  
  The new BigQuery Multi-statement Transactions feature enables the usage of BigQuery as a somewhat-workable application backend, which can come in handy if used with high caution. Still, carefully consider the trade-offs vs a traditional OLTP database, and be prepared to defend your choices if you follow this path !
&lt;/h2&gt;

&lt;p&gt;Thanks for reading! I’m Matthieu, data engineer at Stack Labs.&lt;br&gt;
If you want to discover the &lt;a href="https://cloud.stack-labs.com/cloud-data-platform" rel="noopener noreferrer"&gt;Stack Labs Data Platform&lt;/a&gt; or join an enthousiast &lt;a href="https://www.welcometothejungle.com/fr/companies/stack-labs" rel="noopener noreferrer"&gt;Data Engineering team&lt;/a&gt;, please contact us.&lt;/p&gt;




&lt;p&gt;Cover photo by &lt;a href="https://unsplash.com/@domithenemeth?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText" rel="noopener noreferrer"&gt;Domi Nemeth&lt;/a&gt; on &lt;a href="https://unsplash.com/s/photos/dangerous-jobs?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText" rel="noopener noreferrer"&gt;Unsplash&lt;/a&gt;&lt;/p&gt;

</description>
      <category>googlecloud</category>
      <category>database</category>
      <category>sql</category>
    </item>
    <item>
      <title>Orchestrate Dataflow pipelines easily with GCP Workflows</title>
      <dc:creator>matthieucham</dc:creator>
      <pubDate>Thu, 11 Mar 2021 09:15:20 +0000</pubDate>
      <link>https://dev.to/stack-labs/orchestrate-dataflow-pipelines-easily-with-gcp-workflows-1i8k</link>
      <guid>https://dev.to/stack-labs/orchestrate-dataflow-pipelines-easily-with-gcp-workflows-1i8k</guid>
      <description>&lt;p&gt;Dataflow pipelines rarely are on their own. Most of the time, they are part of a more global process. For example : one pipeline collects events from the source into BigTable, then a second pipeline computes aggregated data from BigTable and store them into BigQuery.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxifla0irjdcqriubteps.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxifla0irjdcqriubteps.png" alt="Alt Text" width="800" height="85"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Of course, each pipeline could be scheduled independently with Cloud Scheduler.&lt;br&gt;
But if these pipelines need to be linked somehow, such as launching the second pipeline when the first is done, then &lt;strong&gt;orchestration&lt;/strong&gt; is required.&lt;/p&gt;

&lt;p&gt;Until recently, GCP had one tool in the box for this kind of purpose : &lt;a href="https://cloud.google.com/composer" rel="noopener noreferrer"&gt;Cloud Composer&lt;/a&gt; , a (slightly) managed Apache Airflow. Despite its rich and numerous functionalities and its broad community, this service had several caveats for the kind of simple orchestration I was after:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;it's not fully integrated : you need to manage costly resources such as a GKE cluster, a CloudSQL instance&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;it pushes Python in your codebase, there is no other choice&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;any change in the setup (like, adding an environment variable) is painfully slow to propagate&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;the wide variety of operators in the ecosystem can lead to a poor separation of concerns between orchestration and business processes&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;And I won't even talk about the Airflow UI... (I've heard that some people like it)&lt;/p&gt;

&lt;p&gt;Because of these, orchestrating with Composer is overly difficult. Yet, as it is often the case with the GCP platform, if you face too many difficulties when doing something that should be simple enough, you're probably not doing it right. This proved true once again: Cloud Composer wasn't the right product for my need...&lt;/p&gt;
&lt;h2&gt;
  
  
  Enter GCP Workflows !
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://cloud.google.com/workflows" rel="noopener noreferrer"&gt;Workflows&lt;/a&gt; is a new service : it has been promoted out of bêta very recently. And luckily, it already offers most of the needed functionality to do the orchestration of GCP services' jobs, and doing it simply:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;it &lt;strong&gt;is&lt;/strong&gt; fully managed and serverless, which means you don't pay when you don't use it&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;it does only one job and does it well : orchestrating HTTP calls&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;all is configured in YAML files, &lt;a href="https://cloud.google.com/workflows/docs/reference/syntax" rel="noopener noreferrer"&gt;whose syntax is short and easy to learn&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;the UI is neatly integrated and feels more "part of GCP" than Composer (Although there is still quite a few display bugs at the moment)&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;With this new product it becomes really easy to write a Workflow which chains multiple Dataflow jobs like in the diagram above. &lt;/p&gt;
&lt;h2&gt;
  
  
  A sample workflow for Dataflow jobs
&lt;/h2&gt;

&lt;p&gt;Workflow files are YAML. It is simple and straightforward:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;main&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;steps&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;init&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;assign&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;project&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}&lt;/span&gt;
          &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;region&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;europe-west1"&lt;/span&gt;
          &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;topic&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;myTopic"&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;firstPipeline&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;call&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;LaunchDataflow&lt;/span&gt;
        &lt;span class="na"&gt;args&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="na"&gt;project&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${project}&lt;/span&gt;
          &lt;span class="na"&gt;region&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${region}&lt;/span&gt;
          &lt;span class="na"&gt;template&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;first"&lt;/span&gt;
        &lt;span class="na"&gt;result&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;firstJobId&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;waitFirstDone&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;call&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;DataflowWaitUntilStatus&lt;/span&gt;
        &lt;span class="na"&gt;args&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="na"&gt;project&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${project}&lt;/span&gt;
          &lt;span class="na"&gt;region&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${region}&lt;/span&gt;
          &lt;span class="na"&gt;jobId&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${firstJobId}&lt;/span&gt;
          &lt;span class="na"&gt;status&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;JOB_STATE_DONE"&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;secondPipeline&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;call&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;LaunchDataflow&lt;/span&gt;
        &lt;span class="na"&gt;args&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="na"&gt;project&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${project}&lt;/span&gt;
          &lt;span class="na"&gt;region&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${region}&lt;/span&gt;
          &lt;span class="na"&gt;template&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;second"&lt;/span&gt;
        &lt;span class="na"&gt;result&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;secondJobId&lt;/span&gt;    
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;waitSecondDone&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;call&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;DataflowWaitUntilStatus&lt;/span&gt;
        &lt;span class="na"&gt;args&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="na"&gt;project&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${project}&lt;/span&gt;
          &lt;span class="na"&gt;region&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${region}&lt;/span&gt;
          &lt;span class="na"&gt;jobId&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${secondJobId}&lt;/span&gt;
          &lt;span class="na"&gt;status&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;JOB_STATE_DONE"&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;publish&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;call&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;googleapis.pubsub.v1.projects.topics.publish&lt;/span&gt;
        &lt;span class="na"&gt;args&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="na"&gt;topic&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${"projects/" + project + "/topics/" + topic}&lt;/span&gt;
          &lt;span class="na"&gt;body&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
            &lt;span class="na"&gt;messages&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
              &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${base64.encode(text.encode("{\"message\":\"workflow done\"}"))}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's break it down. The sample workflow has the following steps:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;init&lt;/strong&gt;: preprocessing stage, where workflow variables are initialized.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;firstPipeline&lt;/strong&gt;: Launch the first dataflow job&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;waitFirstDone&lt;/strong&gt;: Wait until the first dataflow job is completed&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;secondPipeline&lt;/strong&gt;: Launch the second dataflow job&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;waitSecondDone&lt;/strong&gt;: Wait until the second dataflow job is completed
-&lt;strong&gt;publish&lt;/strong&gt;: Push a sample PubSub notification at the end of the workflow&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;As you have noticed, &lt;strong&gt;firstPipeline&lt;/strong&gt; and &lt;strong&gt;secondPipeline&lt;/strong&gt; call a custom routine, a &lt;em&gt;subworkflow&lt;/em&gt;, which is defined in the same file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;LaunchDataflow&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;params&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;project&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;region&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;template&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
  &lt;span class="na"&gt;steps&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;launch&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;call&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;http.post&lt;/span&gt;
        &lt;span class="na"&gt;args&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="na"&gt;url&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${"https://dataflow.googleapis.com/v1b3/projects/"+project+"/locations/"+region+"/flexTemplates:launch"}&lt;/span&gt;
          &lt;span class="na"&gt;auth&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
            &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;OAuth2&lt;/span&gt;
          &lt;span class="na"&gt;body&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
            &lt;span class="na"&gt;launchParameter&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
              &lt;span class="na"&gt;jobName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${"workflow-" + template }&lt;/span&gt;
              &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
                &lt;span class="na"&gt;numWorkers&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
                &lt;span class="na"&gt;maxWorkers&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;8&lt;/span&gt;
              &lt;span class="na"&gt;containerSpecGcsPath&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${template}&lt;/span&gt;
        &lt;span class="na"&gt;result&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;dataflowResponse&lt;/span&gt;
        &lt;span class="na"&gt;next&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;jobCreated&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;jobCreated&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;return&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${dataflowResponse.body.job.id}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This subworkflow calls the &lt;a href="https://cloud.google.com/dataflow/docs/reference/rest" rel="noopener noreferrer"&gt;Dataflow Rest API&lt;/a&gt; to launch a job (here, a flex template). With workflows you can easily call any service's API or any external HTTP endpoint.&lt;/p&gt;

&lt;p&gt;Similarly, &lt;strong&gt;waitFirstDone&lt;/strong&gt; and &lt;strong&gt;waitSecondDone&lt;/strong&gt; call another subworkflow:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;DataflowWaitUntilStatus&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;params&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;project&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;region&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;jobId&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;status&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
  &lt;span class="na"&gt;steps&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;init&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;assign&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;currentStatus&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;"&lt;/span&gt;
          &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;failureStatuses&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;JOB_STATE_FAILED"&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;JOB_STATE_CANCELLED"&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;JOB_STATE_UPDATED"&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;JOB_STATE_DRAINED"&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;check_condition&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;switch&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;condition&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${currentStatus in failureStatuses}&lt;/span&gt;
            &lt;span class="na"&gt;next&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;exit_fail&lt;/span&gt;
          &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;condition&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${currentStatus != status}&lt;/span&gt;
            &lt;span class="na"&gt;next&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;iterate&lt;/span&gt;
        &lt;span class="na"&gt;next&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;exit_success&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;iterate&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;steps&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;sleep30s&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
              &lt;span class="na"&gt;call&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;sys.sleep&lt;/span&gt;
              &lt;span class="na"&gt;args&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
                &lt;span class="na"&gt;seconds&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;30&lt;/span&gt;
          &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;getJob&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
              &lt;span class="na"&gt;call&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;http.get&lt;/span&gt;
              &lt;span class="na"&gt;args&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
                &lt;span class="na"&gt;url&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${"https://dataflow.googleapis.com/v1b3/projects/"+project+"/locations/"+region+"/jobs/"+jobId}&lt;/span&gt;
                &lt;span class="na"&gt;auth&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
                  &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;OAuth2&lt;/span&gt;
              &lt;span class="na"&gt;result&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;getJobResponse&lt;/span&gt;
          &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;getStatus&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
              &lt;span class="na"&gt;assign&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
                &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;currentStatus&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${getJobResponse.body.currentState}&lt;/span&gt;
          &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;log&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
              &lt;span class="na"&gt;call&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;sys.log&lt;/span&gt;
              &lt;span class="na"&gt;args&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
                &lt;span class="na"&gt;text&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${"Current job status="+currentStatus}&lt;/span&gt;
                &lt;span class="na"&gt;severity&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;INFO"&lt;/span&gt;
        &lt;span class="na"&gt;next&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;check_condition&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;exit_success&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;return&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${currentStatus}&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;exit_fail&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;raise&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${"Job in unexpected terminal status "+currentStatus}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This subworkflow also calls the Dataflow Rest API, this time in a kind of loop until the job reach a terminal status. In case of unexpected state, an exception is raised and the workflow stops and is marked as failed. Otherwise, it proceeds to the next stage&lt;/p&gt;

&lt;p&gt;Finally, just deploy this workflow, via the UI or gcloud for example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#! /bin/bash&lt;/span&gt;
&lt;span class="nv"&gt;localDir&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;&lt;span class="nb"&gt;dirname&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="nv"&gt;$0&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="si"&gt;)&lt;/span&gt;

&lt;span class="nv"&gt;WORKFLOW&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"sample"&lt;/span&gt;
&lt;span class="nv"&gt;DESCRIPTION&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"Sample workflow"&lt;/span&gt;
&lt;span class="nv"&gt;SOURCE&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"sample.yaml"&lt;/span&gt;
&lt;span class="nv"&gt;PROJECT&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"my-gcp-project"&lt;/span&gt;
&lt;span class="nv"&gt;REGION&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"europe-west4"&lt;/span&gt;
&lt;span class="nv"&gt;SERVICE_ACCOUNT&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"sa-workflows@my-gcp-project.iam.gserviceaccount.com"&lt;/span&gt;

gcloud beta workflows deploy &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;WORKFLOW&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt; &lt;span class="nt"&gt;--location&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;REGION&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt; &lt;span class="nt"&gt;--service-account&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;SERVICE_ACCOUNT&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt; &lt;span class="nt"&gt;--source&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;localDir&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;/&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;SOURCE&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt; &lt;span class="nt"&gt;--description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;DESCRIPTION&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Breaking news&lt;/em&gt; &lt;a href="https://github.com/hashicorp/terraform-provider-google/blob/master/CHANGELOG.md#3590-march-08-2021" rel="noopener noreferrer"&gt;Workflows resources are now available in Terraform&lt;/a&gt; for you IAC freaks&lt;/p&gt;

&lt;p&gt;Once deployed, it can be launched, for example from Scheduler, by POSTing to this endpoint &lt;code&gt;https://workflowexecutions.googleapis.com/v1/projects/${PROJECT}/locations/${REGION}/workflows/${WORKFLOW}/executions&lt;/code&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Thanks to Workflows, with just a relatively small YAML file we were able to chain two Dataflow jobs the easy way: serverlessly.&lt;/p&gt;




&lt;p&gt;Thanks for reading! I’m Matthieu, data engineer at Stack Labs.&lt;br&gt;
If you want to discover the &lt;a href="https://cloud.stack-labs.com/cloud-data-platform" rel="noopener noreferrer"&gt;Stack Labs Data Platform&lt;/a&gt; or join an enthousiast &lt;a href="https://www.welcometothejungle.com/fr/companies/stack-labs" rel="noopener noreferrer"&gt;Data Engineering team&lt;/a&gt;, please contact us.&lt;/p&gt;

</description>
      <category>googlecloud</category>
      <category>workflows</category>
      <category>dataflow</category>
      <category>composer</category>
    </item>
    <item>
      <title>Tricky Dataflow ep.2 : Import documents from MongoDB views</title>
      <dc:creator>matthieucham</dc:creator>
      <pubDate>Wed, 17 Feb 2021 16:46:58 +0000</pubDate>
      <link>https://dev.to/stack-labs/tricky-dataflow-ep-2-import-documents-from-mongodb-views-lpf</link>
      <guid>https://dev.to/stack-labs/tricky-dataflow-ep-2-import-documents-from-mongodb-views-lpf</guid>
      <description>&lt;p&gt;&lt;em&gt;This is the second episode of my Tricky Dataflow series, in which I present some of the trickiest issues I faced while implementing pipelines with Google Cloud Dataflow, and how I overcame them.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;&lt;em&gt;&lt;a href="https://dev.to/stack-labs/tricky-dataflow-ep-1-auto-create-bigquery-tables-in-pipelines-n2k"&gt;The last episode dealt with some BigQuery issues&lt;/a&gt;. This time, let's talk about a completely different flavour of database : MongoDB&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;MongoDB now is fairly widespread in the DB world, and arguably the most well-known NoSql database on the market. So, as one would expect, the Dataflow SDK has got a &lt;a href="https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/mongodb/MongoDbIO.html" rel="noopener noreferrer"&gt;MongoDB connector ready to ease the usage of MongoDB as a datasource&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;It offers the ability to read from and write to MongoDB collections, so I (the naïve me who was not so familiar with MongoDB at the time) thought it was all that was required to implement this simple kind of pipeline:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Ff17vt7aflctpjqabxwzf.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Ff17vt7aflctpjqabxwzf.png" alt="Alt Text" width="661" height="211"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;But of course - otherwise there would be not point writing a blog post - everything did not run as smoothly as I expected. &lt;/p&gt;

&lt;h2&gt;
  
  
  So you want to query a view, huh ?
&lt;/h2&gt;

&lt;p&gt;In the first version of the pipeline, which I did as a warmup, I read documents directly from the collection with &lt;code&gt;MongoDbIO.read().withUri(...).withDatabase(...).withCollection(...)&lt;/code&gt; and faced no real issue. There was one subtle point though, of which I did not realize the importance at the time:&lt;br&gt;
Because the source MongoDB instance was hosted on Atlas, &lt;a href="https://jira.mongodb.org/browse/SERVER-27344" rel="noopener noreferrer"&gt;MongoDbIO was not allowed to run the default splitVector() command&lt;/a&gt; and therefore &lt;a href="https://docs.mongodb.com/manual/reference/operator/aggregation/bucketAuto/" rel="noopener noreferrer"&gt;it was mandatory to add &lt;code&gt;withBucketAuto(true)&lt;/code&gt; clause to download the collection.&lt;/a&gt; &lt;/p&gt;

&lt;p&gt;I was not expecting the difficulties that came when I naïvely tried to use the view name in place of the collection :&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;[WARNING] org.apache.beam.sdk.Pipeline$PipelineExecutionException: com.mongodb.MongoCommandException: Command failed with error 166 (CommandNotSupportedOnView): 'Namespace [myview] is a view, not a collection' on server [***]&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;So apparently MongoDB knows about my view, understand I'd like to request this view but no, it won't let me retrieve documents from it. Actually it turns out there was no simple way to just get documents from the view. There surely is a good explanation for this, but I couldn't find it. &lt;strong&gt;So frustrating...&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fngui72kk0dc5bhvve468.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fngui72kk0dc5bhvve468.jpg" alt="Alt Text" width="800" height="533"&gt;&lt;/a&gt;&lt;br&gt;
&lt;em&gt;You know that feeling... (Photo Wikipedia / Nlan86)&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Actually, a view in MongoDB is not as straightforward as a regular view in the SQL world : a MongoDB view is the result of collection documents processed by an &lt;strong&gt;aggregation pipeline&lt;/strong&gt;. And MongoDbIO is able to perform aggregation queries on read collection thanks to &lt;a href="https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/mongodb/AggregationQuery.html" rel="noopener noreferrer"&gt;AggregationQuery&lt;/a&gt; that may be passed to &lt;code&gt;.withQueryFn()&lt;/code&gt;. The solution started to appear:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;read from the collection&lt;/li&gt;
&lt;li&gt;retrieve the aggregation definition from the view options&lt;/li&gt;
&lt;li&gt;pass the aggregation pipeline to &lt;code&gt;withQueryFn&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;MongoDB will process the document through the provided pipeline, which will result in the same documents as from the view&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;em&gt;Let's follow the plan !&lt;/em&gt;&lt;/p&gt;
&lt;h3&gt;
  
  
  Retrieve the view's aggregation pipeline
&lt;/h3&gt;

&lt;p&gt;To get the pipeline, we need to use mongo-java-client directly and get collection infos with it. It's pretty verbose:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BsonDocument&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;retrieveViewPipeline&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Options&lt;/span&gt; &lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Strings&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;isNullOrEmpty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getView&lt;/span&gt;&lt;span class="o"&gt;()))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;debug&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"No view in options"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ArrayList&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="n"&gt;com&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;mongodb&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MongoClientOptions&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Builder&lt;/span&gt; &lt;span class="n"&gt;optionsBuilder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;com&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;mongodb&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MongoClientOptions&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Builder&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="n"&gt;optionsBuilder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;maxConnectionIdleTime&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;60000&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;MongoClient&lt;/span&gt; &lt;span class="n"&gt;mongoClient&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;MongoClient&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;MongoClientURI&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"mongodb+srv://"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMongoDBUri&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt;
                &lt;span class="n"&gt;optionsBuilder&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

        &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Document&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;viewPipeline&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Document&lt;/span&gt; &lt;span class="n"&gt;collecInfosDoc&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;mongoClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getDatabase&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getDatabase&lt;/span&gt;&lt;span class="o"&gt;()).&lt;/span&gt;&lt;span class="na"&gt;listCollections&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;collecInfosDoc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getString&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"name"&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;equalsIgnoreCase&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getView&lt;/span&gt;&lt;span class="o"&gt;()))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;viewPipeline&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;collecInfosDoc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"options"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Document&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;getList&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"pipeline"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Document&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
                &lt;span class="k"&gt;break&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="n"&gt;checkArgument&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;viewPipeline&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;format&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"%s view not found"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getView&lt;/span&gt;&lt;span class="o"&gt;()));&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;viewPipeline&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;((&lt;/span&gt;&lt;span class="n"&gt;doc&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;doc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toBsonDocument&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BsonDocument&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                &lt;span class="nc"&gt;MongoClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getDefaultCodecRegistry&lt;/span&gt;&lt;span class="o"&gt;())).&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collectors&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toList&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Pass the pipeline to MongoDbIO
&lt;/h3&gt;

&lt;p&gt;As mentioned, MongoDbIO has a method to handle aggregations : &lt;code&gt;withQueryFn&lt;/code&gt;. However, this method actually &lt;a href="https://github.com/apache/beam/blob/master/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/AggregationQuery.java" rel="noopener noreferrer"&gt;has a little bug in the current version (2.27)&lt;/a&gt; when the pipeline has multiple steps:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fw03tuoojhg36kqlqvavb.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fw03tuoojhg36kqlqvavb.png" alt="Alt Text" width="800" height="315"&gt;&lt;/a&gt;&lt;br&gt;
&lt;em&gt;Line 71: Harsh time for the last stage of the pipeline :( (screenshot from Github)&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Of course, there is a simple workaround for this: just append a useless item to the pipeline list, which will be replaced by the &lt;code&gt;bucket()&lt;/code&gt; stage:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;viewPipeline&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;size&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;viewPipeline&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;add&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;BsonDocument&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;There you go, with the source connector configured like this, you can now retrieve the view documents:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;PCollectionTuple&lt;/span&gt; &lt;span class="n"&gt;mongoDocs&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
    &lt;span class="n"&gt;pipeline&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Read from MongoDB"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
        &lt;span class="nc"&gt;MongoDbIO&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;read&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withUri&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"mongodb+srv://"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMongoDBUri&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;         
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withDatabase&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getDatabase&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;                        
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withCollection&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getCollection&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withBucketAuto&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; 
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withQueryFn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="nc"&gt;AggregationQuery&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;create&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withMongoDbPipeline&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;viewPipeline&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
    &lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  But wait ! Does it work on HUGE collections ?
&lt;/h2&gt;

&lt;p&gt;Finally ! You can now retrieve documents from your testing dataset, now you feel ready to test your shiny new pipeline on your real, huge, MongoDB view. And then...&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;com.mongodb.MongoCommandException:&lt;br&gt;
Command failed with error 16819 (Location16819): ‘Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in.’&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;... it turns out you're not finished yet. At least the error message is pretty clear: when processing the aggregation pipeline on the MongoDB instance, the memory (RAM) limit has been exceeded. Sadly this limit is not configurable. The only work around is to allow MongoDB to use a swap file, which you can force by setting the parameter &lt;code&gt;allowDiskUse: true&lt;/code&gt; alongside the aggregation pipeline.&lt;br&gt;
This parameter is easily accessible through mongo-java-client thanks to &lt;code&gt;AggregateIterable.allowDiskUse()&lt;/code&gt;. The problem is that, sadly, this method is not exposed in MongoDbIO yet. &lt;a href="https://issues.apache.org/jira/browse/BEAM-7256" rel="noopener noreferrer"&gt;There is a feature request for it&lt;/a&gt; but it's not in a roadmap at the moment.&lt;/p&gt;

&lt;p&gt;Unfortunately, &lt;code&gt;allowDiskUse()&lt;/code&gt; is necessary in two places of the MongoDB Beam connector and it's not possible to override them:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt; &lt;code&gt;MongoDbIO.buildAutoBuckets&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;AggregateIterable&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Document&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;buckets&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;mongoCollection&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;aggregate&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;aggregates&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;allowDiskUse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;ul&gt;
&lt;li&gt; &lt;code&gt;AggregationQuery.apply()&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;aggregate&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;mongoDbPipeline&lt;/span&gt;&lt;span class="o"&gt;()).&lt;/span&gt;&lt;span class="na"&gt;allowDiskUse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;iterator&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;So, the only way to edit these classes for now is to fork or duplicate them. Not perfect, but at  least you can do some cleanup in your pipeline dependencies:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight xml"&gt;&lt;code&gt;    &lt;span class="c"&gt;&amp;lt;!-- MongoDB connector --&amp;gt;&lt;/span&gt;
    &lt;span class="c"&gt;&amp;lt;!-- Because of limitations, a fork of this lib is used --&amp;gt;&lt;/span&gt;
    &lt;span class="c"&gt;&amp;lt;!--&amp;lt;dependency&amp;gt;
      &amp;lt;groupId&amp;gt;org.apache.beam&amp;lt;/groupId&amp;gt;
      &amp;lt;artifactId&amp;gt;beam-sdks-java-io-mongodb&amp;lt;/artifactId&amp;gt;
      &amp;lt;version&amp;gt;${beam.version}&amp;lt;/version&amp;gt;
    &amp;lt;/dependency&amp;gt;--&amp;gt;&lt;/span&gt;
    &lt;span class="c"&gt;&amp;lt;!-- The fork needs the Mongo-java driver --&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt;
      &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;org.mongodb&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt;
      &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;mongo-java-driver&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt;
      &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;3.12.7&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;All you need is mongo-java-driver&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;This long story has a happy end: thanks to allowDiskUse and the swap file, your custom MongoDbIO connector can now query MongoDb views of any size !&lt;/p&gt;

&lt;p&gt;&lt;em&gt;That's it for this second episode. Stay tuned for the next one, I'll talk present you GCP Workflows, a convenient way to orchestrate your Dataflow pipelines&lt;/em&gt;&lt;/p&gt;

</description>
      <category>googlecloud</category>
      <category>dataflow</category>
      <category>mongodb</category>
      <category>java</category>
    </item>
    <item>
      <title>Tricky Dataflow ep.1 : Auto create BigQuery tables in pipelines</title>
      <dc:creator>matthieucham</dc:creator>
      <pubDate>Wed, 03 Feb 2021 09:35:30 +0000</pubDate>
      <link>https://dev.to/stack-labs/tricky-dataflow-ep-1-auto-create-bigquery-tables-in-pipelines-n2k</link>
      <guid>https://dev.to/stack-labs/tricky-dataflow-ep-1-auto-create-bigquery-tables-in-pipelines-n2k</guid>
      <description>&lt;p&gt;&lt;em&gt;GCP's Dataflow is a really powerful weapon when you need to manipulate massive amounts of data in a highly parallel and flexible fashion. Dataflow pipelines surely are the number one asset in every GCP data engineer toolbox.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;&lt;em&gt;However, learning to use Apache Beam, which is the open source framework behind Dataflow, is no bed of roses: &lt;a href="https://beam.apache.org/documentation/" rel="noopener noreferrer"&gt;The official documentation is sparse&lt;/a&gt;, &lt;a href="https://github.com/GoogleCloudPlatform/DataflowTemplates" rel="noopener noreferrer"&gt;GCP-provided templates don't work out-of-the-box&lt;/a&gt;, and &lt;a href="https://beam.apache.org/releases/javadoc/2.27.0/" rel="noopener noreferrer"&gt;the Javadoc is, well, a javadoc&lt;/a&gt;.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;&lt;em&gt;In this series, I would like to present you some of the trickiest issues Dataflow and Beam had in store for me, and how I overcame them.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Let's start with a bit of BigQueryIO frustration...&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  How to write data to dynamically generated BigQuery tables ?
&lt;/h2&gt;

&lt;p&gt;Beam provides the ability to load data into BigQuery using dynamic destinations, where the target table spec is derived dynamically from incoming elements. We would like to use this feature to achieve the following design&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fxor3c3rql3jkuwn64qgk.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fxor3c3rql3jkuwn64qgk.png" alt="Alt Text" width="751" height="251"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Events coming from several Kafka topics are handled by a single Dataflow pipeline then serialized to several BigQuery tables: events from topic A go to table A, and so on...&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Luckily, there is a solution to this exact problem offered in &lt;a href="https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html" rel="noopener noreferrer"&gt;Beam Javadoc&lt;/a&gt;&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;A common use case is to dynamically generate BigQuery table names based on the current value. To support this, BigQueryIO.Write.to(SerializableFunction) accepts a function mapping the current element to a tablespec. For example, here's code that outputs quotes of different stocks to different tables:&lt;br&gt;
&lt;/p&gt;
&lt;/blockquote&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;PCollection&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Quote&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;quotes&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;...;&lt;/span&gt;

 &lt;span class="n"&gt;quotes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BigQueryIO&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;write&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
         &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withSchema&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
         &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withFormatFunction&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;quote&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;TableRow&lt;/span&gt;&lt;span class="o"&gt;()...)&lt;/span&gt;
         &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;to&lt;/span&gt;&lt;span class="o"&gt;((&lt;/span&gt;&lt;span class="nc"&gt;ValueInSingleWindow&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Quote&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;quote&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
             &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;symbol&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;quote&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSymbol&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
             &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;TableDestination&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                 &lt;span class="s"&gt;"my-project:my_dataset.quotes_"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// Table spec&lt;/span&gt;
                 &lt;span class="s"&gt;"Quotes of stock "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;symbol&lt;/span&gt; &lt;span class="c1"&gt;// Table description&lt;/span&gt;
               &lt;span class="o"&gt;);&lt;/span&gt;
           &lt;span class="o"&gt;});&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Unfortunately, when we implemented this stage in our pipeline, with a CreateDisposition of &lt;code&gt;BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED&lt;/code&gt;, we soon found out that &lt;strong&gt;only the first dynamic table of the pipeline output had been created&lt;/strong&gt;. The other tables were missing, and as a result the data loading failed. &lt;a href="https://issues.apache.org/jira/browse/BEAM-3772" rel="noopener noreferrer"&gt;This is a known bug in Beam, which has been around for more than 2 years&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Facing this issue, we had no other choice than to develop a workaround.&lt;/p&gt;

&lt;h1&gt;
  
  
  Custom table-creation stage
&lt;/h1&gt;

&lt;p&gt;Apache Beam is an extensible framework, and as such it is possible to develop a new transformation stage, aka PTransform. Therefore, we developed a PTransform whose single goal is to check if the target table of incoming elements exists, and create it if not. &lt;/p&gt;

&lt;p&gt;For this we have to bypass BigQueryIO and use the BigQuery java client directly. The client should be instantiated during the &lt;em&gt;setup&lt;/em&gt; or &lt;em&gt;startBundle&lt;/em&gt; phase of the PTransform:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.google.cloud.bigquery.*&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="o"&gt;...&lt;/span&gt;

&lt;span class="nd"&gt;@Setup&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;setup&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;bigquery&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;BigQueryOptions&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getDefaultInstance&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getService&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With the client, we can easily call api methods to create tables. But, not so fast: we can't check tables for each element, it would not be sustainable. On the contrary, we have to group elements by lot and check for each lot. Within a streaming pipeline, grouping means windowing. A simple strategy with fixed windows is enough for this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Window&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;into&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;FixedWindows&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;standardSeconds&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;15&lt;/span&gt;&lt;span class="n"&gt;l&lt;/span&gt;&lt;span class="o"&gt;))))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Fixed windows of 15 seconds' width&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Then we just have to inspect the content of each window and group their elements by target table name&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Compute target table name"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;WithKeys&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;GetTargetTableName&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;outputTableSpec&lt;/span&gt;&lt;span class="o"&gt;))).&lt;/span&gt;&lt;span class="na"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;GroupByKey&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;create&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now it's possible to check the table for each group. But no need to check repeatedly for the same table name every 15 seconds : let's use some caching, for example Guava Cache. That way we minimize costly api calls&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Create target table if needed"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;ParDo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;CreateIfNeeded&lt;/span&gt;&lt;span class="o"&gt;()))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To see the details of &lt;code&gt;CreateIfNeeded&lt;/code&gt; and the rest of the implementation, check out &lt;a href="https://gist.github.com/matthieucham/85459eff5fdea8d115be520e2dd5ccc1" rel="noopener noreferrer"&gt;this Gist&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Finally we can have the satisfaction to watch our nice stage deployed, just before BigQuery.Write who can safely load data into sure-to-exist tables&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Foa64fpi316ja53kcdjo2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Foa64fpi316ja53kcdjo2.png" alt="Alt Text" width="506" height="598"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;See you soon for the next Dataflow tricky trick !&lt;/p&gt;

</description>
      <category>googlecloud</category>
      <category>dataflow</category>
      <category>bigquery</category>
      <category>bigdata</category>
    </item>
  </channel>
</rss>
