DEV Community

Cover image for Open-Source Signal-to-Action Pipeline Starter: Webhook Enrich Trigger Log
SpurIQ Engineering
SpurIQ Engineering

Posted on

Open-Source Signal-to-Action Pipeline Starter: Webhook Enrich Trigger Log

A real code skeleton for building your own sales automation pipeline. Because every RevOps team deserves to understand what happens between a signal and an action.

I built my first GTM automation code pipeline in a weekend out of frustration.

Our intent platform was surfacing great signals. Pricing page visits. Content downloads. Intent surges. The data was there. But the gap between "signal appears in dashboard" and "rep actually does something about it" was averaging 4 days. Four days. In a world where buying windows close in hours.

The tools in our stack didn't talk to each other in the way we needed. The intent platform could fire a webhook. The CRM could receive data. The sequencing tool could trigger outreach. But nothing connected them into a single pipeline that said: "signal fires → context gets pulled → action gets triggered → everything gets logged."

So I built one. It was ugly. It worked. And it taught me more about where revenue execution actually breaks down than any dashboard ever did.

This article shares the skeleton of that pipeline. It's not production-ready. It's a starter that you can fork, extend and adapt to your own stack. Think of it as the RevOps open source equivalent of a "hello world" for signal-to-action automation.

The Architecture: Four Stages

The pipeline has four stages. Each one does one job. Together they close the loop between a signal existing and an action happening.

Signal (Webhook) → Enrich (Context) → Trigger (Action) → Log (CRM)
Enter fullscreen mode Exit fullscreen mode

Stage 1: Webhook receiver: Catches incoming signals from your intent platform, website analytics, CRM, or any tool that can fire a webhook.

Stage 2: Enrichment: Takes the raw signal and adds context: who is this person, what company are they from, are they in our ICP, what's the deal history.

Stage 3: Trigger: Based on the enriched signal decides what action to take and executes it: create a task, send a Slack alert, queue an email, update a CRM field.

Stage 4: Log: Writes everything back to the CRM so the action is tracked, the signal is recorded and nothing falls through the cracks.
Let's build each one.

Stage 1: Webhook Receiver

This is your front door. Every signal enters here. Your intent platform fires a CRM webhook trigger when an account shows activity. Your website analytics fires one when a target account visits the pricing page. Your CRM fires one when a deal stage changes or goes idle.

Here's a minimal receiver in Python (Flask):

# signal_receiver.py
from flask import Flask, request, jsonify
from datetime import datetime
import json

app = Flask(__name__)

SIGNAL_QUEUE = []

@app.route('/webhook/signal', methods=['POST'])
def receive_signal():
    """
    Receives incoming signals from any source.
    Expects JSON with at minimum: source, signal_type and payload.
    """
    data = request.get_json()

    if not data or 'signal_type' not in data:
        return jsonify({'error': 'Missing signal_type'}), 400

    signal = {
        'id': f"sig_{datetime.utcnow().strftime('%Y%m%d%H%M%S%f')}",
        'received_at': datetime.utcnow().isoformat(),
        'source': data.get('source', 'unknown'),
        'signal_type': data.get('signal_type'),
        'payload': data.get('payload', {}),
        'status': 'received'
    }

    SIGNAL_QUEUE.append(signal)
    print(f"[RECEIVED] {signal['signal_type']} from {signal['source']}")

    # Pass to enrichment
    enriched = enrich_signal(signal)
    if enriched:
        action_result = trigger_action(enriched)
        log_to_crm(enriched, action_result)

    return jsonify({'status': 'processed', 'signal_id': signal['id']}), 200


if __name__ == '__main__':
    app.run(port=5000, debug=True)
Enter fullscreen mode Exit fullscreen mode

And the equivalent in Node (Express):

// signalReceiver.js
const express = require('express');
const app = express();
app.use(express.json());

const signalQueue = [];

app.post('/webhook/signal', (req, res) => {
  const data = req.body;

  if (!data || !data.signal_type) {
    return res.status(400).json({ error: 'Missing signal_type' });
  }

  const signal = {
    id: `sig_${Date.now()}`,
    received_at: new Date().toISOString(),
    source: data.source || 'unknown',
    signal_type: data.signal_type,
    payload: data.payload || {},
    status: 'received'
  };

  signalQueue.push(signal);
  console.log(`[RECEIVED] ${signal.signal_type} from ${signal.source}`);

  // Pass through the pipeline
  const enriched = enrichSignal(signal);
  if (enriched) {
    const actionResult = triggerAction(enriched);
    logToCrm(enriched, actionResult);
  }

  res.json({ status: 'processed', signal_id: signal.id });
});

app.listen(5000, () => console.log('Signal receiver running on :5000'));
Enter fullscreen mode Exit fullscreen mode

