DEV Community

Cover image for Enterprise-Grade Custom Financial Data APIs: From Architecture Design to Python Integration
San Si wu
San Si wu

Posted on

Enterprise-Grade Custom Financial Data APIs: From Architecture Design to Python Integration

In the wave of digital transformation in the financial industry, data has become a core corporate asset. Whether for quantitative trading, risk management, or robo-advisory services, high-quality, low-latency financial data is indispensable. However, generic data APIs often fail to meet enterprise-specific needs—issues such as incomplete fields, mismatched update frequencies, and inconsistent data rules are frequent. Consequently, an increasing number of enterprises are building or procuring customized financial data APIs.

I. Why Do We Need Customized Financial Data APIs?

Generic financial data APIs (such as Bloomberg or Wind) often present the following pain points in deep enterprise-level usage:

  • Inability to integrate proprietary internal data.
  • High parsing costs due to excessive or insufficient returned fields.
  • Unfriendly pricing models (per-call fees or high annual subscriptions) for high-frequency scenarios.
  • Internal compliance requirements prohibiting data from leaving private clouds.

Customized APIs precisely match business scenarios by returning only necessary fields, supporting private deployment, integrating with internal data lakes, and offering elastic billing based on actual usage.

II. Core Design Principles

1. Domain-Driven Design (DDD) First
Abstract financial data into clear domain models:

  • Market Data: Quotes, order books.
  • Reference Data: Security basics, corporate actions.
  • Fundamental Data: Financial indicators, valuations.
  • Alternative Data: Sentiment analysis, alternative metrics. Each domain evolves independently, linked through a unified data dictionary.

2. API First & OpenAPI Specification
Define all interfaces beforehand to support automatic SDK generation and documentation. Example interface design:

/getQuote:
  get:
    summary: Get real-time quotes
    parameters:
      - name: symbols
        in: query
        required: true
        schema:
          type: array
          items: { type: string }
      - name: fields
        in: query
        schema:
          type: array
          items: { type: string, enum: [open, high, low, last, volume] }
Enter fullscreen mode Exit fullscreen mode

3. Multi-Tenancy & Quota Control
Support independent tenants for different business lines, with configurable call frequency limits, accessible data ranges, and output format preferences.

III. Overall Technical Architecture

The core architecture of a customized financial data API includes the following layers: Client → Load Balancer → API Gateway → Business Service Layer → Data Aggregation Layer → Data Sources. Key components are:

  • API Gateway: Manages routing, rate limiting, and authentication.
  • Business Services: Implement specific data logic.
  • Cache Layer: Provides millisecond-level responses using Redis.
  • Data Aggregation: Real-time data cleaning and alignment using Flink.
  • Storage Layer: Stores time-series data using ClickHouse.
  • Data Source Adapters: Plugin-based integration with various data sources.

IV. Key Challenges & Solutions

1. Consistency Alignment of Multi-Source Data
Different data sources have varying timestamps, adjustment methods, and handling of trading halts. The solution is to establish a standardized data pipeline (ETL → Cleaning → Alignment → Validation), outputting a single authoritative version with T+0 real-time validation and T+1 reconciliation.

2. Low Latency Under High Concurrency
Quote APIs must support thousands of queries per second (QPS) with P99 latency below 50 milliseconds. Solutions include:

  • Pushing hot data to Redis or local caches.
  • Using asynchronous non-blocking models.
  • Implementing lazy loading or on-demand querying for low-frequency fields.

3. Flexible Return of Customized Fields
Different clients require varying field combinations. The solution is to introduce field selectors (e.g., fields=open,high,last), dynamically assembling JSON responses on the server side to avoid client-side field projection.

4. Graded Handling of Data Timeliness

  • L1 Real-Time Push (WebSocket): Used during trading hours.
  • L2 Near-Real-Time (Every 3 Seconds): Used for intra-day monitoring.
  • L3 Batch Processing (Daily Midnight): Used for fundamental data.

