DEV Community

Chris Widmer
Chris Widmer

Posted on

The Connector Graveyard: What Multi-Model Pipeline Code Actually Looks Like.

Every ML team building multi-model pipelines has a graveyard.

It is not a literal place. It is a directory called connectors/ or adapters/ or maybe just utils/ with a collection of files named things like ner_to_scorer_bridge.py and v2_classifier_output_transform.py and, if the team has been around long enough, legacy_extractor_compat_DO_NOT_DELETE.py.

Each file solves exactly one problem for exactly one model pair. Each one was written by someone who has since moved on. Each one is a small, undocumented piece of institutional knowledge that the pipeline cannot run without.

This post is about why that graveyard exists, what it actually costs, and what eliminating it looks like in practice.


A concrete pipeline

Let's use a legal document processing pipeline. It is representative of what teams actually build, and the schema incompatibilities are real.

The pipeline needs to do three things in sequence:

  1. Extract named entities (parties, dates, amounts) from an incoming contract
  2. Classify the extracted entities by obligation type
  3. Score the obligations against a compliance policy

Three models. Three teams. Three completely different schemas.

Here is what those schemas actually look like.


Model 1: dslim/bert-base-NER (the extractor)

Input:

{
    "input_text": "The licensor grants a worldwide license to Acme Corp."
}
Enter fullscreen mode Exit fullscreen mode

Output:

[
    {"entity": "B-ORG", "score": 0.9996, "index": 11, "word": "A", "start": 43, "end": 44},
    {"entity": "I-ORG", "score": 0.9449, "index": 12, "word": "##c", "start": 44, "end": 45},
    {"entity": "I-ORG", "score": 0.9990, "index": 13, "word": "##me", "start": 45, "end": 47},
    {"entity": "I-ORG", "score": 0.9994, "index": 14, "word": "Corp", "start": 48, "end": 52}
]
Enter fullscreen mode Exit fullscreen mode

Notice that "Acme Corp" comes back as four wordpiece tokens, not two words. The model uses BERT tokenization. The connector code has to reassemble them.


Model 2: a fine-tuned obligation classifier

Input:

{
    "text": "Acme Corp",
    "entity_type": "ORG",
    "context_window": "grants a worldwide license to Acme Corp",
    "threshold": 0.7
}
Enter fullscreen mode Exit fullscreen mode

Output:

{
    "obligation_type": "LICENSEE",
    "confidence": 0.94,
    "metadata": {
        "model_version": "2.1.0",
        "inference_time_ms": 43
    }
}
Enter fullscreen mode Exit fullscreen mode

Model 3: a compliance scorer

Input:

{
    "obligations": [
        {
            "party": "Acme Corp",
            "role": "LICENSEE",
            "prior_confidence": 0.94
        }
    ],
    "policy_id": "gdpr-commercial-v3",
    "strict_mode": true
}
Enter fullscreen mode Exit fullscreen mode

Output:

{
    "score": 0.87,
    "flags": [],
    "passed": true
}
Enter fullscreen mode Exit fullscreen mode

Three models. Three completely different input shapes. Three completely different output shapes. And none of this is documented anywhere except in the connector code itself.


What the connector code actually looks like

Here is the connector between Model 1 and Model 2. This is realistic code. You have probably written something like it.

def transform_ner_output_to_classifier_input(
    ner_output: list,
    original_text: str,
    confidence_threshold: float = 0.7
) -> list:
    """
    Transform NER output into classifier input format.

    WARNING: NER uses wordpiece tokenization. Must reassemble tokens.
    "Acme Corp" comes back as ["A", "##c", "##me", "Corp"] - see bug 847

    WARNING: Only pass entities above threshold. Classifier chokes on
    low-confidence entities - see incident 2024-11-03
    """
    entities = []
    current_entity = None
    current_tokens = []
    current_start = None
    current_scores = []

    for token in ner_output:
        tag = token["entity"]
        word = token["word"]
        score = token["score"]

        if tag.startswith("B-"):
            if current_entity is not None:
                avg_score = sum(current_scores) / len(current_scores)
                if avg_score >= confidence_threshold:
                    assembled = "".join(
                        t.replace("##", "") for t in current_tokens
                    )
                    start_idx = max(0, current_start - 30)
                    end_idx = min(len(original_text), token["start"] + 30)
                    context = original_text[start_idx:end_idx]

                    entities.append({
                        "text": assembled,
                        "entity_type": current_entity,
                        "context_window": context,
                        "threshold": confidence_threshold
                    })

            current_entity = tag[2:]
            current_tokens = [word]
            current_start = token.get("start", 0)
            current_scores = [score]

        elif tag.startswith("I-") and current_entity is not None:
            current_tokens.append(word)
            current_scores.append(score)
        else:
            current_entity = None
            current_tokens = []
            current_scores = []

    if current_entity is not None:
        avg_score = sum(current_scores) / len(current_scores)
        if avg_score >= confidence_threshold:
            assembled = "".join(t.replace("##", "") for t in current_tokens)
            entities.append({
                "text": assembled,
                "entity_type": current_entity,
                "context_window": original_text[-60:],
                "threshold": confidence_threshold
            })

    return entities