The webhook accepts any JSON payload with a signal_type field. That's intentionally flexible. Your intent platform might send { signal_type: "intent_surge", payload: { company: "Acme", topic: "sales automation" }}. Your website might send { signal_type: "pricing_page_visit", payload: { email: "vp@acme.com", visits: 3 }}. Same pipeline handles both.

Stage 2: Enrichment

Raw signals are noisy. A pricing page visit from a random Gmail address is different from a pricing page visit from the VP of Revenue at a Series B SaaS company in your ICP. Enrichment adds the context that determines whether this signal deserves action and what kind.

# enrichment.py
import os

# In production replace with actual API calls to
# Clearbit, Apollo, ZoomInfo, or your enrichment provider
ENRICHMENT_API_KEY = os.environ.get('ENRICHMENT_API_KEY', '')

# Simple ICP scoring criteria
ICP_CRITERIA = {
    'min_employees': 50,
    'max_employees': 500,
    'industries': ['saas', 'software', 'fintech', 'technology'],
    'titles_priority': ['cro', 'vp sales', 'head of revenue',
                        'revops', 'sales operations', 'founder', 'ceo']
}


def enrich_signal(signal):
    """
    Takes a raw signal and enriches it with company
    and contact context. Returns enriched signal or
    None if it doesn't meet ICP criteria.
    """
    payload = signal.get('payload', {})
    email = payload.get('email', '')
    company = payload.get('company', '')
    domain = payload.get('domain', '')

    # Simulate enrichment lookup
    # In production: call your enrichment API here
    enrichment_data = mock_enrichment_lookup(email, company, domain)

    if not enrichment_data:
        print(f"[SKIP] No enrichment data for {email or company}")
        return None

    # ICP scoring
    icp_score = calculate_icp_score(enrichment_data)

    if icp_score < 40:
        print(f"[SKIP] ICP score too low: {icp_score} for "
              f"{enrichment_data.get('company_name')}")
        return None

    # Build enriched signal
    enriched = {
        **signal,
        'enrichment': enrichment_data,
        'icp_score': icp_score,
        'status': 'enriched'
    }

    # Determine urgency based on signal type
    enriched['urgency'] = classify_urgency(signal['signal_type'], icp_score)

    print(f"[ENRICHED] {enrichment_data.get('company_name')} | "
          f"ICP: {icp_score} | Urgency: {enriched['urgency']}")

    return enriched


def calculate_icp_score(data):
    """Simple ICP scoring. Returns 0-100."""
    score = 0
    employees = data.get('employee_count', 0)
    industry = data.get('industry', '').lower()
    title = data.get('contact_title', '').lower()

    # Company size fit
    if ICP_CRITERIA['min_employees'] <= employees <= ICP_CRITERIA['max_employees']:
        score += 30
    elif employees > ICP_CRITERIA['max_employees']:
        score += 15  # enterprise, still worth engaging

    # Industry fit
    if any(ind in industry for ind in ICP_CRITERIA['industries']):
        score += 30

    # Title fit
    if any(t in title for t in ICP_CRITERIA['titles_priority']):
        score += 40

    return min(score, 100)


def classify_urgency(signal_type, icp_score):
    """Classify urgency for routing."""
    high_urgency = ['pricing_page_visit', 'demo_request', 'competitor_mention']
    medium_urgency = ['content_download', 'intent_surge', 'return_visit']

    if signal_type in high_urgency and icp_score >= 60:
        return 'high'
    elif signal_type in medium_urgency or icp_score >= 50:
        return 'medium'
    return 'low'


def mock_enrichment_lookup(email, company, domain):
    """Replace with actual API call in production."""
    return {
        'company_name': company or 'Acme Corp',
        'employee_count': 200,
        'industry': 'SaaS',
        'funding_stage': 'Series B',
        'contact_name': 'Jane Smith',
        'contact_title': 'VP of Sales',
        'contact_email': email or 'jane@acme.com',
        'contact_linkedin': 'linkedin.com/in/janesmith'
    }
Enter fullscreen mode Exit fullscreen mode

The ICP scoring is deliberately simple. Three dimensions: company size, industry and contact title. In production you'd add technographic filters (do they use Salesforce?), funding recency, growth signals and existing deal history from your CRM.

The urgency classifier determines what happens next. A pricing page visit from a high-ICP contact is "high" urgency and gets immediate routing. A content download from a medium-fit account is "medium" and enters a different workflow.

Stage 3: Trigger

This is where signals become actions. Based on the enriched signal and its urgency the trigger stage decides what to do and executes it.

# trigger.py
import json
from datetime import datetime

# In production these would be actual API calls
# to Slack, your sequencing tool, CRM, etc.


