DEV Community

兆鹏 于
兆鹏 于

Posted on

Banking Operations AI: Building the 7x24 Unmanned Autonomous Loop

24/7 Unmanned Banking Operations: Building an AI-Powered Ticketing and Workflow System

Bank operations centers process tens of thousands of tickets daily — account freezes, cross-border settlements, compliance reviews, customer complaints. Every step relies on human judgment and manual handoffs. A typical ticket flow:

Customer Call → Agent Logs → L1 Review → L2 Approval → Risk Verification → Execute → Result Return → Customer Notification
Enter fullscreen mode Exit fullscreen mode

This chain crosses 3-5 departments, involves 5-8 employees doing manual work, and averages 4.2 hours. Over 60% of the operations are rule-based repetition — check the account, reconcile the details, walk the approval, send the notification — all perfectly automatable.

Three fatal problems:

  1. Dispatch by guessing: Operators assign tickets based on experience. Ambiguous tickets get kicked between groups — averaging 2.3 transfers before finding the right handler.

  2. Monitoring by staring: People notice SLA breaches 5 minutes before deadline. Emergency escalation is the norm. Night and holiday understaffing means 3x more timeout tickets than workdays.

  3. Knowledge in heads: Processing rules live in senior employees' brains and scattered Excel files. New hire onboarding takes 3 months, with persistently high error rates.

The core contradiction: business volume grows linearly while human capacity is rigidly bounded. Only automated closed loops can break through.

24/7 Architecture: Four-Layer Design

True 24/7 unmanned operations isn't just writing automation scripts — it requires a complete architecture. Here's the design I've validated across multiple banks:

  • Ingestion Layer: Unified intake from all sources — ticketing systems, emails, APIs — all normalized into a standard OpsTask object
  • Decision Layer: The brain — rules engine + machine learning for auto-classification, dispatch, and priority assessment
  • Execution Layer: The hands — state-machine-driven workflow engine decomposing tickets into atomic tasks and coordinating execution
  • Monitoring Layer: The eyes — real-time SLA monitoring, anomaly detection, alerting, ensuring the loop never breaks

Unmanned doesn't mean "no people." It means "the system can close the loop even when nobody's at their desk."

Day/Night Mode Switching

This is a critical design point. During daytime with staff present, the system auto-dispatches and humans confirm. At night without staff: low-risk tickets execute fully automatically, medium-risk tickets auto-process with next-day human spot-checks, high-risk tickets auto-escalate to the on-call person's phone. The switch isn't hardcoded — it's driven by a duty_mode config parameter:

DUTY_CONFIG = {
    "daytime": {
        "time_range": "08:00-20:00",
        "auto_execute_risk": "low",
        "human_confirm": True,
        "escalation_threshold": "medium"
    },
    "nighttime": {
        "time_range": "20:00-08:00",
        "auto_execute_risk": "low",
        "auto_with_audit": "medium",
        "escalation_threshold": "high",
        "escalation_channel": "sms"  # High-risk: SMS to on-call directly
    }
}
Enter fullscreen mode Exit fullscreen mode

Intelligent Ticket Dispatch: Three-Layer Strategy

Ticket dispatch is the first gate of operations automation. Humans doing this manually have blind spots, and experience doesn't scale.

I designed a three-layer dispatch strategy: rules first, keywords as backup, escalation as safety net.

import re
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
from enum import Enum

class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"

class Priority(Enum):
    P1 = 1  # Urgent: fund risk
    P2 = 2  # High: customer impact
    P3 = 3  # Medium: process blockage
    P4 = 4  # Low: routine processing

@dataclass
class OpsTask:
    """Unified operations ticket data structure"""
    task_id: str
    title: str
    description: str
    source: str           # Source: ticketing system / email / API
    category: str = ""    # Category: accounts / settlement / compliance / complaint / other
    risk_level: RiskLevel = RiskLevel.LOW
    priority: Priority = Priority.P4
    assigned_group: str = ""
    sla_hours: float = 24.0
    keywords: List[str] = field(default_factory=list)
    create_time: str = ""

