<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Jonathan Bhaskar</title>
    <description>The latest articles on DEV Community by Jonathan Bhaskar (@jonathanbhaskar).</description>
    <link>https://dev.to/jonathanbhaskar</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F2776696%2Feec85537-e6f1-41bc-823f-c1730d9f2aad.jpg</url>
      <title>DEV Community: Jonathan Bhaskar</title>
      <link>https://dev.to/jonathanbhaskar</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/jonathanbhaskar"/>
    <language>en</language>
    <item>
      <title>From Code to Configuration: Why Data Teams Should Follow DevOps' Path to Declarative Pipelines (+ Real Examples with dagster-odp)</title>
      <dc:creator>Jonathan Bhaskar</dc:creator>
      <pubDate>Wed, 05 Feb 2025 04:45:33 +0000</pubDate>
      <link>https://dev.to/jonathanbhaskar/from-code-to-configuration-why-data-teams-should-follow-devops-path-to-declarative-pipelines--2ja5</link>
      <guid>https://dev.to/jonathanbhaskar/from-code-to-configuration-why-data-teams-should-follow-devops-path-to-declarative-pipelines--2ja5</guid>
      <description>&lt;div class="ltag__link"&gt;
  &lt;a href="/jonathanbhaskar" class="ltag__link__link"&gt;
    &lt;div class="ltag__link__pic"&gt;
      &lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F2776696%2Feec85537-e6f1-41bc-823f-c1730d9f2aad.jpg" alt="jonathanbhaskar"&gt;
    &lt;/div&gt;
  &lt;/a&gt;
  &lt;a href="https://dev.to/jonathanbhaskar/declarative-data-pipelines-moving-from-code-to-configuration-1i13" class="ltag__link__link"&gt;
    &lt;div class="ltag__link__content"&gt;
      &lt;h2&gt;Declarative Data Pipelines: Moving from Code to Configuration&lt;/h2&gt;
      &lt;h3&gt;Jonathan Bhaskar ・ Feb 5&lt;/h3&gt;
      &lt;div class="ltag__link__taglist"&gt;
        &lt;span class="ltag__link__tag"&gt;#dataengineering&lt;/span&gt;
        &lt;span class="ltag__link__tag"&gt;#python&lt;/span&gt;
        &lt;span class="ltag__link__tag"&gt;#opensource&lt;/span&gt;
      &lt;/div&gt;
    &lt;/div&gt;
  &lt;/a&gt;
&lt;/div&gt;


</description>
      <category>dataengineering</category>
      <category>python</category>
      <category>opensource</category>
    </item>
    <item>
      <title>Declarative Data Pipelines: Moving from Code to Configuration</title>
      <dc:creator>Jonathan Bhaskar</dc:creator>
      <pubDate>Wed, 05 Feb 2025 04:41:32 +0000</pubDate>
      <link>https://dev.to/jonathanbhaskar/declarative-data-pipelines-moving-from-code-to-configuration-1i13</link>
      <guid>https://dev.to/jonathanbhaskar/declarative-data-pipelines-moving-from-code-to-configuration-1i13</guid>
      <description>&lt;p&gt;In data teams today, writing Python code to create data pipelines has become second nature, especially for analytics workloads. Creating new DAGs, tasks, and operators in Airflow - now the industry standard for data orchestration - is just part of our daily routine. But what if there's a simpler, more accessible approach to building these pipelines?&lt;/p&gt;

&lt;p&gt;Before we explore this alternative, let's examine how another domain - DevOps - evolved its approach to building and shipping software, and see what lessons we can apply to data orchestration.&lt;/p&gt;

&lt;h2&gt;
  
  
  Evolution of Infrastructure Management
&lt;/h2&gt;

&lt;p&gt;Let's examine a simple but common task: deploying a web application with a database. The way teams handled this task evolved dramatically over time.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fopmyz5rwqu3v69pjylhk.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fopmyz5rwqu3v69pjylhk.png" alt="Timeline showing evolution from Shell scripts (2000) to Infrastructure as code (2016)" width="682" height="195"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Shell scripts (2000s)
&lt;/h3&gt;

&lt;p&gt;In the early 2000s, deploying applications meant writing detailed shell scripts that listed every command needed. These scripts were brittle, hard to maintain, and required deep system knowledge:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#!/bin/bash&lt;/span&gt;

&lt;span class="c"&gt;# Install dependencies&lt;/span&gt;
apt-get update
apt-get &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;-y&lt;/span&gt; nginx mysql-server

