<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Yegor Bryzhan</title>
    <description>The latest articles on DEV Community by Yegor Bryzhan (@ybryzy).</description>
    <link>https://dev.to/ybryzy</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3639909%2Fd1dc9ff7-c50e-40db-87d6-a000c6cf00dd.png</url>
      <title>DEV Community: Yegor Bryzhan</title>
      <link>https://dev.to/ybryzy</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/ybryzy"/>
    <language>en</language>
    <item>
      <title>Trying Out Dagster for Data Orchestration</title>
      <dc:creator>Yegor Bryzhan</dc:creator>
      <pubDate>Mon, 01 Dec 2025 23:33:05 +0000</pubDate>
      <link>https://dev.to/ybryzy/trying-out-dagster-for-data-orchestration-1ih5</link>
      <guid>https://dev.to/ybryzy/trying-out-dagster-for-data-orchestration-1ih5</guid>
      <description>&lt;p&gt;I've been hearing a lot of good things about Dagster recently, so I wanted to give it a try.&lt;br&gt;
There are quite a few orchestrators in the data world, but the biggest ones are:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Apache Airflow&lt;/li&gt;
&lt;li&gt;Prefect&lt;/li&gt;
&lt;li&gt;Dagster&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;I have professional experience with both Prefect and Airflow, so I wanted to see what Dagster brings to the table (Spoiler: quite a lot), so I've built a simple pipeline that extracts and aggregates weather data from &lt;code&gt;https://open-meteo.com/&lt;/code&gt; to test how Dagster's approach compares to other tools in the industry.&lt;/p&gt;

&lt;p&gt;Full pipeline at: &lt;a href="https://github.com/bryzy-y/weather_etl" rel="noopener noreferrer"&gt;https://github.com/bryzy-y/weather_etl&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Intro
&lt;/h2&gt;

&lt;p&gt;Well, I can't jump right into the pipeline before giving a brief intro and highlighting the most obvious differentiating factor that Dagster has - &lt;strong&gt;Assets&lt;/strong&gt;. The way you model your pipeline is very different in Dagster because you are not thinking about &lt;code&gt;flows&lt;/code&gt; or &lt;code&gt;operators/dags&lt;/code&gt;, but instead the actual data entities that are most often the results of these operations. Meaning all of your procedural steps like extracting the data from some system and then loading it into S3 for example, will be modeled as a single &lt;code&gt;asset&lt;/code&gt; rather than separately. This puts Dagster in a more declarative camp, with &lt;code&gt;assets&lt;/code&gt; being at the forefront of things, versus task based approaches that are more procedural, like in &lt;code&gt;Prefect&lt;/code&gt; and &lt;code&gt;Airflow&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;One remark I want to make is that Dagster is flexible enough to support the &lt;code&gt;non-asset&lt;/code&gt; way of doing things like I just described via something called an &lt;code&gt;op&lt;/code&gt;, but it's a recommended practice to use &lt;code&gt;assets&lt;/code&gt; instead, unless the complexity at hand requires the usage of an &lt;code&gt;op&lt;/code&gt;. I will come back to it later.&lt;/p&gt;
&lt;h2&gt;
  
  
  Data Pipeline Architecture
&lt;/h2&gt;

&lt;p&gt;After a short overview, let's look at &lt;code&gt;weather_etl&lt;/code&gt; project structure. It has two datasets - &lt;code&gt;actual weather&lt;/code&gt; and &lt;code&gt;forecast&lt;/code&gt;. Both of these are sourced, including the history, from the API and are stored in S3 as parquet files. The transformation step is done with &lt;code&gt;polars&lt;/code&gt;, an awesome &lt;code&gt;pandas&lt;/code&gt; alternative. It's rather simple and it's there to illustrate a concept, because at that stage you can use a data warehouse to do your data modeling needs.&lt;br&gt;
This is what the resulting Dagster asset graph looks like:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F50a0s585maomfh10g4xr.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F50a0s585maomfh10g4xr.png" alt="Assets" width="800" height="637"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Assets
&lt;/h2&gt;

&lt;p&gt;Defining assets in Dagster is easy: just slap an &lt;code&gt;asset&lt;/code&gt; decorator on top of the function.&lt;/p&gt;