# Layer 1: Rules Engine — exact match, zero latency
DISPATCH_RULES = [
    {
        "name": "Account Freeze",
        "pattern": r"(freeze|stop payment|limit).*account",
        "category": "accounts",
        "group": "Account Operations",
        "priority": Priority.P1,
        "risk": RiskLevel.HIGH,
        "sla_hours": 1.0
    },
    {
        "name": "Cross-border Settlement",
        "pattern": r"(cross-border|SWIFT|remittance).*settlement",
        "category": "settlement",
        "group": "International Settlement",
        "priority": Priority.P2,
        "risk": RiskLevel.MEDIUM,
        "sla_hours": 4.0
    },
    {
        "name": "Customer Complaint",
        "pattern": r"(complaint|dissatisfied|report)",
        "category": "complaint",
        "group": "Customer Experience",
        "priority": Priority.P2,
        "risk": RiskLevel.HIGH,
        "sla_hours": 2.0
    },
    {
        "name": "Reconciliation Discrepancy",
        "pattern": r"(reconcil|discrepancy|imbalance)",
        "category": "settlement",
        "group": "Reconciliation",
        "priority": Priority.P3,
        "risk": RiskLevel.MEDIUM,
        "sla_hours": 8.0
    }
]

def rule_based_dispatch(task: OpsTask) -> Optional[Tuple[str, Priority, RiskLevel, float]]:
    """Rule dispatch: exact match, millisecond response"""
    text = f"{task.title} {task.description}"
    for rule in DISPATCH_RULES:
        if re.search(rule["pattern"], text, re.IGNORECASE):
            return (rule["group"], rule["priority"], rule["risk"], rule["sla_hours"])
    return None

# Layer 2: Keyword-weighted dispatch — fuzzy matching
KEYWORD_SCORES = {
    "Account Operations":  {"account": 3, "balance": 2, "open": 2, "close": 2, "freeze": 3},
    "International Settlement": {"cross-border": 3, "SWIFT": 3, "forex": 2, "remittance": 2, "settlement": 2},
    "Customer Experience": {"complaint": 3, "dissatisfied": 2, "feedback": 1, "report": 3, "service": 1},
    "Reconciliation":     {"reconcile": 3, "discrepancy": 2, "imbalance": 2, "statement": 1, "verify": 2},
    "Compliance Review":  {"AML": 3, "anti-money": 3, "suspicious": 2, "sanctions": 3, "screening": 2},
}

def keyword_dispatch(task: OpsTask, threshold: float = 4.0) -> Optional[str]:
    """Keyword-weighted dispatch: handles tickets that rules missed"""
    scores = {}
    text = f"{task.title} {task.description}"
    for group, word_weights in KEYWORD_SCORES.items():
        score = sum(weight for word, weight in word_weights.items() if word.lower() in text.lower())
        if score > 0:
            scores[group] = score

    if not scores:
        return None
    best_group = max(scores, key=scores.get)
    return best_group if scores[best_group] >= threshold else None

# Layer 3: Escalation fallback — when auto-dispatch can't determine
ESCALATION_GROUPS = {
    RiskLevel.HIGH: "Operations Supervisor",
    RiskLevel.MEDIUM: "Operations Dispatch",
    RiskLevel.LOW: "General Processing"
}

def dispatch_task(task: OpsTask) -> OpsTask:
    """Three-layer dispatch: rules → keywords → escalation"""
    # Layer 1 attempt
    result = rule_based_dispatch(task)
    if result:
        task.assigned_group, task.priority, task.risk_level, task.sla_hours = result
        return task

    # Layer 2 attempt
    group = keyword_dispatch(task)
    if group:
        task.assigned_group = group
        return task

    # Layer 3 fallback escalation
    task.assigned_group = ESCALATION_GROUPS[task.risk_level]
    task.priority = Priority.P3
    return task
Enter fullscreen mode Exit fullscreen mode

Real-World Results

After 3 months running at Bank A:

Dispatch Layer Hit Rate Avg Latency Misdispatch Rate
Rules Engine 68% 12ms 0.3%
Keyword Weighted 19% 35ms 2.1%
Escalation Fallback 13% 2min (human) 0%

The goal isn't 100% auto-dispatch. It's covering high-frequency cases with rules, fuzzy zones with keywords, and catching everything else with fallback. 87% of tickets dispatched in under 35 milliseconds.

Workflow Orchestration: Configurable Business Process Engine

Once tickets are dispatched, the next step is automating the processing. Every bank operation has a standard SOP, but SOPs are usually documents, not code. The workflow engine's core goal: turn SOPs into executable workflows.

State-Machine-Driven Workflow

from enum import Enum
from typing import Callable, Dict, List
import time

class TaskState(Enum):
    CREATED = "created"
    ASSIGNED = "assigned"
    PROCESSING = "processing"
    REVIEWING = "reviewing"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRY = "retry"

# State transition rules: define valid state migrations
TRANSITIONS = {
    TaskState.CREATED:    [TaskState.ASSIGNED],
    TaskState.ASSIGNED:   [TaskState.PROCESSING],
    TaskState.PROCESSING: [TaskState.REVIEWING, TaskState.FAILED],
    TaskState.REVIEWING:  [TaskState.COMPLETED, TaskState.PROCESSING],
    TaskState.FAILED:     [TaskState.RETRY],
    TaskState.RETRY:      [TaskState.PROCESSING],
}

class WorkflowEngine:
    """Workflow engine: state-machine-driven, supports configurable workflows"""

    def __init__(self):
        self.handlers: Dict[TaskState, List[Callable]] = {}
        self.max_retries = 3

    def register_handler(self, state: TaskState, handler: Callable):
        """Register a state handler"""
        if state not in self.handlers:
            self.handlers[state] = []
        self.handlers[state].append(handler)

    def can_transition(self, current: TaskState, target: TaskState) -> bool:
        """Validate state transition is legal"""
        return target in TRANSITIONS.get(current, [])

    def execute(self, task: OpsTask) -> OpsTask:
        """Execute the workflow"""
        retry_count = 0
        while task.state != TaskState.COMPLETED:
            handlers = self.handlers.get(task.state, [])
            if not handlers:
                break

            try:
                for handler in handlers:
                    task = handler(task)

                # Auto-advance to next state
                next_states = TRANSITIONS.get(task.state, [])
                if len(next_states) == 1:
                    task.state = next_states[0]
                elif TaskState.COMPLETED in next_states:
                    task.state = TaskState.COMPLETED
                else:
                    break  # Multiple targets need external decision

            except Exception as e:
                retry_count += 1
                if retry_count <= self.max_retries:
                    task.state = TaskState.RETRY
                else:
                    task.state = TaskState.FAILED
                task.error_log = f"Step {task.state}: {str(e)}"

        return task

# Define concrete business handlers
def assign_handler(task: OpsTask) -> OpsTask:
    """Dispatch handler: calls the dispatch algorithm"""
    task = dispatch_task(task)
    task.state = TaskState.ASSIGNED
    return task

def process_account_freeze(task: OpsTask) -> OpsTask:
    """Account freeze processing logic"""
    # 1. Verify freeze authorization
    # 2. Call core system freeze API
    # 3. Log operation trail
    task.result = f"Account frozen, operation ID: FZ{int(time.time())}"
    return task

def review_handler(task: OpsTask) -> OpsTask:
    """Review handler: low-risk auto-approve, medium/high-risk requires human"""
    if task.risk_level == RiskLevel.LOW:
        task.review_result = "auto_approved"
    else:
        task.review_result = "pending_human"
    return task

# Assemble the workflow
engine = WorkflowEngine()
engine.register_handler(TaskState.CREATED, assign_handler)
engine.register_handler(TaskState.PROCESSING, process_account_freeze)
engine.register_handler(TaskState.REVIEWING, review_handler)
Enter fullscreen mode Exit fullscreen mode

YAML Configuration for Hot Updates

Taking it further, we extract workflow definitions into config files for hot updates:

# workflow_account_freeze.yaml
name: Account Freeze Process
trigger:
  category: accounts
  action: freeze

steps:
  - name: Authorization Check
    handler: auth_validator
    timeout: 5s
    retry: 1

  - name: Risk Assessment
    handler: risk_assessor
    timeout: 10s
    branch:
      high: [escalate_to_compliance]
      low: [execute_freeze]

  - name: Execute Freeze
    handler: core_system_freeze
    timeout: 30s
    retry: 3
    fallback: manual_freeze

  - name: Notify Customer
    handler: sms_notifier
    timeout: 10s
    async: true

sla: 1h
escalation:
  at: 45min
  to: Operations Supervisor
  channel: sms
Enter fullscreen mode Exit fullscreen mode

Operations staff can modify workflows without touching code. New business process? Copy a YAML file, change a few fields.

SLA Monitoring: The Last Line of Defense

Automated closed loops need monitoring. SLA is the lifeline of bank operations. A late cross-border settlement could mean massive penalty interest; a late complaint response could trigger regulatory scrutiny.

Three-Level Monitoring System

Level 1: Proactive Patrol (every 5 minutes) — Scan all in-flight ticket SLA countdowns, calculate remaining time vs. thresholds, trigger pre-alert at 50% elapsed.

Level 2: Smart Alerts (event-driven) — Orange alert at 75% SLA elapsed, red at 90%, instant alert on processing failure, trend alert when similar tickets surge 3x.

Level 3: Auto-Rescue (for unmanned hours) — Auto-reassign to idle group near SLA, switch to degraded process after 3 consecutive failures, activate backup channel on system failure, escalate to on-call when thresholds exceeded.

import time
from datetime import datetime, timedelta
from typing import List, Optional

@dataclass
class SLAPolicy:
    """SLA policy configuration"""
    category: str
    sla_hours: float
    pre_alert_ratio: float = 0.5
    orange_alert_ratio: float = 0.75
    red_alert_ratio: float = 0.9
    auto_escalation_ratio: float = 0.95

class SLAMonitor:
    """SLA monitoring and alerting engine"""

    def __init__(self):
        self.policies: Dict[str, SLAPolicy] = {}
        self.alert_handlers: Dict[str, Callable] = {}

    def register_policy(self, policy: SLAPolicy):
        """Register an SLA policy"""
        self.policies[policy.category] = policy

    def check_sla(self, task: OpsTask) -> dict:
        """Check SLA status for a single ticket"""
        policy = self.policies.get(task.category)
        if not policy:
            return {"status": "no_policy", "task_id": task.task_id}

        elapsed = (datetime.now() - datetime.fromisoformat(task.create_time)).total_seconds()
        total = policy.sla_hours * 3600
        ratio = elapsed / total if total > 0 else 1.0

        result = {
            "task_id": task.task_id,
            "category": task.category,
            "sla_hours": policy.sla_hours,
            "elapsed_hours": round(elapsed / 3600, 2),
            "ratio": round(ratio, 3),
            "status": "normal",
            "action": None
        }

        if ratio >= policy.auto_escalation_ratio:
            result["status"] = "critical"
            result["action"] = "auto_escalate"
        elif ratio >= policy.red_alert_ratio:
            result["status"] = "red"
            result["action"] = "alert_red"
        elif ratio >= policy.orange_alert_ratio:
            result["status"] = "orange"
            result["action"] = "alert_orange"
        elif ratio >= policy.pre_alert_ratio:
            result["status"] = "pre_alert"
            result["action"] = "notify_handler"

        return result

    def patrol(self, active_tasks: List[OpsTask]) -> List[dict]:
        """Proactive patrol: batch check all in-flight tickets"""
        alerts = []
        for task in active_tasks:
            check = self.check_sla(task)
            if check["status"] not in ("normal", "no_policy"):
                alerts.append(check)

                action = check.get("action")
                if action and action in self.alert_handlers:
                    self.alert_handlers<a href="check">action</a>

                # Level 3: Auto-rescue
                if check["action"] == "auto_escalate":
                    self._auto_rescue(task, check)

        return alerts

    def _auto_rescue(self, task: OpsTask, check: dict):
        """Auto-rescue: last defense during unmanned hours"""
        # Strategy 1: Try reassigning to an idle group
        idle_group = self._find_idle_group(task.category)
        if idle_group:
            task.assigned_group = idle_group
            return

        # Strategy 2: Fast-track for low-risk tickets
        if task.risk_level == RiskLevel.LOW:
            task.fast_track = True
            return

        # Strategy 3: Escalate to on-call person
        duty_person = self._get_duty_person(task.priority)
        self._send_escalation(duty_person, task, check)

    def _find_idle_group(self, category: str) -> Optional[str]:
        """Find the processing group with the lowest current load"""
        pass

    def _get_duty_person(self, priority: Priority) -> str:
        """Get the on-call person"""
        pass

    def _send_escalation(self, person: str, task: OpsTask, check: dict):
        """Send escalation notification"""
        pass