V. Observability & Operations Assurance

Financial-grade APIs must be observable and auditable. Monitor metrics such as QPS, error rates, data lag seconds, and cache hit rates. Use full-chain Trace IDs to log and query by tenant. Configure alert rules to trigger P0 alerts if data sources stop streaming for more than 3 seconds. Audit each data request's tenant, fields, and response time.

VI. Python Integration:

After understanding the architecture design, we need to start integrating with a specific financial data source. Here, we'll use iTick API as an example to demonstrate how to implement full data integration using Python. iTick covers global stocks, forex, futures, and more, offering REST API and WebSocket access methods.

First, install dependencies: pip install requests websocket-client.

6.1 Get Real-Time Stock Quotes

import requests

API_TOKEN = "your_api_token_here"
BASE_URL = "https://api.itick.org"

def get_stock_quote(region, code):
    url = f"{BASE_URL}/stock/quote"
    headers = {"accept": "application/json", "token": API_TOKEN}
    params = {"region": region, "code": code}

    response = requests.get(url, headers=headers, params=params, timeout=10)
    if response.status_code == 200:
        data = response.json()
        if data.get("code") == 0:
            return data.get("data", {})
    return None

quote = get_stock_quote("US", "AAPL")
if quote:
    print(f"Apple latest price: {quote.get('ld')} USD, Change %: {quote.get('chp')}%")
Enter fullscreen mode Exit fullscreen mode

6.2 Get Forex Quotes and Historical K-Lines

def get_forex_quote(currency_pair):
    url = f"{BASE_URL}/forex/quote"
    headers = {"accept": "application/json", "token": API_TOKEN}
    params = {"region": "GB", "code": currency_pair}
    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        data = response.json()
        return data.get("data") if data.get("code") == 0 else None

def get_kline_data(region, code, ktype, limit=100):
    # ktype: 1-1min 2-5min 3-15min 4-30min 5-60min 8-Daily
    url = f"{BASE_URL}/stock/kline"
    params = {"region": region, "code": code, "kType": ktype, "limit": limit}
    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        data = response.json()
        return data.get("data", []) if data.get("code") == 0 else []

eurusd = get_forex_quote("EURUSD")
klines = get_kline_data("HK", "700", ktype=8, limit=10)
Enter fullscreen mode Exit fullscreen mode

6.3 WebSocket Real-Time Push

For real-time monitoring needs, WebSocket latency can be controlled to milliseconds:

import websocket
import json
import threading
import time

WS_URL = "wss://api.itick.org/stock"

def on_message(ws, message):
    data = json.loads(message)
    if data.get("resAc") == "auth" and data.get("code") == 1:
        # Subscribe after successful authentication
        sub_msg = {"ac": "subscribe", "params": "AAPL$US", "types": "quote"}
        ws.send(json.dumps(sub_msg))
    elif data.get("data"):
        market_data = data["data"]
        print(f"{market_data.get('s')} Latest Price: {market_data.get('ld')}")

def on_close(ws, close_status_code, close_msg):
    print("Connection closed, reconnecting in 5 seconds...")
    time.sleep(5)
    start_websocket()

def send_heartbeat(ws):
    while True:
        time.sleep(30)
        ws.send(json.dumps({"ac": "ping", "params": str(int(time.time()*1000))}))

def start_websocket():
    ws = websocket.WebSocketApp(WS_URL, header={"token": API_TOKEN},
                                on_message=on_message, on_close=on_close)
    threading.Thread(target=send_heartbeat, args=(ws,), daemon=True).start()
    ws.run_forever()
Enter fullscreen mode Exit fullscreen mode

6.4 Encapsulate as an Enterprise-Level Client

from typing import Dict, List, Optional, Callable