Enter fullscreen mode Exit fullscreen mode

That is 60 lines of connector code to move data between two models. It handles wordpiece reassembly, confidence filtering, context window extraction, and two separate bugs that were discovered in production after the pipeline went live.

Now the connector between Model 2 and Model 3:

def transform_classifier_outputs_to_scorer_input(
    classifier_outputs: list,
    policy_id: str,
    strict_mode: bool = True
) -> dict:
    """
    Aggregate classifier results into scorer input format.

    NOTE: scorer expects "role" not "obligation_type" - same concept, different name
    NOTE: scorer expects "party" not "text" - same concept, different name
    NOTE: prior_confidence is in metadata.confidence in v2.1+, top-level in v2.0

    If classifier returns no obligations above 0.8, scorer returns 422.
    Handle that in the caller. See bug 1089.
    """
    obligations = []

    for item in classifier_outputs:
        if isinstance(item, list):
            result = item[0] if item else None
        elif isinstance(item, dict):
            result = item
        else:
            continue

        if result is None:
            continue

        role = result.get("obligation_type") or result.get("role")
        party = result.get("text") or result.get("party")

        metadata = result.get("metadata", {})
        confidence = metadata.get("confidence") or result.get("confidence", 0.0)

        if role and party:
            obligations.append({
                "party": party,
                "role": role,
                "prior_confidence": confidence
            })

    return {
        "obligations": obligations,
        "policy_id": policy_id,
        "strict_mode": strict_mode
    }
Enter fullscreen mode Exit fullscreen mode

Another 55 lines. Comments explaining that obligation_type and role are the same concept with different names in different schemas. Comments explaining that text and party are the same concept. A note about a 422 error that was discovered in production.

Two connectors. 115 lines of fragile, poorly documented, bug-riddled translation code. And this is a simple three-model pipeline.


What happens when a model updates

In November 2024, the team running the obligation classifier updated it from v2.0 to v2.1. The change was minor: they moved the confidence score from the top level of the response into a metadata object for consistency with their other models.

Before:

{"obligation_type": "LICENSEE", "confidence": 0.94}
Enter fullscreen mode Exit fullscreen mode

After:

{"obligation_type": "LICENSEE", "metadata": {"confidence": 0.94, "model_version": "2.1.0"}}
Enter fullscreen mode Exit fullscreen mode

The connector code was not updated. The prior_confidence field in the scorer input silently defaulted to 0.0 for every obligation. The scorer, receiving obligations with zero confidence, began flagging everything as high risk.

The pipeline did not crash. No exception was raised. The output was wrong for three days before someone noticed the compliance scores had changed. The root cause was a single line in a connector file.

This is not a hypothetical failure mode. It is the most common failure mode in multi-model pipelines. Silent schema drift.


The same pipeline with SYNAPSE adapters

Now look at what the same pipeline looks like with SYNAPSE. Each model has two functions. No connectors. No field name translation logic. No wordpiece reassembly code buried in a utility file.

Adapter for dslim/bert-base-NER:

from synapse_sdk import AdapterBase, CanonicalIR
from typing import Any

class NERAdapter(AdapterBase):
    MODEL_ID = "dslim/bert-base-NER"
    ADAPTER_VERSION = "1.0.0"

    def ingress(self, ir: CanonicalIR) -> dict[str, Any]:
        return {"input_text": ir.payload.content}

    def egress(self, output: list, original_ir: CanonicalIR, latency_ms: int) -> CanonicalIR:
        updated = original_ir.copy()

        entities = []
        current_word = []
        current_label = None
        current_scores = []

        for token in output:
            tag, word, score = token["entity"], token["word"], token["score"]
            if tag.startswith("B-"):
                if current_label:
                    entities.append({
                        "text": "".join(current_word).replace("##", ""),
                        "label": current_label,
                        "confidence": sum(current_scores) / len(current_scores)
                    })
                current_label = tag[2:]
                current_word = [word]
                current_scores = [score]
            elif tag.startswith("I-"):
                current_word.append(word)
                current_scores.append(score)

        if current_label:
            entities.append({
                "text": "".join(current_word).replace("##", ""),
                "label": current_label,
                "confidence": sum(current_scores) / len(current_scores)
            })

        updated.payload.entities = entities
        updated.provenance.append(self.build_provenance(
            confidence=max(e["confidence"] for e in entities) if entities else 0.0,
            latency_ms=latency_ms,
        ))
        return updated
