DEV Community

Daniel Romitelli
Daniel Romitelli

Posted on • Originally published at craftedbydaniel.com

The CRM Sync Engine I Had to Reverse‑Engineer: Two‑Step Fetches, 50‑Field Limits, and a Mapper That Refuses to Drift

I found the bug the way you always find the worst ones: not in a unit test, not in staging, but in a "why is this field blank again?" message after a real sync.

The Zoho CRM API call looked correct. The request even included a fields parameter. And yet the response kept arriving with a drifting set of columns—sometimes missing the ones I explicitly asked for. That was the moment I stopped treating the API docs as a contract and started treating them as a suggestion.

The CRM is a noisy sensor. The sync engine is the signal conditioner. Once you see it that way, the architecture writes itself.

This is Part 7 of my series "How to Architect an Enterprise AI System (And Why the Engineer Still Matters)". Part 6 was about building claim/unclaim workers with SKIP LOCKED.

The key insight: treat the CRM as an adversarial input stream

The non-obvious part isn't "sync records from a CRM." The non-obvious part is accepting that the CRM will:

  • ignore parameters that look mandatory,
  • enforce limits that aren't obvious until you trip them,
  • return the same logical field in multiple shapes,
  • and occasionally pretend a field exists in one module when it doesn't.

So I engineered the sync like I engineer extraction pipelines: make drift visible, make normalization explicit, and make the happy path optional.

flowchart TD
  schemaGen[Schema generator] --> mappingFile[Auto-generated mapping file]
  mappingFile --> syncJob[Listing sync scheduler]
  syncJob -->|custom view| idList[Fetch IDs]
  idList -->|batch GET by ID| rawRecords[Raw CRM records]
  rawRecords --> fieldMapper[CRM field mapper]
  fieldMapper --> locationParser[Freetext location parser]
  locationParser --> normalizedRows[Normalized rows]
  normalizedRows --> hashGate[SHA-256 change gate]
  hashGate --> upsert[Incremental processing]
Enter fullscreen mode Exit fullscreen mode

That diagram is the whole trick: I don't "sync." I converge.

The happy path the docs describe (and why it fails)

The documented story most CRMs tell is simple:

  1. Call a "list records" endpoint.
  2. Pass fields=... to control the payload.
  3. Paginate until you're done.
  4. Map fields into your database.

That works right up until you do any of these:

  • use custom views,
  • request more than 50 fields,
  • pull a module large enough to hit a 2000-record cap,
  • or rely on consistent typing.

This post is the set of workarounds I had to build after discovering five undocumented behaviors in my production Zoho CRM integration:

  1. Auto-generated field schema (because hand-maintained mappings rot)
  2. Custom views ignoring the fields parameter
  3. The 50-field API limit
  4. Date-chunked pagination to bypass the 2000-record cap
  5. Incremental sync via SHA‑256 + runtime normalization (including freetext location parsing)

1) I stopped hand-maintaining schema: the auto-generated mapping file

The first reliability decision was boring but decisive: I auto-generated my field schema from the Zoho CRM API.

When you integrate with a CRM at any real scale, manual mappings become a quiet source of corruption. Someone adds a field in the CRM UI, renames a label, changes a picklist variant, and suddenly your "stable" integration is lying.

So I made the mapping file a product of the API itself.

The generator pulls module metadata and emits a giant mapping artifact—in my case, 68 modules, 1518 fields—so the sync engine can validate what's real instead of guessing.

import requests
from typing import Any

