DEV Community

Cover image for Fuzzy-match millions of rows in Databricks (2026)
Siyana Hristova
Siyana Hristova

Posted on • Edited on • Originally published at similarity-api.com

Fuzzy-match millions of rows in Databricks (2026)

When you fuzzy-match 10 million rows, you aren't "just comparing strings." A naïve dedupe implies roughly n(n−1)/2 ≈ 5×10¹³ potential pairs. At this scale, approaches that feel "quick" on small tables start to break.

In Databricks, most teams reach for one of three options:

  • Spark-native candidate generation (LSH/MinHash)
    Fast to start, but you end up tuning a tradeoff between missed matches and huge candidate sets.

  • Entity-resolution frameworks
    Powerful, but often heavier than you want for "dedupe this column."

  • Custom Python scoring (UDFs / pandas UDFs)
    Easy to prototype, but at large scale jobs become dominated by Python overhead, skew, and shuffles.

A practical approach is to let Databricks handle what it's best at (data access, ETL, governance) and offload the actual matching step to a service built specifically for high-scale deduplication.

In this tutorial, we'll do that using Similarity API — an async "job" style matching service where you:

  • upload a dataset once (CSV or Parquet)
  • start a job
  • poll status
  • then download results (as Parquet or CSV)

This doesn't eliminate all cost — you still export data and ingest results — but it avoids the most fragile part: doing pairwise matching inside Spark.

Why use Similarity API for the matching step?

  • Avoid Spark-side pairwise matching: no cartesian joins or UDF-based scoring at scale.
  • Normalization options built in: punctuation removal, lowercasing, token sorting, and a company_names preset that strips common business suffixes (Inc/LLC/Ltd/etc.).
  • Deterministic output artifact: the service returns a file you can land back into Delta (e.g., per-row annotations, membership maps, or match pairs).
  • Proven at 1M+ scale: see the benchmark run (1M rows in ~7 minutes) and comparisons vs common fuzzy matching approaches.

The workflow: Databricks ↔ Similarity API

Prerequisites:

  • Network egress: This workflow assumes your Databricks compute can make outbound HTTPS requests to the Similarity API hostname. In many enterprise and some serverless setups, outbound internet/DNS is restricted by policy—if so, you'll need an admin to allow outbound access (or allowlist the API domain) for the notebook to reach the service.

  • Access (pricing + token): Similarity API is a paid service. To run this notebook you’ll need an API token—create an account to get one (there’s typically a free trial/credits for testing), then store the token in Databricks Secrets as API_TOKEN

Important detail about Similarity API's current output contract: for "row annotations," the result includes a row_id that is the 0..n-1 positional index of the uploaded file. To join results back to your source table, we'll create an explicit index in Databricks and persist an idx → primary_key mapping.

Step 1 — Build a stable index and export a single-column parquet

We create:

  • idx: 0..n-1
  • pk: your real primary key
  • value: the string column you want to dedupe

Then we write:

  • an index map table to Delta (idx, pk)
  • a single-column Parquet containing only value (in the same row order) for upload
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import time, os

# Config
SOURCE_TABLE  = "main.crm.customers"
PK_COLUMN     = "customer_id"    # change to your true PK
STRING_COLUMN = "company_name"   # change to your string column

# Use DBFS scheme for Spark paths
TMP_DIR_DBFS     = f"dbfs:/tmp/similarity_api/{int(time.time())}"
PARQUET_DIR_DBFS = f"{TMP_DIR_DBFS}/input_parquet"

base = (
    spark.table(SOURCE_TABLE)
    .select(
        F.col(PK_COLUMN).cast("string").alias("pk"),
        F.col(STRING_COLUMN).cast("string").alias("value"),
    )
    .where(F.col("value").isNotNull() & (F.length(F.trim(F.col("value"))) > 0))
)

# Create a deterministic 0..n-1 index by ordering on pk.
w = Window.orderBy(F.col("pk"))
indexed = base.withColumn("idx", (F.row_number().over(w) - 1).cast("long"))

# Persist idx → pk mapping for join-back
indexed.select("idx", "pk").write.mode("overwrite") \
    .format("delta").saveAsTable("main.crm.similarity_idx_map")

# Export ONLY the value column, in the same row order.
# Use coalesce(1) (not repartition(1)) to avoid a full shuffle.
indexed.select("value").coalesce(1).write.mode("overwrite") \
    .parquet(PARQUET_DIR_DBFS)

print("Wrote parquet to:", PARQUET_DIR_DBFS)
Enter fullscreen mode Exit fullscreen mode

Note: coalesce(1) makes Spark write a single part-*.parquet data file (plus a few small metadata files). Similarity API currently returns one signed upload URL for one object, so this "single part file" approach is the simplest way to upload. For very large datasets, you'll eventually want multi-part ingestion (multiple files) or storage-native ingestion — this version is intentionally "works now."

Step 2 — Create a Similarity API job and upload the Parquet file

