AI-Powered AML Detection: Building Graph-Based Anti-Money Laundering for Banking
Bank risk control is undergoing a structural shift from "human guards" to "tech guards." Traditional systems rely on rules engines + manual review, facing three increasingly sharp contradictions: rules can never keep up with pattern mutation speed, human power can never keep up with transaction volume growth, and compliance requirements keep tightening while customers demand faster service.
The core tension: laundering patterns mutate daily while rule engines update monthly; daily transactions hit millions while human review handles hundreds per week; due diligence requires 7 days while customers expect 7-minute onboarding.
I've spent the last two years building integrated risk control systems for banks. Here's the complete engineering approach — from graph-based AML detection to privacy-preserving computation.
From Rule Matching to Relationship Graphs
Traditional AML works on "single-point judgment": a transaction exceeds a threshold, frequency looks abnormal, or it involves a high-risk region. Trigger an alert. This works for simple laundering but is nearly blind to organized, network-based schemes.
A relationship graph (Knowledge Graph / Property Graph) elevates AML from "transactions" to "networks." The core idea: model accounts, transactions, entities, and addresses as graph structures, then use graph algorithms to discover hidden relationships and anomalous subgraphs.
Graph modeling:
- Nodes: accounts, individuals, companies, addresses, devices, IPs
- Edges: transfer relationships, shareholding, same-address, same-device, same-IP
- Attributes: transaction amounts, frequencies, time windows, risk labels
Under this model, a laundering network's signature is: multiple low-risk accounts gradually funneling funds into a few control accounts, forming a "many-to-one" aggregation structure. Each individual transaction looks normal, but the graph topology shows clear anomalies.
import networkx as nx
from collections import defaultdict
class AMLGraphBuilder:
"""AML relationship graph builder"""
def __init__(self):
self.G = nx.DiGraph()
def add_account(self, account_id, **attrs):
self.G.add_node(account_id, node_type="account", **attrs)
def add_transaction(self, src, dst, amount, date, **attrs):
edge_key = (src, dst)
if self.G.has_edge(src, dst):
self.G[src][dst]["transactions"].append({
"amount": amount, "date": date, **attrs
})
self.G[src][dst]["total_amount"] += amount
self.G[src][dst]["tx_count"] += 1
else:
self.G.add_edge(src, dst,
transactions=[{"amount": amount, "date": date, **attrs}],
total_amount=amount, tx_count=1
)
def detect_suspicious_communities(self):
"""Detect suspicious groups via community detection"""
undirected = self.G.to_undirected()
communities = nx.community.louvain_communities(undirected, resolution=1.2)
suspicious = []
for community in communities:
subgraph = self.G.subgraph(community)
density = nx.density(subgraph.to_undirected())
avg_amount = sum(d["total_amount"]
for _, _, d in subgraph.edges(data=True)) / max(subgraph.number_of_edges(), 1)
if density > 0.6 and avg_amount > 50000:
suspicious.append({
"nodes": list(community),
"density": round(density, 3),
"avg_amount": round(avg_amount, 2),
"risk_level": "HIGH" if density > 0.8 else "MEDIUM"
})
return suspicious
def trace_fund_flow(self, source, max_depth=4):
"""Fund flow tracing: BFS from a specified account"""
visited = set()
flow_path = []
queue = [(source, 0)]
while queue:
node, depth = queue.pop(0)
if node in visited or depth > max_depth:
continue
visited.add(node)
flow_path.append({
"account": node,
"depth": depth,
"out_degree": self.G.out_degree(node),
"total_out": sum(d["total_amount"]
for _, _, d in self.G.out_edges(node, data=True))
})
for successor in self.G.successors(node):
queue.append((successor, depth + 1))
return flow_path
Community detection algorithms (like Louvain) automatically partition the graph into clusters — high internal connectivity, sparse external connections. Laundering rings form internally dense, externally isolated communities, distinctly different from normal customer networks.
Deployment tip: Use Neo4j for the full historical graph, NetworkX for in-memory real-time computation, and coordinate them through APIs.
Star Pattern Detection: Graph Algorithm in Practice
Within suspicious communities, one topology appears frequently: "star-pattern transfers." A central account simultaneously transacts with multiple peripheral accounts, forming a star-shaped radiation.
In AML scenarios, star patterns manifest in two forms:
- Star-In: Multiple peripheral accounts transfer to the center, simulating normal income aggregation
- Star-Out: Center transfers to multiple peripherals, simulating normal spending dispersal
Not every star pattern is laundering, but laundering almost always involves star structures. The detection goal is identifying anomalous stars, not all stars.
Anomalous star judgment dimensions: recency of peripheral accounts (recently opened), time synchronization (concentrated in a short period), amount similarity (amounts close but just below thresholds), inter-peripheral connections (same address, same device). These dimensions combine into a star suspicion score.
import networkx as nx
import numpy as np
from datetime import datetime, timedelta
class StarPatternDetector:
"""Star-pattern transfer detector"""
def __init__(self, gini_threshold=0.35, sync_window_hours=48):
self.gini_threshold = gini_threshold
self.sync_window_hours = sync_window_hours
@staticmethod
def gini_coefficient(values):
"""Gini coefficient: measures how uniform the amount distribution is"""
values = sorted(values)
n = len(values)
if n == 0:
return 0
cum = np.cumsum(values)
return (n + 1 - 2 * np.sum(cum) / cum[-1]) / n if cum[-1] > 0 else 0
def detect_star_patterns(self, G, min_neighbors=5, min_total_amount=100000):
"""Detect star-pattern transfers in the graph"""
stars = []
for node in G.nodes():
in_edges = list(G.in_edges(node, data=True))
out_edges = list(G.out_edges(node, data=True))
# Star-In detection
if len(in_edges) >= min_neighbors:
result = self._analyze_star(node, in_edges, "in", G)
if result and result["total_amount"] >= min_total_amount:
stars.append(result)
# Star-Out detection
if len(out_edges) >= min_neighbors:
result = self._analyze_star(node, out_edges, "out", G)
if result and result["total_amount"] >= min_total_amount:
stars.append(result)
stars.sort(key=lambda x: x["suspicion_score"], reverse=True)
return stars
def _analyze_star(self, center, edges, direction, G):
"""Analyze suspicion level of a single star structure"""
amounts = [d["total_amount"] for _, _, d in edges]
total = sum(amounts)
gini = self.gini_coefficient(amounts)
# Time synchronization check
all_tx_times = []
for _, _, d in edges:
for tx in d.get("transactions", []):
if isinstance(tx.get("date"), datetime):
all_tx_times.append(tx["date"])
sync_ratio = 0.0
if len(all_tx_times) >= 2:
min_t, max_t = min(all_tx_times), max(all_tx_times)
span = (max_t - min_t).total_seconds() / 3600
sync_ratio = 1.0 - min(span / self.sync_window_hours, 1.0)
# Peripheral account recency
neighbors = [e[0] if direction == "in" else e[1] for e in edges]
new_account_ratio = 0.0
for n in neighbors:
open_date = G.nodes[n].get("open_date")
if open_date and isinstance(open_date, datetime):
if (datetime.now() - open_date).days < 90:
new_account_ratio += 1.0 / len(neighbors)
# Composite suspicion score
score = (
0.3 * (1 - gini) + # More uniform amounts → more suspicious
0.3 * sync_ratio + # More synchronized → more suspicious
0.2 * new_account_ratio + # More new accounts → more suspicious
0.2 * min(len(edges) / 20, 1.0) # Normalized neighbor count
)
return {
"center_account": center,
"direction": direction,
"neighbor_count": len(edges),
"total_amount": round(total, 2),
"gini": round(gini, 4),
"sync_ratio": round(sync_ratio, 4),
"new_account_ratio": round(new_account_ratio, 4),
"suspicion_score": round(score, 4),
"risk_level": "HIGH" if score > 0.7 else "MEDIUM" if score > 0.5 else "LOW"
}
def batch_detect_with_timeline(self, G, date_ranges):
"""Batch detection across time periods, observing star pattern evolution"""
timeline_results = []
for start, end in date_ranges:
sub_edges = [
(u, v, d) for u, v, d in G.edges(data=True)
if any(
start <= tx.get("date", datetime.min) <= end
for tx in d.get("transactions", [])
)
]
subG = nx.DiGraph()
subG.add_nodes_from(G.nodes(data=True))
subG.add_edges_from(sub_edges)
stars = self.detect_star_patterns(subG)
timeline_results.append({
"period": f"{start.date()} ~ {end.date()}",
"star_count": len(stars),
"high_risk": sum(1 for s in stars if s["risk_level"] == "HIGH"),
"total_suspicious_amount": sum(s["total_amount"] for s in stars)
})
return timeline_results
In one city commercial bank's deployment, star detection reduced AML alert false positives from 78% to 31%, while increasing true positive submissions by 2.4x. Not by tightening thresholds, but by using sharper features.
The key insight: this scoring formula isn't a hard judgment — it's a ranking tool. Analysts prioritize reviewing the highest-scoring stars.
Intelligent Due Diligence: From 7 Days to 7 Minutes
AML system input quality depends on due diligence data completeness and timeliness. Traditional due diligence flows are serial — each step waits for the previous one, with manual handoffs creating massive idle time.
| Step | Traditional Time | Bottleneck |
|---|---|---|
| Business registration verification | 1 day | Multi-platform queries, manual comparison |
| Ultimate Beneficial Owner tracing | 2 days | Multi-layer equity穿透, manual step-by-step |
| Negative news scanning | 1 day | Search engines + court records separately |
| Sanctions list matching | 0.5 day | Cross-matching multiple lists |
| Beneficial owner identification | 1.5 days | UBO tracing requires human judgment |
| Risk report drafting | 1 day | Template writing, cross-review |
| Total | 7 days | — |
The solution: change serial to parallel. All data source APIs call simultaneously, then AI performs structured analysis and judgment on the aggregated results.
import asyncio
from dataclasses import dataclass, field
from typing import List, Dict, Optional
@dataclass
class KYCResult:
"""Due diligence result structure"""
company_name: str
registration_info: Dict = field(default_factory=dict)
ubo_chain: List[Dict] = field(default_factory=list)
sanctions_hits: List[Dict] = field(default_factory=list)
negative_news: List[Dict] = field(default_factory=list)
risk_flags: List[str] = field(default_factory=list)
risk_score: float = 0.0
class IntelligentKYCEngine:
"""Intelligent due diligence engine: parallel collection + AI analysis"""
def __init__(self, apis: Dict):
self.apis = apis
async def full_scan(self, company_name: str) -> KYCResult:
"""Execute full due diligence scan in parallel"""
# 5 data sources called concurrently — total time = slowest one
reg_task = self._query_registration(company_name)
ubo_task = self._trace_ubo(company_name)
sanction_task = self._check_sanctions(company_name)
news_task = self._scan_negative_news(company_name)
relation_task = self._find_related_entities(company_name)
reg, ubo, sanction, news, relations = await asyncio.gather(
reg_task, ubo_task, sanction_task, news_task, relation_task
)
result = KYCResult(
company_name=company_name,
registration_info=reg,
ubo_chain=ubo,
sanctions_hits=sanction,
negative_news=news,
)
# AI cross-analysis: combine all dimensions to generate risk signals
result.risk_flags = self._cross_analyze(result, relations)
result.risk_score = self._calculate_risk_score(result)
return result
def _cross_analyze(self, result: KYCResult, relations: Dict) -> List[str]:
"""Cross-analysis: multi-dimensional joint risk signal detection"""
flags = []
# Signal 1: UBO appears on sanctions list after穿透
ubo_names = {person["name"] for person in result.ubo_chain}
sanctioned_names = {hit["name"] for hit in result.sanctions_hits}
overlap = ubo_names & sanctioned_names
if overlap:
flags.append(f"UBO overlaps with sanctions list: {overlap}")
# Signal 2: Registration address matches known shell company cluster
reg_addr = result.registration_info.get("address", "")
if relations.get("shell_cluster_addrs") and reg_addr in relations["shell_cluster_addrs"]:
flags.append(f"Registration address matches shell cluster: {reg_addr}")
# Signal 3: UBO has negative news in last 90 days
for news in result.negative_news:
if news.get("recency_days", 999) <= 90:
flags.append(f"UBO recent negative news: {news['title'][:50]}")
# Signal 4: Equity穿透 exceeds 4 layers (complex structure hiding risk)
if len(result.ubo_chain) > 4:
flags.append(f"Equity穿透 {len(result.ubo_chain)} layers, complex structure")
return flags
def _calculate_risk_score(self, result: KYCResult) -> float:
"""Weighted scoring: accumulate risk signals by weight"""
weights = {
"UBO overlaps with sanctions": 40,
"Registration address matches shell cluster": 25,
"recent negative news": 15,
"layers": 10,
}
score = 0.0
for flag in result.risk_flags:
for keyword, w in weights.items():
if keyword in flag:
score += w
return min(score, 100.0)
async def _query_registration(self, name): ...
async def _trace_ubo(self, name): ...
async def _check_sanctions(self, name): ...
async def _scan_negative_news(self, name): ...
async def _find_related_entities(self, name): ...
The result: 5 serial API calls that would take 10-15 seconds now run in parallel, completing in the time of the slowest one (usually 3-5 seconds), plus 1-2 seconds for AI cross-analysis. Total: from 7 days to under 7 minutes.
The 7-days-to-7-minutes leap isn't about AI replacing judgment. It's about making data collection parallel instead of serial, and making cross-analysis automatic inference instead of manual comparison. The final risk judgment still requires compliance confirmation — but they confirm high-confidence AI conclusions rather than investigating from scratch.
One-Click Risk Report Generation: The Markdown + Skill Approach
The KYC engine outputs structured data, but compliance officers need readable, archivable, auditable risk reports. This last step — data to document — is often the final efficiency bottleneck.
The traditional approach is template filling: Word template + copy-paste. Problems: rigid formatting, missing fields, version chaos. The engineering approach: define report generation as a standardized "Skill" — structured data in, standardized Markdown report out, then convert to PDF or DOCX as needed.
from datetime import datetime
from typing import List, Dict
class RiskReportGenerator:
"""Risk report generation Skill"""
def __init__(self):
self.template_sections = [
"Overview", "Entity Information", "UBO Analysis",
"Sanctions Check", "Negative News Scan", "Risk Signal Summary",
"Risk Rating & Recommendation"
]
def generate(self, kyc_result, output_format="markdown"):
"""Generate risk report from KYCResult in one click"""
sections = [
self._build_overview(kyc_result),
self._build_entity_info(kyc_result),
self._build_ubo_analysis(kyc_result),
self._build_sanctions_check(kyc_result),
self._build_news_scan(kyc_result),
self._build_risk_summary(kyc_result),
self._build_recommendation(kyc_result),
]
body = "\n\n".join(sections)
header = self._build_header(kyc_result.company_name)
footer = self._build_footer()
report = f"{header}\n\n{body}\n\n{footer}"
if output_format == "html":
return self._markdown_to_html(report)
return report
@staticmethod
def _build_header(company_name: str) -> str:
return (
f"# Corporate Client Due Diligence Risk Report\n\n"
f"| Field | Value |\n|------|------|\n"
f"| Client Name | {company_name} |\n"
f"| Report Generated | {datetime.now().strftime('%Y-%m-%d %H:%M')} |\n"
f"| Report Version | V1.0 (AI Auto-Generated) |"
)
@staticmethod
def _build_overview(kyc_result) -> str:
level = "HIGH" if kyc_result.risk_score >= 50 else \
"MEDIUM" if kyc_result.risk_score >= 25 else "LOW"
return (
f"## Report Overview\n\n"
f"> Composite Risk Score: **{kyc_result.risk_score:.0f}/100**, "
f"Risk Level: **{level}**\n\n"
f"- Risk Signals: {len(kyc_result.risk_flags)}\n"
f"- Sanctions Matches: {len(kyc_result.sanctions_hits)}\n"
f"- Negative News Items: {len(kyc_result.negative_news)}\n"
f"- UBO Tracing Layers: {len(kyc_result.ubo_chain)}"
)
@staticmethod
def _build_entity_info(kyc_result) -> str:
reg = kyc_result.registration_info
rows = "\n".join(f"| {k} | {v} |" for k, v in reg.items())
return f"## Entity Information\n\n| Field | Value |\n|------|------|\n{rows}"
@staticmethod
def _build_ubo_analysis(kyc_result) -> str:
if not kyc_result.ubo_chain:
return "## UBO Analysis\n\n> No ultimate beneficial owner identified — supplementary due diligence required"
rows = []
for i, person in enumerate(kyc_result.ubo_chain, 1):
rows.append(
f"| {i} | {person.get('name', '-')} | "
f"{person.get('share_ratio', '-')} | "
f"{person.get('type', '-')} |"
)
table = "\n".join(rows)
return (
f"## UBO Analysis\n\n"
f"| Layer | Holder | Share Ratio | Type |\n"
f"|-------|--------|-------------|------|\n{table}"
)
@staticmethod
def _build_sanctions_check(kyc_result) -> str:
if not kyc_result.sanctions_hits:
return "## Sanctions Check\n\n> No sanctions list matches found"
rows = [f"- **{hit.get('name', '-')}** matched {hit.get('list', '-')} list"
for hit in kyc_result.sanctions_hits]
return "## Sanctions Check\n\n" + "\n".join(rows)
@staticmethod
def _build_news_scan(kyc_result) -> str:
if not kyc_result.negative_news:
return "## Negative News Scan\n\n> No relevant negative news found"
rows = []
for news in kyc_result.negative_news[:10]:
recent = " [RECENT]" if news.get("recency_days", 999) <= 90 else ""
rows.append(
f"| {news.get('date', '-')} | {news.get('title', '-')[:60]} | "
f"{news.get('source', '-')} |{recent} |"
)
table = "\n".join(rows)
return (
f"## Negative News Scan\n\n"
f"| Date | Title | Source | Flag |\n"
f"|------|-------|--------|------|\n{table}"
)
@staticmethod
def _build_risk_summary(kyc_result) -> str:
if not kyc_result.risk_flags:
return "## Risk Signal Summary\n\n> No risk signals identified"
items = "\n".join(f"- {flag}" for flag in kyc_result.risk_flags)
return f"## Risk Signal Summary\n\n{items}"
@staticmethod
def _build_recommendation(kyc_result) -> str:
score = kyc_result.risk_score
if score >= 50:
action = "Recommend rejecting onboarding or escalating approval, with Enhanced Due Diligence (EDD) measures"
elif score >= 25:
action = "Recommend conditional onboarding, set transaction monitoring thresholds and periodic review cycles"
else:
action = "Recommend standard onboarding, include in standard monitoring process"
review_cycle = "Quarterly" if score >= 50 else "Semi-annually" if score >= 25 else "Annually"
return (
f"## Risk Rating & Recommendation\n\n"
f"> {action}\n\n"
f"| Item | Recommendation |\n|------|--------------|\n"
f"| Onboarding Decision | {action} |\n"
f"| Monitoring Level | {'Enhanced' if score >= 50 else 'Standard'} |\n"
f"| Review Cycle | {review_cycle} |"
)
@staticmethod
def _build_footer() -> str:
return (
"---\n\n"
f"*This report was auto-generated by AI for risk reference only. "
f"Final decisions require compliance officer confirmation. "
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*"
)
The engineering value here is threefold: standardization (all risk reports follow the same structure, avoiding omissions and format drift), traceability (report content derives directly from KYCResult, every conclusion traceable to raw query results), and extensibility (add detection dimensions by adding fields to the engine and sections to the template).
Markdown is the ideal intermediate format: human-readable, Git-trackable, Pandoc-convertible to PDF/DOCX/HTML, and naturally suited to compliance text archiving.
Privacy Preservation: 3 Technical Paths for Data-That-Never-Leaves
The risk control pipeline we've built — from relationship graphs to star detection, from due diligence to report generation — all runs within a single institution. But real scenarios require cross-institutional collaboration: syndicated loan due diligence, cross-bank AML fund tracing, multi-source credit model training. The core constraint: raw data cannot leave the institution's security domain.
Privacy computing philosophy: data stays, models move. Data is usable but not visible.
Three mainstream paths, each with different applicable scenarios:
Path 1: Federated Learning — Exchange Model Parameters, Not Raw Data
import numpy as np
from typing import List
class FederatedAveraging:
"""FedAvg — cross-bank AML model joint training"""
def __init__(self, n_clients: int, min_clients: int = 2):
self.n_clients = n_clients
self.min_clients = min_clients
def aggregate(self, local_weights: List[dict]) -> dict:
"""Aggregate local model parameters from all parties"""
if len(local_weights) < self.min_clients:
raise ValueError(
f"Insufficient participants: {len(local_weights)}/{self.min_clients}"
)
# Weighted average by each party's sample size
total_samples = sum(w["_n_samples"] for w in local_weights)
global_weights = {}
for key in local_weights[0]:
if key == "_n_samples":
continue
weighted_sum = sum(
w[key] * (w["_n_samples"] / total_samples)
for w in local_weights
)
global_weights[key] = weighted_sum
return global_weights
def secure_aggregate(self, local_weights: List[dict]) -> dict:
"""Secure aggregation: additive secret sharing prevents parameter snooping"""
n = len(local_weights)
masks = []
for i, w in enumerate(local_weights):
mask = {}
for key in w:
if key == "_n_samples":
continue
# Last party's mask = -sum of other masks, ensuring total = 0
if i < n - 1:
mask[key] = np.random.randn(*w[key].shape) * 0.01
else:
mask[key] = -sum(m.get(key, 0) for m in masks)
masks.append(mask)
# Add masks to parameters before aggregation
masked_weights = []
for i, w in enumerate(local_weights):
mw = {"_n_samples": w["_n_samples"]}
for key in w:
if key == "_n_samples":
continue
mw[key] = w[key] + masks[i].get(key, 0)
masked_weights.append(mw)
return self.aggregate(masked_weights)
Federated learning works well for joint AML model training and credit scoring. Limitations: high communication overhead (full model parameters transmitted each round), sensitivity to non-IID data distributions (convergence slows when banks have very different customer structures), and inability to prevent malicious participants from submitting forged gradients.
Path 2: Differential Privacy — Add Noise to Query Results
import numpy as np
class DifferentialPrivacy:
"""Differential privacy mechanism for risk control statistics and model training"""
def __init__(self, epsilon: float = 1.0, delta: float = 1e-5):
self.epsilon = epsilon
self.delta = delta
def laplace_mechanism(self, true_value: float, sensitivity: float) -> float:
"""Laplace mechanism: add noise to numeric query results"""
scale = sensitivity / self.epsilon
noise = np.random.laplace(0, scale)
return true_value + noise
def gaussian_mechanism(self, true_value: float, sensitivity: float) -> float:
"""Gaussian mechanism: (epsilon, delta)-DP for aggregate statistics"""
sigma = sensitivity * np.sqrt(2 * np.log(1.25 / self.delta)) / self.epsilon
noise = np.random.normal(0, sigma)
return true_value + noise
def dp_count(self, records: list, condition_fn, sensitivity: int = 1) -> int:
"""DP count query, e.g., counting sanctions-matched customers"""
true_count = sum(1 for r in records if condition_fn(r))
return max(0, round(self.laplace_mechanism(true_count, sensitivity)))
def dp_histogram(self, records: list, bin_fn, sensitivity: int = 1) -> dict:
"""DP histogram, e.g., customer distribution by risk level"""
bins = {}
for r in records:
key = bin_fn(r)
bins[key] = bins.get(key, 0) + 1
dp_bins = {}
for key, count in bins.items():
dp_bins[key] = max(0, round(self.laplace_mechanism(count, sensitivity)))
return dp_bins
def compute_privacy_budget(self, n_queries: int, total_epsilon: float = 1.0) -> float:
"""Privacy budget allocation: serial composition theorem"""
return total_epsilon / n_queries
Differential privacy suits regulatory reporting statistics and cross-department data sharing — scenarios where you only need aggregates, not individual records. Advantage: provable privacy guarantees. Disadvantage: noise reduces data precision, especially impactful for small datasets.
Path 3: Homomorphic Encryption — Compute Directly on Ciphertext
# Simplified Paillier homomorphic encryption (production use: PySyft/Tenseal)
import random
from math import gcd
class PaillierHE:
"""Paillier additive homomorphic encryption — supports ciphertext addition and scalar multiplication"""
def __init__(self, key_size: int = 512):
self.key_size = key_size
p = self._generate_prime(key_size // 2)
q = self._generate_prime(key_size // 2)
self.n = p * q
self.n_sq = self.n ** 2
self.g = self.n + 1
self.lam = (p - 1) * (q - 1) // gcd(p - 1, q - 1)
self.mu = pow(self.lam, -1, self.n)
@staticmethod
def _generate_prime(bits: int) -> int:
while True:
candidate = random.getrandbits(bits) | (1 << (bits - 1)) | 1
if all(candidate % d != 0 for d in [2, 3, 5, 7, 11, 13, 17, 19, 23, 29]):
return candidate
def encrypt(self, plaintext: int) -> int:
"""Encrypt plaintext"""
r = random.randint(1, self.n - 1)
while gcd(r, self.n) != 1:
r = random.randint(1, self.n - 1)
c = (pow(self.g, plaintext, self.n_sq) * pow(r, self.n, self.n_sq)) % self.n_sq
return c
def decrypt(self, ciphertext: int) -> int:
"""Decrypt ciphertext"""
x = pow(ciphertext, self.lam, self.n_sq)
l_func = (x - 1) // self.n
m = (l_func * self.mu) % self.n
return m
@staticmethod
def add_encrypted(c1: int, c2: int, n_sq: int) -> int:
"""Ciphertext + Ciphertext -> Ciphertext (homomorphic addition)"""
return (c1 * c2) % n_sq
@staticmethod
def scalar_multiply(ciphertext: int, scalar: int, n_sq: int) -> int:
"""Ciphertext * Scalar -> Ciphertext (homomorphic scalar multiplication)"""
return pow(ciphertext, scalar, n_sq)
def cross_bank_risk_score_example():
"""Cross-bank risk score aggregation on encrypted data"""
he = PaillierHE()
n_sq = he.n_sq
# Bank A's local score (encrypted)
score_a_plaintext = 72
score_a_cipher = he.encrypt(score_a_plaintext)
# Bank B's local score (encrypted)
score_b_plaintext = 58
score_b_cipher = he.encrypt(score_b_plaintext)
# Aggregator computes weighted sum on ciphertext without decrypting either score
weighted_a = PaillierHE.scalar_multiply(score_a_cipher, 6, n_sq)
weighted_b = PaillierHE.scalar_multiply(score_b_cipher, 4, n_sq)
sum_cipher = PaillierHE.add_encrypted(weighted_a, weighted_b, n_sq)
# Decrypt to get combined score
combined = he.decrypt(sum_cipher) / 10.0
assert abs(combined - (0.6 * score_a_plaintext + 0.4 * score_b_plaintext)) < 0.1
return combined
Homomorphic encryption suits cross-institution risk score aggregation and joint statistics — scenarios requiring computation on encrypted data. Highest security (computing party sees no plaintext), but extreme performance overhead — 3 to 6 orders of magnitude slower than plaintext computation. Only viable for low-frequency, small-scale calculations today.
Choosing the Right Path
| Dimension | Federated Learning | Differential Privacy | Homomorphic Encryption |
|---|---|---|---|
| Protects | Training data | Query results | Full ciphertext |
| Performance cost | Medium (communication) | Low (noise) | Extreme (3-6 orders) |
| Best for | Joint modeling | Statistical sharing | Encrypted computation |
| Maturity | High | High | Medium-Low |
| Typical use case | Cross-bank AML model | Regulatory reporting desensitization | Cross-bank score aggregation |
In practice, these are often combined: federated learning trains the model + differential privacy protects gradients + homomorphic encryption aggregates parameters. They're complementary, not mutually exclusive.
Engineering Deployment Checklist
CHECKLIST = {
"Data Layer": [
("Neo4j full graph deployed", False),
("NetworkX real-time computation container ready", False),
("Kafka transaction stream connected", False),
("KYCResult structure definition published", False),
],
"Model Layer": [
("Star detection weights calibrated", False),
("Due diligence rules engine YAML configured", False),
("Federated learning aggregation service deployed", False),
("Model versioning connected to MLflow", False),
],
"Application Layer": [
("Risk report Skill registered", False),
("Markdown to PDF conversion pipeline ready", False),
("Analyst ranking view live", False),
("7-minute due diligence SLA monitored", False),
],
"Compliance Layer": [
("AI decision audit logging enabled", False),
("Federated learning privacy budget audit configured", False),
("Report dual-label mechanism (AI + human) implemented", False),
],
"Operations Layer": [
("Graph daily full + hourly incremental scheduled", False),
("Star detection P99 < 500ms verified", False),
("API availability SLA monitoring configured", False),
],
}
for layer, items in CHECKLIST.items():
done = sum(1 for _, status in items if status)
total = len(items)
print(f"[{layer}] {done}/{total} complete")
Key engineering principles:
- Data layer: Neo4j for full historical graph + NetworkX for real-time computation, dual-engine coordination. Transaction data streams via Kafka with <5s graph update latency.
- Model layer: Star detection scoring weights should be calibrated on your bank's data (2 rounds of A/B testing recommended). Due diligence cross-analysis rules should use YAML configuration — change config, not code, when business rules evolve.
- Application layer: Risk report Skill uses Markdown as intermediate format, converts to PDF for archiving or HTML for display. Star detection results sorted by suspicion score — analysts focus on TOP 20.
- Compliance layer: Every AI decision retains auditable logs: input data, model version, output result, human confirmation record. Federated learning privacy budget audited quarterly. Risk reports carry "AI-generated + human-confirmed" dual labels.
This article integrated graph-based AML detection, star-pattern graph algorithm checking, intelligent due diligence, automated risk report generation, and three privacy-preservation paths into a complete engineering pipeline from data modeling to regulatory compliance. Every module includes runnable Python code you can use as a technical prototype for iteration.
Top comments (0)