def generate_crm_mappings(access_token: str, api_base: str) -> dict[str, Any]:
    """
    Introspect the CRM API to auto-generate a field mapping artifact.
    In production this covered 68 modules and 1518 fields.
    """
    headers = {"Authorization": f"Zoho-oauthtoken {access_token}"}
    mapping: dict[str, Any] = {"modules": {}}

    # Step 1: Discover all API-accessible modules
    modules_resp = requests.get(f"{api_base}/settings/modules", headers=headers)
    modules = [
        m for m in modules_resp.json().get("modules", [])
        if m.get("api_supported")  # skip non-API modules
    ]

    for mod in modules:
        api_name = mod["api_name"]

        # Step 2: Pull field metadata for each module
        fields_resp = requests.get(
            f"{api_base}/settings/fields",
            headers=headers,
            params={"module": api_name},
        )
        fields = {}
        for f in fields_resp.json().get("fields", []):
            field_entry = {
                "api_name": f["api_name"],
                "data_type": f["data_type"],
                "field_label": f["field_label"],
                "required": f.get("required", False),
                "read_only": f.get("read_only", False),
                "custom_field": f.get("custom_field", False),
            }
            # Capture picklist variants (display_value vs actual_value)
            if f["data_type"] in ("picklist", "multiselectpicklist"):
                field_entry["picklist_values"] = f.get("pick_list_values", [])

            fields[f["api_name"]] = field_entry

        mapping["modules"][api_name] = {
            "module_api_name": api_name,
            "fields": fields,
        }

    return mapping
Enter fullscreen mode Exit fullscreen mode

What surprised me here wasn't that schema drift exists—it's that it's quiet. Auto-generation turns quiet drift into a diff you can review, which is the only kind of drift that doesn't bite you later.

2) The undocumented behavior that forced a two-step fetch

The nastiest discovery was this one:

In Zoho CRM, custom views ignore the fields parameter.

I learned this empirically. The request asked for a set of fields, but the response came back with whatever the custom view felt like returning. That meant my downstream mapper was being fed a partial record—sometimes missing columns that were mandatory for my pipeline.

The workaround is a pattern I now consider foundational for CRM sync:

  1. Query the custom view only to get IDs.
  2. Then do a batch GET by ID where fields is honored.

That turns the custom view into a filter, not a payload contract.

import requests
import time
from typing import Any

def two_step_fetch(
    access_token: str,
    api_base: str,
    module: str,
    custom_view_id: str,
    fields: list[str],
    batch_size: int = 100,
) -> list[dict[str, Any]]:
    """
    Two-step CRM fetch: IDs via custom view, then batch GET for full records.
    Step 1 is cheap (2 fields per record). Step 2 honors the fields parameter.
    """
    headers = {"Authorization": f"Zoho-oauthtoken {access_token}"}

    # --- Step 1: fetch IDs from the custom view ---
    record_ids: list[str] = []
    page = 1
    per_page = 200

    while True:
        resp = requests.get(
            f"{api_base}/{module}",
            headers=headers,
            params={
                "cvid": custom_view_id,
                "fields": "id",          # minimal payload
                "per_page": per_page,
                "page": page,
            },
        )
        if resp.status_code == 429:       # throttled
            time.sleep(60)
            continue

        batch = resp.json().get("data", [])
        record_ids.extend(r["id"] for r in batch)

        if len(batch) < per_page:         # last page
            break
        page += 1

    # --- Step 2: batch GET by ID (fields parameter honored here) ---
    full_records: list[dict[str, Any]] = []
    fields_param = ",".join(fields)

    for i in range(0, len(record_ids), batch_size):
        ids_chunk = record_ids[i : i + batch_size]
        ids_param = ",".join(ids_chunk)

        resp = requests.get(
            f"{api_base}/{module}",
            headers=headers,
            params={"ids": ids_param, "fields": fields_param},
        )
        full_records.extend(resp.json().get("data", []))

    return full_records
Enter fullscreen mode Exit fullscreen mode

This was one of those fixes that felt like an admission of defeat—until I realized it's actually a reliability upgrade. IDs are stable. Views are not.

3) The 50-field limit and why it changes your mapper design

Then I hit the next invisible wall:

Zoho's API caps requests at 50 fields.

This matters because it forces you into one of two designs:

  • a "wide fetch" that fails unpredictably as your schema grows,
  • or a deliberate field selection strategy with mapping that understands partial payloads.

