Every ML team building multi-model pipelines has a graveyard.
It is not a literal place. It is a directory called connectors/ or adapters/ or maybe just utils/ with a collection of files named things like ner_to_scorer_bridge.py and v2_classifier_output_transform.py and, if the team has been around long enough, legacy_extractor_compat_DO_NOT_DELETE.py.
Each file solves exactly one problem for exactly one model pair. Each one was written by someone who has since moved on. Each one is a small, undocumented piece of institutional knowledge that the pipeline cannot run without.
This post is about why that graveyard exists, what it actually costs, and what eliminating it looks like in practice.
A concrete pipeline
Let's use a legal document processing pipeline. It is representative of what teams actually build, and the schema incompatibilities are real.
The pipeline needs to do three things in sequence:
- Extract named entities (parties, dates, amounts) from an incoming contract
- Classify the extracted entities by obligation type
- Score the obligations against a compliance policy
Three models. Three teams. Three completely different schemas.
Here is what those schemas actually look like.
Model 1: dslim/bert-base-NER (the extractor)
Input:
{
"input_text": "The licensor grants a worldwide license to Acme Corp."
}
Output:
[
{"entity": "B-ORG", "score": 0.9996, "index": 11, "word": "A", "start": 43, "end": 44},
{"entity": "I-ORG", "score": 0.9449, "index": 12, "word": "##c", "start": 44, "end": 45},
{"entity": "I-ORG", "score": 0.9990, "index": 13, "word": "##me", "start": 45, "end": 47},
{"entity": "I-ORG", "score": 0.9994, "index": 14, "word": "Corp", "start": 48, "end": 52}
]
Notice that "Acme Corp" comes back as four wordpiece tokens, not two words. The model uses BERT tokenization. The connector code has to reassemble them.
Model 2: a fine-tuned obligation classifier
Input:
{
"text": "Acme Corp",
"entity_type": "ORG",
"context_window": "grants a worldwide license to Acme Corp",
"threshold": 0.7
}
Output:
{
"obligation_type": "LICENSEE",
"confidence": 0.94,
"metadata": {
"model_version": "2.1.0",
"inference_time_ms": 43
}
}
Model 3: a compliance scorer
Input:
{
"obligations": [
{
"party": "Acme Corp",
"role": "LICENSEE",
"prior_confidence": 0.94
}
],
"policy_id": "gdpr-commercial-v3",
"strict_mode": true
}
Output:
{
"score": 0.87,
"flags": [],
"passed": true
}
Three models. Three completely different input shapes. Three completely different output shapes. And none of this is documented anywhere except in the connector code itself.
What the connector code actually looks like
Here is the connector between Model 1 and Model 2. This is realistic code. You have probably written something like it.
def transform_ner_output_to_classifier_input(
ner_output: list,
original_text: str,
confidence_threshold: float = 0.7
) -> list:
"""
Transform NER output into classifier input format.
WARNING: NER uses wordpiece tokenization. Must reassemble tokens.
"Acme Corp" comes back as ["A", "##c", "##me", "Corp"] - see bug 847
WARNING: Only pass entities above threshold. Classifier chokes on
low-confidence entities - see incident 2024-11-03
"""
entities = []
current_entity = None
current_tokens = []
current_start = None
current_scores = []
for token in ner_output:
tag = token["entity"]
word = token["word"]
score = token["score"]
if tag.startswith("B-"):
if current_entity is not None:
avg_score = sum(current_scores) / len(current_scores)
if avg_score >= confidence_threshold:
assembled = "".join(
t.replace("##", "") for t in current_tokens
)
start_idx = max(0, current_start - 30)
end_idx = min(len(original_text), token["start"] + 30)
context = original_text[start_idx:end_idx]
entities.append({
"text": assembled,
"entity_type": current_entity,
"context_window": context,
"threshold": confidence_threshold
})
current_entity = tag[2:]
current_tokens = [word]
current_start = token.get("start", 0)
current_scores = [score]
elif tag.startswith("I-") and current_entity is not None:
current_tokens.append(word)
current_scores.append(score)
else:
current_entity = None
current_tokens = []
current_scores = []
if current_entity is not None:
avg_score = sum(current_scores) / len(current_scores)
if avg_score >= confidence_threshold:
assembled = "".join(t.replace("##", "") for t in current_tokens)
entities.append({
"text": assembled,
"entity_type": current_entity,
"context_window": original_text[-60:],
"threshold": confidence_threshold
})
return entities
That is 60 lines of connector code to move data between two models. It handles wordpiece reassembly, confidence filtering, context window extraction, and two separate bugs that were discovered in production after the pipeline went live.
Now the connector between Model 2 and Model 3:
def transform_classifier_outputs_to_scorer_input(
classifier_outputs: list,
policy_id: str,
strict_mode: bool = True
) -> dict:
"""
Aggregate classifier results into scorer input format.
NOTE: scorer expects "role" not "obligation_type" - same concept, different name
NOTE: scorer expects "party" not "text" - same concept, different name
NOTE: prior_confidence is in metadata.confidence in v2.1+, top-level in v2.0
If classifier returns no obligations above 0.8, scorer returns 422.
Handle that in the caller. See bug 1089.
"""
obligations = []
for item in classifier_outputs:
if isinstance(item, list):
result = item[0] if item else None
elif isinstance(item, dict):
result = item
else:
continue
if result is None:
continue
role = result.get("obligation_type") or result.get("role")
party = result.get("text") or result.get("party")
metadata = result.get("metadata", {})
confidence = metadata.get("confidence") or result.get("confidence", 0.0)
if role and party:
obligations.append({
"party": party,
"role": role,
"prior_confidence": confidence
})
return {
"obligations": obligations,
"policy_id": policy_id,
"strict_mode": strict_mode
}
Another 55 lines. Comments explaining that obligation_type and role are the same concept with different names in different schemas. Comments explaining that text and party are the same concept. A note about a 422 error that was discovered in production.
Two connectors. 115 lines of fragile, poorly documented, bug-riddled translation code. And this is a simple three-model pipeline.
What happens when a model updates
In November 2024, the team running the obligation classifier updated it from v2.0 to v2.1. The change was minor: they moved the confidence score from the top level of the response into a metadata object for consistency with their other models.
Before:
{"obligation_type": "LICENSEE", "confidence": 0.94}
After:
{"obligation_type": "LICENSEE", "metadata": {"confidence": 0.94, "model_version": "2.1.0"}}
The connector code was not updated. The prior_confidence field in the scorer input silently defaulted to 0.0 for every obligation. The scorer, receiving obligations with zero confidence, began flagging everything as high risk.
The pipeline did not crash. No exception was raised. The output was wrong for three days before someone noticed the compliance scores had changed. The root cause was a single line in a connector file.
This is not a hypothetical failure mode. It is the most common failure mode in multi-model pipelines. Silent schema drift.
The same pipeline with SYNAPSE adapters
Now look at what the same pipeline looks like with SYNAPSE. Each model has two functions. No connectors. No field name translation logic. No wordpiece reassembly code buried in a utility file.
Adapter for dslim/bert-base-NER:
from synapse_sdk import AdapterBase, CanonicalIR
from typing import Any
class NERAdapter(AdapterBase):
MODEL_ID = "dslim/bert-base-NER"
ADAPTER_VERSION = "1.0.0"
def ingress(self, ir: CanonicalIR) -> dict[str, Any]:
return {"input_text": ir.payload.content}
def egress(self, output: list, original_ir: CanonicalIR, latency_ms: int) -> CanonicalIR:
updated = original_ir.copy()
entities = []
current_word = []
current_label = None
current_scores = []
for token in output:
tag, word, score = token["entity"], token["word"], token["score"]
if tag.startswith("B-"):
if current_label:
entities.append({
"text": "".join(current_word).replace("##", ""),
"label": current_label,
"confidence": sum(current_scores) / len(current_scores)
})
current_label = tag[2:]
current_word = [word]
current_scores = [score]
elif tag.startswith("I-"):
current_word.append(word)
current_scores.append(score)
if current_label:
entities.append({
"text": "".join(current_word).replace("##", ""),
"label": current_label,
"confidence": sum(current_scores) / len(current_scores)
})
updated.payload.entities = entities
updated.provenance.append(self.build_provenance(
confidence=max(e["confidence"] for e in entities) if entities else 0.0,
latency_ms=latency_ms,
))
return updated
Adapter for the obligation classifier:
class ClassifierAdapter(AdapterBase):
MODEL_ID = "internal/obligation-classifier-v2"
ADAPTER_VERSION = "1.0.0"
def ingress(self, ir: CanonicalIR) -> list[dict]:
return [
{
"text": e["text"],
"entity_type": e["label"],
"context_window": ir.payload.content[:60],
"threshold": ir.task_header.quality_floor or 0.7,
}
for e in (ir.payload.entities or [])
]
def egress(self, output: list, original_ir: CanonicalIR, latency_ms: int) -> CanonicalIR:
updated = original_ir.copy()
updated.payload.data = {"classified_entities": output}
updated.provenance.append(self.build_provenance(
confidence=output[0]["confidence"] if output else 0.0,
latency_ms=latency_ms,
))
return updated
Adapter for the compliance scorer:
class ScorerAdapter(AdapterBase):
MODEL_ID = "internal/compliance-scorer-v3"
ADAPTER_VERSION = "1.0.0"
def ingress(self, ir: CanonicalIR) -> dict:
entities = ir.payload.data.get("classified_entities", [])
return {
"obligations": [
{
"party": e["text"],
"role": e["obligation_type"],
"prior_confidence": e["confidence"],
}
for e in entities
],
"policy_id": "gdpr-commercial-v3",
"strict_mode": True,
}
def egress(self, output: dict, original_ir: CanonicalIR, latency_ms: int) -> CanonicalIR:
updated = original_ir.copy()
updated.payload.data = {"compliance_result": output}
updated.provenance.append(self.build_provenance(
confidence=output.get("score", 0.0),
latency_ms=latency_ms,
))
return updated
What changed
| Connector approach | SYNAPSE adapter approach | |
|---|---|---|
| Lines of translation code | 115 lines across 2 connector files | 0 lines of translation code |
| Field name mapping | Scattered across connectors | Each adapter owns its own mapping |
| Wordpiece reassembly | In a shared utility, one bug found in prod | In the NER adapter egress, tested against 20 standard fixtures |
| Schema update impact | Silent failure, 3-day incident | Adapter version bumped, validator catches it before deployment |
| New model added | New connector file for each existing model | One new adapter file, two functions |
When the classifier team updated from v2.0 to v2.1 and moved confidence into the metadata object, the ClassifierAdapter egress function handles that translation once. The canonical IR payload always contains confidence as a normalized field. The ScorerAdapter ingress reads from the canonical IR, not from the classifier's native schema. The schema change in the classifier does not propagate to the scorer because the canonical IR absorbs it.
That is the difference. Not a different architecture. Not a different approach to AI pipelines. Just a shared schema with strict contracts around it, so that schema changes in one model are isolated at the adapter boundary rather than cascading through connector code.
The economics
One well-maintained adapter per model. Written once. Updated when the model's schema changes. Tested against 20 standard fixtures before deployment. Validated against 13 conformance rules.
If you have 5 specialist models in production, you have 5 adapters. Adding a sixth model costs you one adapter, not five new connectors.
If you have 10 models, you have 10 adapters. Not 45 connectors.
The connectors/ directory stays empty. The graveyard does not grow.
If you want to try writing an adapter for a model you are already using, the SDK is at github.com/synapse-ir/adapter-sdk and the specification is at github.com/synapse-ir/spec.
pip install synapse-adapter-sdk
synapse-validate --adapter my_module.MyAdapter --all-fixtures
The BOUNTIES.md lists models where the first adapter earns a Founding Contributor listing.
This is post 2 in the Building SYNAPSE series. Post 1 covered what MCP solves and what it deliberately does not.
Top comments (0)