DEV Community

Rizwan Saleem
Rizwan Saleem

Posted on

Designing a Time-Series Event Sourcing System for IoT Glasshouse Monitoring

Designing a Time-Series Event Sourcing System for IoT Glasshouse Monitoring

Designing a Time-Series Event Sourcing System for IoT Glasshouse Monitoring

In this tutorial, you’ll learn how to design a scalable, maintainable time-series event sourcing system tailored to monitoring IoT-enabled glasshouses (greenhouses, conservatories, or urban farm domes). The goal is to capture every sensor reading and user action as events, query them efficiently, and support retroactive analytics, offline processing, and robust disaster recovery. We’ll walk through architecture, data models, storage strategies, event schemas, aggregation pipelines, and operational practices. A small sample implementation in TypeScript/Node.js with PostgreSQL and Apache Kafka is included to illustrate the concepts end-to-end.

Why time-series event sourcing for IoT glasshouses?

  • High volume: multiple sensors (temperature, humidity, soil moisture, CO2, light) producing frequent readings.
  • Irregular writes: sensors may burst data during weather events; network hiccups may delay deliveries.
  • Auditability: you want a complete history of sensor values and control actions (valve open/close, heater on/off).
  • Retroactive analytics: you may need to recompute metrics with new aggregation rules or correct misreadings without re-ingesting raw data.
  • Reproducibility: deterministic event streams enable reproducible experiments and fault isolation.

Overview of the architecture

  • Ingest layer: devices publish sensor readings and actuator commands as events to a message bus.
  • Event store: an append-only log of all events, stored with high write throughput and immutability guarantees.
  • Read models: derived views for dashboards, alerts, and analytics, built via streaming or batch processors.
  • Command layer: a thin layer that issues desired state changes as command events, validated against current state.
  • Processing stack: a mix of real-time stream processing and offline batch pipelines for heavy analytics.
  • Storage strategy: a hybrid approach using optimized time-series storage for raw data and relational/columnar databases for aggregations and metadata.
  • Observability and DevOps: tracing, schema evolution, schema registry, and robust deployment strategies to avoid breaking changes.

1) Data model and event schemas

  • Core event types
    • SensorReading
    • sensorId: string
    • deviceId: string
    • timestamp: ISO 8601 timestamp
    • reading: object with key-value pairs (e.g., { temperature: 22.4, humidity: 58.2 })
    • quality: enum (good, suspected, bad)
    • ActuatorCommand
    • actuatorId: string
    • deviceId: string
    • timestamp: ISO 8601
    • command: string (e.g., "set_temperature_target", "open_window")
    • value: any (target value or mode)
    • correlationId: string (tracing across systems)
    • ActuatorEvent
    • actuatorId, deviceId, timestamp
    • state: string (e.g., "on", "off", "open", "closed")
    • reason: string (optional)
    • StateSnapshot
    • deviceId: string
    • timestamp: ISO 8601
    • state: object (latest known control state, e.g., heater: on, fan: off)
  • Metadata and lineage

    • DeviceMetadata
    • deviceId, deviceType, location, model, firmwareVersion
    • SchemaVersion
    • version: number
    • migratedAt: timestamp
    • EventAudit
    • eventId, source, ingestionTime, validity
  • Why an event-centric model?

    • You can replay events to rebuild state or compute alternative aggregates.
    • It naturally handles late arrivals and out-of-order data with proper reconciliation.
    • It simplifies auditing and compliance since every action is immutable.

2) Ingestion and transport

  • Protocols
    • MQTT for edge devices (low power, unreliable networks)
    • HTTP/REST or WebSockets for gateways and dashboards
    • Kafka or pulsar as the central event bus for reliability and scalability
  • Idempotency and dedupe
    • Use per-event unique identifiers (eventId) and idempotent processors
    • At least-once delivery semantics with deduplication in the consumer or store
  • Schema evolution discipline
    • Use a schema registry (confluent schema registry or similar) to version event schemas
    • Validate events at ingress; reject or route failures to a dead-letter queue