In my case, the sync job's mapping became intentionally narrow for the hot path—a curated column mapping—and the mapper got very strict about coercion and alignment.

The important part isn't the exact column count; it's the discipline: a stable, typed, explicitly coerced slice of the CRM schema that downstream systems can rely on.

from typing import Any, Optional

def map_listing_row(crm_record: dict[str, Any]) -> dict[str, Any]:
    """
    Map a raw CRM record into a normalized row for the local database.
    Explicit coercion on every field — no silent type drift.
    """
    return {
        "listing_id":        crm_record.get("Listing_Number"),
        "crm_id":            crm_record.get("id"),
        "address":           crm_record.get("Street_Address"),
        "city":              crm_record.get("City"),
        "state":             crm_record.get("State"),
        "zip_code":          crm_record.get("Zip_Code"),
        # Numeric fields arrive as int OR float — force to str for consistency
        "price":             _safe_str(crm_record.get("Asking_Price")),
        "square_feet":       _safe_str(crm_record.get("Square_Footage")),
        "lot_size":          _safe_str(crm_record.get("Lot_Size")),
        "bedrooms":          _safe_str(crm_record.get("Bedrooms")),
        "year_built":        _safe_str(crm_record.get("Year_Built")),
        "property_type":     crm_record.get("Property_Type"),
        "listing_status":    crm_record.get("Status"),
        "agent_name":        crm_record.get("Listing_Agent"),
        "agent_email":       crm_record.get("Agent_Email"),
        "agent_phone":       crm_record.get("Agent_Phone"),
        "description":       crm_record.get("Description"),
        "date_listed":       crm_record.get("Date_Listed"),
    }


def _safe_str(value: Any) -> Optional[str]:
    """Convert numeric fields to str. Zoho sends int sometimes, float other times."""
    return str(value) if value is not None else None
Enter fullscreen mode Exit fullscreen mode

The thing that broke first here was my assumption that "numeric is numeric." In production, numeric fields arrived as floats sometimes and ints other times. The fix wasn't clever—it was explicit: align the columns and cast where drift shows up.

4) The 2000-record cap: date-chunked pagination

Once the sync started pulling real volume, I ran into another hard cap:

Zoho enforces a 2000-record limit that makes naive pagination a trap.

If you just "page until done," you can silently stop early. That's the kind of bug that doesn't throw errors; it just erases reality.

The workaround is date-chunked pagination: instead of paging across the whole dataset, you page across time windows. You choose a date field, slice the timeline, and ensure each slice stays under the cap.

from datetime import datetime, timedelta
from typing import Any

def date_chunked_fetch(
    access_token: str,
    api_base: str,
    module: str,
    fields: list[str],
    start_date: datetime,
    end_date: datetime,
    chunk_days: int = 30,
) -> list[dict[str, Any]]:
    """
    Paginate by date windows to avoid the 2000-record hard cap.
    Each chunk stays small enough that normal pagination completes.
    """
    headers = {"Authorization": f"Zoho-oauthtoken {access_token}"}
    all_records: list[dict[str, Any]] = []
    chunk_start = start_date

    while chunk_start < end_date:
        chunk_end = min(chunk_start + timedelta(days=chunk_days), end_date)
        page = 1

        while True:
            resp = requests.get(
                f"{api_base}/{module}/search",
                headers=headers,
                params={
                    "criteria": (
                        f"(Modified_Time:greater_equal:{chunk_start.strftime('%Y-%m-%dT%H:%M:%S+00:00')})"
                        f"and(Modified_Time:less_than:{chunk_end.strftime('%Y-%m-%dT%H:%M:%S+00:00')})"
                    ),
                    "fields": ",".join(fields),
                    "per_page": 200,
                    "page": page,
                },
            )

            if resp.status_code == 204:   # no records in this window
                break

            data = resp.json().get("data", [])
            all_records.extend(data)

            if len(data) < 200:
                break
            page += 1

        chunk_start = chunk_end

    return all_records