&lt;span class="c"&gt;# Configure MySQL&lt;/span&gt;
mysql &lt;span class="nt"&gt;-u&lt;/span&gt; root &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="s2"&gt;"CREATE DATABASE myapp;"&lt;/span&gt;
mysql &lt;span class="nt"&gt;-u&lt;/span&gt; root &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="s2"&gt;"CREATE USER 'myapp'@'localhost';"&lt;/span&gt;
mysql &lt;span class="nt"&gt;-u&lt;/span&gt; root &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="s2"&gt;"GRANT ALL PRIVILEGES ON myapp.*;"&lt;/span&gt;

&lt;span class="c"&gt;# Deploy application&lt;/span&gt;
&lt;span class="nb"&gt;cd&lt;/span&gt; /var/www/html
&lt;span class="nb"&gt;tar&lt;/span&gt; &lt;span class="nt"&gt;-xzf&lt;/span&gt; myapp.tar.gz
&lt;span class="nb"&gt;chown&lt;/span&gt; &lt;span class="nt"&gt;-R&lt;/span&gt; www-data:www-data /var/www/html

&lt;span class="c"&gt;# Start services&lt;/span&gt;
service mysql start
service nginx start
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Configuration Management (2010s)
&lt;/h3&gt;

&lt;p&gt;By 2010, tools like Puppet introduced a paradigm shift. Instead of listing commands, teams defined their desired system state in a declarative format. The tool would figure out how to achieve that state:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight puppet"&gt;&lt;code&gt;&lt;span class="n"&gt;package&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'nginx'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'mysql-server'&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt;
  &lt;span class="py"&gt;ensure&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;installed&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="n"&gt;service&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="s1"&gt;'nginx'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
  &lt;span class="py"&gt;ensure&lt;/span&gt;  &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;running&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="py"&gt;enable&lt;/span&gt;  &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="kp"&gt;require&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;Package&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'nginx'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="n"&gt;service&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="s1"&gt;'mysql'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
  &lt;span class="py"&gt;ensure&lt;/span&gt;  &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;running&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="py"&gt;enable&lt;/span&gt;  &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="kp"&gt;require&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;Package&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'mysql-server'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="n"&gt;exec&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="s1"&gt;'create-db'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
  &lt;span class="py"&gt;command&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="s1"&gt;'mysql -u root -e "CREATE DATABASE myapp;"'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="py"&gt;unless&lt;/span&gt;  &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="s1"&gt;'mysql -u root -e "SHOW DATABASES;" | grep myapp'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="kp"&gt;require&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;Service&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'mysql'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="n"&gt;file&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="s1"&gt;'/var/www/html/myapp'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
  &lt;span class="py"&gt;ensure&lt;/span&gt;  &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;directory&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="py"&gt;source&lt;/span&gt;  &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="s1"&gt;'puppet:///modules/myapp/files'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="py"&gt;recurse&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="py"&gt;owner&lt;/span&gt;   &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="s1"&gt;'www-data'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="py"&gt;group&lt;/span&gt;   &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="s1"&gt;'www-data'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Infrastructure as Code (2016+)
&lt;/h3&gt;

&lt;p&gt;Cloud platforms like AWS took this declarative approach even further. With CloudFormation, engineers simply specified the resources they needed, and AWS handled all implementation details:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;AWSTemplateFormatVersion&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;2010-09-09'&lt;/span&gt;
&lt;span class="na"&gt;Resources&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;WebServer&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::EC2::Instance&lt;/span&gt;
    &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;InstanceType&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;t2.micro&lt;/span&gt;
      &lt;span class="na"&gt;ImageId&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;ami-0c55b159cbfafe1f0&lt;/span&gt;
      &lt;span class="na"&gt;UserData&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; 
        &lt;span class="na"&gt;Fn::Base64&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Sub&lt;/span&gt; &lt;span class="pi"&gt;|&lt;/span&gt;
          &lt;span class="s"&gt;#!/bin/bash&lt;/span&gt;
          &lt;span class="s"&gt;yum update -y&lt;/span&gt;
          &lt;span class="s"&gt;yum install -y nginx&lt;/span&gt;

  &lt;span class="na"&gt;Database&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::RDS::DBInstance&lt;/span&gt;
    &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;Engine&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;mysql&lt;/span&gt;
      &lt;span class="na"&gt;DBName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;myapp&lt;/span&gt;
      &lt;span class="na"&gt;MasterUsername&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;admin&lt;/span&gt;
      &lt;span class="na"&gt;MasterUserPassword&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;password&lt;/span&gt;
      &lt;span class="na"&gt;DBInstanceClass&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;db.t2.micro&lt;/span&gt;

  &lt;span class="na"&gt;SecurityGroup&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::EC2::SecurityGroup&lt;/span&gt;
    &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;GroupDescription&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;Allow web traffic&lt;/span&gt;
      &lt;span class="na"&gt;SecurityGroupIngress&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;IpProtocol&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;tcp&lt;/span&gt;
          &lt;span class="na"&gt;FromPort&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;80&lt;/span&gt;
          &lt;span class="na"&gt;ToPort&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;80&lt;/span&gt;
          &lt;span class="na"&gt;CidrIp&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;0.0.0.0/0&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The difference over the years is that, instead of actually writing the steps that needed to be executed (imperative), the engineers moved on to specifying what they wanted and let the system figure out how it was done (declarative).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhivofw137xt8p1zmk9ds.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhivofw137xt8p1zmk9ds.png" alt="Flow chart comparing Shell script vs Cloud Formation approaches" width="675" height="223"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Evolution of Infrastructure Teams