&lt;p&gt;The pipeline is organized into two layers: &lt;strong&gt;raw&lt;/strong&gt; and &lt;strong&gt;staged&lt;/strong&gt;. The raw layer contains assets that fetch data from the Open-Meteo API and store it as-is in S3, while the staged layer transforms this raw data into normalized, analytics-ready tables.&lt;/p&gt;
&lt;h3&gt;
  
  
  Raw Assets
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;&lt;code&gt;historical_forecast&lt;/code&gt;&lt;/strong&gt; - This partitioned asset fetches historical weather forecast data month by month, starting from January 2023. It uses the Historical Forecast API endpoint and is perfect for backfilling. The monthly partitioning means I can process years of historical data independently, making it resilient to failures and easy to reprocess specific time periods.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;monthly_partition&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;MonthlyPartitionsDefinition&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="c1"&gt;# Backfill the data from Jan 2023 onwards
&lt;/span&gt;    &lt;span class="n"&gt;start_date&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nf"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2023&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;tzinfo&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;UTC&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="nd"&gt;@asset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;key_prefix&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;raw&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;retry_policy&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;RetryPolicy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;max_retries&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;delay&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;group_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;weather_etl&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;owner&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ybryz&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;api-url&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;MetadataValue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;url&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;HISTORICAL_API_URL&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;endpoint&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/forecast&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="n"&gt;kinds&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="n"&gt;partitions_def&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;monthly_partition&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;historical_forecast&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;AssetExecutionContext&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;weather_api_client&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;WeatherApiClient&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;S3Resource&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="bp"&gt;...&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;&lt;code&gt;hourly_forecast&lt;/code&gt;&lt;/strong&gt; - This asset runs daily at 6 AM UTC and fetches the 7-day weather forecast for all configured cities (NYC, Philadelphia, Chicago, and DC). It's scheduled with &lt;code&gt;AutomationCondition.on_cron()&lt;/code&gt; to ensure fresh forecast data is available every morning. The data includes temperature, precipitation, wind speed, snowfall, and visibility metrics.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@asset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;key_prefix&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;raw&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;retry_policy&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;RetryPolicy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;max_retries&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;delay&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;group_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;weather_etl&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;owner&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ybryz&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;api-url&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;MetadataValue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;url&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;API_URL&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;endpoint&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/forecast&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="n"&gt;kinds&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="n"&gt;automation_condition&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AutomationCondition&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;on_cron&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;0 6 * * *&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;  &lt;span class="c1"&gt;# Daily at 6 AM UTC
&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;hourly_forecast&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;weather_api_client&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;WeatherApiClient&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;S3Resource&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="bp"&gt;...&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;&lt;code&gt;historical_actual_weather&lt;/code&gt;&lt;/strong&gt; - Similar to &lt;code&gt;historical_forecast&lt;/code&gt;, but this fetches observed weather data (what actually happened) from the Archive API. It's also monthly partitioned and accounts for the Archive API's 5-day delay, meaning it can only fetch data from 5 days ago or earlier. This is useful for comparing forecasts against reality.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;code&gt;actual_weather&lt;/code&gt;&lt;/strong&gt; - This asset runs every 5 days and fetches recent observed weather data. Since the Archive API has a 5-day delay, this asset pulls data from 10 days ago to 5 days ago, ensuring we always have the most recent available observations. It complements the historical data by keeping the dataset current.&lt;/p&gt;

&lt;p&gt;All raw assets follow the same pattern: they call the &lt;code&gt;WeatherApiClient&lt;/code&gt; resource to fetch data, convert it to a Polars DataFrame, add an &lt;code&gt;extracted_at&lt;/code&gt; timestamp for lineage tracking, and upload it to S3 as Parquet files organized by month. Each asset includes retry policies (up to 3 retries with 10-second delays) to handle transient API failures.&lt;/p&gt;

&lt;h3&gt;
  
  
  Staged Assets
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;&lt;code&gt;hourly_forecast_table&lt;/code&gt;&lt;/strong&gt; - This transformation asset depends on both &lt;code&gt;historical_forecast&lt;/code&gt; and &lt;code&gt;hourly_forecast&lt;/code&gt;. It reads all the raw forecast parquet files from S3, normalizes the nested JSON structure by exploding the hourly arrays, maps coordinates to city codes, and deduplicates the data based on city and timestamp. The result is a clean, denormalized table ready for analysis. It uses &lt;code&gt;AutomationCondition.eager()&lt;/code&gt;, meaning it automatically runs whenever new upstream data arrives.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@asset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;key_prefix&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;staged&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;group_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;weather_etl&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;kinds&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;polars&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="n"&gt;deps&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;historical_forecast&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;hourly_forecast&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;automation_condition&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AutomationCondition&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;eager&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;hourly_forecast_table&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="bp"&gt;...&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;&lt;code&gt;actual_weather_table&lt;/code&gt;&lt;/strong&gt; - The transformation counterpart for observed weather data. It depends on &lt;code&gt;historical_actual_weather&lt;/code&gt; and &lt;code&gt;actual_weather&lt;/code&gt;, and applies the same normalization logic through a shared &lt;code&gt;transform_weather_data()&lt;/code&gt; function.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@asset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;key_prefix&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;staged&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;group_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;weather_etl&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;kinds&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;polars&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="n"&gt;deps&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;historical_actual_weather&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;actual_weather&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;automation_condition&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AutomationCondition&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;eager&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;actual_weather_table&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="bp"&gt;...&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Both staged assets include rich metadata like table schemas and row counts, making it easy to monitor data quality and spot anomalies through the Dagster UI.&lt;/p&gt;