Enter fullscreen mode Exit fullscreen mode

This is one of those patterns that looks like overengineering until you've lived through the alternative.

5) Incremental sync with SHA‑256: only process changed records

After I made the fetch reliable, I made the processing cheap.

The sync engine computes a SHA‑256 fingerprint of the normalized record and only processes rows that changed. That gives me incremental behavior without trusting the CRM to provide perfect updated_at semantics.

This is especially important when the upstream system can mutate fields in ways that don't show up consistently in timestamps—or when you're normalizing fields so aggressively that "raw changed" and "meaningfully changed" aren't the same thing.

import hashlib
import json
from typing import Any

def compute_record_hash(record: dict[str, Any]) -> str:
    """
    SHA-256 fingerprint of a normalized record.
    Exclude the CRM's internal ID so that re-imports don't trigger false changes.
    """
    hashable = {k: v for k, v in record.items() if k != "id"}
    data_json = json.dumps(hashable, sort_keys=True)
    return hashlib.sha256(data_json.encode("utf-8")).hexdigest()


def incremental_sync(
    new_records: list[dict[str, Any]],
    existing_hashes: dict[str, str],    # {listing_id: hash}
    id_field: str = "Listing_Number",
) -> tuple[list[dict], list[dict]]:
    """
    Compare incoming records against stored hashes.
    Returns (new_records, changed_records) — skip unchanged.
    """
    to_insert: list[dict] = []
    to_update: list[dict] = []

    for record in new_records:
        record_id = record.get(id_field)
        new_hash = compute_record_hash(record)

        if record_id not in existing_hashes:
            to_insert.append(record)
        elif existing_hashes[record_id] != new_hash:
            to_update.append(record)
        # else: unchanged — skip

    return to_insert, to_update
Enter fullscreen mode Exit fullscreen mode

The fingerprint gate is simple in concept: normalize → hash → compare → process. In practice it cut my sync from 45 seconds (full reprocessing) to under 10 seconds (incremental), because only 10–20% of records typically change between runs.

Runtime normalization: the mapper that refuses to drift

The last piece—the one that keeps the whole thing honest—is runtime normalization.

I centralized the "Zoho is weird" logic into a field mapper that handles:

  • phone normalization to E.164,
  • picklists with 3-variant handling (Zoho sends the same multiselect field as a JSON array string, a comma-separated string, or an already-parsed list depending on context),
  • datetime normalization.
import json
import re
from datetime import datetime
from typing import Any, Optional

def normalize_phone_e164(phone: Optional[str]) -> Optional[str]:
    """Normalize a phone number to E.164 format."""
    if not phone:
        return None

    digits = re.sub(r"\D", "", phone)

    if len(digits) == 11 and digits.startswith("1"):
        return f"+{digits}"          # "+15551234567"
    if len(digits) == 10:
        return f"+1{digits}"         # assume US
    if digits and not phone.startswith("+"):
        return f"+{digits}"
    return phone


def normalize_picklist(value: Any) -> list[str]:
    """
    Zoho sends multiselect picklists in 3 shapes:
      1. JSON array string:  '["Series 7", "Series 66"]'
      2. Comma-separated:    "Series 7, Series 66"
      3. Already a list:     ["Series 7", "Series 66"]
    Normalize to a consistent list.
    """
    if isinstance(value, list):
        return value
    if not isinstance(value, str) or not value:
        return []
    if value.startswith("["):
        try:
            return json.loads(value)
        except json.JSONDecodeError:
            pass
    return [v.strip() for v in value.split(",") if v.strip()]


def normalize_datetime(value: Optional[str]) -> Optional[str]:
    """Normalize datetime strings to consistent ISO 8601."""
    if not value:
        return None
    dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
    return dt.isoformat()
