Most RevOps engineering projects start the same way.
Someone on the revenue team identifies a pain point, deals going dark, follow-ups falling through, CRM data rotting quietly in the background. They bring it to engineering. Engineering scopes it. Somebody says "we could automate that." Everyone agrees.
Three months later, there is a sprawling Node service that handles CRM webhooks, enriches contact data, scores leads, triggers email sequences, updates deal stages, sends Slack alerts and logs everything to a data warehouse. It works, until it does not. One upstream API changes. One webhook payload shifts format. One enrichment service goes down for 40 minutes. And now the entire pipeline is silent, nobody knows why and half the team is afraid to touch the code.
This is what happens when you build CRM automation as a monolith. Not because the engineers were sloppy. Because the problem seduces you into building too much in one place.
This post is about building signal-to-action workflows the other way, modular, composable, debuggable and designed to survive the inevitable moment when one piece breaks without taking everything else down with it.
First: What a signal-to-action workflow actually is
Before the architecture, the definition, because "signal-to-action" gets used loosely and it matters to be precise.
A signal is any event that changes the probability a deal will advance or decay if acted on or ignored. CRM events, call transcripts, website behavior, intent data, stakeholder changes, all of these are signals. They are inputs that carry information about where a deal or account is right now.
An action is anything that happens in response: a follow-up email goes out, a CRM field updates, a Slack message fires, a task gets created, a sequence triggers. Actions are the outputs.
The workflow is the intelligence layer in between, the thing that takes a signal, understands its context, decides what action is warranted and makes sure that action executes reliably.
The problem with monolithic sales workflow automation is that all three of these concerns, signal ingestion, decision logic and action execution, get tangled together in a single codebase. When you need to change how actions execute, you touch the same code that handles signal ingestion. When one enrichment API fails, it blocks action execution entirely. The whole thing becomes fragile in proportion to how tightly the pieces are coupled.
The alternative is a pipeline architecture: discrete, independently deployable services connected by an event queue. Each service does one thing. Each failure is contained. Each piece can be tested, upgraded and replaced without touching the others.
Here is how to build it.
The architecture: Four layers, one queue
[Signal Sources]
↓
[Ingestion Layer] ← normalizes all incoming signals
↓
[Event Queue] ← the backbone; decouples everything
↓
[Enrichment Layer] ← adds context before decisions are made
↓
[Decision Layer] ← determines what action is required
↓
[Execution Layer] ← carries out the action across the GTM stack
↓
[Observability] ← logs, traces, failure alerts
Each layer is a separate service. They communicate exclusively through the queue. No layer knows about the internal implementation of any other. This is the design decision that keeps the system maintainable as it grows.
Layer 1: Signal ingestion - normalize everything at the boundary
Your signal sources will be inconsistent. Your CRM fires webhooks in one format. Your call intelligence platform fires them in another. Your intent data provider sends batch files on a schedule. Your website sends behavioral events via a tracking endpoint.
If you let each of these formats bleed into your core workflow, you will write format-specific handling code everywhere. Instead, write a thin adapter for each source that normalizes into one canonical schema at the point of entry.
# Canonical signal schema
@dataclass
class Signal:
signal_id: str
signal_type: str # "crm_event" | "call_complete" | "intent" | "behavioral"
source: str # "hubspot" | "gong" | "bombora" | "website"
account_id: str
deal_id: Optional[str]
contact_id: Optional[str]
payload: dict # normalized, source-agnostic
urgency: str # "immediate" | "high" | "standard" | "low"
confidence: float # 0.0 - 1.0
fired_at: datetime
received_at: datetime
# HubSpot CRM webhook → normalized signal
class HubSpotAdapter:
def normalize(self, raw: dict) -> Signal:
return Signal(
signal_id=f"hs_{raw['objectId']}_{raw['propertyName']}",
signal_type="crm_event",
source="hubspot",
account_id=raw.get("companyId"),
deal_id=raw.get("objectId"),
contact_id=raw.get("contactId"),
payload={
"event": raw["propertyName"],
"old_value": raw.get("propertyValue", {}).get("from"),
"new_value": raw.get("propertyValue", {}).get("to"),
},
urgency=self._classify_urgency(raw),
confidence=0.95,
fired_at=datetime.fromtimestamp(raw["occurredAt"] / 1000),
received_at=datetime.utcnow()
)
def _classify_urgency(self, raw: dict) -> str:
high_urgency_events = {"dealstage", "closedate", "hs_deal_stage_probability"}
return "high" if raw.get("propertyName") in high_urgency_events else "standard"
Once normalized, every signal gets pushed to the queue in the same format. Downstream layers never need to know where the signal came from or what format the source used. The adapter layer absorbs all that complexity at the edge.
Layer 2: The event queue - the backbone of the whole system
This is the piece that makes everything else composable. Your queue (SQS, RabbitMQ, Kafka, Redis Streams, choose based on your scale and operational preference) sits between every layer. Nothing calls another layer directly. Everything publishes to the queue and subscribes from it.
class EventQueue:
def publish(self, topic: str, event: dict, priority: str = "standard"):
message = {
"event_id": str(uuid4()),
"topic": topic,
"priority": priority,
"payload": event,
"published_at": datetime.utcnow().isoformat()
}
self.backend.push(topic, message)
def subscribe(self, topic: str, handler: Callable, batch_size: int = 10):
while True:
messages = self.backend.poll(topic, batch_size)
for msg in messages:
try:
handler(msg["payload"])
self.backend.ack(msg)
except Exception as e:
self.backend.nack(msg)
self.logger.error(f"Handler failed: {e}", extra={"msg": msg})
The queue gives you three things that a monolith cannot:
Isolation: If the enrichment layer goes down for 20 minutes, signals pile up in the queue and process when it recovers. Nothing is lost. Nothing downstream is blocked by the enrichment failure.
Independent scaling: If your intent data processor needs to handle 10x more events than your call-complete processor, you scale the relevant consumer independently. You do not scale the entire pipeline.
Auditability: Every event that touches the queue is logged with a timestamp. When something goes wrong, you have a full event trail to trace through. This is what makes RevOps engineering debuggable, not guesswork.
Layer 3: Enrichment - context before decisions
Enrichment is a separate consumer that pulls signals from the queue, adds context and republishes enriched signals to a different topic. It should not make decisions. It should not trigger actions. It should only make signals smarter.
class EnrichmentService:
def enrich(self, signal: dict) -> dict:
deal_id = signal.get("deal_id")
account_id = signal.get("account_id")
enriched = {**signal}
# Pull deal context from CRM
if deal_id:
deal = self.crm.get_deal(deal_id)
enriched["deal_context"] = {
"stage": deal.stage,
"days_in_stage": deal.days_in_stage,
"last_activity_days_ago": deal.last_activity_days_ago,
"close_date": deal.close_date.isoformat(),
"open_tasks": [t.description for t in deal.open_tasks],
"competitors_mentioned": deal.competitors_mentioned
}
# Pull account firmographics
if account_id:
account = self.crm.get_account(account_id)
enriched["account_context"] = {
"employee_count": account.employee_count,
"industry": account.industry,
"recent_signals": self.intent_store.get_recent(account_id, days=30)
}
# Re-score urgency with context
enriched["urgency"] = self._rescore_urgency(enriched)
return enriched
def _rescore_urgency(self, enriched: dict) -> str:
deal = enriched.get("deal_context", {})
days_to_close = self._days_until(deal.get("close_date"))
idle_days = deal.get("last_activity_days_ago", 0)
# Promote urgency if the deal is close to close date and idle
if days_to_close and days_to_close < 21 and idle_days > 10:
return "immediate"
return enriched.get("urgency", "standard")
The enrichment layer is where signals go from being raw events to being context-aware inputs. A CRM event that says "deal stage changed" becomes a context-aware signal that says "deal stage changed, this deal has been idle 12 days, close date is in 18 days, a competitor was mentioned in the last call." That context is what the decision layer needs to make a good call.
Keep enrichment idempotent. The same signal enriched twice should produce the same output. This makes retrying safe, which matters when external APIs are flaky.
Layer 4: The decision layer - rules first, LLM second
This is where the GTM automation intelligence lives. The decision layer consumes enriched signals and produces action plans. It does not execute actions itself. It decides what should happen and publishes that decision to the queue.
Run a two-pass decision process:
class DecisionEngine:
def decide(self, enriched_signal: dict) -> ActionPlan:
# Pass 1: Rules engine — fast, deterministic, zero-LLM
rule_result = self.rules_engine.evaluate(enriched_signal)
if rule_result.is_conclusive:
return rule_result.action_plan
# Pass 2: LLM reasoning — for complex, ambiguous cases
return self.llm_decider.reason(enriched_signal)
class RulesEngine:
def evaluate(self, signal: dict) -> RuleResult:
deal = signal.get("deal_context", {})
idle_days = deal.get("last_activity_days_ago", 0)
days_to_close = self._days_until(deal.get("close_date"))
competitors = deal.get("competitors_mentioned", [])
actions = []
# Rule: Idle deal approaching close date
if idle_days > 10 and days_to_close and days_to_close < 21:
actions.append(Action(
type="draft_followup",
priority="immediate",
context={"reason": "idle_near_close", "idle_days": idle_days}
))
# Rule: Competitor mentioned, no competitive task exists
if competitors and not self._has_competitive_task(deal):
actions.append(Action(
type="create_task",
priority="high",
context={"task": f"Address competitive position vs {competitors[0]}"}
))
return RuleResult(
is_conclusive=len(actions) > 0,
action_plan=ActionPlan(actions=actions) if actions else None
)
Rules handle the clear-cut cases fast. Reserve the LLM call for genuinely ambiguous situations, multi-stakeholder complexity, conflicting signals, deals with long histories that require synthesis. This keeps latency low for the 60–70% of signals that have obvious responses and uses inference budget only where it adds real value.
Layer 5: Execution - adapters again, same principle
The execution layer consumes action plans and carries them out against your GTM stack. Apply the same adapter pattern you used for ingestion:
class ExecutionRouter:
adapters = {
"draft_followup": EmailDraftAdapter,
"update_crm": CRMWriteAdapter,
"create_task": TaskAdapter,
"send_alert": SlackAdapter,
"enroll_sequence": SequenceAdapter
}
def execute(self, action: Action):
adapter_class = self.adapters.get(action.type)
if not adapter_class:
raise UnknownActionType(action.type)
adapter = adapter_class(self.config)
result = adapter.execute(action)
self.audit_log.record(action, result)
return result
Each adapter handles one action type against one external system. When Salesforce changes their API, you update one adapter. The rest of the pipeline does not care.
Build a pending queue on top of this for actions that require human approval before executing, follow-up emails especially. Reps want to know what is going out under their name. An approval layer with a 4-hour auto-execute TTL gives them visibility without creating a bottleneck.
What makes this hold up over time
The teams that build sales workflow automation that actually survives contact with production share a few habits:
They treat observability as a first-class concern, not an afterthought: Every event that enters the queue, every enrichment call, every decision, every execution attempt, logged with a correlation ID that traces the full journey from signal to action. When something breaks, you trace the ID, not the guesswork.
They design for partial failure: The enrichment service will go down. An external API will timeout. The LLM will occasionally return malformed JSON. Build for it explicitly, dead letter queues, retry with exponential backoff, graceful degradation to rule-based decisions when the LLM is unavailable.
They resist the urge to merge layers: The moment someone says "we could just call the enrichment function directly from the ingestion handler, it would be simpler", that is the moment the monolith starts growing back. The queue is the contract. Maintain it.
They version their schemas: Signal schemas change. Deal context fields get added. Action types evolve. Version from the start so old events can still be processed by newer consumers without breaking.
The architecture is the product
One more thing worth saying directly: the reason this architecture matters is not purely technical.
Revenue teams measure pipeline quality, deal velocity and forecast accuracy. Engineering teams measure uptime, latency and deployment frequency. A modular signal-to-action pipeline is the architecture that lets both teams win, because it is observable enough for engineering to trust and fast enough for revenue to rely on.
The 15-minute window between a signal firing and an action executing is not a performance target. It is a revenue metric. Build the system that hits it consistently and the upstream teams will notice.
Top comments (0)