&lt;/h3&gt;

&lt;p&gt;This shift from imperative to declarative approaches fundamentally changed how infrastructure teams operated. Let's look at what this evolution meant in practice.&lt;/p&gt;

&lt;p&gt;In the early days, deploying infrastructure involved a lot of back-and-forth between developers and system administrators:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgdobjakddsfsaebreo9f.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgdobjakddsfsaebreo9f.png" alt="Diagram showing complex Developer-Sysadmin interactions in 2000" width="598" height="495"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;With declarative infrastructure, the interaction changed dramatically:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F51bbx4pystdx8a19skpv.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F51bbx4pystdx8a19skpv.png" alt="Diagram showing simplified Developer-Infrastructure Engineer interactions in 2016" width="531" height="406"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The key difference was that teams now had a common language - YAML configurations - that both developers and infrastructure engineers could understand and work with effectively. This shift to configuration-driven workflows revolutionized how infrastructure teams operated.&lt;/p&gt;

&lt;h2&gt;
  
  
  Current state of data pipelines
&lt;/h2&gt;

&lt;p&gt;Data teams today face similar challenges to what infrastructure teams encountered in the 2000s. To understand these parallels, let's examine how data pipelines are typically built, using a common scenario: moving data from S3 to GCS, then loading it into BigQuery - a pattern you might use when you have transactional database backups in S3 but your analytics stack runs on BigQuery.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;transfer_s3_to_gcs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;s3_bucket&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;s3_key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;gcs_bucket&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;gcs_object_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;aws_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;aws_default&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;gcp_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;google_cloud_default&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="c1"&gt;# Initialize hooks
&lt;/span&gt;    &lt;span class="n"&gt;s3_hook&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;S3Hook&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;aws_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;aws_conn_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;gcs_hook&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;GCSHook&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;gcp_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;gcp_conn_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Create a temporary file
&lt;/span&gt;    &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;tempfile&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;NamedTemporaryFile&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;temp_file&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="c1"&gt;# Download file from S3
&lt;/span&gt;        &lt;span class="n"&gt;s3_hook&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;download_file&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;s3_key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;s3_bucket&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;local_path&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;temp_file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="c1"&gt;# Upload file to GCS
&lt;/span&gt;        &lt;span class="n"&gt;gcs_hook&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;upload&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;gcs_bucket&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;object_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;gcs_object_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;filename&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;temp_file&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Create DAG
&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;DAG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3_to_gcs_transfer&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Transfer files from S3 to GCS&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;schedule_interval&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;@daily&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;catchup&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Define the transfer task
&lt;/span&gt;&lt;span class="n"&gt;transfer_task&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PythonOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;transfer_s3_to_gcs&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;python_callable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;transfer_s3_to_gcs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;op_kwargs&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3_bucket&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;your-s3-bucket-name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3_key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;path/to/your/file.csv&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gcs_bucket&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;your-gcs-bucket-name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gcs_object_name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;path/to/destination/file.csv&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Load from GCS to BigQuery
&lt;/span&gt;&lt;span class="n"&gt;load_to_bq&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;GCSToBigQueryOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;load_to_bigquery&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;bucket&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;your-gcs-bucket&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;source_objects&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;path/to/destination/file.csv&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;destination_project_dataset_table&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;your-project:dataset.table&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Set task dependencies (if you add more tasks)
&lt;/span&gt;&lt;span class="n"&gt;transfer_task&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;load_to_bq&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When comparing this approach to what we saw with the DevOps workflows, the code is imperative in nature. We tell the orchestrator how to perform each step in the data pipeline, instead of what needs to be done and letting it figure out the how.&lt;/p&gt;

&lt;h3&gt;
  
  
  Data team interactions
&lt;/h3&gt;

