When you fuzzy-match 10 million rows, you aren't "just comparing strings." A naïve dedupe implies roughly n(n−1)/2 ≈ 5×10¹³ potential pairs. At this scale, approaches that feel "quick" on small tables start to break.
In Databricks, most teams reach for one of three options:
Spark-native candidate generation (LSH/MinHash)
Fast to start, but you end up tuning a tradeoff between missed matches and huge candidate sets.Entity-resolution frameworks
Powerful, but often heavier than you want for "dedupe this column."Custom Python scoring (UDFs / pandas UDFs)
Easy to prototype, but at large scale jobs become dominated by Python overhead, skew, and shuffles.
A practical approach is to let Databricks handle what it's best at (data access, ETL, governance) and offload the actual matching step to a service built specifically for high-scale deduplication.
In this tutorial, we'll do that using Similarity API — an async "job" style matching service where you:
- upload a dataset once (CSV or Parquet)
- start a job
- poll status
- then download results (as Parquet or CSV)
This doesn't eliminate all cost — you still export data and ingest results — but it avoids the most fragile part: doing pairwise matching inside Spark.
Why use Similarity API for the matching step?
- Avoid Spark-side pairwise matching: no cartesian joins or UDF-based scoring at scale.
- Normalization options built in: punctuation removal, lowercasing, token sorting, and a company_names preset that strips common business suffixes (Inc/LLC/Ltd/etc.).
- Deterministic output artifact: the service returns a file you can land back into Delta (e.g., per-row annotations, membership maps, or match pairs).
- Proven at 1M+ scale: see the benchmark run (1M rows in ~7 minutes) and comparisons vs common fuzzy matching approaches.
The workflow: Databricks ↔ Similarity API
Prerequisites:
Network egress: This workflow assumes your Databricks compute can make outbound HTTPS requests to the Similarity API hostname. In many enterprise and some serverless setups, outbound internet/DNS is restricted by policy—if so, you'll need an admin to allow outbound access (or allowlist the API domain) for the notebook to reach the service.
Access (pricing + token): Similarity API is a paid service. To run this notebook you’ll need an API token—create an account to get one (there’s typically a free trial/credits for testing), then store the token in Databricks Secrets as
API_TOKEN
Important detail about Similarity API's current output contract: for "row annotations," the result includes a row_id that is the 0..n-1 positional index of the uploaded file. To join results back to your source table, we'll create an explicit index in Databricks and persist an idx → primary_key mapping.
Step 1 — Build a stable index and export a single-column parquet
We create:
-
idx: 0..n-1 -
pk: your real primary key -
value: the string column you want to dedupe
Then we write:
- an index map table to Delta (
idx,pk) - a single-column Parquet containing only
value(in the same row order) for upload
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import time, os
# Config
SOURCE_TABLE = "main.crm.customers"
PK_COLUMN = "customer_id" # change to your true PK
STRING_COLUMN = "company_name" # change to your string column
# Use DBFS scheme for Spark paths
TMP_DIR_DBFS = f"dbfs:/tmp/similarity_api/{int(time.time())}"
PARQUET_DIR_DBFS = f"{TMP_DIR_DBFS}/input_parquet"
base = (
spark.table(SOURCE_TABLE)
.select(
F.col(PK_COLUMN).cast("string").alias("pk"),
F.col(STRING_COLUMN).cast("string").alias("value"),
)
.where(F.col("value").isNotNull() & (F.length(F.trim(F.col("value"))) > 0))
)
# Create a deterministic 0..n-1 index by ordering on pk.
w = Window.orderBy(F.col("pk"))
indexed = base.withColumn("idx", (F.row_number().over(w) - 1).cast("long"))
# Persist idx → pk mapping for join-back
indexed.select("idx", "pk").write.mode("overwrite") \
.format("delta").saveAsTable("main.crm.similarity_idx_map")
# Export ONLY the value column, in the same row order.
# Use coalesce(1) (not repartition(1)) to avoid a full shuffle.
indexed.select("value").coalesce(1).write.mode("overwrite") \
.parquet(PARQUET_DIR_DBFS)
print("Wrote parquet to:", PARQUET_DIR_DBFS)
Note: coalesce(1) makes Spark write a single part-*.parquet data file (plus a few small metadata files). Similarity API currently returns one signed upload URL for one object, so this "single part file" approach is the simplest way to upload. For very large datasets, you'll eventually want multi-part ingestion (multiple files) or storage-native ingestion — this version is intentionally "works now."
Step 2 — Create a Similarity API job and upload the Parquet file
import glob, requests
API_URL = "https://api.similarity-api.com"
TOKEN = dbutils.secrets.get(scope="similarity", key="API_TOKEN")
headers = {"Authorization": f"Bearer {TOKEN}"}
payload = {
"config": {
"input_format": "parquet",
"similarity_threshold": 0.85,
"use_case": "company_names",
"output_format": "row_annotations",
"output_file_format": "parquet",
"top_k": 50
# If you upload a multi-column parquet later, add:
# "input_column": "value"
}
}
# Convert Spark path -> local driver path for Python file access
PARQUET_DIR_LOCAL = PARQUET_DIR_DBFS.replace("dbfs:", "/dbfs")
part_file = glob.glob(f"{PARQUET_DIR_LOCAL}/part-*.parquet")[0]
# 1) Create job (NEW PATH)
resp = requests.post(
f"{API_URL}/dedupe/jobs",
headers=headers,
json=payload,
timeout=120
)
resp.raise_for_status()
data = resp.json()
job_id = data["job_id"]
upload_url = data["upload_url"]
print("job_id:", job_id)
# 2) Upload file bytes to signed URL
with open(part_file, "rb") as f:
r = requests.put(
upload_url,
data=f,
headers={"Content-Type": "application/octet-stream"},
timeout=3600
)
r.raise_for_status()
# 3) Commit (starts async run) (NEW PATH)
r = requests.post(
f"{API_URL}/dedupe/jobs/{job_id}/commit",
headers=headers,
timeout=120
)
r.raise_for_status()
print("Committed. rows_total:", r.json().get("rows_total"))
Step 3 — Poll, download results, and land back into Delta
Similarity API returns a signed result_url (HTTPS). Spark typically won't read HTTPS URLs directly as Parquet, so we download to DBFS first and then load with spark.read.parquet.
import time, requests, os
def wait_for_results(job_id: str) -> str:
while True:
resp = requests.get(
f"{API_URL}/dedupe/jobs/{job_id}", # NEW PATH
headers=headers,
timeout=120
)
resp.raise_for_status()
res = resp.json()
print(f"Stage: {res.get('stage')} ({res.get('progress')}%) | Status: {res.get('job_status')}")
if res.get("job_status") == "completed":
if "result_url" not in res:
raise RuntimeError("Job completed but no result_url returned.")
return res["result_url"]
if res.get("job_status") == "failed":
raise RuntimeError(f"Job failed: {res.get('error')}")
time.sleep(10)
result_url = wait_for_results(job_id)
# Save results to DBFS (driver local path for Python)
OUT_DIR_DBFS = f"{TMP_DIR_DBFS}/results"
OUT_DIR_LOCAL = OUT_DIR_DBFS.replace("dbfs:", "/dbfs")
os.makedirs(OUT_DIR_LOCAL, exist_ok=True)
local_path = f"{OUT_DIR_LOCAL}/result.parquet"
with requests.get(result_url, stream=True, timeout=3600) as r:
r.raise_for_status()
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8 * 1024 * 1024):
if chunk:
f.write(chunk)
# Spark reads from dbfs:/...
results_df = spark.read.parquet(f"{OUT_DIR_DBFS}/result.parquet")
results_df.write.mode("overwrite") \
.format("delta").saveAsTable("main.crm.similarity_results")
# Join back to your original pk using the idx map
idx_map = spark.table("main.crm.similarity_idx_map") # idx, pk
joined = results_df.join(
idx_map, results_df["row_id"] == idx_map["idx"], "left"
)
joined.write.mode("overwrite") \
.format("delta").saveAsTable("main.crm.customers_dedupe_annotations")
print("Wrote Delta tables: main.crm.similarity_results, main.crm.customers_dedupe_annotations")
At this point, you've got a Delta table keyed by your original pk with whatever annotations Similarity API returned (representatives, membership, similarity scores, etc.). You can inspect schema with:
spark.table("main.crm.customers_dedupe_annotations").printSchema()
Security note: Similarity API processes uploaded data only to compute the requested matching results. Customer data is not sold, shared, or used for advertising or model training. To minimize exposure, this workflow exports only the single string column required for deduplication.
Conclusion
At 10M rows, the bottleneck isn't string similarity — it's building a reliable end-to-end workflow that doesn't devolve into Spark shuffles, UDF overhead, and constant tuning.
By letting Databricks handle data access and governance, and offloading the matching step to Similarity API, you get a workflow that's reproducible, configurable, and doesn't require maintaining custom matching infrastructure.
Top comments (0)