&lt;p&gt;Now, I want to go over Dagster's features that are used consistently throughout the project.&lt;/p&gt;

&lt;h2&gt;
  
  
  Resources and Dependency Injection
&lt;/h2&gt;

&lt;p&gt;One of the things I really appreciate about Dagster is how cleanly it handles resources through dependency injection. Instead of passing configuration around or relying on global state, you define resources that can be injected into your assets as needed.&lt;/p&gt;

&lt;p&gt;In my pipeline, I created a &lt;code&gt;WeatherApiClient&lt;/code&gt; resource that wraps the Open-Meteo API:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;WeatherApiClient&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ConfigurableResource&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;A simple client for the Open-Meteo weather API.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="n"&gt;base_url&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;API_URL&lt;/span&gt;
    &lt;span class="n"&gt;_client&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;httpx&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PrivateAttr&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="nd"&gt;@contextmanager&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;yield_for_execution&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;InitResourceContext&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Yields an HTTPX client configured for the weather API.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;httpx&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Client&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;base_url&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;base_url&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;
        &lt;span class="k"&gt;finally&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;hourly_forecast&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cities&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;list&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;City&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;list&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Fetch hourly weather forecast data for specified cities.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;params&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ForecastParams&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cities&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;cities&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;hourly&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;WeatherVars&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;default&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
        &lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;forecast&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;params&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;params&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_query_params&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
        &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;raise_for_status&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This resource extends &lt;code&gt;ConfigurableResource&lt;/code&gt; and implements &lt;code&gt;yield_for_execution&lt;/code&gt;, which manages the lifecycle of the HTTP client - creating it when needed and properly closing it when done. The resource exposes clean methods like &lt;code&gt;hourly_forecast()&lt;/code&gt;, &lt;code&gt;historical_forecast()&lt;/code&gt;, and &lt;code&gt;actual_weather()&lt;/code&gt; that handle all the API interaction details.&lt;/p&gt;

&lt;p&gt;In my assets, I simply declare the resource as a parameter:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@asset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;key_prefix&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;raw&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;retry_policy&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;RetryPolicy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;max_retries&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;delay&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;group_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;weather_etl&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;automation_condition&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AutomationCondition&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;on_cron&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;0 6 * * *&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;hourly_forecast&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;weather_api_client&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;WeatherApiClient&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;S3Resource&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Fetches hourly forecast data from the weather API for the next 7 days.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;now&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;UTC&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;timestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;strftime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;%Y%m%dT%H%M%S&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;lake_path&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;forecast_path&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;replace&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;day&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hourly_forecast_&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;.parquet&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;

    &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;weather_api_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;hourly_forecast&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CITIES&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;values&lt;/span&gt;&lt;span class="p"&gt;()))&lt;/span&gt;

    &lt;span class="c1"&gt;# Add metadata fields
&lt;/span&gt;    &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pl&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;from_dicts&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;with_columns&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;pl&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;lit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;alias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;extracted_at&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

    &lt;span class="c1"&gt;# Upload data to S3
&lt;/span&gt;    &lt;span class="nf"&gt;polars_to_s3&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;lake_path&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_client&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;MaterializeResult&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;cities&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;MetadataValue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CITIES&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;())),&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3_path&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;MetadataValue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;url&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;lake_path&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;uri&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Dagster automatically injects these resources when the asset runs. No need to create global objects or pass them by hand, just do things declaratively. This makes testing easier too, since you can inject mock resources during tests.&lt;/p&gt;

&lt;h2&gt;
  
  
  Partitions
&lt;/h2&gt;

&lt;p&gt;Partitions are another killer feature in Dagster that I found incredibly useful. They let you split your data pipeline into logical chunks - typically by time - and manage them independently. This is crucial for backfilling historical data or re-processing specific time periods without touching everything else.&lt;/p&gt;