&lt;p&gt;Unsurprisingly, the data team workflows within companies mirror the interactions between developers and sysadmins in the 2000s. When a data scientist or analyst needs a new pipeline, they have to coordinate with data engineers who are often juggling multiple priorities:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9ksmfynh9w1wrykm89z2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9ksmfynh9w1wrykm89z2.png" alt="Sequence diagram showing data team communications" width="621" height="491"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Reusability
&lt;/h3&gt;

&lt;p&gt;Another significant downside of the current imperative approach in Airflow is the difficulty of code reuse. Looking at Airflow's abstraction layers, we can see why:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsk1437ci8ru8620btv1j.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsk1437ci8ru8620btv1j.png" alt="Airflow architecture diagram showing layers and components" width="569" height="477"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;While Airflow provides hooks and operators that can be reused across DAGs, the tasks themselves must be written directly in the DAG file along with the business logic. Even if we need to create a new DAG that shares most of the same tasks but adds just one additional step, we have to rewrite all the task definitions in the new DAG file.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fk91trtqeq965clxeshtc.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fk91trtqeq965clxeshtc.png" alt="Modified Airflow diagram showing code duplication issue" width="569" height="477"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The tasks have to be defined in this DAG again, even though the &lt;code&gt;transfer_s3_to_gcs&lt;/code&gt; task and the &lt;code&gt;load_to_bigquery&lt;/code&gt; task are the same as the previous DAG.&lt;/p&gt;

&lt;h3&gt;
  
  
  Tight coupling
&lt;/h3&gt;

&lt;p&gt;The real challenge becomes apparent when we need to update implementation details. Consider a scenario where we need to make our transfer process more scalable. The current approach, which might work for smaller files, will fail for file sizes exceeding the Airflow worker's memory. Even worse, it could stall or crash the worker, disrupting other executing DAGs.&lt;/p&gt;

&lt;p&gt;Instead, to ensure the workload scales independently, we create an instance and use the instance's &lt;code&gt;STARTUP_SCRIPT&lt;/code&gt; to transfer the files.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;DAG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3_to_gcs_transfer&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
         &lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
         &lt;span class="n"&gt;schedule_interval&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;@daily&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
         &lt;span class="n"&gt;catchup&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

    &lt;span class="c1"&gt;# Create and start instance
&lt;/span&gt;    &lt;span class="c1"&gt;# Use the STARTUP_SCRIPT to transfer files in the instance
&lt;/span&gt;    &lt;span class="n"&gt;create_instance&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ComputeEngineInsertInstanceOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;create_transfer_instance&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;project_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;your-project-id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
        &lt;span class="n"&gt;zone&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;us-central1-a&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;body&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;transfer-instance-{{ ds_nodash }}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;machineType&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;n1-standard-2&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;metadata&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;items&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
                    &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;startup-script&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;STARTUP_SCRIPT&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
                    &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3_bucket&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;your-s3-bucket&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
                    &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3_key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;path/to/your/file&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
                    &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gcs_bucket&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;your-gcs-bucket&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
                    &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gcs_path&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;destination-path&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
                &lt;span class="p"&gt;]&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Delete instance
&lt;/span&gt;    &lt;span class="n"&gt;delete_instance&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ComputeEngineDeleteInstanceOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;delete_transfer_instance&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;project_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;your-project-id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;zone&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;us-central1-a&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;resource_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;transfer-instance-{{ ds_nodash }}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;trigger_rule&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;all_done&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Load from GCS to BigQuery
&lt;/span&gt;    &lt;span class="n"&gt;load_to_bq&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;GCSToBigQueryOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;load_to_bigquery&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;bucket&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;your-gcs-bucket&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;source_objects&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;path/to/destination/file.csv&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
        &lt;span class="n"&gt;destination_project_dataset_table&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;project:dataset.table&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;create_instance&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;delete_instance&lt;/span&gt;
    &lt;span class="n"&gt;create_instance&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;load_to_bq&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When we need to update the DAGs to use this scalable approach, there isn't a clean way to do it. Every DAG using the transfer logic needs to be updated, tested, and redeployed individually. The refactoring effort multiplies with the number of files that need updating.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fydnvuvywnciw8v7ltxjb.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fydnvuvywnciw8v7ltxjb.png" alt="Diagram illustrating pipeline update propagation challenges" width="800" height="262"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This limitation stems from Airflow's fundamental design pattern where business logic (the nodes and structure of the DAG) is tightly coupled with implementation logic (the actual task code). This coupling not only makes maintenance difficult but also creates a barrier between data engineers and other team members who could potentially define and modify pipelines themselves.&lt;/p&gt;

&lt;h2&gt;
  
  
  Declarative data pipelines
&lt;/h2&gt;