# Register SLA policies
monitor = SLAMonitor()
monitor.register_policy(SLAPolicy(category="accounts", sla_hours=1.0))
monitor.register_policy(SLAPolicy(category="settlement", sla_hours=4.0))
monitor.register_policy(SLAPolicy(category="complaint", sla_hours=2.0))
monitor.register_policy(SLAPolicy(category="compliance", sla_hours=8.0,
    pre_alert_ratio=0.4, orange_alert_ratio=0.7, red_alert_ratio=0.85))
Enter fullscreen mode Exit fullscreen mode

Alert Noise Reduction

Too many alerts cause a "boy who cried wolf" effect. Three principles I've found essential:

  1. Alert consolidation: Same group, same type, within 5 minutes → merge into one alert with count and most urgent details
  2. Alert escalation: Pre-alert only lights up the dashboard. Orange sends a message. Red sends SMS.
  3. Alert suppression: In-progress tickets don't re-alert. Already-escalated tickets don't re-escalate.

The highest state of monitoring isn't more alerts — it's fewer, sharper alerts. Every alert should correspond to a clear action. Otherwise it's noise.

Pure Python: 6 Financial AI Agent Scenarios

Many bank teams say "we don't have an AI platform." The truth is, pure Python can build a lightweight financial AI agent. Here are 6 scenarios, all using Python standard library + open-source packages, zero API costs.

Scenario 1: Intelligent Bookkeeping — NLP Auto-Classification

"""
Intelligent bookkeeping: keyword + rule based journal entry generation
No LLM API dependency, pure local rules engine
"""
from typing import List, Tuple
import re

ACCOUNT_RULES = {
    # (keyword pattern, debit account, credit account)
    r"office supplies|stationery|printing": ("6602 Mgmt Expense - Office", "1002 Bank Deposits"),
    r"travel|air ticket|hotel|accommodation": ("6602 Mgmt Expense - Travel", "1002 Bank Deposits"),
    r"salary|payroll|social insurance|housing fund": ("2211 Employee Payables", "1002 Bank Deposits"),
    r"interest income|deposit interest": ("1002 Bank Deposits", "6011 Interest Income"),
    r"loan|disbursement|lending": ("1301 Loans", "1002 Bank Deposits"),
    r"repayment|loan recovery": ("1002 Bank Deposits", "1301 Loans"),
}

def generate_entry(description: str, amount: float) -> dict:
    """Auto-generate journal entry from description"""
    for pattern, (debit, credit) in ACCOUNT_RULES.items():
        if re.search(pattern, description, re.IGNORECASE):
            return {
                "description": description,
                "amount": amount,
                "debit_account": debit,
                "credit_account": credit,
                "confidence": "rule_match"
            }

    # No rule match: flag for manual confirmation
    return {
        "description": description,
        "amount": amount,
        "debit_account": "Pending",
        "credit_account": "Pending",
        "confidence": "manual_required",
        "suggestion": "No matching rule, please select account manually"
    }
Enter fullscreen mode Exit fullscreen mode

Scenario 2: Invoice Recognition — OCR Key Field Extraction

