DEV Community

AIbuddy_il
AIbuddy_il

Posted on • Originally published at aibuddy.co.il

Building a WhatsApp AI Agent for Small Businesses with Python

Building a WhatsApp AI Agent for small businesses sounds complicated — but with Python, the WhatsApp Business API, and a modern LLM, you can have a working agent in an afternoon. This guide walks through the real implementation: webhook handling, conversation state, lead capture, and production error handling.

We built this for AI Buddy, our AI automation company serving Israeli small businesses. If you want a fully managed version without writing code, check out ClawBud.

Architecture Overview

The system has four main pieces:

  1. WhatsApp webhook — receives messages from Meta's API
  2. State manager — tracks conversation context per user
  3. LLM integration — generates intelligent responses
  4. Lead capture — extracts and stores contact info
WhatsApp → Webhook (Flask) → State Manager → LLM → Response → WhatsApp
                                   ↓
                              Lead Store (DB)
Enter fullscreen mode Exit fullscreen mode

Setting Up the Webhook

Meta requires a webhook endpoint that handles both verification and message events. Here's a clean Flask implementation:

import os
import hmac
import hashlib
import json
from flask import Flask, request, jsonify
import requests

app = Flask(__name__)

VERIFY_TOKEN = os.environ["WHATSAPP_VERIFY_TOKEN"]
ACCESS_TOKEN = os.environ["WHATSAPP_ACCESS_TOKEN"]
APP_SECRET = os.environ["WHATSAPP_APP_SECRET"]
PHONE_NUMBER_ID = os.environ["WHATSAPP_PHONE_NUMBER_ID"]