&lt;p&gt;A declarative data platform could address these challenges by completely separating the technical implementation details (how to move data from S3 to GCS) from the pipeline business logic (which pipeline needs to move data from S3 to GCS). This separation would allow data engineers to focus on building robust, reusable components while enabling analysts and scientists to define pipelines without deep technical knowledge.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4f7fge02e4zm2qallnui.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4f7fge02e4zm2qallnui.png" alt="Architecture diagram of declarative pipeline approach" width="571" height="377"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Config-based workflows
&lt;/h3&gt;

&lt;p&gt;In summary, we can identify three key requirements for building an effective declarative data platform:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Separation of Concerns&lt;/strong&gt;: Pipeline definitions (what) should be completely separate from task implementations (how)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Reusability&lt;/strong&gt;: Tasks should be reusable across different pipelines without code duplication&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Simplified Interface&lt;/strong&gt;: Teams should be able to define pipelines using a simple, declarative syntax&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Here's how such a platform might work. Instead of writing Python code, you'd define your pipeline in YAML:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;tasks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;daily_transfer&lt;/span&gt;
    &lt;span class="na"&gt;task&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;s3_to_gcs&lt;/span&gt;
    &lt;span class="na"&gt;params&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;s3_bucket&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;your-s3-bucket-name,&lt;/span&gt;
      &lt;span class="na"&gt;s3_key&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;path/to/your/file.csv,&lt;/span&gt;
      &lt;span class="na"&gt;gcs_bucket&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;your-gcs-bucket-name,&lt;/span&gt;
      &lt;span class="na"&gt;gcs_object_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;path/to/destination/file.csv&lt;/span&gt;

  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;daily_load_to_bq&lt;/span&gt;
    &lt;span class="na"&gt;task&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;gcs_to_bq&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;s3_to_gcs&lt;/span&gt;
    &lt;span class="na"&gt;params&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;gcs_bucket&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;your-gcs-bucket-name,&lt;/span&gt;
      &lt;span class="na"&gt;gcs_object_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;path/to/destination/file.csv,&lt;/span&gt;
      &lt;span class="na"&gt;bq_dataset&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;your-bq-dataset-name,&lt;/span&gt;
      &lt;span class="na"&gt;bq_table&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;your-bq-table-name&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The actual implementation of these tasks would be written in Python, but crucially, they would be independent of any specific pipeline. This separation allows data engineers to optimize and maintain task implementations without affecting pipeline definitions.&lt;/p&gt;

&lt;h3&gt;
  
  
  Benefits
&lt;/h3&gt;

&lt;p&gt;This approach also offers other key advantages.&lt;/p&gt;

&lt;h4&gt;
  
  
  Reduced cognitive load
&lt;/h4&gt;

&lt;p&gt;Declarative systems mirror how humans naturally think about problem-solving. When designing a new pipeline, we typically think top-down about what tasks need to be included. However, imperative systems like Airflow require bottom-up thinking, where you must detail every implementation step before building the pipeline.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2k6z6vcnhd2dm7d7axwl.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2k6z6vcnhd2dm7d7axwl.png" alt="Comparison of top-down vs bottom-up pipeline design approaches" width="664" height="243"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Improved collaboration
&lt;/h4&gt;

&lt;p&gt;Since creating and updating tasks are decoupled from business logic, teams can work more efficiently:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Data analysts can create and modify pipelines without deep technical knowledge&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Data engineers can optimize task implementations without understanding pipeline-specific logic&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Changes can be reviewed and deployed independently&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fu7xtflqz38or9gcio2cu.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fu7xtflqz38or9gcio2cu.png" alt="Workflow diagram between Data Scientists and Engineers" width="800" height="288"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  UI and no-code tools
&lt;/h3&gt;

&lt;p&gt;While this article proposes a YAML-based approach, visual workflow tools like AWS Glue offer similar declarative functionality. However, these UI-based tools come with some non-obvious limitations:&lt;/p&gt;

&lt;h4&gt;
  
  
  Limited DevOps Integration
&lt;/h4&gt;

&lt;p&gt;Core software engineering practices like code review, testing, and automated deployments become difficult or impossible with UI-based tools.&lt;/p&gt;

&lt;h4&gt;
  
  
  Reduced Flexibility
&lt;/h4&gt;

&lt;p&gt;While UI tools excel at common use cases, they often struggle with custom requirements. Teams frequently end up modifying their requirements to fit the tool's capabilities rather than the other way around.&lt;/p&gt;

&lt;h3&gt;
  
  
  AI &amp;amp; LLMs
&lt;/h3&gt;