def trigger_action(enriched_signal):
    """
    Decides and executes the appropriate action
    based on signal type and urgency.
    """
    signal_type = enriched_signal['signal_type']
    urgency = enriched_signal['urgency']
    contact = enriched_signal.get('enrichment', {})
    company = contact.get('company_name', 'Unknown')
    name = contact.get('contact_name', 'Unknown')

    action_result = {
        'signal_id': enriched_signal['id'],
        'timestamp': datetime.utcnow().isoformat(),
        'actions_taken': []
    }

    # High urgency: immediate rep notification + task creation
    if urgency == 'high':
        # Alert the account owner via Slack
        slack_msg = build_slack_alert(enriched_signal)
        send_slack_notification(slack_msg)
        action_result['actions_taken'].append('slack_alert_sent')

        # Create a CRM task with context
        task = create_crm_task(enriched_signal)
        action_result['actions_taken'].append('crm_task_created')

        # If demo request: auto-schedule availability
        if signal_type == 'demo_request':
            send_calendar_link(contact)
            action_result['actions_taken'].append('calendar_link_sent')

    # Medium urgency: add to priority outbound list
    elif urgency == 'medium':
        add_to_sequence(enriched_signal)
        action_result['actions_taken'].append('added_to_priority_sequence')

    # Low urgency: log for future reference
    else:
        action_result['actions_taken'].append('logged_for_monitoring')

    print(f"[ACTION] {company} ({name}) | "
          f"Urgency: {urgency} | "
          f"Actions: {action_result['actions_taken']}")

    return action_result


def build_slack_alert(signal):
    """Build a contextual Slack message for the rep."""
    contact = signal.get('enrichment', {})
    return {
        'channel': '#sales-signals',
        'text': (
            f":rotating_light: *High-Intent Signal*\n"
            f"*Company:* {contact.get('company_name')}\n"
            f"*Contact:* {contact.get('contact_name')} "
            f"({contact.get('contact_title')})\n"
            f"*Signal:* {signal['signal_type']}\n"
            f"*ICP Score:* {signal['icp_score']}/100\n"
            f"*Action needed:* Outreach within 4 hours\n"
            f"*LinkedIn:* {contact.get('contact_linkedin')}"
        )
    }


def send_slack_notification(msg):
    """In production: POST to Slack webhook URL."""
    print(f"[SLACK] {msg['text'][:80]}...")


def create_crm_task(signal):
    """In production: POST to Salesforce/HubSpot API."""
    contact = signal.get('enrichment', {})
    return {
        'subject': f"High-intent signal: {signal['signal_type']} "
                   f"from {contact.get('company_name')}",
        'due_date': datetime.utcnow().isoformat(),
        'priority': 'high',
        'description': (
            f"{contact.get('contact_name')} at "
            f"{contact.get('company_name')} triggered "
            f"{signal['signal_type']}. ICP score: "
            f"{signal['icp_score']}. Act within 4 hours."
        )
    }


def add_to_sequence(signal):
    """In production: POST to Outreach/Salesloft/Apollo API."""
    print(f"[SEQUENCE] Added {signal['enrichment'].get('contact_email')} "
          f"to priority sequence")


def send_calendar_link(contact):
    """In production: send via email API with Calendly/Chili Piper link."""
    print(f"[CALENDAR] Sent booking link to "
          f"{contact.get('contact_email')}")
Enter fullscreen mode Exit fullscreen mode

The key design principle here: different urgency levels trigger different action types. A high-urgency signal doesn't enter a queue. It alerts the rep immediately with full context. A medium signal enters a priority sequence. A low signal gets logged for future reference.

This is where most sales automation pipelines in the wild either don't exist or break down. The signal gets detected. Nobody defined what should happen next. So it sits in a dashboard.

Stage 4: Log to CRM

Everything gets logged. The signal, the enrichment, the action taken and the timestamp. This creates an audit trail and keeps the CRM current without the rep doing any manual data entry.

# crm_logger.py
from datetime import datetime


def log_to_crm(enriched_signal, action_result):
    """
    Logs the complete signal-to-action chain to CRM.
    In production: use Salesforce REST API or
    HubSpot API to create/update records.
    """
    contact = enriched_signal.get('enrichment', {})

    crm_record = {
        'type': 'signal_event',
        'timestamp': datetime.utcnow().isoformat(),
        'company': contact.get('company_name'),
        'contact_name': contact.get('contact_name'),
        'contact_email': contact.get('contact_email'),
        'signal_type': enriched_signal['signal_type'],
        'signal_source': enriched_signal['source'],
        'icp_score': enriched_signal['icp_score'],
        'urgency': enriched_signal['urgency'],
        'actions_taken': action_result.get('actions_taken', []),
        'signal_to_action_latency_seconds': calculate_latency(
            enriched_signal['received_at'],
            action_result['timestamp']
        )
    }

    # In production: POST to CRM API
    write_to_crm(crm_record)

    print(f"[LOGGED] {crm_record['company']} | "
          f"Latency: {crm_record['signal_to_action_latency_seconds']}s | "
          f"Actions: {crm_record['actions_taken']}")

    return crm_record