import glob, requests

API_URL = "https://api.similarity-api.com"
TOKEN   = dbutils.secrets.get(scope="similarity", key="API_TOKEN")
headers = {"Authorization": f"Bearer {TOKEN}"}

payload = {
    "config": {
        "input_format":         "parquet",
        "similarity_threshold": 0.85,
        "use_case":             "company_names",
        "output_format":        "row_annotations",
        "output_file_format":   "parquet",
        "top_k":                50
        # If you upload a multi-column parquet later, add:
        # "input_column": "value"
    }
}

# Convert Spark path -> local driver path for Python file access
PARQUET_DIR_LOCAL = PARQUET_DIR_DBFS.replace("dbfs:", "/dbfs")
part_file = glob.glob(f"{PARQUET_DIR_LOCAL}/part-*.parquet")[0]

# 1) Create job (NEW PATH)
resp = requests.post(
    f"{API_URL}/dedupe/jobs",
    headers=headers,
    json=payload,
    timeout=120
)
resp.raise_for_status()
data = resp.json()
job_id     = data["job_id"]
upload_url = data["upload_url"]
print("job_id:", job_id)

# 2) Upload file bytes to signed URL
with open(part_file, "rb") as f:
    r = requests.put(
        upload_url,
        data=f,
        headers={"Content-Type": "application/octet-stream"},
        timeout=3600
    )
    r.raise_for_status()

# 3) Commit (starts async run) (NEW PATH)
r = requests.post(
    f"{API_URL}/dedupe/jobs/{job_id}/commit",
    headers=headers,
    timeout=120
)
r.raise_for_status()
print("Committed. rows_total:", r.json().get("rows_total"))
Enter fullscreen mode Exit fullscreen mode

Step 3 — Poll, download results, and land back into Delta

Similarity API returns a signed result_url (HTTPS). Spark typically won't read HTTPS URLs directly as Parquet, so we download to DBFS first and then load with spark.read.parquet.

import time, requests, os

def wait_for_results(job_id: str) -> str:
    while True:
        resp = requests.get(
            f"{API_URL}/dedupe/jobs/{job_id}",   # NEW PATH
            headers=headers,
            timeout=120
        )
        resp.raise_for_status()
        res = resp.json()
        print(f"Stage: {res.get('stage')} ({res.get('progress')}%) | Status: {res.get('job_status')}")
        if res.get("job_status") == "completed":
            if "result_url" not in res:
                raise RuntimeError("Job completed but no result_url returned.")
            return res["result_url"]
        if res.get("job_status") == "failed":
            raise RuntimeError(f"Job failed: {res.get('error')}")
        time.sleep(10)

result_url = wait_for_results(job_id)

# Save results to DBFS (driver local path for Python)
OUT_DIR_DBFS  = f"{TMP_DIR_DBFS}/results"
OUT_DIR_LOCAL = OUT_DIR_DBFS.replace("dbfs:", "/dbfs")
os.makedirs(OUT_DIR_LOCAL, exist_ok=True)
local_path = f"{OUT_DIR_LOCAL}/result.parquet"

with requests.get(result_url, stream=True, timeout=3600) as r:
    r.raise_for_status()
    with open(local_path, "wb") as f:
        for chunk in r.iter_content(chunk_size=8 * 1024 * 1024):
            if chunk:
                f.write(chunk)

# Spark reads from dbfs:/...
results_df = spark.read.parquet(f"{OUT_DIR_DBFS}/result.parquet")
results_df.write.mode("overwrite") \
    .format("delta").saveAsTable("main.crm.similarity_results")

# Join back to your original pk using the idx map
idx_map = spark.table("main.crm.similarity_idx_map")  # idx, pk
joined = results_df.join(
    idx_map, results_df["row_id"] == idx_map["idx"], "left"
)
joined.write.mode("overwrite") \
    .format("delta").saveAsTable("main.crm.customers_dedupe_annotations")

print("Wrote Delta tables: main.crm.similarity_results, main.crm.customers_dedupe_annotations")
Enter fullscreen mode Exit fullscreen mode

At this point, you've got a Delta table keyed by your original pk with whatever annotations Similarity API returned (representatives, membership, similarity scores, etc.). You can inspect schema with:

spark.table("main.crm.customers_dedupe_annotations").printSchema()
Enter fullscreen mode Exit fullscreen mode

Security note: Similarity API processes uploaded data only to compute the requested matching results. Customer data is not sold, shared, or used for advertising or model training. To minimize exposure, this workflow exports only the single string column required for deduplication.

Conclusion

At 10M rows, the bottleneck isn't string similarity — it's building a reliable end-to-end workflow that doesn't devolve into Spark shuffles, UDF overhead, and constant tuning.

By letting Databricks handle data access and governance, and offloading the matching step to Similarity API, you get a workflow that's reproducible, configurable, and doesn't require maintaining custom matching infrastructure.

Top comments (0)