&lt;p&gt;The rise of Large Language Models (LLMs) presents interesting opportunities for declarative data platforms. LLMs excel at breaking down high-level instructions into logical steps but can struggle with generating efficient, maintainable implementation code.&lt;/p&gt;

&lt;p&gt;A declarative platform could leverage these strengths by using LLMs to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Help generate pipeline configurations from natural language descriptions&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Suggest task combinations for common data patterns&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Assist with pipeline documentation and metadata&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This aligns well with LLMs' capabilities while avoiding their limitations around generating complex implementation code.&lt;/p&gt;

&lt;h2&gt;
  
  
  Implementation
&lt;/h2&gt;

&lt;p&gt;With these challenges in mind, I built &lt;a href="https://github.com/runodp/dagster-odp" rel="noopener noreferrer"&gt;dagster-odp&lt;/a&gt;, as a proof-of-concept for bringing declarative pipelines to Dagster. Let's examine how this implementation addresses the problems we've discussed, starting with why Dagster was chosen as the foundation.&lt;/p&gt;

&lt;h3&gt;
  
  
  Why not Airflow
&lt;/h3&gt;

&lt;p&gt;Two fundamental aspects of Airflow's design made it unsuitable for our needs:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Task definition model&lt;/strong&gt;: In Airflow, tasks can only exist within DAGs - they aren't independent, reusable components. This conflicts with our core requirement of having tasks be first-class citizens that can be reused across different pipelines. The tight coupling between tasks and DAGs makes it impossible to build a truly modular system.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Dynamic DAG Creation&lt;/strong&gt;: Our platform needs to create DAGs dynamically from YAML configurations. However, Airflow expects DAGs to be defined in Python files. Creating DAGs dynamically would mean generating Python code at runtime and writing this code to DAG files - an approach that's both complex and prone to errors.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Why Dagster?
&lt;/h3&gt;

&lt;p&gt;Dagster's architecture aligns perfectly with our declarative approach, offering two key constructs that make it ideal for our platform:&lt;/p&gt;

&lt;h4&gt;
  
  
  Asset-Based Architecture
&lt;/h4&gt;

&lt;p&gt;Instead of focusing on tasks like Airflow, Dagster uses assets as its primary abstraction. An asset represents a concrete piece of data - like a table, file, or model. Creating a new version of assets through a pipeline run is called asset materialization.&lt;/p&gt;

&lt;p&gt;This approach means the code is directly linked to the data object it creates, enabling rich metadata when a new version of an asset is created. For instance, when running a pipeline in Dagster, your code can push detailed information about the data asset it produced to the UI. The metadata could include the asset's size, schema, quality metrics, and lineage. This tight coupling between code and data makes it easier to build observable, maintainable data pipelines.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvwnssr6v83v4ys1osngz.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvwnssr6v83v4ys1osngz.png" alt="Dagster UI vs Airflow UI comparison screenshot" width="800" height="333"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Dynamic Definition Support
&lt;/h4&gt;

&lt;p&gt;Dagster's component model is built around Python decorators rather than class inheritance, making it ideal for dynamic creation. For example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@asset&lt;/span&gt; 
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;dynamic_asset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt; 
    &lt;span class="c1"&gt;# Asset logic here 
&lt;/span&gt;    &lt;span class="k"&gt;pass&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This design means we can programmatically create assets from YAML configurations without generating code files - a critical requirement for our platform.&lt;/p&gt;

&lt;h4&gt;
  
  
  Additional Dagster features
&lt;/h4&gt;

&lt;p&gt;Beyond its core architecture, Dagster provides several features that make it an ideal foundation for our declarative platform:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Local Development&lt;/strong&gt;: Dagster prioritizes developer experience by allowing local execution without complex setup. This means you can develop and test pipelines on your local machine with the same code that will run in production, leading to faster iteration cycles and simpler debugging.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;DBT Integration&lt;/strong&gt;: Dagster's DBT integration automatically discovers DBT models in your repository. This deep integration means you can treat individual DBT models as Dagster assets, run specific model selections, and maintain a complete picture of your data lineage across both DBT and non-DBT transformations.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Partitions&lt;/strong&gt;: Dagster's partitioning system lets you process data in logical chunks, whether time-based (like daily runs) or categorical (like per-country processing). This is particularly powerful for backfills and incremental processing, as you can easily rerun specific partitions without touching others.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Asset checks&lt;/strong&gt;: Data quality monitoring is built into Dagster's core abstractions through asset checks. Rather than treating quality checks as an afterthought, Dagster allows you to define checks that run automatically after asset materialization, enabling you to catch data issues before they propagate downstream.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  dagster-odp
&lt;/h2&gt;