3) Storage architecture

  • Event store (append-only)
    • Choose a storage system that supports high write throughput and efficient time-based partitioning
    • Options: Apache Kafka topics with compacted keys, or a purpose-built append-only store (e.g., AWS Kinesis, Google Pub/Sub with a durable sink)
  • Read-optimized time-series store
    • Use a columnar time-series database (e.g., TimescaleDB, InfluxDB) for high-volume sensor data
    • Store raw SensorReading events as time-series with tags: deviceId, sensorId, location
  • Metadata and relational data
    • PostgreSQL or CockroachDB for device metadata, user accounts, and configuration
    • Use hypertables (TimescaleDB) or partitioned tables to scale
  • Aggregations and dashboards
    • Materialized views or event-driven batch jobs to maintain:
    • Rolling averages, min/max, quantiles per device and sensor
    • Environmental indices (e.g., dew point, heat index)
    • Alerts based on thresholds and trends
  • Data retention and tiering
    • Raw data: keep 30-90 days in detail, downsample to hourly for longer retention
    • Aggregated data: keep per-minute/hour/day aggregations for several years

4) Event processing and read models

  • Real-time stream processing
    • Use a stream processor (Kafka Streams, Flink, or Spark Structured Streaming)
    • Tasks:
    • Filter invalid readings and apply quality gates
    • Compute per-device state windows (e.g., heat index)
    • Emit derived events (e.g., TemperatureAlert if temperature exceeds threshold)
  • Batching and offline analytics
    • Periodic ETL jobs (Spark or dbt) to compute longer-term metrics
    • Rebuild read models when schemas evolve or new aggregations are requested
  • Read models examples
    • LatestState view: for dashboards to show current heater/vent states
    • DailySummary: min/max/avg per sensor per day
    • AnomalyStream: events flagged as anomalies with severity
    • LocationHeatMap: per-location aggregates for visualization

5) Consistency, correctness, and replayability

  • Event sourcing pattern specifics
    • Store the entire sequence of events per device
    • Rehydrate device state by replaying events in order
    • Use snapshots to optimize startup time (periodic StateSnapshot for device state)
  • Time synchronization
    • Ensure all timestamps are in UTC and use precise clock sources on devices
    • Consider clock drift handling in edge devices and compensating at ingestion
  • Recovery strategies
    • Backups for event store, periodic snapshots
    • Play back events from a known checkpoint in case of failure
    • Maintain a dead-letter path for malformed events to avoid data loss

6) Security and compliance

  • Access control
    • Role-based access control for dashboards, ingestion endpoints, and admin portals
  • Data integrity
    • Signed events or HMAC to detect tampering
  • Privacy
    • Anonymize or pseudonymize data where applicable (e.g., patient or sensitive farm data)
  • Compliance
    • Audit trails for data access and modifications
    • Data retention policies aligned with regulations

7) Implementation sketch: a minimal end-to-end example
Stack choices (example)

  • Edge: MQTT to gateway
  • Ingestion: Kafka cluster with topics per event type
  • Storage:
    • TimescaleDB for raw sensor time-series
    • PostgreSQL for device metadata
  • Processing: Kafka Streams for real-time dashboards; Spark for batch jobs
  • API: Express/Fastify REST endpoints for dashboards; GraphQL surface for clients

Project layout (TypeScript/Node.js)

  • packages/
    • ingest-service/
    • publishes SensorReading and ActuatorCommand to Kafka
    • processor-service/
    • Kafka Streams app processing SensorReading to produce TemperatureAlerts and DailySummaries
    • read-models/
    • API server serving latest state and analytics from PostgreSQL/TimescaleDB
    • domain/
    • event definitions and serializers/deserializers
  • infra/
    • docker-compose.yml to run local dev stack (Kafka, Zookeeper, TimescaleDB, PostgreSQL)
  • scripts/
    • migrate-schemas.sh
    • rebuild-read-models.sh