Enter fullscreen mode Exit fullscreen mode

I used to scatter this kind of logic through the codebase. Centralizing it made failures legible: when something looks wrong downstream, I know exactly where "truth" is defined.

The freetext location parser: turning human prose into (city, state)

The most satisfying part of this sync engine is also the most human: location parsing.

CRM location fields aren't clean. People type:

  • "Greater Dallas-Fort Worth Area"
  • non-standard state abbreviations
  • city/state blends
  • metro area names that don't map 1:1 to a city

So I wrote a freetext parser that recognizes 50 states, non-standard abbreviations, and 100+ metro area patterns, and normalizes that mess into consistent city and state fields.

import re
from typing import Optional

# Full state names → abbreviations
STATE_NAMES: dict[str, str] = {
    "alabama": "AL", "alaska": "AK", "arizona": "AZ", "arkansas": "AR",
    "california": "CA", "colorado": "CO", "connecticut": "CT", "delaware": "DE",
    "florida": "FL", "georgia": "GA", "hawaii": "HI", "idaho": "ID",
    "illinois": "IL", "indiana": "IN", "iowa": "IA", "kansas": "KS",
    "kentucky": "KY", "louisiana": "LA", "maine": "ME", "maryland": "MD",
    "massachusetts": "MA", "michigan": "MI", "minnesota": "MN",
    "mississippi": "MS", "missouri": "MO", "montana": "MT", "nebraska": "NE",
    "nevada": "NV", "new hampshire": "NH", "new jersey": "NJ",
    "new mexico": "NM", "new york": "NY", "north carolina": "NC",
    "north dakota": "ND", "ohio": "OH", "oklahoma": "OK", "oregon": "OR",
    "pennsylvania": "PA", "rhode island": "RI", "south carolina": "SC",
    "south dakota": "SD", "tennessee": "TN", "texas": "TX", "utah": "UT",
    "vermont": "VT", "virginia": "VA", "washington": "WA",
    "west virginia": "WV", "wisconsin": "WI", "wyoming": "WY",
    "district of columbia": "DC",
}

# Non-standard abbreviations people actually type
INFORMAL_ABBREVS: dict[str, str] = {
    "cal": "CA", "calif": "CA", "colo": "CO", "conn": "CT",
    "fla": "FL", "penn": "PA", "tenn": "TN", "tex": "TX",
    "wash": "WA", "wis": "WI", "mass": "MA", "minn": "MN",
}

# Valid 2-letter state codes
VALID_CODES: set[str] = set(STATE_NAMES.values())

# Metro area patterns → state (order matters: "washington dc" before "washington")
METRO_PATTERNS: list[tuple[str, str]] = [
    ("washington dc", "DC"), ("washington d.c.", "DC"),
    ("san francisco", "CA"), ("bay area", "CA"), ("silicon valley", "CA"),
    ("los angeles", "CA"), ("san diego", "CA"), ("san jose", "CA"),
    ("new york", "NY"), ("manhattan", "NY"), ("brooklyn", "NY"),
    ("chicago", "IL"), ("dallas", "TX"), ("fort worth", "TX"),
    ("houston", "TX"), ("san antonio", "TX"), ("austin", "TX"),
    ("phoenix", "AZ"), ("seattle", "WA"), ("denver", "CO"),
    ("boston", "MA"), ("atlanta", "GA"), ("miami", "FL"),
    ("tampa", "FL"), ("orlando", "FL"), ("minneapolis", "MN"),
    ("detroit", "MI"), ("portland", "OR"), ("las vegas", "NV"),
    ("charlotte", "NC"), ("nashville", "TN"), ("washington", "WA"),
]