&lt;p&gt;To demonstrate how &lt;a href="https://github.com/runodp/dagster-odp" rel="noopener noreferrer"&gt;dagster-odp&lt;/a&gt; brings these concepts together, we'll implement the same S3 to BigQuery pipeline we discussed earlier, but using a declarative approach. The complete implementation consists of three main components: resource configuration, task definition, and workflow configuration.&lt;/p&gt;

&lt;h3&gt;
  
  
  Resource definition
&lt;/h3&gt;

&lt;p&gt;First, we define our resource configuration. Resources in &lt;code&gt;dagster-odp&lt;/code&gt; represent reusable connections and services, similar to Airflow hooks. While &lt;code&gt;dagster-odp&lt;/code&gt; ships with pre-built GCP resources, we'll also create a custom S3 resource:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;resources&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;resource_kind&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;gcs&lt;/span&gt;
    &lt;span class="na"&gt;params&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;project&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;my-gcp-project&lt;/span&gt;

  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;resource_kind&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;s3&lt;/span&gt;

  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;resource_kind&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;bigquery&lt;/span&gt;
    &lt;span class="na"&gt;params&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;project&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;my-gcp-project&lt;/span&gt;
      &lt;span class="na"&gt;location&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;us-east1&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The S3 resource implementation shows how to create a custom resource:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@odp_resource&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;S3Resource&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ConfigurableResource&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;A resource that provides access to Amazon S3.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_client&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Any&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;boto3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;client&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;aws_access_key_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;EnvVar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;AWS_ACCESS_KEY_ID&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;get_value&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
        &lt;span class="n"&gt;aws_secret_access_key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;EnvVar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;AWS_SECRET_ACCESS_KEY&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;get_value&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We use the &lt;code&gt;odp_resource&lt;/code&gt; decorator to define the resource class. This allows it to be discovered by the framework and used in the YAML file. We define one method, &lt;code&gt;get_client&lt;/code&gt;, which we will use in our task definition.&lt;/p&gt;

&lt;h3&gt;
  
  
  Task definition
&lt;/h3&gt;

&lt;p&gt;With our resources defined, we can create the S3 to GCS transfer task by implementing a BaseTask subclass.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dagster_odp.tasks.manager&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;BaseTask&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;odp_task&lt;/span&gt;

&lt;span class="nd"&gt;@odp_task&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3_file_to_gcs&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;required_resources&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;gcs&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;S3ToGCSTransfer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;BaseTask&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    A task that transfers a file from Amazon S3 to GCP Storage
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;s3_bucket&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;s3_key&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;destination_uri&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;s3_client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_resources&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;get_client&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;gcs_client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_resources&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;gcs&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;


        &lt;span class="c1"&gt;# Download file from S3
&lt;/span&gt;        &lt;span class="n"&gt;s3_response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;s3_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_object&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Bucket&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;s3_bucket&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                            &lt;span class="n"&gt;Key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;s3_key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;file_content&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;s3_response&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Body&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;read&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

        &lt;span class="n"&gt;bucket_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;destination_uri&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;:].&lt;/span&gt;&lt;span class="nf"&gt;split&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="n"&gt;blob_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;destination_uri&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;:].&lt;/span&gt;&lt;span class="nf"&gt;split&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;:])&lt;/span&gt;

        &lt;span class="c1"&gt;# Upload to GCS
&lt;/span&gt;        &lt;span class="n"&gt;bucket&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;gcs_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;bucket&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;blob&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;bucket&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;blob&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;blob_name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;blob&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;upload_from_string&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;file_content&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3_bucket&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;s3_bucket&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3_key&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;s3_key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;destination_uri&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;destination_uri&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;size&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;file_content&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The task implementation returns metadata that will be visible in the Dagster UI after each run.&lt;/p&gt;

&lt;h3&gt;
  
  
  Workflow definition
&lt;/h3&gt;

&lt;p&gt;Finally, we tie everything together in our workflow configuration. This YAML file defines our assets, their dependencies, and scheduling:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;assets&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;asset_key&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;raw_data_gcs&lt;/span&gt;
    &lt;span class="na"&gt;task_type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;s3_file_to_gcs&lt;/span&gt;
    &lt;span class="na"&gt;params&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;s3_bucket&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;my-source-bucket&lt;/span&gt;
      &lt;span class="na"&gt;s3_key&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;data/daily_export.csv&lt;/span&gt;
      &lt;span class="na"&gt;destination_file_uri&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;gs://my-gcs-bucket/data/daily_export.csv&lt;/span&gt;

  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;asset_key&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;raw_data_bq&lt;/span&gt;
    &lt;span class="na"&gt;task_type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;gcs_file_to_bq&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;raw_data_gcs&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
    &lt;span class="na"&gt;params&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;source_file_uri&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;{{raw_data_gcs.destination_file_uri}}"&lt;/span&gt;
      &lt;span class="na"&gt;destination_table_id&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;my-project.my_dataset.daily_data"&lt;/span&gt;
      &lt;span class="na"&gt;job_config_params&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;source_format&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;CSV&lt;/span&gt;
        &lt;span class="na"&gt;write_disposition&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;WRITE_TRUNCATE&lt;/span&gt;
        &lt;span class="na"&gt;autodetect&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;