Key code patterns

  • Event interface (TypeScript)

    • interface SensorReadingEvent { eventId: string; deviceId: string; timestamp: string; sensorId: string; value: number; unit: string; quality?: string; }
  • Ingestion publisher (producer)

    • const producer = new Kafka.Producer({ clientId: 'ingest', brokers: ['kafka:9092'] });
    • function publishSensorReading(e: SensorReadingEvent) { producer.send({ topic: 'sensor-reading', messages: [{ key: e.deviceId, value: JSON.stringify(e) }] }); }
  • Real-time processor (Kafka Streams-like)

    • Consume sensor-reading topic, group by deviceId, compute per-minute averages, emit to an alerts topic if thresholds breached
    • Example pseudocode:
    • stream.filter(validReading)
    • .groupByKey()
    • .windowedBy(TimeWindows.ofMinutes(1))
    • .aggregate({ sum: 0, count: 0 }, (acc, r) => { acc.sum += r.value; acc.count += 1; return acc; })
    • .map((agg) => { const avg = agg.sum / agg.count; return { deviceId: key, timestamp: now, avg, type: 'per-minute-avg' }; })
    • .to('sensor-aggregates')
  • Read model API (Express)

    • GET /devices/:id/latest reads from TimescaleDB for latest sensor values
    • GET /devices/:id/daily-summary?date=YYYY-MM-DD joins raw data with precomputed aggregates
    • GET /alerts returns active alerts from the alert table

8) Operational practices

  • Schema evolution
    • Tag each event with a schemaVersion; implement migrations that can be replayed
    • Do not break backward compatibility; add new fields as optional
  • Testing strategy
    • Unit tests for event serializers/deserializers
    • Integration tests that simulate out-of-order events, late arrivals, and missing data
    • End-to-end tests that ingest synthetic data and validate read models
  • Observability
    • Metrics: ingestion latency, event drop rate, processing lag, read-model refresh times
    • Tracing: propagate correlation IDs through all services
    • Dashboard: status page showing device health, weather patterns, and alert counts
  • Deployment
    • Use blue/green or canary releases for critical services
    • Immutable infrastructure for service containers
    • Reliable rollback plans in case of schema or code regressions

9) Practical pitfalls and how to avoid them

  • High cardinality sensors
    • Avoid creating excessive shard keys; coarsen sensor dimensions when possible
    • Use hierarchical keys (deviceId.location.sensorType) to enable efficient partition pruning
  • Late-arriving data and out-of-order events
    • Implement watermarking in the stream processor
    • Accept a bounded lateness window for aggregations
  • Data retention costs
    • Downsample aggressively after a retention threshold
    • Archive raw data to cold storage when possible
  • Operational complexity
    • Start with a minimal viable product (MVP) and incrementally add features
    • Use managed services for Kafka/TimescaleDB if operational overhead is high

Illustrative example: a simple per-device daily average read

  • Goal: compute daily average temperature for each device
  • Flow:
    • Ingest SensorReading events with sensorId = "temp"
    • Stream processor buffers values per device per day
    • On midnight UTC, emit DailyAverage(datestr, deviceId, avgTemp)
  • Pseudocode (stream processor)
    • key = deviceId
    • window = day
    • aggregate: sum and count for value where sensorId == "temperature"
    • on window close: write DailyAverage event or update read-model database
  • Benefits
    • Efficient long-term analytics without reprocessing raw data
    • Clear separation of raw ingestion and derived analytics

When to extend or adapt

  • If you have extremely tight latency requirements, lean towards in-memory read models and micro-batching optimized for your access patterns.
  • For edge scenarios with intermittent connectivity, implement local buffering on devices and a robust reconciliation protocol on reconnect.
  • If you require strong consistency guarantees across multiple devices, consider a distributed transaction approach or compensating actions, though it adds complexity.

Closing thoughts
A time-series event sourcing architecture for IoT glasshouses provides a robust foundation for auditability, scalable ingestion, and flexible analytics. By treating every reading and action as an immutable event, you gain replayability, easier retroactive analytics, and a clear path to evolving the system without breaking existing behavior. Start with a constrained MVP focusing on a few sensors and a single device, then scale horizontally by adding more devices, more sensors, and additional read models.

Would you like a starter repository with a minimal Docker setup and a working example of a SensorReading ingestion pipeline in Node.js to experiment locally? If yes, tell me your preferred stack details (e.g., Kafka vs. alternative, TimescaleDB vs. InfluxDB, and whether you want a REST or GraphQL API), and I’ll tailor a ready-to-run template.

-

Rizwan Saleem | https://rizwansaleem.co

Top comments (0)