class ITickClient:
    def __init__(self, token: str, base_url: str = "https://api.itick.org"):
        self.token = token
        self.base_url = base_url
        self.headers = {"accept": "application/json", "token": token}

    def get_quote(self, asset_type: str, region: str, code: str) -> Optional[Dict]:
        url = f"{self.base_url}/{asset_type}/quote"
        resp = requests.get(url, headers=self.headers, 
                           params={"region": region, "code": code}, timeout=10)
        if resp.status_code == 200:
            data = resp.json()
            return data.get("data") if data.get("code") == 0 else None
        return None

    def get_kline(self, asset_type: str, region: str, code: str, 
                  ktype: int, limit: int = 100) -> List[Dict]:
        url = f"{self.base_url}/{asset_type}/kline"
        params = {"region": region, "code": code, "kType": ktype, "limit": limit}
        resp = requests.get(url, headers=self.headers, params=params, timeout=30)
        if resp.status_code == 200:
            data = resp.json()
            return data.get("data", []) if data.get("code") == 0 else []
        return []

    def subscribe_realtime(self, symbols: List[str], types: List[str], on_data: Callable):
        ws_url = "wss://api.itick.org/stock"
        def on_message(ws, message):
            data = json.loads(message)
            if data.get("resAc") == "auth" and data.get("code") == 1:
                ws.send(json.dumps({"ac": "subscribe", 
                                   "params": ",".join(symbols), 
                                   "types": ",".join(types)}))
            elif data.get("data"):
                on_data(data["data"])
        ws = websocket.WebSocketApp(ws_url, header={"token": self.token}, 
                                    on_message=on_message)
        ws.run_forever()

client = ITickClient("your_token")
quote = client.get_quote("stock", "US", "AAPL")
Enter fullscreen mode Exit fullscreen mode

6.5 Production Environment Best Practices

Add retry mechanisms with exponential backoff:

from time import sleep
from functools import wraps

def retry(max_retries=3, delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for i in range(max_retries):
                result = func(*args, **kwargs)
                if result is not None:
                    return result
                sleep(delay * (2 ** i))
            return None
        return wrapper
    return decorator

@retry(max_retries=3)
def get_quote_with_retry(client, region, code):
    return client.get_quote("stock", region, code)
Enter fullscreen mode Exit fullscreen mode

For historical data, consider local caching to avoid repeated requests:

import sqlite3

def cache_kline(code, kline_data):
    conn = sqlite3.connect('market_data.db')
    cursor = conn.cursor()
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS kline_{code} (
            timestamp TEXT, open REAL, high REAL, low REAL, close REAL, volume REAL
        )
    """)
    for k in kline_data:
        cursor.execute(f"INSERT INTO kline_{code} VALUES (?,?,?,?,?,?)",
                      (k['t'], k['o'], k['h'], k['l'], k['c'], k['v']))
    conn.commit()
    conn.close()
Enter fullscreen mode Exit fullscreen mode

Conclusion

If an enterprise is building a customized financial data API from scratch, it's advisable to adopt a phased approach:

  • MVP Phase: Implement 3-5 highest-frequency interfaces for basic REST API.
  • Optimization Phase: Introduce caching and on-demand field returns.
  • Expansion Phase: Add WebSocket real-time push and multi-source aggregation.
  • Intelligence Phase: Integrate large models for natural language queries.

With the proliferation of large model technology, customized financial data APIs will evolve towards natural language queries (e.g., "Check Moutai's PE band for the past 5 years" automatically generating requests), intelligent routing (automatically selecting the best data source based on query content), and semantic layers (built-in business terminology to avoid interpretation errors).

The essence of an enterprise-customized financial data API is to serviceize data governance capabilities, enabling business departments to self-access high-quality data rather than repeatedly requesting, waiting, and reconciling data. Whether you're a technical leader planning data architecture or a developer needing to integrate data sources, I hope the architectural principles and practical code in this article provide valuable references.

Reference Documentation: https://docs.itick.org/rest-api/forex/forex-quote
GitHub: https://github.com/itick-org/

Top comments (0)