&lt;span class="na"&gt;jobs&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;job_id&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;daily_data_transfer&lt;/span&gt;
    &lt;span class="na"&gt;description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Transfer&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;from&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;S3&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;to&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;BigQuery&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;daily"&lt;/span&gt;
    &lt;span class="na"&gt;asset_selection&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;raw_data_gcs&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;raw_data_bq&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
    &lt;span class="na"&gt;triggers&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;trigger_id&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;daily_schedule&lt;/span&gt;
        &lt;span class="na"&gt;trigger_type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;schedule&lt;/span&gt;
        &lt;span class="na"&gt;params&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="na"&gt;schedule_kind&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;cron&lt;/span&gt;
          &lt;span class="na"&gt;schedule_params&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
            &lt;span class="na"&gt;cron_schedule&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;@daily"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;These components are converted by &lt;code&gt;dagster-odp&lt;/code&gt; into Dagster constructs like assets, jobs and schedules.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fm5wxn36oee9ea3w53jpb.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fm5wxn36oee9ea3w53jpb.png" alt="dagster-odp system conversion flow diagram" width="459" height="285"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  dagster-odp Features
&lt;/h3&gt;

&lt;p&gt;The example above demonstrates some key features of the framework:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Asset dependencies through the &lt;code&gt;depends_on&lt;/code&gt; field&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Metadata sharing between assets using handlebars syntax (&lt;code&gt;{{raw_data_gcs.destination_file_uri}}&lt;/code&gt;)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Built-in scheduling with cron support&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Pre-built tasks for common operations like &lt;code&gt;gcs_file_to_bq&lt;/code&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The framework also provides several other capabilities:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Integration with DLT for data ingestion&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Soda integration for data quality monitoring&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;DBT support with dependency management and variable passing&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For a hands-on introduction to &lt;code&gt;dagster-odp&lt;/code&gt;, check out the &lt;a href="https://runodp.github.io/dagster-odp/getting-started/overview/" rel="noopener noreferrer"&gt;Getting Started guide&lt;/a&gt;. The guide walks through setting up your first project and building a simple pipeline using the concepts we've discussed.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Move Towards Declarative Pipelines
&lt;/h2&gt;

&lt;p&gt;&lt;code&gt;dagster-odp&lt;/code&gt; isn't the only declarative data platform. Several platforms are exploring similar approaches, recognizing the need to make data pipelines more maintainable and accessible:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Data aware scheduling&lt;/strong&gt;: Airflow's data aware scheduling (v2.4+) introduces a concept similar to Dagster's assets, allowing DAGs to be dynamically chained together based on datasets. However, since DAGs must still be defined in Python files, creating these jobs dynamically from YAML remains challenging.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;DLT+&lt;/strong&gt;: dlthub, an open source data ingestion tool, recently launched DLT+, offering end-to-end workflow definitions via YAML files. It's built specifically for data scientists, focusing on single-machine computation for both ingestion and processing tasks. To learn more, &lt;a href="https://dlthub-community.slack.com/" rel="noopener noreferrer"&gt;get in touch&lt;/a&gt; with them.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Dagster&lt;/strong&gt;: The Dagster team is working on their own declarative feature that will be deeply integrated into their orchestration product. To learn more or collaborate, jump into the &lt;a href="https://dagster.slack.com/join/shared_invite/zt-1xbsn8up1-PGqm317FBlwnzPeCbRhaHQ" rel="noopener noreferrer"&gt;Dagster Slack&lt;/a&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;The evolution from imperative to declarative approaches in data engineering mirrors what we've seen in other domains like DevOps. As data pipelines become more complex, the need for maintainable, configurable, and collaborative solutions grows stronger. A declarative framework addresses these challenges by separating pipeline definitions from implementation details, enabling teams to work more efficiently while maintaining the flexibility needed for complex data workflows.&lt;/p&gt;

&lt;p&gt;Whether you're building new pipelines or maintaining existing ones, considering a declarative approach could help make your data infrastructure more maintainable and your teams more productive.&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>python</category>
      <category>opensource</category>
    </item>
  </channel>
</rss>