&lt;p&gt;I defined a monthly partition that starts from January 2023:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;monthly_partition&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;MonthlyPartitionsDefinition&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;start_date&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nf"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2023&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;tzinfo&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;UTC&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then I applied this partition definition to my historical data assets:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@asset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;key_prefix&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;raw&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;partitions_def&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;monthly_partition&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;historical_forecast&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;AssetExecutionContext&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;weather_api_client&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;WeatherApiClient&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;S3Resource&lt;/span&gt;
&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Fetches historical forecast data from the weather API.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;now&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;UTC&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;start_month&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;strptime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partition_key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;monthly_partition&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;date&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;end_month&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;start_month&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nf"&gt;timedelta&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;days&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;32&lt;/span&gt;&lt;span class="p"&gt;)).&lt;/span&gt;&lt;span class="nf"&gt;replace&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;day&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="nf"&gt;timedelta&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;days&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;date&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;end_month&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;end_month&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;date&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;weather_api_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;historical_forecast&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CITIES&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;values&lt;/span&gt;&lt;span class="p"&gt;()),&lt;/span&gt; &lt;span class="n"&gt;start_month&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;end_month&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="c1"&gt;# ... process and save data
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Everything happens through the &lt;code&gt;context.partition_key&lt;/code&gt; - when this asset runs, Dagster knows which partition (which month) it's processing. This means I can:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Backfill all historical data from 2023 by running each month independently&lt;/li&gt;
&lt;li&gt;Re-process just October 2025 if something went wrong with that month&lt;/li&gt;
&lt;li&gt;Monitor which partitions have succeeded or failed in the UI&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The same pattern applies to my &lt;code&gt;historical_actual_weather&lt;/code&gt; asset, which fetches observed weather data (as opposed to forecasts) from the archive API.&lt;/p&gt;

&lt;h2&gt;
  
  
  Automation Conditions
&lt;/h2&gt;

&lt;p&gt;In addition to traditional scheduling, Dagster also uses &lt;code&gt;AutomationCondition&lt;/code&gt; to determine when assets should materialize. This is more flexible than simple schedules because it can account for dependencies and other conditions.&lt;/p&gt;

&lt;p&gt;Let's start with simple stuff. For my &lt;code&gt;hourly_forecast&lt;/code&gt; asset, I want fresh forecast data every morning:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;automation_condition&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AutomationCondition&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;on_cron&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;0 6 * * *&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="c1"&gt;# Daily at 6 AM UTC
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;For the &lt;code&gt;actual_weather&lt;/code&gt; asset that fetches observed weather (which has a 5-day API delay), I only need to run it every 5 days:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;automation_condition&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AutomationCondition&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;on_cron&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;0 3 */5 * *&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="c1"&gt;# Every 5 days at 3 AM UTC
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;But here's where it gets really interesting. For my transformation assets like &lt;code&gt;hourly_forecast_table&lt;/code&gt;, I use an eager automation condition:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@asset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;key_prefix&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;staged&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;deps&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;historical_forecast&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;hourly_forecast&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;automation_condition&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AutomationCondition&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;eager&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;hourly_forecast_table&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Hourly forecast table normalized from raw JSON data.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;path&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;forecast_path&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;date&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;**&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;*.parquet&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pl&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;read_parquet&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;uri&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;missing_columns&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;insert&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;pipe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;transform_weather_data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="c1"&gt;# ... transform and save
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;eager()&lt;/code&gt; condition means this asset will automatically run whenever any of its upstream dependencies (&lt;code&gt;historical_forecast&lt;/code&gt; or &lt;code&gt;hourly_forecast&lt;/code&gt;) materialize new data.&lt;/p&gt;

&lt;h2&gt;
  
  
  Metadata
&lt;/h2&gt;

&lt;p&gt;Dagster has first-class support for metadata. Every asset can return metadata about what it produced:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;MaterializeResult&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;cities&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;MetadataValue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CITIES&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;())),&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3_path&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;MetadataValue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;url&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;lake_path&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;uri&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;start_date&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;MetadataValue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;text&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;start&lt;/span&gt;&lt;span class="p"&gt;)),&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;end_date&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;MetadataValue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;text&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;end&lt;/span&gt;&lt;span class="p"&gt;)),&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This metadata shows up in the Dagster UI, so you can quickly see:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;What cities were processed&lt;/li&gt;
&lt;li&gt;Where the data was written in S3&lt;/li&gt;
&lt;li&gt;What date range was covered&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For the staged assets, I include schema information and row counts:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;columns&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;TableColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;type&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dtype&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="nf"&gt;for &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dtype&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;items&lt;/span&gt;&lt;span class="p"&gt;()]&lt;/span&gt;