Enter fullscreen mode Exit fullscreen mode

Adapter for the obligation classifier:

class ClassifierAdapter(AdapterBase):
    MODEL_ID = "internal/obligation-classifier-v2"
    ADAPTER_VERSION = "1.0.0"

    def ingress(self, ir: CanonicalIR) -> list[dict]:
        return [
            {
                "text": e["text"],
                "entity_type": e["label"],
                "context_window": ir.payload.content[:60],
                "threshold": ir.task_header.quality_floor or 0.7,
            }
            for e in (ir.payload.entities or [])
        ]

    def egress(self, output: list, original_ir: CanonicalIR, latency_ms: int) -> CanonicalIR:
        updated = original_ir.copy()
        updated.payload.data = {"classified_entities": output}
        updated.provenance.append(self.build_provenance(
            confidence=output[0]["confidence"] if output else 0.0,
            latency_ms=latency_ms,
        ))
        return updated
Enter fullscreen mode Exit fullscreen mode

Adapter for the compliance scorer:

class ScorerAdapter(AdapterBase):
    MODEL_ID = "internal/compliance-scorer-v3"
    ADAPTER_VERSION = "1.0.0"

    def ingress(self, ir: CanonicalIR) -> dict:
        entities = ir.payload.data.get("classified_entities", [])
        return {
            "obligations": [
                {
                    "party": e["text"],
                    "role": e["obligation_type"],
                    "prior_confidence": e["confidence"],
                }
                for e in entities
            ],
            "policy_id": "gdpr-commercial-v3",
            "strict_mode": True,
        }

    def egress(self, output: dict, original_ir: CanonicalIR, latency_ms: int) -> CanonicalIR:
        updated = original_ir.copy()
        updated.payload.data = {"compliance_result": output}
        updated.provenance.append(self.build_provenance(
            confidence=output.get("score", 0.0),
            latency_ms=latency_ms,
        ))
        return updated
Enter fullscreen mode Exit fullscreen mode

What changed

Connector approach SYNAPSE adapter approach
Lines of translation code 115 lines across 2 connector files 0 lines of translation code
Field name mapping Scattered across connectors Each adapter owns its own mapping
Wordpiece reassembly In a shared utility, one bug found in prod In the NER adapter egress, tested against 20 standard fixtures
Schema update impact Silent failure, 3-day incident Adapter version bumped, validator catches it before deployment
New model added New connector file for each existing model One new adapter file, two functions

When the classifier team updated from v2.0 to v2.1 and moved confidence into the metadata object, the ClassifierAdapter egress function handles that translation once. The canonical IR payload always contains confidence as a normalized field. The ScorerAdapter ingress reads from the canonical IR, not from the classifier's native schema. The schema change in the classifier does not propagate to the scorer because the canonical IR absorbs it.

That is the difference. Not a different architecture. Not a different approach to AI pipelines. Just a shared schema with strict contracts around it, so that schema changes in one model are isolated at the adapter boundary rather than cascading through connector code.


The economics

One well-maintained adapter per model. Written once. Updated when the model's schema changes. Tested against 20 standard fixtures before deployment. Validated against 13 conformance rules.

If you have 5 specialist models in production, you have 5 adapters. Adding a sixth model costs you one adapter, not five new connectors.

If you have 10 models, you have 10 adapters. Not 45 connectors.

The connectors/ directory stays empty. The graveyard does not grow.


If you want to try writing an adapter for a model you are already using, the SDK is at github.com/synapse-ir/adapter-sdk and the specification is at github.com/synapse-ir/spec.

pip install synapse-adapter-sdk
synapse-validate --adapter my_module.MyAdapter --all-fixtures
Enter fullscreen mode Exit fullscreen mode

The BOUNTIES.md lists models where the first adapter earns a Founding Contributor listing.

This is post 2 in the Building SYNAPSE series. Post 1 covered what MCP solves and what it deliberately does not.

Top comments (0)