A deal gets created, a contact gets created, an account gets created… and then the audit trail tells three different stories depending on whether you’re looking at webhooks, API logs, or what the CRM UI eventually shows.
When I first tried to audit our CRM workflow, I assumed the raw event feed would behave like a clean ledger: one event per state change, perfectly ordered, perfectly delivered. That assumption didn’t survive first contact with reality.
So I started treating the audit trail like a noisy sensor: it’s reporting on a real underlying process, but it drops readings, repeats readings, and sometimes reports them late.
The system I ended up building does four things:
- Normalize heterogeneous records into one event shape.
-
Sessionize events by
deal_id, including a first-step “creation burst” when Contact+Deal+Account are created together. - Compress each session into a canonical state path.
- Extract a transition graph whose edges carry both counts and observed durations.
The hard part isn’t drawing the graph. It’s deciding what the graph even means when you only partially observe the underlying process.
Key insight: deal_id is the spine
The most stabilizing decision in this audit pipeline is also the simplest: I treat deal_id as the primary key for process reconstruction.
Not because deals are the only entity that matters, but because they’re the one entity that reliably ties the story together across:
- Event streams from webhooks
- Event streams from API logs
Everything else—contact IDs, account IDs, UI timestamps—can be missing, duplicated, reordered, or delayed. But if I can anchor events to a deal, I can reconstruct a timeline that’s good enough to reason about.
How it works end-to-end (timeline → path → graph)
The pipeline has three representations of the same underlying thing:
- Event timeline (raw, noisy)
- Compressed state path (canonical labels, sessionized)
- Transition graph (adjacency matrix with time‑weighted edges)
Exactly one diagram, because one is all you need if it’s the right one.
flowchart TD
subgraph ingestion ["Multi-source ingestion"]
webhookStream[Webhooks] --> normalize[Normalize]
apiLogStream[API logs] --> normalize
pgSnapshot[DB snapshots] --> normalize
end
normalize --> sessionize[Sessionize by deal_id]
sessionize --> compress[Compress state path]
compress --> transitions[Extract transitions]
transitions --> adjacency[Time-weighted adjacency]
adjacency --> analytics[Heatmaps · CDFs · shortcuts]
The rest of this post walks through each step with the concrete shapes I use, the wrong turn I took early, and the two ways I handle missing states without lying to myself.
Preprocessing: event normalization and canonical state labels
Webhooks and API logs don’t share a schema. So the first job is to normalize them into a single event record.
The normalized shape I need is driven by what I do later (sessionization and transition extraction). At minimum:
-
deal_id(primary key) timestamp-
source(webhook vs API log vs snapshot) -
entity_type(deal/contact/account) -
event_type(created/updated/etc.) -
canonical_state(derived label)
In my system, the raw payload formats and the canonical label mapping rules are environment-specific, and I’m not going to paste them here. What I can do is make the contract explicit, and make it impossible to accidentally treat a stub as a working normalizer.
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional
@dataclass(frozen=True)
class NormalizedEvent:
deal_id: str
timestamp: datetime
source: str # e.g., "webhook", "api_log", "snapshot"
entity_type: str # e.g., "deal", "contact", "account"
event_type: str # e.g., "created", "updated"
canonical_state: str
raw: Dict[str, Any]
def normalize_event(raw: Dict[str, Any], *, source: str) -> Optional[NormalizedEvent]:
"""Normalize a raw webhook/log/snapshot record into a common event shape.
This function is intentionally an interface example.
Why: the exact field extraction rules depend on your CRM payload schemas
and your canonical state vocabulary.
Safety: raising here prevents a silent 'return None' footgun that can
corrupt downstream counts.
"""
raise NotImplementedError(
"Implement payload parsing + canonical state mapping for your sources."
)
Canonical state labels aren’t just a prettier name for a stage. They’re a projection of heterogeneous events into a state-machine vocabulary you control.
If you don’t control the vocabulary, you can’t compare runs.
Sessionization: grouping events into process runs
A single deal_id can have multiple “runs” worth of activity depending on how your business operates (re-opened deals, retries, manual edits). That’s why sessionization exists.
Sessionization does two jobs:
- Split a long event list into contiguous sessions based on time gaps.
- Merge closely related entity creation events (Contact+Deal+Account) into a single “creation burst” so the initial state path isn’t fragmented.
Unlike normalization, sessionization can be demonstrated generically because it only depends on timestamps and a couple of tunable time windows.
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Iterable, List, Sequence
# NormalizedEvent is defined in the normalization step above.
@dataclass(frozen=True)
class Session:
deal_id: str
events: List[NormalizedEvent]
def sessionize(
events: Iterable[NormalizedEvent],
*,
gap: timedelta,
creation_burst_window: timedelta,
) -> List[Session]:
"""Group normalized events into sessions per deal_id.
Rules:
- Events are sorted by timestamp.
- A new session starts when the gap between consecutive events exceeds `gap`.
- A 'creation burst' is a short window at the start of a session; if multiple
creation events across entity types land in that window, they are kept
together as the single start of the session (not split into separate sessions).
Note: creation-burst handling here is deliberately conservative: it does not
rewrite states; it only prevents the session boundary logic from fragmenting
the initial bundle.
"""
by_deal: Dict[str, List[NormalizedEvent]] = {}
for e in events:
by_deal.setdefault(e.deal_id, []).append(e)
out: List[Session] = []
for deal_id, evs in by_deal.items():
evs_sorted = sorted(evs, key=lambda x: x.timestamp)
if not evs_sorted:
continue
current: List[NormalizedEvent] = [evs_sorted[0]]
session_start = evs_sorted[0].timestamp
for e in evs_sorted[1:]:
prev = current[-1]
dt = e.timestamp - prev.timestamp
# If we're still inside the initial creation-burst window, never split.
in_creation_burst = (prev.timestamp - session_start) <= creation_burst_window
if (dt > gap) and (not in_creation_burst):
out.append(Session(deal_id=deal_id, events=current))
current = [e]
session_start = e.timestamp
else:
current.append(e)
out.append(Session(deal_id=deal_id, events=current))
return out
Why this matters: transition extraction assumes you have a coherent sequence. If you let a creation bundle appear as three independent starts, your graph will over-count early-stage transitions and under-count the “true” first state.
Transition extraction: adjacency with time‑weighted edges
Once I have a session, I compress it into a state path:
- remove redundant repeats (same canonical state repeated by multiple updates)
- keep the first timestamp for each state occurrence
Then I extract transitions: (state_i -> state_{i+1}).
“Time‑weighted edges” means each edge carries both:
- a count (how often did this transition occur)
- a list of observed durations (how long did it take)
I store durations rather than pre-aggregating them, because averages can hide multi-modal behavior.
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
State = str
Edge = Tuple[State, State]
@dataclass
class EdgeStats:
count: int
durations: List[timedelta]
def build_adjacency_matrix(
state_path: List[Tuple[State, datetime]],
) -> Dict[Edge, EdgeStats]:
"""Build an adjacency map from a timestamped state path.
Input: [(state, entered_at), ...] in chronological order.
Output: {(from_state, to_state): EdgeStats(count, durations)}
The 'time-weight' stored here is the raw observed duration per transition.
"""
adjacency: Dict[Edge, EdgeStats] = {}
for (s1, t1), (s2, t2) in zip(state_path, state_path[1:]):
edge = (s1, s2)
dt = t2 - t1
if edge not in adjacency:
adjacency[edge] = EdgeStats(count=0, durations=[])
adjacency[edge].count += 1
adjacency[edge].durations.append(dt)
return adjacency
Partial observability: why naive counts lie
Webhooks don’t always arrive. API logs don’t always include everything. That means you can observe:
-
A -> Cin your timeline
…even though the real process was:
A -> B -> C
If you naively count A -> C, you will:
- inflate shortcut edges
- deflate the missing state’s incoming/outgoing edges
- misidentify bottlenecks (because time spent in
Bdisappears)
One mitigation I use is to augment event streams with periodic database snapshots of current entity state. When a snapshot implies a state that never appeared in the event stream, I insert an imputed event tagged with its source so it can’t be confused with an observed transition.
That doesn’t magically give you truth. It reduces a systematic bias: “missing delivery looks like a shortcut.”
EM over missing states (conceptual only)
Below is conceptual pseudocode—not runnable code. A real EM implementation needs your concrete state space, constraints, and safety checks (convergence criteria, caps on path explosion, and validation that probabilities stay well-formed).
PSEUDO-CODE — for conceptual illustration only (DO NOT RUN)
Goal:
Estimate transition probabilities while accounting for possibly-missing intermediate states.
initialize T from observed adjacent transitions (or uniformly)
repeat until convergence:
expected_counts = 0
for each observed path:
for each adjacent observed pair (A, C):
if missing intermediates are possible between A and C:
infer a distribution over candidate hidden sequences A -> ... -> C using current T
add fractional expected counts along each candidate sequence
else:
expected_counts[A,C] += 1
normalize each row of expected_counts into T
return T
The point of EM here isn’t magic—it’s honesty. Instead of declaring “it was A→C,” you spread probability mass across plausible intermediates.
Entropy-aware tie-breaker for ambiguous transitions
Sometimes I don’t want the complexity of EM, or I need a deterministic path for visualization.
In that case:
- If multiple next states are plausible, I pick the one with the lowest uncertainty only if that uncertainty is below a configured tolerance.
- Otherwise, I split the transition into multiple edges with fractional counts.
That second branch is the whole point: preserve ambiguity instead of forcing a single edge.
Analytic outputs: heatmaps, CDFs, and shortcut detection
Once you have an adjacency matrix with per-edge durations, three outputs are straightforward.
Bottleneck heatmaps
Heatmaps are a visualization layer over the adjacency matrix.
- X axis: from_state
- Y axis: to_state
- cell value: count, median duration, or another aggregation
Mean-time-to-transition CDFs
Averages are brittle. CDFs tell you what fraction of transitions complete within a given time.
Given EdgeStats.durations, you can compute an empirical CDF per edge and compare edges to find “slow tails” that don’t show up in mean/median.
Anomalous shortcut detection
A “shortcut” is an edge that appears in the graph but violates the expected ordering implied by your canonical state model.
Method:
- define allowed transitions (your expected adjacency)
- flag observed edges outside that set
- rank by count and/or time impact
What went wrong first: I trusted arrival order
My first implementation treated the event stream as inherently ordered: whatever arrived first must have happened first.
That was the wrong architecture.
Once I mixed sources (webhooks + logs), arrival order became a meaningless artifact of network timing. The fix was to make timestamp ordering a first-class requirement:
- parse timestamps
- sort within deal_id
- only then sessionize
The symptom that finally forced the change was simple: I kept chasing “phantom shortcuts” that disappeared the moment I stopped trusting arrival order.
Closing
Once I anchored process reconstruction on deal_id, normalized the vocabulary, sessionized creation bursts, and let edges carry both counts and observed durations, the audit trail stopped being a pile of contradictory stories and started behaving like a model I could interrogate—without pretending the missing pieces weren’t missing.
Top comments (0)