I've been hearing a lot of good things about Dagster recently, so I wanted to give it a try.
There are quite a few orchestrators in the data world, but the biggest ones are:
- Apache Airflow
- Prefect
- Dagster
I have professional experience with both Prefect and Airflow, so I wanted to see what Dagster brings to the table (Spoiler: quite a lot), so I've built a simple pipeline that extracts and aggregates weather data from https://open-meteo.com/ to test how Dagster's approach compares to other tools in the industry.
Full pipeline at: https://github.com/bryzy-y/weather_etl
Intro
Well, I can't jump right into the pipeline before giving a brief intro and highlighting the most obvious differentiating factor that Dagster has - Assets. The way you model your pipeline is very different in Dagster because you are not thinking about flows or operators/dags, but instead the actual data entities that are most often the results of these operations. Meaning all of your procedural steps like extracting the data from some system and then loading it into S3 for example, will be modeled as a single asset rather than separately. This puts Dagster in a more declarative camp, with assets being at the forefront of things, versus task based approaches that are more procedural, like in Prefect and Airflow.
One remark I want to make is that Dagster is flexible enough to support the non-asset way of doing things like I just described via something called an op, but it's a recommended practice to use assets instead, unless the complexity at hand requires the usage of an op. I will come back to it later.
Data Pipeline Architecture
After a short overview, let's look at weather_etl project structure. It has two datasets - actual weather and forecast. Both of these are sourced, including the history, from the API and are stored in S3 as parquet files. The transformation step is done with polars, an awesome pandas alternative. It's rather simple and it's there to illustrate a concept, because at that stage you can use a data warehouse to do your data modeling needs.
This is what the resulting Dagster asset graph looks like:
Assets
Defining assets in Dagster is easy: just slap an asset decorator on top of the function.
The pipeline is organized into two layers: raw and staged. The raw layer contains assets that fetch data from the Open-Meteo API and store it as-is in S3, while the staged layer transforms this raw data into normalized, analytics-ready tables.
Raw Assets
historical_forecast - This partitioned asset fetches historical weather forecast data month by month, starting from January 2023. It uses the Historical Forecast API endpoint and is perfect for backfilling. The monthly partitioning means I can process years of historical data independently, making it resilient to failures and easy to reprocess specific time periods.
monthly_partition = dg.MonthlyPartitionsDefinition(
# Backfill the data from Jan 2023 onwards
start_date=datetime(2023, 1, 1, tzinfo=UTC),
)
@asset(
key_prefix=["raw"],
retry_policy=RetryPolicy(max_retries=3, delay=10),
group_name="weather_etl",
metadata={
"owner": "ybryz",
"api-url": MetadataValue.url(HISTORICAL_API_URL),
"endpoint": "/forecast",
},
kinds={"s3"},
partitions_def=monthly_partition,
)
def historical_forecast(context: AssetExecutionContext, weather_api_client: WeatherApiClient, s3: S3Resource):
...
hourly_forecast - This asset runs daily at 6 AM UTC and fetches the 7-day weather forecast for all configured cities (NYC, Philadelphia, Chicago, and DC). It's scheduled with AutomationCondition.on_cron() to ensure fresh forecast data is available every morning. The data includes temperature, precipitation, wind speed, snowfall, and visibility metrics.
@asset(
key_prefix=["raw"],
retry_policy=RetryPolicy(max_retries=3, delay=10),
group_name="weather_etl",
metadata={
"owner": "ybryz",
"api-url": MetadataValue.url(API_URL),
"endpoint": "/forecast",
},
kinds={"s3"},
automation_condition=dg.AutomationCondition.on_cron("0 6 * * *"), # Daily at 6 AM UTC
)
def hourly_forecast(weather_api_client: WeatherApiClient, s3: S3Resource):
...
historical_actual_weather - Similar to historical_forecast, but this fetches observed weather data (what actually happened) from the Archive API. It's also monthly partitioned and accounts for the Archive API's 5-day delay, meaning it can only fetch data from 5 days ago or earlier. This is useful for comparing forecasts against reality.
actual_weather - This asset runs every 5 days and fetches recent observed weather data. Since the Archive API has a 5-day delay, this asset pulls data from 10 days ago to 5 days ago, ensuring we always have the most recent available observations. It complements the historical data by keeping the dataset current.
All raw assets follow the same pattern: they call the WeatherApiClient resource to fetch data, convert it to a Polars DataFrame, add an extracted_at timestamp for lineage tracking, and upload it to S3 as Parquet files organized by month. Each asset includes retry policies (up to 3 retries with 10-second delays) to handle transient API failures.
Staged Assets
hourly_forecast_table - This transformation asset depends on both historical_forecast and hourly_forecast. It reads all the raw forecast parquet files from S3, normalizes the nested JSON structure by exploding the hourly arrays, maps coordinates to city codes, and deduplicates the data based on city and timestamp. The result is a clean, denormalized table ready for analysis. It uses AutomationCondition.eager(), meaning it automatically runs whenever new upstream data arrives.
@asset(
key_prefix=["staged"],
group_name="weather_etl",
kinds={"polars"},
deps=[historical_forecast, hourly_forecast],
automation_condition=dg.AutomationCondition.eager(),
)
def hourly_forecast_table():
...
actual_weather_table - The transformation counterpart for observed weather data. It depends on historical_actual_weather and actual_weather, and applies the same normalization logic through a shared transform_weather_data() function.
@asset(
key_prefix=["staged"],
group_name="weather_etl",
kinds={"polars"},
deps=[historical_actual_weather, actual_weather],
automation_condition=dg.AutomationCondition.eager(),
)
def actual_weather_table():
...
Both staged assets include rich metadata like table schemas and row counts, making it easy to monitor data quality and spot anomalies through the Dagster UI.
Now, I want to go over Dagster's features that are used consistently throughout the project.
Resources and Dependency Injection
One of the things I really appreciate about Dagster is how cleanly it handles resources through dependency injection. Instead of passing configuration around or relying on global state, you define resources that can be injected into your assets as needed.
In my pipeline, I created a WeatherApiClient resource that wraps the Open-Meteo API:
class WeatherApiClient(dg.ConfigurableResource):
"""A simple client for the Open-Meteo weather API."""
base_url: str = API_URL
_client: httpx.Client = PrivateAttr()
@contextmanager
def yield_for_execution(self, context: dg.InitResourceContext):
"""Yields an HTTPX client configured for the weather API."""
try:
self._client = httpx.Client(base_url=self.base_url)
yield self
finally:
self._client.close()
def hourly_forecast(self, cities: list[City]) -> list[dict]:
"""Fetch hourly weather forecast data for specified cities."""
params = ForecastParams(cities=cities, hourly=WeatherVars.default())
response = self._client.get("forecast", params=params.to_query_params())
response.raise_for_status()
return response.json()
This resource extends ConfigurableResource and implements yield_for_execution, which manages the lifecycle of the HTTP client - creating it when needed and properly closing it when done. The resource exposes clean methods like hourly_forecast(), historical_forecast(), and actual_weather() that handle all the API interaction details.
In my assets, I simply declare the resource as a parameter:
@asset(
key_prefix=["raw"],
retry_policy=RetryPolicy(max_retries=3, delay=10),
group_name="weather_etl",
automation_condition=dg.AutomationCondition.on_cron("0 6 * * *"),
)
def hourly_forecast(weather_api_client: WeatherApiClient, s3: S3Resource):
"""Fetches hourly forecast data from the weather API for the next 7 days."""
now = datetime.now(UTC)
timestamp = now.strftime("%Y%m%dT%H%M%S")
lake_path = forecast_path(now.replace(day=1)) / f"hourly_forecast_{timestamp}.parquet"
data = weather_api_client.hourly_forecast(list(CITIES.values()))
# Add metadata fields
df = pl.from_dicts(data)
df = df.with_columns(pl.lit(timestamp).alias("extracted_at"))
# Upload data to S3
polars_to_s3(df, lake_path, s3.get_client())
return dg.MaterializeResult(
metadata={
"cities": MetadataValue.json(list(CITIES.keys())),
"s3_path": MetadataValue.url(lake_path.uri),
}
)
Dagster automatically injects these resources when the asset runs. No need to create global objects or pass them by hand, just do things declaratively. This makes testing easier too, since you can inject mock resources during tests.
Partitions
Partitions are another killer feature in Dagster that I found incredibly useful. They let you split your data pipeline into logical chunks - typically by time - and manage them independently. This is crucial for backfilling historical data or re-processing specific time periods without touching everything else.
I defined a monthly partition that starts from January 2023:
monthly_partition = dg.MonthlyPartitionsDefinition(
start_date=datetime(2023, 1, 1, tzinfo=UTC),
)
Then I applied this partition definition to my historical data assets:
@asset(
key_prefix=["raw"],
partitions_def=monthly_partition,
)
def historical_forecast(
context: AssetExecutionContext,
weather_api_client: WeatherApiClient,
s3: S3Resource
):
"""Fetches historical forecast data from the weather API."""
now = datetime.now(UTC)
start_month = datetime.strptime(context.partition_key, monthly_partition.fmt).date()
end_month = (start_month + timedelta(days=32)).replace(day=1) - timedelta(days=1)
if now.date() < end_month:
end_month = now.date()
data = weather_api_client.historical_forecast(list(CITIES.values()), start_month, end_month)
# ... process and save data
Everything happens through the context.partition_key - when this asset runs, Dagster knows which partition (which month) it's processing. This means I can:
- Backfill all historical data from 2023 by running each month independently
- Re-process just October 2025 if something went wrong with that month
- Monitor which partitions have succeeded or failed in the UI
The same pattern applies to my historical_actual_weather asset, which fetches observed weather data (as opposed to forecasts) from the archive API.
Automation Conditions
In addition to traditional scheduling, Dagster also uses AutomationCondition to determine when assets should materialize. This is more flexible than simple schedules because it can account for dependencies and other conditions.
Let's start with simple stuff. For my hourly_forecast asset, I want fresh forecast data every morning:
automation_condition=dg.AutomationCondition.on_cron("0 6 * * *") # Daily at 6 AM UTC
For the actual_weather asset that fetches observed weather (which has a 5-day API delay), I only need to run it every 5 days:
automation_condition=dg.AutomationCondition.on_cron("0 3 */5 * *") # Every 5 days at 3 AM UTC
But here's where it gets really interesting. For my transformation assets like hourly_forecast_table, I use an eager automation condition:
@asset(
key_prefix=["staged"],
deps=[historical_forecast, hourly_forecast],
automation_condition=dg.AutomationCondition.eager(),
)
def hourly_forecast_table():
"""Hourly forecast table normalized from raw JSON data."""
path = forecast_path(date=None) / "**" / "*.parquet"
df = pl.read_parquet(path.uri, missing_columns="insert").pipe(transform_weather_data)
# ... transform and save
The eager() condition means this asset will automatically run whenever any of its upstream dependencies (historical_forecast or hourly_forecast) materialize new data.
Metadata
Dagster has first-class support for metadata. Every asset can return metadata about what it produced:
return dg.MaterializeResult(
metadata={
"cities": MetadataValue.json(list(CITIES.keys())),
"s3_path": MetadataValue.url(lake_path.uri),
"start_date": MetadataValue.text(str(start)),
"end_date": MetadataValue.text(str(end)),
}
)
This metadata shows up in the Dagster UI, so you can quickly see:
- What cities were processed
- Where the data was written in S3
- What date range was covered
For the staged assets, I include schema information and row counts:
columns = [dg.TableColumn(name=col, type=str(dtype)) for (col, dtype) in df.schema.items()]
return dg.MaterializeResult(
metadata={
"column_schema": dg.TableSchema(columns=columns),
"row_count": MetadataValue.int(df.height),
}
)
You can also attach metadata to the asset definition itself:
@asset(
metadata={
"owner": "ybryz",
"api-url": MetadataValue.url(ARCHIVE_API_URL),
"endpoint": "/archive",
},
)
This documentation lives with your code and appears in the UI, making it easy for team members to understand what each asset does and where the data comes from.
Ops
Remember at the beginning I mentioned that Dagster supports both assets and ops? Most of the time, you'll use assets because they represent data artifacts. But sometimes you need to perform an action that doesn't produce a data asset - that's where ops come in.
In my pipeline, I use an op to send weather forecast updates to Discord:
@dg.op
def hourly_forecast_to_discord(
context: OpExecutionContext,
config: DiscordWebhook,
) -> None:
"""Send hourly weather forecast to a Discord channel via webhook."""
city_code = "PHIL"
forecast_date = date.today()
# Read from staged S3 table
table_path = staged_path / "hourly_forecast_table.parquet"
df = pl.read_parquet(table_path.uri)
# Filter for specific city and date
df = df.filter(
(pl.col("city_code") == city_code) &
(pl.col("time").dt.date() == forecast_date)
).sort("time")
# Build Discord message with forecast data
# ... format the message ...
# Send to Discord webhook
payload = {"content": message, "username": "Weather Bot"}
with httpx.Client() as client:
response = client.post(config.url, json=payload)
response.raise_for_status()
This op doesn't produce a data asset - it's a side effect (sending a notification). Ops are wrapped in jobs:
@dg.job(
config=dg.RunConfig(
ops={"hourly_forecast_to_discord": DiscordWebhook(url=dg.EnvVar("DISCORD_WEBHOOK_URL"))}
)
)
def hourly_forecast_to_discord_job() -> None:
"""Send a message to a Discord channel via webhook."""
hourly_forecast_to_discord()
I can trigger these jobs manually in the UI or I can attach them to events like down below:
@dg.asset_sensor(
asset_key=dg.AssetKey(["staged", "hourly_forecast_table"]),
job=hourly_forecast_to_discord_job,
default_status=dg.DefaultSensorStatus.RUNNING,
)
def hourly_forecast_to_discord_sensor(context: dg.SensorEvaluationContext):
"""Sensor to trigger Discord report when hourly forecast table is updated."""
return dg.RunRequest(run_key=context.cursor)
Sensor senses any materializations of hourly_forecast_table and triggers the job to send a Discord message. Pretty cool, huh?
Closing thoughts
There are a few other things that I didn't cover, like using env variables in your project, how definitions work, and how to deploy the pipeline. On the last part, I used Dagster Cloud to deploy, but the software is open-source so you can host it yourself locally or in the cloud. Nevertheless, check out the full project at - https://github.com/bryzy-y/weather_etl.
Overall, I found Dagster really pleasant to use - especially partitions, resources, and how observable your full pipeline is. My biggest gripe with Airflow and Prefect is that they lack observability out of the box. You can, of course, build it out yourself, but Dagster really spoils you with all of its metadata available right from the get-go. I think most of the time assets is the correct paradigm to work with. Both Prefect and Airflow also rolled out their own versions of data assets by the way. However, there will always be cases where it doesn't really fit the problem and you care more about tasks than your data artifacts. In that case, Airflow or Prefect will be a better choice.

Top comments (0)