def calculate_latency(received_at, action_at):
    """Calculate signal-to-action latency in seconds."""
    fmt = '%Y-%m-%dT%H:%M:%S'
    try:
        received = datetime.fromisoformat(received_at)
        acted = datetime.fromisoformat(action_at)
        return (acted - received).total_seconds()
    except (ValueError, TypeError):
        return -1


def write_to_crm(record):
    """
    In production replace with actual CRM API call.

    Salesforce example:
        sf.Account.update(record_id, {
            'Last_Signal_Type__c': record['signal_type'],
            'Last_Signal_Date__c': record['timestamp'],
            'ICP_Score__c': record['icp_score'],
            'Signal_Action_Latency__c': record[
                'signal_to_action_latency_seconds'
            ]
        })
    """
    print(f"[CRM] Would write: {record['signal_type']} for "
          f"{record['company']}")
Enter fullscreen mode Exit fullscreen mode

Notice the signal_to_action_latency_seconds calculation. This is the metric that matters most. Every signal that passes through your pipeline gets a latency measurement. Over time you can track average latency by signal type, by rep, by urgency level. That single metric tells you whether your pipeline is actually compressing the gap or just moving data around.

Putting It Together

Here's how the full flow works end to end:

  1. Intent platform fires webhook → /webhook/signal
  2. Receiver validates and creates signal object
  3. Enrichment adds company/contact context + ICP score
  4. Urgency classifier determines response tier
  5. Trigger executes the appropriate action
  6. Logger writes everything to CRM with latency measurement

To test it locally:

# Terminal 1: Start the receiver
python signal_receiver.py

# Terminal 2: Fire a test signal
curl -X POST http://localhost:5000/webhook/signal \
  -H "Content-Type: application/json" \
  -d '{
    "source": "website",
    "signal_type": "pricing_page_visit",
    "payload": {
      "email": "vp.sales@acme.com",
      "company": "Acme Corp",
      "visits": 3,
      "pages": ["pricing", "case-study", "integrations"]
    }
  }'
Enter fullscreen mode Exit fullscreen mode

Where This Skeleton Ends and Production Begins

Let me be honest about what this code is and isn't.

What it is: A working skeleton that demonstrates the signal-to-action pipeline architecture. Fork it. Wire it to your actual APIs. Deploy it on a serverless function. It will work.

What it isn't: Production-grade GTM automation. There's no retry logic. No queue management for high volume. No deduplication (the same person visiting the pricing page 5 times shouldn't trigger 5 Slack alerts). No rate limiting on outbound APIs. No authentication on the webhook endpoint. No monitoring or alerting when the pipeline breaks.

Building this into a reliable production system is where the real complexity lives. Signal deduplication. Account-level vs contact-level routing. Multi-signal scoring (an intent surge plus a pricing page visit is a different urgency than either alone). CRM record matching and merge logic. Team-based routing rules. Compliance and opt-out handling. Error recovery. Scaling to handle thousands of signals per day.

This is exactly the engineering problem that SpurIQ productizes. The LeadIQ and DealIQ engines handle the full production pipeline: signal ingestion from multiple sources, enrichment, ICP scoring, urgency classification, action orchestration, CRM logging and latency tracking. All the stuff that takes this skeleton from a weekend project to a reliable revenue execution system.

But understanding the architecture is valuable even if you never build it yourself. When you know how the pipeline should work you can evaluate tools better, debug execution gaps faster and have smarter conversations with your RevOps team about where the signal-to-action chain is actually breaking.

Fork It

The full skeleton is designed to be extended. Some ideas for what to add:
Wire the enrichment stage to a real API (Apollo and Clearbit both have straightforward REST APIs). Connect the Slack notification to an actual webhook URL. Add a Salesforce or HubSpot API call in the logger. Build a simple dashboard that tracks average signal-to-action latency over time. Add deduplication logic (hash the signal by email + signal_type + date and skip duplicates).

Each addition moves you closer to a production pipeline and teaches you something about where revenue execution actually happens.
The signal-to-action gap is an architecture problem. This is the architecture. Build on it.

_Your GTM stack generates signals every day. This skeleton shows the pipeline that turns them into actions. The code is simple. The impact of closing the gap between signal and action is not. Whether you build this yourself or use a platform that does it for you the architecture is the same: Webhook → Enrich → Trigger → Log. Everything else is implementation detail.
_

Top comments (0)