DEV Community

Cover image for Financial Data Persistence via WebSocket & SQL (Full-Stack)
San Si wu
San Si wu

Posted on

Financial Data Persistence via WebSocket & SQL (Full-Stack)

For developers building quantitative trading systems, financial data analytics platforms, or market dashboards, the core requirement is never just "getting data"—it’s "getting data reliably, querying it flexibly, and reusing it effectively."

Many developers start by calling financial data APIs directly for ad-hoc analysis, but as data volumes grow and requirements evolve, they quickly face pain points: repeated API calls, disorganized data, and inefficient queries. The right implementation logic should be: use WebSocket to connect to regulated data providers for real-time raw data push, standardize and persist the data to a database, leverage WebSocket’s real-time push capabilities for automatic updates, and finally encapsulate custom query interfaces to make financial data manageable, queryable, and reusable.

This article walks through a complete Java-based implementation of financial data integration—covering WebSocket data ingestion, database persistence, real-time automatic updates, and query interface development—with full reusable code that even beginners can implement quickly.

Core Logic: The Complete Pipeline from WebSocket to Query Interfaces

Unlike "scheduled API polling" solutions, the WebSocket-based financial data pipeline offers the key advantages of "real-time push and automatic updates." The full closed-loop process has four stages:

  1. WebSocket Connection to Data Providers: Use a Java WebSocket client to establish a persistent connection with regulated data providers' WebSocket interfaces and receive real-time push of raw financial data (in JSON format).
  2. Data Processing & Database Persistence: Standardize, deduplicate, and enrich raw WebSocket data, then store it in a MySQL database with a normalized table structure.
  3. Real-Time Automatic Updates: Leverage WebSocket’s persistent connection to trigger automatic processing and persistence whenever new data is pushed by the provider.
  4. Query Interface Encapsulation: Build SQL query interfaces with Spring Boot to support on-demand filtering and aggregation, for consumption by frontends, backends, or quantitative systems.

Key Benefits:

  • Real-time data push and automatic updates (no scheduled tasks required)
  • Reusable persistent connections reduce request overhead (more efficient than traditional API polling)
  • One-time data persistence with repeated reuse, drastically improving query efficiency and enabling full control over data permissions

Practical Implementation: 4 Steps to Build the End-to-End Pipeline

This implementation uses real-time A-share market data as an example, following the pipeline: iTick WebSocket → Java Client Ingestion → MySQL Persistence → Spring Boot Query Interface. The code is fully reusable and enables automatic real-time data updates.

Step 1: Environment Setup & Dependency Configuration

This project uses Java 8+. Core dependencies include: WebSocket client (okhttp), JSON parsing (fastjson), MySQL connectivity (mybatis-plus), and Web interface (spring-boot-starter-web). Add these dependencies to your pom.xml (for Maven projects):

<!-- Core Dependency Configuration -->
<dependencies>
    <!-- Spring Boot Web for building query interfaces -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>2.7.10</version>
    </dependency>

    <!-- WebSocket Client (okhttp: lightweight and stable) -->
    <dependency>
        <groupId>com.squareup.okhttp3</groupId>
        <artifactId>okhttp</artifactId>
        <version>4.11.0</version>
    </dependency>

    <!-- JSON Parsing (fastjson: optimized for financial data) -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.32</version>
    </dependency>

    <!-- MySQL Connectivity & ORM (mybatis-plus: simplifies database operations) -->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.5.3.1</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.33</version>
        <scope>runtime</scope>
    </dependency>

    <!-- Data Validation & Utility Classes -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.12.0</version>
    </dependency>
</dependencies>
Enter fullscreen mode Exit fullscreen mode

Note: Convert dependencies for Gradle projects as needed.

Step 2: WebSocket Client Development for Real-Time Data Ingestion

Implement a WebSocket client with okhttp to establish a persistent connection with the data provider, receive real-time A-share market data, and handle connection exceptions/reconnection logic to ensure stable data ingestion.

2.1 Entity Class Definition (Mapping Push Data Format)

First, define an entity class for real-time A-share market data, mapping to JSON fields from WebSocket push for easy parsing and persistence.

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.math.BigDecimal;
import java.util.Date;