def parse_location(text: Optional[str]) -> tuple[Optional[str], Optional[str]]:
    """
    Parse freetext location into (city, state_code).

    Handles:
      "Sioux Falls, South Dakota, United States"  → ("Sioux Falls", "SD")
      "Seattle, WA"                                → ("Seattle", "WA")
      "Greater Chicago Area"                       → ("Chicago", "IL")
      "PHOENIX, ARIZONA 85016"                     → ("Phoenix", "AZ")
      "Austin, Texas Metropolitan Area"            → ("Austin", "TX")
    """
    if not text or not text.strip():
        return None, None

    # Normalize separators
    normalized = text.replace(" - ", ", ")
    parts = [p.strip() for p in normalized.split(",")]

    city: Optional[str] = None
    state: Optional[str] = None

    for part in parts:
        if not part:
            continue

        # Strip ZIP codes and suffixes
        cleaned = re.sub(r"\s*\d{5}(-\d{4})?\s*$", "", part)
        for suffix in (" metropolitan area", " metro area", " area",
                        " united states", " usa", " us"):
            if cleaned.lower().endswith(suffix):
                cleaned = cleaned[: -len(suffix)].strip()

        lower = cleaned.lower().strip(".")

        # Skip noise tokens
        if lower in ("united states", "usa", "us", ""):
            continue

        # Priority 1: full state name
        if lower in STATE_NAMES and state is None:
            state = STATE_NAMES[lower]
            continue

        # Priority 2: informal abbreviation
        if lower in INFORMAL_ABBREVS and state is None:
            state = INFORMAL_ABBREVS[lower]
            continue

        # Priority 3: exact 2-letter code
        upper = cleaned.upper().strip(".")
        code_match = re.match(r"^([A-Z]{2})(?:\s+\d|$)", upper)
        if code_match and code_match.group(1) in VALID_CODES and state is None:
            state = code_match.group(1)
            continue
        if len(upper) == 2 and upper in VALID_CODES and state is None:
            state = upper
            continue

        # Otherwise it's probably the city
        if city is None and "area" not in lower:
            city = cleaned.title()

    # Fallback: metro area pattern matching
    if state is None and text:
        text_lower = text.lower()
        for pattern, st in METRO_PATTERNS:
            if pattern in text_lower:
                state = st
                if city is None:
                    city = pattern.title()
                break

    return city, state
Enter fullscreen mode Exit fullscreen mode

What broke initially was my naïve assumption that "location" is a field. In practice it's a paragraph, and you either respect that or you ship garbage.

What went wrong: Zoho's "truth" wasn't stable

Three specific production discoveries changed how I build CRM integrations:

  • Custom views ignoring fields meant I couldn't trust the response payload.
  • Numeric types drifting between float and int meant I couldn't trust types.
  • A field existing conceptually but not actually existing (like Full_Name not being a real field on the Contacts module) meant I couldn't trust module parity.

None of these show up in an SDK guide. They show up when you run the sync long enough for reality to express itself.

Nuances and tradeoffs

A sync engine like this is a set of deliberate tradeoffs:

  • Two-step fetch costs extra calls, but buys deterministic payloads.
  • Narrow column mapping limits breadth, but prevents schema drift from corrupting downstream systems.
  • Date-chunked pagination adds complexity, but avoids silent truncation under record caps.
  • SHA‑256 incremental sync adds compute, but avoids reprocessing and reduces the blast radius of upstream noise.
  • Freetext parsing is maintenance work, but it turns human-entered prose into structured data that the rest of the platform can actually use.

The pattern underneath all of it is the same: I'd rather pay complexity once in code than pay confusion forever in operations.

Closing

The CRM didn't hand me a clean API contract; it handed me a set of behaviors I had to discover the hard way. The sync engine only became reliable once I stopped asking it to "pull records" and started forcing it to prove every field, every type, every page, and every change—because in production, correctness is something you enforce, not something you assume. No framework gave me that. No SDK anticipated it. The engineer who sat with the failing sync long enough to understand why it failed—that's what made the system reliable.


🎧 Listen to the Enterprise AI Architecture audiobook
📖 Read the full 13-part series with an AI assistant

Top comments (0)