"""
Invoice recognition: PaddleOCR for key field extraction
Install: pip install paddlepaddle paddleocr
"""
from paddleocr import PaddleOCR
import re

ocr_engine = PaddleOCR(use_angle_cls=True, lang="ch")

def extract_invoice(image_path: str) -> dict:
    """Extract key fields from VAT invoice"""
    result = ocr_engine.ocr(image_path, cls=True)
    text_lines = [line[1][0] for line in result[0]]
    full_text = "\n".join(text_lines)

    return {
        "invoice_code": _extract(r"Invoice Code[::]\s*(\d+)", full_text),
        "invoice_number": _extract(r"Invoice Number[::]\s*(\d+)", full_text),
        "date": _extract(r"Date[::]\s*(\d{4}[-/]\d{1,2}[-/]\d{1,2})", full_text),
        "amount": _extract(r"Amount[::]\s*([\d,.]+)", full_text),
        "tax": _extract(r"Tax[::]\s*([\d,.]+)", full_text),
        "seller": _extract(r"Seller[::]\s*(.+?)(?:\n|$)", full_text),
        "total": _extract(r"Total[::]\s*([\d,.]+)", full_text),
        "raw_text": full_text
    }

def _extract(pattern: str, text: str) -> str:
    match = re.search(pattern, text)
    return match.group(1).strip() if match else ""
Enter fullscreen mode Exit fullscreen mode

Scenario 3: Expense Audit — Multi-Rule Validation Engine

"""
Expense audit: multi-rule parallel validation with structured conclusions
"""

class ExpenseRule:
    """Audit rule base class"""
    def __init__(self, name: str, severity: str = "warning"):
        self.name = name
        self.severity = severity  # warning / rejection

    def check(self, expense: dict) -> Optional[str]:
        raise NotImplementedError

class AmountLimitRule(ExpenseRule):
    """Amount ceiling rule"""
    def __init__(self, limit: float, dept_limits: dict = None):
        super().__init__("Amount Limit Check", "rejection")
        self.limit = limit
        self.dept_limits = dept_limits or {}

    def check(self, expense: dict) -> Optional[str]:
        dept = expense.get("department", "")
        actual_limit = self.dept_limits.get(dept, self.limit)
        if expense["amount"] > actual_limit:
            return f"Amount {expense['amount']} exceeds {dept} dept limit {actual_limit}"
        return None

class DuplicateRule(ExpenseRule):
    """Duplicate reimbursement check"""
    def __init__(self, history: List[dict]):
        super().__init__("Duplicate Reimbursement Check", "rejection")
        self.history = history

    def check(self, expense: dict) -> Optional[str]:
        for h in self.history:
            if (h.get("invoice_number") == expense.get("invoice_number")
                and h.get("applicant") == expense.get("applicant")):
                return f"Invoice {expense.get('invoice_number')} already reimbursed"
        return None

class TimeRangeRule(ExpenseRule):
    """Time validity check"""
    def __init__(self, max_days: int = 90):
        super().__init__("Expense Timeliness Check", "warning")
        self.max_days = max_days

    def check(self, expense: dict) -> Optional[str]:
        from datetime import datetime
        expense_date = datetime.fromisoformat(expense["expense_date"])
        days_diff = (datetime.now() - expense_date).days
        if days_diff > self.max_days:
            return f"Expense {days_diff} days old, exceeds {self.max_days}-day limit"
        return None

def audit_expense(expense: dict, rules: List[ExpenseRule]) -> dict:
    """Execute expense audit"""
    rejections, warnings = [], []
    for rule in rules:
        result = rule.check(expense)
        if result:
            if rule.severity == "rejection":
                rejections.append({"rule": rule.name, "reason": result})
            else:
                warnings.append({"rule": rule.name, "reason": result})

    if rejections:
        return {"conclusion": "rejected", "rejections": rejections, "warnings": warnings}
    elif warnings:
        return {"conclusion": "conditional_pass", "warnings": warnings}
    else:
        return {"conclusion": "approved"}
Enter fullscreen mode Exit fullscreen mode