/**
 * Real-Time A-Share Market Data Entity (Maps to MySQL Table)
 */
@Data
@TableName("a_stock_realtime")
public class StockRealtime {
    // Composite Primary Key: Stock Code + Push Time (Prevent Duplicate Data)
    @TableId(type = IdType.INPUT)
    private String tsCode; // Stock Code (e.g., 600036.SH)
    @TableId(type = IdType.INPUT)
    private Date pushTime; // Real-Time Data Push Timestamp

    private BigDecimal open; // Opening Price
    private BigDecimal high; // Highest Price
    private BigDecimal low; // Lowest Price
    private BigDecimal close; // Real-Time Closing Price
    private BigDecimal volume; // Real-Time Trading Volume
    private BigDecimal amount; // Real-Time Trading Value
    private BigDecimal changeRate; // Real-Time Price Change (%)
}
Enter fullscreen mode Exit fullscreen mode

2.2 WebSocket Client Implementation (Data Ingestion + Automatic Processing)

Develop the WebSocket client to handle connection establishment, data reception, parsing, and trigger subsequent processing/persistence—enabling a closed loop of "data ingestion → automatic processing."

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import okhttp3.*;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * iTick WebSocket Client (Ingests Real-Time Stock Push Data)
 */
@Component
public class FinancialWebSocketClient {

    @Autowired
    private StockDataService stockDataService;

    // iTick Stock WebSocket Connection URL
    private static final String WEB_SOCKET_URL = "wss://api.itick.org/stock";
    // iTick API Token (Obtain via Official Website Registration)
    private static final String API_TOKEN = "YOUR_API_TOKEN_HERE";
    // Subscribed Stock List (Format: symbol$region, multiple separated by commas)
    private static final String SUBSCRIBE_SYMBOLS = "600519$SH";
    // Subscribed Data Type
    private static final String SUBSCRIBE_TYPES = "quote";

    private OkHttpClient okHttpClient;
    private WebSocket webSocket;
    private volatile boolean isConnected = false;

    public void start() {
        okHttpClient = new OkHttpClient.Builder()
                .connectTimeout(10, TimeUnit.SECONDS)
                .readTimeout(60, TimeUnit.SECONDS)
                .writeTimeout(10, TimeUnit.SECONDS)
                .build();

        // Build Request (Add Token Authentication Header)
        Request request = new Request.Builder()
                .url(WEB_SOCKET_URL)
                .header("token", API_TOKEN)
                .build();

        // Establish WebSocket Connection
        webSocket = okHttpClient.newWebSocket(request, new WebSocketListener() {
            @Override
            public void onOpen(WebSocket webSocket, Response response) {
                super.onOpen(webSocket, response);
                isConnected = true;
                System.out.println("iTick WebSocket connection successful. Starting real-time data ingestion...");
                // Send Subscription Command
                sendSubscribeMessage();
                // Start Heartbeat Thread
                startHeartbeat();
            }

            @Override
            public void onMessage(WebSocket webSocket, String text) {
                super.onMessage(webSocket, text);
                System.out.println("Received data: " + text);
                if (StringUtils.isNotBlank(text)) {
                    parseAndProcessData(text);
                }
            }

            @Override
            public void onFailure(WebSocket webSocket, Throwable t, Response response) {
                super.onFailure(webSocket, t, response);
                System.err.println("WebSocket connection error: " + t.getMessage());
                isConnected = false;
                reconnect();
            }

            @Override
            public void onClosed(WebSocket webSocket, int code, String reason) {
                super.onClosed(webSocket, code, reason);
                System.out.println("WebSocket connection closed. Reason: " + reason);
                isConnected = false;
                reconnect();
            }
        });
    }

    /**
     * Send Subscription Command (Compliant with iTick Protocol)
     * Format: {"ac":"subscribe", "params":"AAPL$US,600519$SH", "types":"quote"}
     */
    private void sendSubscribeMessage() {
        JSONObject subscribeMsg = new JSONObject();
        subscribeMsg.put("ac", "subscribe");
        subscribeMsg.put("params", SUBSCRIBE_SYMBOLS);
        subscribeMsg.put("types", SUBSCRIBE_TYPES);
        webSocket.send(subscribeMsg.toJSONString());
        System.out.println("Subscription request sent: " + subscribeMsg.toJSONString());
    }