&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;MaterializeResult&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;column_schema&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;TableSchema&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;columns&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;row_count&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;MetadataValue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;height&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can also attach metadata to the asset definition itself:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@asset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;owner&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ybryz&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;api-url&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;MetadataValue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;url&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ARCHIVE_API_URL&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;endpoint&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/archive&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This documentation lives with your code and appears in the UI, making it easy for team members to understand what each asset does and where the data comes from.&lt;/p&gt;

&lt;h2&gt;
  
  
  Ops
&lt;/h2&gt;

&lt;p&gt;Remember at the beginning I mentioned that Dagster supports both assets and ops? Most of the time, you'll use assets because they represent data artifacts. But sometimes you need to perform an action that doesn't produce a data asset - that's where ops come in.&lt;/p&gt;

&lt;p&gt;In my pipeline, I use an op to send weather forecast updates to Discord:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@dg.op&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;hourly_forecast_to_discord&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;OpExecutionContext&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;DiscordWebhook&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Send hourly weather forecast to a Discord channel via webhook.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;city_code&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;PHIL&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="n"&gt;forecast_date&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;today&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="c1"&gt;# Read from staged S3 table
&lt;/span&gt;    &lt;span class="n"&gt;table_path&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;staged_path&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hourly_forecast_table.parquet&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pl&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;read_parquet&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;table_path&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;uri&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Filter for specific city and date
&lt;/span&gt;    &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;filter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;pl&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;city_code&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;city_code&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;
        &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;pl&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;time&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;dt&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;date&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;forecast_date&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;sort&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;time&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Build Discord message with forecast data
&lt;/span&gt;    &lt;span class="c1"&gt;# ... format the message ...
&lt;/span&gt;
    &lt;span class="c1"&gt;# Send to Discord webhook
&lt;/span&gt;    &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;content&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;username&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Weather Bot&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;httpx&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Client&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;post&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;raise_for_status&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This op doesn't produce a data asset - it's a side effect (sending a notification). Ops are wrapped in jobs:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@dg.job&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;RunConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;ops&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hourly_forecast_to_discord&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;DiscordWebhook&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;EnvVar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;DISCORD_WEBHOOK_URL&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))}&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;hourly_forecast_to_discord_job&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Send a message to a Discord channel via webhook.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="nf"&gt;hourly_forecast_to_discord&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I can trigger these jobs manually in the UI or I can attach them to events like down below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="nd"&gt;@dg.asset_sensor&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;asset_key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;AssetKey&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;staged&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;hourly_forecast_table&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]),&lt;/span&gt;
    &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;hourly_forecast_to_discord_job&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;default_status&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DefaultSensorStatus&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;RUNNING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;hourly_forecast_to_discord_sensor&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;SensorEvaluationContext&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Sensor to trigger Discord report when hourly forecast table is updated.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;dg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;RunRequest&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;run_key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Sensor &lt;code&gt;senses&lt;/code&gt; any materializations of hourly_forecast_table and triggers the job to send a Discord message. Pretty cool, huh?&lt;/p&gt;

&lt;h2&gt;
  
  
  Closing thoughts
&lt;/h2&gt;

&lt;p&gt;There are a few other things that I didn't cover, like using env variables in your project, how definitions work, and how to deploy the pipeline. On the last part, I used Dagster Cloud to deploy, but the software is open-source so you can host it yourself locally or in the cloud. Nevertheless, check out the full project at - &lt;a href="https://github.com/bryzy-y/weather_etl" rel="noopener noreferrer"&gt;https://github.com/bryzy-y/weather_etl&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Overall, I found Dagster really pleasant to use - especially partitions, resources, and how observable your full pipeline is. My biggest gripe with &lt;code&gt;Airflow&lt;/code&gt; and &lt;code&gt;Prefect&lt;/code&gt; is that they lack observability out of the box. You can, of course, build it out yourself, but Dagster really spoils you with all of its metadata available right from the get-go. I think most of the time &lt;code&gt;assets&lt;/code&gt; is the correct paradigm to work with. Both &lt;code&gt;Prefect&lt;/code&gt; and &lt;code&gt;Airflow&lt;/code&gt; also rolled out their own versions of data assets by the way. However, there will always be cases where it doesn't really fit the problem and you care more about tasks than your data artifacts. In that case, &lt;code&gt;Airflow&lt;/code&gt; or &lt;code&gt;Prefect&lt;/code&gt; will be a better choice.&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>python</category>
      <category>programming</category>
    </item>
  </channel>
</rss>