Scenarios 4-6: Reconciliation, Tax Calculation, Report Generation

These three follow the same pattern — rule-driven + template output:

  • Reconciliation: Match by amount + date + account name triple; unmatched items go to manual reconciliation queue
  • Tax calculation: Encode VAT, corporate income tax, stamp duty formulas as Python functions — input tax base, output per-tax liability
  • Report generation: Use openpyxl to auto-fill balance sheet, income statement, cash flow statement from templates, with auto-calculated YoY/QoQ metrics
"""
Tax calculation example: VAT + Corporate Income Tax
"""

def calc_vat(revenue: float, input_vat: float, rate: float = 0.06) -> dict:
    """VAT calculation: general taxpayer simplified method"""
    output_vat = revenue * rate
    payable = max(0, output_vat - input_vat)
    return {
        "output_vat": round(output_vat, 2),
        "input_vat": round(input_vat, 2),
        "vat_payable": round(payable, 2),
        "rate": f"{rate*100:.0f}%"
    }

def calc_corporate_income_tax(profit_before_tax: float,
                               tax_rate: float = 0.25) -> dict:
    """Corporate income tax calculation"""
    tax_amount = profit_before_tax * tax_rate if profit_before_tax > 0 else 0
    return {
        "profit_before_tax": profit_before_tax,
        "tax_rate": f"{tax_rate*100:.0f}%",
        "tax_amount": round(tax_amount, 2),
        "net_profit": round(profit_before_tax - tax_amount, 2)
    }
Enter fullscreen mode Exit fullscreen mode

Pure Python agents aren't toy solutions. Without an AI platform, the rules engine + NLP tokenization + OCR combination covers 80% of finance automation scenarios at zero API cost, with full data control and no external data leakage risk.

Engineering Deployment Checklist

Getting this into production requires a structured rollout:

Infrastructure (Weeks 1-2)

# Task Deliverable Owner
1 Set up Python runtime, install PaddleOCR, openpyxl dependencies Environment doc Ops team
2 Connect to ticketing system API — read and update tickets Interface layer code Dev team
3 Deploy notification channels (email + SMS + enterprise WeChat) Notification service Ops team
4 Set up data storage for rules library and case library DB schema Data team

Core Engines (Weeks 3-4)

# Task Deliverable Owner
5 Implement three-layer dispatch, import historical tickets for keyword weight training Dispatch service Dev team
6 Implement state-machine workflow engine, configure 3-5 core business flows Workflow engine Dev team
7 Implement SLA monitoring engine, configure tiered alert policies Monitoring service Dev team
8 Implement day/night mode auto-switching Config center Dev team

Scenario Integration (Weeks 5-6)

# Task Deliverable Owner
9 Onboard account freeze/unfreeze flow, configure workflow YAML Business flow config Ops team
10 Onboard reconciliation flow, implement triple matching (amount + date + name) Reconciliation module Settlement team
11 Onboard expense audit flow, configure audit rule sets Audit module Finance team
12 Onboard invoice recognition and intelligent bookkeeping Bookkeeping module Finance team

Validation & Go-Live (Weeks 7-8)

# Task Deliverable Owner
13 Replay historical tickets to validate dispatch accuracy ≥ 85% Validation report QA team
14 72-hour continuous unmanned stress test Stress test report QA team
15 Security audit: access control, operation audit, sensitive data redaction Audit report Security team
16 Gradual rollout: start with 1-2 low-risk business scenarios Run report Ops team

8 weeks from zero to gradual rollout. Don't overreach — nail one business flow before onboarding the next. Every phase has clear deliverables and acceptance criteria.


Bank operations intelligence isn't about replacing people with AI — it's about freeing people from repetitive labor to focus on work that requires judgment and creativity. 24/7 unmanned closed-loop automation means the system keeps running through nights and holidays. SLA compliance isn't maintained by people staring at dashboards — it's guaranteed by architecture. From ticket dispatch to workflow orchestration to SLA monitoring, from rules engines to pure Python agents — the core design principle is one thing: let machines do what machines should do, and let people do what people should do.

Top comments (0)