    /**
     * Start Heartbeat Mechanism (Send ping every 30 seconds to maintain connection)
     */
    private void startHeartbeat() {
        Thread heartbeatThread = new Thread(() -> {
            while (isConnected) {
                try {
                    TimeUnit.SECONDS.sleep(30);
                    if (webSocket != null && isConnected) {
                        JSONObject pingMsg = new JSONObject();
                        pingMsg.put("ac", "ping");
                        pingMsg.put("params", String.valueOf(System.currentTimeMillis()));
                        webSocket.send(pingMsg.toJSONString());
                        System.out.println("Heartbeat packet sent");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        heartbeatThread.setDaemon(true);
        heartbeatThread.start();
    }

    /**
     * Reconnection Mechanism (Exponential Backoff)
     */
    private void reconnect() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println("Attempting reconnection...");
            start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void parseAndProcessData(String jsonText) {
        try {
            JSONObject response = JSON.parseObject(jsonText);
            // Check Response Code (code=1 indicates success)
            Integer code = response.getInteger("code");
            if (code == null || code != 1) {
                System.out.println("Invalid response code: " + response.getString("msg"));
                return;
            }

            JSONObject data = response.getJSONObject("data");
            if (data == null) {
                return;
            }

            // Process only "quote" type data
            String dataType = data.getString("type");
            if (!"quote".equals(dataType)) {
                return;
            }

            StockRealtime stockRealtime = new StockRealtime();
            // Parse Stock Code (Format: AAPL$US, extract part before $)
            String fullSymbol = data.getString("s");
            if (StringUtils.isNotBlank(fullSymbol)) {
                String[] parts = fullSymbol.split("\\$");
                stockRealtime.setTsCode(parts[0]);
            }
            stockRealtime.setPushTime(new Date());
            // iTick Field Mapping
            stockRealtime.setOpen(getBigDecimal(data, "o"));      // Opening Price
            stockRealtime.setHigh(getBigDecimal(data, "h"));      // Highest Price
            stockRealtime.setLow(getBigDecimal(data, "l"));       // Lowest Price
            stockRealtime.setClose(getBigDecimal(data, "ld"));    // Latest Price
            stockRealtime.setVolume(getBigDecimal(data, "v"));    // Trading Volume
            stockRealtime.setChangeRate(getBigDecimal(data, "chp")); // Price Change (%)
            // Optional: Set Trading Value (Calculate if not provided by iTick: Latest Price × Volume)
            if (data.containsKey("tu")) {
                stockRealtime.setAmount(getBigDecimal(data, "tu"));
            } else if (stockRealtime.getClose() != null && stockRealtime.getVolume() != null) {
                stockRealtime.setAmount(stockRealtime.getClose().multiply(stockRealtime.getVolume()));
            }

            // Call Data Processing Service
            stockDataService.processAndSaveData(stockRealtime);

        } catch (Exception e) {
            System.err.println("Data parsing failed: " + e.getMessage());
            e.printStackTrace();
        }
    }

    private BigDecimal getBigDecimal(JSONObject json, String key) {
        Object value = json.get(key);
        if (value == null) {
            return null;
        }
        if (value instanceof BigDecimal) {
            return (BigDecimal) value;
        }
        return new BigDecimal(value.toString());
    }

    public void close() {
        isConnected = false;
        if (webSocket != null) {
            webSocket.close(1000, "Connection closed actively");
        }
        if (okHttpClient != null) {
            okHttpClient.dispatcher().executorService().shutdown();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 3: Data Processing & Persistence for Automatic Updates

After ingesting data via WebSocket, the client automatically calls the data processing service to deduplicate, validate, and standardize data—then persist it to MySQL in batches using MyBatis-Plus. Leverage WebSocket’s real-time push to enable automatic data updates.

3.1 Data Processing Service

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;

/**
 * Stock Data Processing Service
 */
@Service
public class StockDataService extends ServiceImpl<StockRealtimeMapper, StockRealtime> {

    /**
     * Data Processing + Persistence (Core Method for Automatic Updates)
     * @param stockRealtime Ingested real-time data
     */
    @Transactional(rollbackFor = Exception.class)
    public void processAndSaveData(StockRealtime stockRealtime) {
        // 1. Data Processing: Validate required fields, deduplicate, standardize format
        if (validateData(stockRealtime)) {
            // 2. Deduplication: Check if data for this stock + push time already exists
            QueryWrapper<StockRealtime> queryWrapper = new QueryWrapper<>();
            queryWrapper.eq("ts_code", stockRealtime.getTsCode())
                    .eq("push_time", stockRealtime.getPushTime());
            boolean exists = this.exists(queryWrapper);

            // 3. Persist if not exists (Skip to avoid duplicates)
            if (!exists) {
                this.save(stockRealtime);
                System.out.println("Data persisted successfully: " + stockRealtime.getTsCode() + " " + stockRealtime.getPushTime());
            }
        }
    }

    /**
     * Data Validation (Core of Processing: Filter Invalid Data)
     */
    private boolean validateData(StockRealtime stockRealtime) {
        // Validate Required Fields
        if (StringUtils.isBlank(stockRealtime.getTsCode()) 
                || stockRealtime.getPushTime() == null) {
            System.out.println("Invalid data: Stock code or push time is empty");
            return false;
        }
        // Validate Numeric Fields (Prevent Null/Non-Numeric Values)
        if (stockRealtime.getOpen() == null) stockRealtime.setOpen(BigDecimal.ZERO);
        if (stockRealtime.getHigh() == null) stockRealtime.setHigh(BigDecimal.ZERO);
        if (stockRealtime.getLow() == null) stockRealtime.setLow(BigDecimal.ZERO);
        if (stockRealtime.getClose() == null) stockRealtime.setClose(BigDecimal.ZERO);
        if (stockRealtime.getVolume() == null) stockRealtime.setVolume(BigDecimal.ZERO);
        if (stockRealtime.getChangeRate() == null) stockRealtime.setChangeRate(BigDecimal.ZERO);
        return true;
    }
}
Enter fullscreen mode Exit fullscreen mode

3.2 MyBatis-Plus Mapper (Database Operations)

Define a Mapper interface to simplify MySQL operations—no manual SQL writing required (MyBatis-Plus generates CRUD operations automatically).

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;

/**
 * Real-Time Stock Market Data Mapper (Database Operations)
 */
@Mapper
public interface StockRealtimeMapper extends BaseMapper<StockRealtime> {
    // No manual SQL needed—MyBatis-Plus provides CRUD methods
}
Enter fullscreen mode Exit fullscreen mode

3.3 Database Configuration & Table Creation

Configure MySQL connection details in application.yml—the table is created automatically on system startup (MyBatis-Plus auto-table creation).

# application.yml Configuration
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/financial_data_db?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8
    username: root
    password: your_database_password
mybatis-plus:
  mapper-locations: classpath:mapper/**/*.xml
  type-aliases-package: com.example.financial.entity
  configuration:
    map-underscore-to-camel-case: true # Underscore to CamelCase Mapping
  global-config:
    db-config:
      table-prefix: # No table prefix
      id-type: input # Manual primary key input (Composite Primary Key)
    banner: false
Enter fullscreen mode Exit fullscreen mode

Note: Create the MySQL database in advance (database name: financial_data_db). The table is auto-created by MyBatis-Plus based on the entity class—no manual DDL required. The composite primary key (ts_code + push_time) prevents duplicate data.

3.4 Start WebSocket Client (Auto-Connect on Project Startup)

Write a startup class to automatically launch the WebSocket client on project initialization, establish the connection, and start real-time data ingestion/persistence.

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan(basePackages = "com.example.financial") // Scan All Components
public class FinancialDataApplication implements CommandLineRunner {

    private final FinancialWebSocketClient webSocketClient;

    // Inject WebSocket Client via Constructor
    public FinancialDataApplication(FinancialWebSocketClient webSocketClient) {
        this.webSocketClient = webSocketClient;
    }

    public static void main(String[] args) {
        SpringApplication.run(FinancialDataApplication.class, args);
    }

    // Launch WebSocket Client Automatically After Project Startup
    @Override
    public void run(String... args) throws Exception {
        webSocketClient.start();
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 4: Build Spring Boot Query Interfaces for Flexible Consumption

Once data is persisted automatically, build query interfaces with Spring Boot to support custom SQL (or parameter-based filtering) for consumption by frontends, quantitative systems, etc.

4.1 Interface Controller (Core Query Interfaces)

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.financial.entity.StockRealtime;
import com.example.financial.service.StockDataService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Financial Data Query Interface Controller
 */
@RestController
@RequestMapping("/api/financial")
public class FinancialQueryController {

    @Autowired
    private StockDataService stockDataService;

    // Custom SQL Query Interface (Supports Flexible Filtering, Similar to Python Interfaces)
    @PostMapping("/query")
    public Map<String, Object> queryData(@RequestBody Map<String, String> request) {
        Map<String, Object> result = new HashMap<>();
        try {
            // 1. Get Request Parameters (SQL Statement + API Key to Prevent Unauthorized Access)
            String sql = request.get("sql");
            String apiKey = request.get("apiKey");

            // 2. API Key Validation (Custom Key to Prevent Interface Abuse)
            String validApiKey = "your_custom_api_key";
            if (!validApiKey.equals(apiKey)) {
                result.put("code", 403);
                result.put("msg", "Invalid API key: Access denied");
                result.put("data", null);
                return result;
            }

            // 3. Validate SQL (Allow Only SELECT Statements to Prevent SQL Injection)
            if (sql == null || !sql.trim().toUpperCase().startsWith("SELECT")) {
                result.put("code", 400);
                result.put("msg", "Only SELECT queries are supported");
                result.put("data", null);
                return result;
            }

            // 4. Execute SQL Query (MyBatis-Plus Executes Custom SQL)
            List<Map<String, Object>> data = stockDataService.getBaseMapper().selectMaps(new QueryWrapper<>().last(sql));

            // 5. Return Results
            result.put("code", 200);
            result.put("msg", "Query successful");
            result.put("count", data.size());
            result.put("data", data);
        } catch (Exception e) {
            result.put("code", 500);
            result.put("msg", "Query failed: " + e.getMessage());
            result.put("data", null);
        }
        return result;
    }

    // Optional: Pagination Query Interface (Optimized for Large Datasets)
    @PostMapping("/query/page")
    public Map<String, Object> queryPage(@RequestBody Map<String, Object> request) {
        Map<String, Object> result = new HashMap<>();
        try {
            int pageNum = Integer.parseInt(request.get("pageNum").toString());
            int pageSize = Integer.parseInt(request.get("pageSize").toString());
            String tsCode = request.get("tsCode").toString(); // Stock Code (Optional Filter)

            // Pagination Query
            IPage<StockRealtime> page = new Page<>(pageNum, pageSize);
            QueryWrapper<StockRealtime> queryWrapper = new QueryWrapper<>();
            if (tsCode != null && !tsCode.isEmpty()) {
                queryWrapper.eq("ts_code", tsCode);
            }
            // Sort by Push Time (Newest First)
            queryWrapper.orderByDesc("push_time");

            IPage<StockRealtime> resultPage = stockDataService.page(page, queryWrapper);

            result.put("code", 200);
            result.put("msg", "Pagination query successful");
            result.put("total", resultPage.getTotal());
            result.put("pages", resultPage.getPages());
            result.put("data", resultPage.getRecords());
        } catch (Exception e) {
            result.put("code", 500);
            result.put("msg", "Pagination query failed: " + e.getMessage());
            result.put("data", null);
        }
        return result;
    }
}
Enter fullscreen mode Exit fullscreen mode

4.2 Interface Call Example (Postman/Java)

Once the interface is running, send a POST request with SQL and API key via Postman or Java to retrieve filtered real-time financial data. Example (Java):

import com.alibaba.fastjson.JSONObject;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import java.io.IOException;

/**
 * Financial Query Interface Call Example
 */
public class QueryApiTest {
    public static void main(String[] args) throws IOException {
        // 1. Interface URL (After Project Startup)
        String apiUrl = "http://localhost:8080/api/financial/query";

        // 2. Request Parameters (SQL Statement + API Key)
        JSONObject payload = new JSONObject();
        payload.put("sql", "SELECT ts_code, push_time, close, volume, change_rate FROM a_stock_realtime WHERE ts_code = '600036.SH' ORDER BY push_time DESC LIMIT 10");
        payload.put("apiKey", "your_custom_api_key");

        // 3. Send POST Request
        OkHttpClient client = new OkHttpClient();
        MediaType mediaType = MediaType.parse("application/json;charset=utf-8");
        RequestBody requestBody = RequestBody.create(mediaType, payload.toJSONString());
        Request request = new Request.Builder()
                .url(apiUrl)
                .post(requestBody)
                .build();

        // 4. Receive Response
        try (Response response = client.newCall(request).execute()) {
            if (response.isSuccessful() && response.body() != null) {
                String responseBody = response.body().string();
                System.out.println("Interface response: " + responseBody);
            } else {
                System.out.println("Interface call failed: " + response.code());
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Note: The interface supports custom SQL queries and pagination for large datasets. Extend with multi-table joins, conditional filtering, etc., to fit your use case.

Critical Pitfall Avoidance (Must-Read for Implementation)

  1. WebSocket Connection Stability: Handle connection exceptions and reconnection logic to avoid data interruptions. Add heartbeat detection (send heartbeats to the data provider) to prevent persistent connection termination.
  2. Data Compliance: Use regulated data providers' WebSocket interfaces only after obtaining authorization—avoid scraping unauthorized data to mitigate legal risks. Adhere to the provider’s push frequency and data scope limits.
  3. Persistence Performance Optimization: For high-volume real-time push data, use batch persistence (e.g., accumulate 100 records before batch insertion) to reduce database interactions. Index frequently queried fields (ts_code, push_time) to improve query speed.
  4. Interface Security: Add API key validation to query interfaces (never expose publicly). Restrict to SELECT statements only to prevent SQL injection. Implement rate limiting to avoid abuse.
  5. Data Deduplication & Validation: WebSocket may push duplicate data—use the "stock code + push time" composite primary key for deduplication. Strictly validate data formats to avoid invalid/abnormal values in the database.
  6. Resource Release: Actively close WebSocket and database connections on project shutdown to prevent resource leaks. Call the close() method in Spring Boot’s destroy hook.

Use Cases & Extension Directions

Use Cases

  • Quantitative Trading: Build a real-time data foundation with interfaces directly integrated into backtesting/trading systems for low-latency market data access.
  • Financial Dashboard Development: Frontends retrieve real-time data via query interfaces to render market charts and alerts for real-time visualization.
  • Enterprise-Grade Financial Systems: Provide standardized real-time data query services for risk management, real-time monitoring, and business systems to enable data reuse.
  • Personal Replay/Analysis: Build a personal real-time financial data warehouse with custom SQL queries for market replay and anomaly tracking.

Extension Directions

  • Multi-Source Data Integration: iTick WebSocket supports simultaneous connections to stocks, cryptocurrencies, forex, etc.—unify persistence for one-stop real-time querying.
  • Interface Enhancement: Add role-based access control (RBAC), query logging, and data caching (e.g., Redis) to improve performance and security.
  • Monitoring & Alerting: Monitor WebSocket connections, data persistence, and interface calls—send alerts (SMS, enterprise WeChat) for exceptions (e.g., disconnection, persistence failures).
  • Persistence Optimization: Use MySQL table sharding (time-based sharding) for historical data to avoid oversized tables and improve query/persistence performance.

Conclusion

The Java+WebSocket financial data pipeline is centered on "real-time push and automatic updates"—ingest real-time pushes via persistent WebSocket connections, trigger automatic processing/persistence, and eliminate the inefficiency and latency of traditional scheduled API polling.

This solution enables reliable real-time data ingestion, standardized storage, and flexible querying—with high stability and reusability. It fits use cases ranging from individual developers (quantitative trading, personal analysis) to enterprise-grade applications (financial dashboards, risk systems)—delivering an efficient, compliant implementation.

Extend with multi-source integration, performance optimization, and monitoring to further enhance data utilization and system stability.

References:

Top comments (0)