Most telemetry pipelines look clean in architecture diagrams.
Devices publish data.
A collector receives it.
A backend stores it.
Dashboards read from it.
Simple.
Until you have to decide what to do with imperfect data.
- records arrive incomplete
- payloads vary by topic
- timestamps are missing or inconsistent
- some data is usable, but not trustworthy
At that point, the problem is no longer ingestion.
It’s data correctness at the system boundary.
That’s what I built this project to handle.
What This Project Actually Is
This service acts as a data-quality boundary between raw MQTT ingestion and downstream systems.
It does not subscribe to MQTT directly.
Instead, it receives batched telemetry records and is responsible for turning them into structured, queryable system state.
Typical topics look like:
sitesinvertersstringsweathergrid
For each record, it decides:
- Is it structurally valid?
- What fields are missing?
- How should it be normalized?
- Should it be accepted or rejected?
- What aggregate state should be updated?
The goal is not just to store data.
The goal is to ensure that everything stored is explicitly understood.
At a high level, the flow looks like this:
MQTT Collector / Buffer
↓
FastAPI Ingest Core
↓
Validation
↓
Normalization
↓
MongoDB
├── normalized_records
├── invalid_records
└── topic_aggregates
This separation is intentional.
The upstream system handles delivery reliability.
This service handles data correctness.
Mixing the two usually makes both harder to reason about.
That boundary matters.
Because raw device or telemetry data is rarely clean enough to use directly.
Why I Didn't Want "Just a Raw Ingest Endpoint"
A very common approach looks like this:
POST /ingest
→ accept payload
→ store raw document
→ defer cleanup downstream
This works early on.
But over time, it pushes data quality problems into every downstream system.
Each consumer ends up re-implementing:
- validation
- normalization
- fallback logic
- schema assumptions
That duplication is where systems start drifting apart.
What the Service Does
The FastAPI service exposes a few core endpoints:
- POST /auth
- POST /ingest
- GET /topics/summary
- GET /invalid/recent
- GET /health
Only /health and /auth are public. Everything else is JWT-protected.
That was intentional.
Once a service becomes the data-quality boundary, it should control:
- who can write
- what gets accepted
- how that data is shaped
This is about protecting the integrity of the data entering the system.
Design Decisions
1. Keeping Auth Out of the Ingest Path
The first decision was simple:
Ingestion should be authenticated, but not complicated.
So I added a small /auth endpoint that validates a configured client ID + secret and issues a JWT.
That token is then required for ingestion and read endpoints.
This gave me a few useful properties:
- Collector services can authenticate once and reuse a token
- Ingest logic stays separate from credential validation
- Downstream protected endpoints remain simple
- The service can reject unauthenticated writes before touching data
This isn’t about building a full auth system.
It’s about ensuring that ingestion is an explicit contract between services, not an open write surface.
Note: This service is designed to sit behind a buffered MQTT ingestion layer.
The buffer handles retries, batching, and offline durability, while this service
focuses purely on validation, normalization, and storage.
You can think of it as the next step after this layer:
https://dev.to/kaustubhalandkar/designing-an-offline-resilient-mqtt-buffer-with-sqlite-dj4
2. Strict Validation Without Throwing Away Useful Data
This was probably the most important design decision in the whole project.
A telemetry record can be "bad" in multiple ways:
- Unknown topic
- Missing payload entirely
- Incomplete payload
- Missing timestamp
- Partially missing expected fields
But not all of those failures should be treated the same.
So I split the logic into two layers:
Hard validation failures — make the record invalid and route it to invalid_records:
- Unsupported topic
- Missing payload
Soft data quality gaps — still allow the record to be stored as valid, but with visibility:
- Missing expected payload keys
- Fallback timestamp usage
- Null-filled required fields
That distinction ended up being extremely useful.
In real telemetry systems, partial data is often still operationally valuable.
Throwing it away completely is often worse than storing it with clear quality metadata.
The goal wasn’t strict correctness.
It was controlled acceptance.
3. Normalizing Records Into a Consistent Shape
One of the biggest problems with raw telemetry is that even "valid" data is often inconsistent.
So I normalize each accepted record before storage. That includes:
- Standardizing timestamps
- Filling missing required payload keys with
null - Tracking missing fields explicitly
- Shaping records into a consistent internal model
This means downstream consumers don’t have to guess whether a field was absent, renamed, or formatted differently by source.
The ingest core makes that decision once.
This shifts complexity away from every consumer and into a single, well-defined boundary.
Which is exactly where it belongs.
4. Timestamps Needed Clear Rules
At first, timestamp handling felt like a detail. It wasn't.
Records may arrive with:
timestamp- timestamp inside the payload (e.g.
payload.ts) - missing timestamp entirely
- malformed values
- naive timestamps with no timezone
So I made the normalization path explicit:
- Use
timestampif valid - Otherwise try a timestamp inside the payload (e.g.
payload.ts) - Otherwise fall back to
received_at - Treat naive timestamps as UTC
Time flows into everything later — ordering, summaries, freshness, debugging.
A bad timestamp policy quietly poisons all of it.
So this rule now lives in one place instead of being reinterpreted across services.
I treated timestamp normalization as a core responsibility.
5. Flexible Storage, Strict Ingest Rules
This service stores three different kinds of data:
| Collection | Purpose |
|---|---|
normalized_records |
Accepted records after validation + normalization |
invalid_records |
Rejected records with explicit error codes |
topic_aggregates |
Per-topic summary state: count, missing field count, last event and received timestamps |
That combination made MongoDB a natural fit.
The ingest rules are strict.
But the shape of incoming data varies across topics.
I wanted:
- Flexible document storage
- Simple inserts
- Topic-based querying
- Aggregate updates without over-modeling too early
So MongoDB became the persistence layer, while the FastAPI service became the place where structure is enforced.
6. Invalid Data Should Be Queryable, Not Just Logged
A lot of ingestion systems do this:
bad input → log error → drop it
That sounds fine until someone asks:
- What's failing most often?
- Which topic is malformed?
- Are devices sending incomplete data?
- Are we rejecting too aggressively?
- Did a deployment break payload shape expectations?
If invalid records only exist in logs, answering those questions becomes annoying very quickly.
So invalid records are stored intentionally with:
- Topic
- Payload
- Received time
- Error list
That gives the system a memory of failure instead of just a momentary complaint.
And operationally, that's much more useful.
Invalid data is not noise.
It’s feedback from the system.
7. Raw Counts Weren’t Enough — So I Added Topic-Level Aggregates
Raw ingestion tells you volume.
It doesn’t tell you whether your system is healthy.
So I built topic-level aggregate updates that track:
- Record count per topic
- Missing field count
- Latest event time
- Latest received time
That gives the system a lightweight operational view without needing a full analytics layer yet.
It's not a dashboard product.
But it creates the kind of summary surface area you actually need once ingestion is running continuously.
Where the Actual Friction Was
The API layer itself was straightforward.
The hard part was deciding how the system should behave under imperfect input.
The code isn't huge. But the subtle parts were much more interesting than the "API" part.
Defining What "Valid Enough" Means Is Harder Than It Sounds
One of the trickiest design questions was: when should a record be rejected vs accepted with missing fields?
That's not a purely technical decision. It's a system behavior decision.
- Reject too aggressively → you lose useful operational data
- Accept too loosely → you pollute downstream trust
So I ended up treating structural integrity as mandatory, and field completeness as observable but tolerable.
That balance felt much more realistic than pretending telemetry is always complete.
Topic Schemas Are Useful — But They Create Ownership
Each topic has its own expected payload shape:
| Topic | Scope |
|---|---|
sites |
Plant-level metrics |
inverters |
Inverter telemetry |
strings |
String-level electrical details |
weather |
Irradiance and atmospheric fields |
grid |
Power and grid metrics |
That means the ingest core now owns a kind of schema contract. Which is good.
But it also means adding a new topic is a deliberate system change, not just "new data showing up."
And honestly, I think that's the right trade-off.
"Store Everything" Sounds Simple Until Queryability Matters
Raw ingestion and useful ingestion are not the same thing.
If you just store incoming payloads blindly, you'll probably feel productive for a while. But later you'll want to ask:
- Which records were incomplete?
- Which topic is most degraded?
- What's the latest valid data per category?
- How many records are structurally invalid?
Those questions only become answerable if the ingest layer stores intent, not just bytes.
That's why normalization, invalid storage, and aggregate tracking ended up mattering more than the endpoint itself.
Error Handling Is Part of the Data Model
One thing this project reinforced for me:
Bad data is still data.
An invalid telemetry record is not "nothing." It's a signal — of schema drift, upstream bugs, partial device failure, rollout mistakes, or simply incomplete operational conditions.
That's why I increasingly think of error handling as part of the data model, not just exception handling.
Once I started thinking that way, the service design got a lot cleaner.
What Shifted in How I Think
Before building this, I thought of ingestion as a transport problem.
Now I think of it as a trust boundary problem.
That boundary decides:
- What gets accepted
- What gets normalized
- What gets rejected
- What becomes queryable system state
- What quality guarantees downstream code can rely on
That's a much more important role than "just receive and store."
One thing that became clear:
In practice, keeping partially correct data is often better than forcing everything to look clean.
Real systems don't always give you perfect input. Sometimes the right move is not to reject everything imperfect. Sometimes it's to:
- Accept what is structurally usable
- Preserve missingness explicitly
- Separate invalid data cleanly
- Make the trade-offs visible
That felt like the right design for this kind of ingestion boundary.
Final Takeaway
I didn’t build an ingestion API.
I built a boundary that decides what data becomes part of the system.
A FastAPI service that turns raw telemetry into something downstream systems can reason about.
Not by overengineering it. Just by being very explicit about authentication, validation, normalization, invalid record handling, and operational visibility.
And honestly, this is where systems tend to either stay clean…
or become harder to reason about over time.
If You've Built Something Similar
I'd genuinely be curious:
- Do you reject incomplete telemetry, or store it with quality metadata?
- Do you treat invalid records as operational artifacts or just log noise?
- Where do you define your "data trust boundary"?
That seems to be where the real system design decisions start showing up.
Top comments (0)