def verify_signature(payload: bytes, signature: str) -> bool:
    """Verify X-Hub-Signature-256 header from Meta."""
    expected = hmac.new(
        APP_SECRET.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(f"sha256={expected}", signature)

@app.route("/webhook", methods=["GET"])
def verify_webhook():
    """Handle Meta's webhook verification challenge."""
    mode = request.args.get("hub.mode")
    token = request.args.get("hub.verify_token")
    challenge = request.args.get("hub.challenge")

    if mode == "subscribe" and token == VERIFY_TOKEN:
        return challenge, 200
    return "Forbidden", 403

@app.route("/webhook", methods=["POST"])
def handle_webhook():
    """Process incoming WhatsApp messages."""
    signature = request.headers.get("X-Hub-Signature-256", "")
    if not verify_signature(request.data, signature):
        return "Unauthorized", 401

    data = request.json

    try:
        entry = data["entry"][0]
        changes = entry["changes"][0]
        value = changes["value"]

        if "messages" not in value:
            return jsonify({"status": "ok"}), 200

        message = value["messages"][0]
        phone = message["from"]
        msg_type = message["type"]

        if msg_type == "text":
            text = message["text"]["body"]
            handle_message(phone, text)

    except (KeyError, IndexError) as e:
        app.logger.error(f"Webhook parse error: {e}, payload: {data}")

    return jsonify({"status": "ok"}), 200
Enter fullscreen mode Exit fullscreen mode

The verify_signature function is critical for security — skip it and anyone can send fake messages to your bot.

Conversation State Management

In-memory state won't survive restarts. Use Redis for production:

import redis
import json
from datetime import datetime
from typing import Optional

r = redis.from_url(os.environ.get("REDIS_URL", "redis://localhost:6379"))
STATE_TTL = 86400  # 24 hours

class ConversationState:
    def __init__(self, phone: str):
        self.phone = phone
        self.key = f"conv:{phone}"

    def get(self) -> dict:
        data = r.get(self.key)
        if data:
            return json.loads(data)
        return {
            "phone": self.phone,
            "messages": [],
            "stage": "greeting",
            "lead_data": {},
            "created_at": datetime.utcnow().isoformat()
        }

    def save(self, state: dict):
        r.setex(self.key, STATE_TTL, json.dumps(state))

    def add_message(self, role: str, content: str):
        state = self.get()
        state["messages"].append({
            "role": role,
            "content": content,
            "ts": datetime.utcnow().isoformat()
        })
        # Keep last 20 messages to avoid context overflow
        state["messages"] = state["messages"][-20:]
        self.save(state)
        return state

    def update_stage(self, stage: str):
        state = self.get()
        state["stage"] = stage
        self.save(state)

    def update_lead_data(self, data: dict):
        state = self.get()
        state["lead_data"].update(data)
        self.save(state)
        return state
Enter fullscreen mode Exit fullscreen mode

The 20-message cap is intentional. Claude and GPT-4 have context windows, and sending 200 messages costs money and slows responses.

LLM Integration with Claude

I prefer Anthropic's Claude for business-facing bots — it's more controllable and less likely to hallucinate contact info. Here's the integration:

import anthropic
from typing import Optional

client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

SYSTEM_PROMPT = """You are a friendly business assistant for a local service company.

Your goals:
1. Greet the customer warmly
2. Understand what service they need
3. Collect: name, service type, preferred date/time, location
4. Answer questions about services and pricing
5. Book appointments or escalate to a human

Tone: Professional, warm, concise. This is WhatsApp — keep messages under 150 words.

When you have collected all lead info, include this JSON in your response (the system will extract it):
<lead_data>
{
  "name": "customer name",
  "service": "service type",
  "date": "preferred date",
  "location": "city/area",
  "phone": "already known from WhatsApp"
}
</lead_data>

If you cannot help or the customer is angry, say: "ESCALATE_TO_HUMAN" as the last line."""

def get_llm_response(messages: list, system: str = SYSTEM_PROMPT) -> str:
    """Get response from Claude with retry logic."""
    import time

    for attempt in range(3):
        try:
            # Convert our format to Anthropic format
            anthropic_messages = [
                {"role": m["role"], "content": m["content"]}
                for m in messages
                if m["role"] in ("user", "assistant")
            ]

            response = client.messages.create(
                model="claude-opus-4-5",
                max_tokens=512,
                system=system,
                messages=anthropic_messages
            )
            return response.content[0].text

        except anthropic.RateLimitError:
            if attempt < 2:
                time.sleep(2 ** attempt)  # exponential backoff
            else:
                raise
        except anthropic.APIError as e:
            app.logger.error(f"Claude API error: {e}")
            raise

def extract_lead_data(response_text: str) -> Optional[dict]:
    """Extract structured lead data from LLM response."""
    import re

    pattern = r'<lead_data>(.*?)</lead_data>'
    match = re.search(pattern, response_text, re.DOTALL)

    if match:
        try:
            return json.loads(match.group(1).strip())
        except json.JSONDecodeError:
            app.logger.warning(f"Failed to parse lead JSON: {match.group(1)}")
    return None
Enter fullscreen mode Exit fullscreen mode

Note the max_tokens=512 limit. WhatsApp messages should be short. A 2000-token response is a terrible user experience.

Message Handling Pipeline

This is where everything connects:

def handle_message(phone: str, text: str):
    """Main message processing pipeline."""
    conv = ConversationState(phone)

    # Add user message to history
    state = conv.add_message("user", text)

    try:
        # Get LLM response
        response_text = get_llm_response(state["messages"])

        # Check for escalation
        if "ESCALATE_TO_HUMAN" in response_text:
            escalate_to_human(phone, state)
            response_text = response_text.replace("ESCALATE_TO_HUMAN", "").strip()

        # Extract and save lead data if present
        lead_data = extract_lead_data(response_text)
        if lead_data:
            lead_data["phone"] = phone
            conv.update_lead_data(lead_data)
            save_lead_to_crm(lead_data)
            # Remove the JSON block from the message sent to user
            import re
            response_text = re.sub(
                r'<lead_data>.*?</lead_data>', 
                '', 
                response_text, 
                flags=re.DOTALL
            ).strip()

        # Save assistant response
        conv.add_message("assistant", response_text)

        # Send to WhatsApp
        send_whatsapp_message(phone, response_text)

    except Exception as e:
        app.logger.error(f"Error handling message from {phone}: {e}")
        # Always send something — silence is the worst UX
        send_whatsapp_message(
            phone, 
            "Sorry, I'm having a technical issue. Please try again in a moment or call us directly."
        )

def send_whatsapp_message(to: str, text: str):
    """Send a WhatsApp message via the Cloud API."""
    url = f"https://graph.facebook.com/v19.0/{PHONE_NUMBER_ID}/messages"

    payload = {
        "messaging_product": "whatsapp",
        "to": to,
        "type": "text",
        "text": {"body": text}
    }

    response = requests.post(
        url,
        headers={
            "Authorization": f"Bearer {ACCESS_TOKEN}",
            "Content-Type": "application/json"
        },
        json=payload,
        timeout=10
    )

    if response.status_code != 200:
        app.logger.error(
            f"WhatsApp send failed: {response.status_code} {response.text}"
        )
        response.raise_for_status()
Enter fullscreen mode Exit fullscreen mode

Lead Capture and CRM Integration

Leads are worthless if they sit in Redis. Push them somewhere actionable:

import sqlite3
from datetime import datetime

# For production, replace with Postgres + SQLAlchemy
def init_db():
    conn = sqlite3.connect("leads.db")
    conn.execute("""
        CREATE TABLE IF NOT EXISTS leads (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            phone TEXT NOT NULL,
            name TEXT,
            service TEXT,
            preferred_date TEXT,
            location TEXT,
            raw_data TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            status TEXT DEFAULT 'new'
        )
    """)
    conn.commit()
    conn.close()

def save_lead_to_crm(lead_data: dict):
    """Save lead to local DB and optionally push to external CRM."""
    conn = sqlite3.connect("leads.db")
    try:
        conn.execute("""
            INSERT INTO leads (phone, name, service, preferred_date, location, raw_data)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (
            lead_data.get("phone"),
            lead_data.get("name"),
            lead_data.get("service"),
            lead_data.get("date"),
            lead_data.get("location"),
            json.dumps(lead_data)
        ))
        conn.commit()
        app.logger.info(f"Lead saved: {lead_data.get('phone')} - {lead_data.get('name')}")

        # Push to external CRM (e.g., HubSpot, Salesforce, custom)
        push_to_external_crm(lead_data)

    except sqlite3.Error as e:
        app.logger.error(f"DB error saving lead: {e}")
    finally:
        conn.close()

def push_to_external_crm(lead_data: dict):
    """Push lead to external CRM. Adjust endpoint and auth per your CRM."""
    crm_url = os.environ.get("CRM_WEBHOOK_URL")
    if not crm_url:
        return

    try:
        requests.post(
            crm_url,
            json={
                "source": "whatsapp_bot",
                "contact": {
                    "phone": lead_data.get("phone"),
                    "name": lead_data.get("name"),
                    "notes": f"Service: {lead_data.get('service')}, Date: {lead_data.get('date')}"
                }
            },
            timeout=5
        )
    except requests.RequestException as e:
        # Don't fail the main flow if CRM is down
        app.logger.warning(f"CRM push failed (non-critical): {e}")

def escalate_to_human(phone: str, state: dict):
    """Alert a human agent when the bot can't handle the conversation."""
    # Send Slack/email notification
    slack_url = os.environ.get("SLACK_WEBHOOK_URL")
    if slack_url:
        requests.post(slack_url, json={
            "text": f"🚨 Human escalation needed for {phone}\nLast messages: {state['messages'][-3:]}"
        }, timeout=5)
Enter fullscreen mode Exit fullscreen mode

Production Deployment

Run this with gunicorn behind nginx:

# requirements.txt
flask==3.0.0
anthropic==0.25.0
redis==5.0.1
requests==2.31.0
gunicorn==21.2.0

# Start command
gunicorn -w 4 -b 0.0.0.0:8000 app:app --timeout 30 --access-logfile -
Enter fullscreen mode Exit fullscreen mode
# nginx config
server {
    listen 443 ssl;
    server_name your-bot-domain.com;

    location /webhook {
        proxy_pass http://127.0.0.1:8000;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_read_timeout 30;
    }
}
Enter fullscreen mode Exit fullscreen mode

Key production settings:

  • 4 gunicorn workers — WhatsApp delivers webhooks fast; you need concurrency
  • 30 second timeout — LLM calls can take 5-10 seconds; give them room
  • Always return 200 from webhook — if you return an error, Meta retries for hours

Error Handling Patterns

The three most common failure modes and how to handle them:

1. LLM timeout or rate limit:

from functools import wraps
import time

def with_fallback(fallback_message):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                app.logger.error(f"{func.__name__} failed: {e}")
                return fallback_message
        return wrapper
    return decorator

@with_fallback("I'm temporarily unavailable. Please try again in a moment.")
def get_response_with_fallback(messages):
    return get_llm_response(messages)
Enter fullscreen mode Exit fullscreen mode

2. Duplicate message delivery (Meta sends duplicates sometimes):

processed_messages = set()  # Use Redis SET in production

def is_duplicate(message_id: str) -> bool:
    if message_id in processed_messages:
        return True
    processed_messages.add(message_id)
    return False
Enter fullscreen mode Exit fullscreen mode

3. User sends unsupported media (images, voice notes):

if msg_type != "text":
    send_whatsapp_message(
        phone,
        "I can only handle text messages right now. Please type your question."
    )
    return
Enter fullscreen mode Exit fullscreen mode

Monitoring

Add basic metrics from day one:

import time
from collections import defaultdict

metrics = defaultdict(int)

def track(event: str):
    metrics[event] += 1
    # In production, send to Prometheus/Datadog

# In handle_message:
track("messages_received")
start = time.time()
# ... process ...
duration = time.time() - start
track(f"response_time_bucket_{int(duration)}")
Enter fullscreen mode Exit fullscreen mode

Skip the Infrastructure, Use ClawBud

If you want this running in production without managing servers, Redis, webhook SSL, and API keys — ClawBud is the managed platform built by AI Buddy that handles all of this for you. You configure the agent's personality and business logic, and the infrastructure is taken care of.

What's Next

This agent handles the basics well. For a production system at scale, the next steps are:

  • Template messages for initial contact (required by Meta for outbound)
  • Language detection — in Israel, customers message in Hebrew, Arabic, and Russian
  • Appointment calendar integration — connect to Calendly or Google Calendar
  • Analytics dashboard — track conversion rate from message to booked appointment

The code in this post is what we actually run in production. Start simple, measure everything, iterate fast.

Top comments (0)