The Problem That Wouldn't Go Away Early in a project I was working on, a customer filed a billing dispute. Simple enough, except when we went to check what subscription plan they were on at the time of the charge, we couldn't tell. Our database only stored the current state. The plan had been upgraded twice since then. The old values were gone. We weren't doing anything unusual. We had created_at and updated_at columns like every other project. But those tell you when a row changed, not what it looked like before it changed. That moment stuck with me. And I kept running into variations of it:
"What was the price of this product on Black Friday last year?" "What were this employee's permissions when that action was taken?" "What did this contract say before it was amended?"
The answer was always the same: we don't know.
Why The Standard Fixes Are Painful Most teams handle this one of three ways, and none of them are great. The audit log table a separate table that records every change as an event. Works for compliance but horrible to query. Reconstructing state at a point in time means replaying a sequence of events, which is complex and slow. Soft deletes with versioning, adding is_current, version, superseded_at columns to your main table. Gets messy fast, pollutes your queries, and every developer on the team implements it slightly differently. Just accepting the loss, the most common approach. You tell yourself you'll fix it later. You never do. What all three approaches lack is a clean, queryable model for time itself.
Bitemporal Modeling... The Right Mental Model There's a well-established concept in database theory called bitemporal modeling. It tracks two independent time axes for every record:
Valid time, when the fact was true in the real world Transaction time, when you recorded it in the database
Most audit systems only track transaction time. Chrono-temporal tracks both, which means you can answer questions like "what did we know on January 1st about what was true last July?" which matters enormously for compliance, retroactive corrections, and debugging. The core idea is simple: instead of mutating records in place, you store versioned snapshots with time ranges:
Core IdeaInstead of mutating records:
You store versioned snapshots with time ranges.
entity_type entity_id valid_from valid_to data
user user_001 2024-01-01 2024-06-01 {"plan":"free"}
user user_001 2024-06-01 NULL {"plan":"pro"}
valid_to = NULL → current record
Updates = close current record + insert new version
Old data is never destroyed
Building chrono-temporal
Chrono Temporal is a temporal data layer for PostgreSQL that allows you to query your data at any point in time, track changes, and compute diffs.
I decided to build this as a reusable Python library rather than solving it again from scratch on every project. The goals were:
Drop into any existing PostgreSQL project with no architectural changes
Clean async Python API built on SQLAlchemy 2.0
Support time-travel queries, full history, and diffing out of the box
Ship as a proper PyPI package
pip install chrono-temporal
The Core Model
Everything is stored in a single temporal_records table:
class TemporalRecord(Base):
tablename = "temporal_records"
id = Column(Integer, primary_key=True)
entity_type = Column(String(100), nullable=False) # e.g. "user", "product"
entity_id = Column(String(255), nullable=False) # your entity's ID
valid_from = Column(DateTime(timezone=True), nullable=False)
valid_to = Column(DateTime(timezone=True), nullable=True) # NULL = currently active
data = Column(JSON, nullable=False) # the full payload
notes = Column(Text, nullable=True)
The entity_type + entity_id combination identifies your entity. The data field is a JSON payload, fully flexible, works with any entity in your system without schema changes.
The Service API
The TemporalService class wraps all the temporal query logic:
from chrono_temporal import TemporalService, TemporalRecordCreate
svc = TemporalService(session)
Create a versioned record
await svc.create(TemporalRecordCreate(
entity_type="user",
entity_id="user_001",
valid_from=datetime(2024, 1, 1, tzinfo=timezone.utc),
data={"name": "Daniel", "plan": "free", "email": "daniel@example.com"}
))
Time-travel query — what did this look like in March 2024?
records = await svc.get_at_point_in_time(
"user", "user_001",
datetime(2024, 3, 1, tzinfo=timezone.utc)
)
Full history — every version ever
history = await svc.get_history("user", "user_001")
Diff — what changed between two dates?
diff = await svc.get_diff(
"user", "user_001",
datetime(2024, 1, 1, tzinfo=timezone.utc),
datetime(2025, 7, 1, tzinfo=timezone.utc),
)
print(diff["changed"]) # {"plan": {"from": "free", "to": "pro"}}
print(diff["unchanged"]) # ["name", "email"]
A Real Example — Product Price History
To show how this composes with real business logic, here's a complete price history service built on top of chrono-temporal:
from chrono_temporal import TemporalService, TemporalRecordCreate
from datetime import datetime, timezone
from typing import Optional
import uuid
ENTITY_TYPE = "product"
class PriceHistoryService:
def init(self, db):
self.temporal = TemporalService(db)
async def create_product(self, name, category, price, currency="USD"):
"""Add a new product with its initial price."""
record = await self.temporal.create(
TemporalRecordCreate(
entity_type=ENTITY_TYPE,
entity_id=f"prod_{uuid.uuid4().hex[:8]}",
valid_from=datetime.now(timezone.utc),
data={
"name": name,
"category": category,
"price": price,
"currency": currency,
},
)
)
return record
async def update_price(self, product_id, new_price, effective_from=None, reason=None):
"""Update price — closes current record and creates new version."""
effective_from = effective_from or datetime.now(timezone.utc)
current = (await self.temporal.get_current(ENTITY_TYPE, product_id))[0]
await self.temporal.close_record(current.id, effective_from)
return await self.temporal.create(
TemporalRecordCreate(
entity_type=ENTITY_TYPE,
entity_id=product_id,
valid_from=effective_from,
data={**current.data, "price": new_price},
notes=reason or f"Price changed from {current.data['price']} to {new_price}",
)
)
async def get_price_at(self, product_id, as_of):
"""What was the price on a specific date?"""
records = await self.temporal.get_at_point_in_time(
ENTITY_TYPE, product_id, as_of
)
return records[0] if records else None
async def get_lowest_price_ever(self, product_id):
"""The lowest price this product has ever had."""
records = await self.temporal.get_history(ENTITY_TYPE, product_id)
return min(records, key=lambda r: r.data["price"]) if records else None
The REST API Layer
The library also ships with a FastAPI layer if you want a ready-made service rather than embedding the library directly:
bash
git clone https://github.com/Daniel7303/chrono-temporal
docker-compose up --build
That gives you a running API at http://localhost:8000 with Swagger docs, API key authentication, and all endpoints ready to use:
Method Endpoint Description
POST /api/v1/temporal/ Create a record
GET /api/v1/temporal/entity/{type}/{id}/current Current state
GET /api/v1/temporal/entity/{type}/{id}/history Full history
GET /api/v1/temporal/entity/{type}/{id}/as-of Time-travel query
GET /api/v1/temporal/entity/{type}/{id}/diff Diff two dates
PATCH /api/v1/temporal/{id}/close Close a record
Where This Is Useful
The pattern applies anywhere your data has a meaningful history:
Subscription billing — what plan was active during a disputed charge?
E-commerce pricing — full price history, lowest/highest ever, Black Friday comparisons
HR systems — salary history, role changes, department transfers
Contracts and legal — every amendment, what the terms said at signing
Inventory — stock levels at any point in time
Compliance — immutable audit trail, regulatorily defensible
Feature flags — what flags were active when this bug occurred?
Basically anywhere you've ever said "we should have kept the old value" — this is the systematic fix.
What's Different From Other Approaches
vs. event sourcing — Event sourcing stores events and derives state by replaying them. chrono-temporal stores snapshots directly, which makes point-in-time queries trivial and doesn't require a replay engine.
vs. temporal tables (PostgreSQL extension) — temporal_tables handles transaction time automatically at the database level, but querying it from Python is verbose. chrono-temporal gives you a clean Python API and handles both valid time and transaction time explicitly.
vs. Dolt / other temporal databases — Those require replacing your database entirely. chrono-temporal works on your existing PostgreSQL instance with one new table.
vs. rolling your own — You could build this yourself. Most teams do, once, inconsistently, with subtle bugs in the boundary conditions. chrono-temporal has a full test suite covering edge cases like exact boundary queries, overlapping records, and empty-state handling.
Current State and Roadmap
The library is 19 days old and on PyPI. The roadmap includes:
Django ORM support
Timeline summary endpoint
Hosted cloud version for teams that don't want to manage the infrastructure
pip install chrono-temporal
GitHub: github.com/Daniel7303/chrono-temporal-api-framework
Demo: https://chrono-temporal-ap.onrender.com/docs#/
Pypi: https://pypi.org/project/chrono-temporal/
If you're dealing with any of the use cases above I'd genuinely love to hear how it fits or doesn't fit your situation. And if you find a bug or edge case the test suite doesn't cover, open an issue.
Built with Python 3.11, SQLAlchemy 2.0, asyncpg, Pydantic 2, FastAPI, PostgreSQL.
Top comments (0)