Anyone involved in quantitative trading knows that the core of a backtesting system is not how fancy the strategy is, but how reliable the data is.
If the historical market data itself is flawed, even the most perfect backtest results are just "garbage in, garbage out".
This article discusses, from a practical perspective, how to batch fetch historical market data via API and implement a rigorous backtesting data cleaning pipeline. I've encountered all these pitfalls myself.
1. Why Is Historical Market Data So Difficult?
Many people think historical market data is simply "stock code + date + open/high/low/close + volume". But when you actually get started, you realize there are numerous issues:
- Different data sources have different formats: some use forward-adjusted prices, some use backward-adjusted, and some are unadjusted
- Trading suspension days, ex-dividend/ex-rights days, and price limit data are easily overlooked
- API rate limiting, resumable downloads, and missing data need to be handled
- The data formats and rules for domestic A-shares, US stocks, and futures differ significantly
A qualified quantitative backtesting system must ensure data completeness, consistency, and unbiasedness from the source.
2. Engineering Design for Batch Fetching
2.1 Basic Approach
Don't try to pull all historical data at once, and don't hardcode dates. A reasonable design should be:
Configure stock pool → Check existing local data → Fetch only missing intervals → Merge and deduplicate → Validate consistency
2.2 Code Example: Batch Fetching with Resumable Downloads
The following uses iTick API to fetch historical daily data (forward-adjusted) and implements local caching with resumable downloads.
import requests
import pandas as pd
import time
from pathlib import Path
API_TOKEN = "your_token_here" # Replace with actual Token
BASE_URL = "https://api.itick.org"
def build_headers():
"""Construct request headers with API Token authentication"""
return {
"token": API_TOKEN,
"Content-Type": "application/json"
}
def fetch_stock_history(stock_code, region="HK", k_type=8, start_date="20000101",
end_date="20231231", cache_dir="./data/raw"):
"""
Batch fetching with caching and automatic resumable download
Parameters:
stock_code : Stock code (HK example: 00700)
region : Market code (HK/US/SZ/SH etc.)
k_type : K-line type (8:daily, 9:weekly, 10:monthly)
start_date : Start date (format YYYYMMDD)
end_date : End date (format YYYYMMDD)
cache_dir : Local cache directory
"""
Path(cache_dir).mkdir(parents=True, exist_ok=True)
cache_file = Path(cache_dir) / f"{stock_code}.parquet"
# Load existing data if available, only fetch missing intervals
if cache_file.exists():
df_old = pd.read_parquet(cache_file)
df_old['trade_date'] = pd.to_datetime(df_old['trade_date'])
last_date = df_old['trade_date'].max()
start_date = (last_date + pd.Timedelta(days=1)).strftime('%Y%m%d')
if start_date > end_date:
return df_old
print(f"{stock_code}: Local data exists up to {last_date.date()}, starting incremental fetch...")
else:
df_old = pd.DataFrame()
# Convert date range to timestamps (iTick kType mode requires et parameter to control end time)
start_ts = int(pd.Timestamp(start_date).timestamp())
end_ts = int(pd.Timestamp(end_date).timestamp())
all_data = []
current_end_ts = end_ts
batch_days = 100 # Maximum ~100 trading days per batch
while True:
# Calculate current batch's start/end interval (based on day count)
batch_start_ts = max(start_ts, current_end_ts - batch_days * 86400)
params = {
"region": region,
"code": stock_code,
"kType": k_type,
"limit": 500, # Maximum 500 K-lines per request
"et": current_end_ts
}
try:
url = f"{BASE_URL}/stock/kline"
resp = requests.get(url, headers=build_headers(), params=params, timeout=15)
if resp.status_code != 200:
print(f"Fetch failed: {stock_code}, status code {resp.status_code}")
time.sleep(2)
continue
data = resp.json()
if data.get("code") == 0 and data.get("data"):
batch_data = data["data"]
all_data.extend(batch_data)
print(f"{stock_code}: Fetched {len(batch_data)} records")
# Check if there's earlier data
earliest_ts = batch_data[-1].get("t", 0) if batch_data else 0
if earliest_ts <= start_ts or len(batch_data) < 500:
break
current_end_ts = earliest_ts - 86400 # Continue fetching earlier data
else:
print(f"Fetch failed: {stock_code}, error message: {data.get('msg')}")
break
time.sleep(0.5) # Rate limiting control
except Exception as e:
print(f"Fetch exception: {stock_code}, error: {e}")
time.sleep(5)
continue
if not all_data:
return df_old
# Data transformation and merging
df_new = pd.DataFrame(all_data)
# Convert timestamps to dates
df_new['trade_date'] = pd.to_datetime(df_new['t'], unit='s')
# Rename fields to unified format
df_new = df_new.rename(columns={
'o': 'open', 'h': 'high', 'l': 'low',
'c': 'close', 'v': 'volume'
})
df_new = df_new[['trade_date', 'open', 'high', 'low', 'close', 'volume']]
df_combined = pd.concat([df_old, df_new], ignore_index=True) if not df_old.empty else df_new
df_combined = df_combined.drop_duplicates(subset=['trade_date']).sort_values('trade_date')
df_combined.to_parquet(cache_file, index=False)
print(f"{stock_code}: Data saved to {cache_file}, total {len(df_combined)} records")
return df_combined
This function accomplishes several key tasks:
- Checks local cache (Parquet format) and only fetches missing intervals
- Controls fetch volume through
limitand batch intervals, supporting automatic pagination for large amounts of historical data - Implements retry logic for exceptions and rate limiting sleep
- Automatically converts timestamps to standardized date fields
2.3 Concurrent Fetching for Multiple Stocks
Single-threaded sequential fetching is inefficient. You can use thread pools for concurrency, but still need to control the number of concurrent threads to avoid API rate limiting:
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_batch(stock_list, region="HK", max_workers=3):
"""
Batch fetch historical data for multiple stocks
max_workers: Recommended concurrency ≤ 5 to prevent rate limiting
"""
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(fetch_stock_history, code, region): code
for code in stock_list
}
for future in as_completed(futures):
code = futures[future]
try:
results[code] = future.result()
print(f"{code}: Fetch completed")
except Exception as e:
print(f"{code}: Fetch failed, error: {e}")
return results
It's recommended to keep concurrent threads no more than 5, otherwise you risk being blocked by the data source.
3. Backtesting Data Cleaning Checklist
The raw fetched data still needs several steps before it can be directly used for backtesting. Here's my summarized cleaning process—don't skip any step.
3.1 Time Axis Processing
# Ensure trading days are continuous without gaps
def align_trading_days(df, trading_calendar=None):
df['trade_date'] = pd.to_datetime(df['trade_date'])
df = df.sort_values('trade_date').set_index('trade_date')
if trading_calendar is None:
# Generate complete calendar (business day frequency)
full_calendar = pd.date_range(start=df.index.min(), end=df.index.max(), freq='B')
else:
full_calendar = trading_calendar
df = df.reindex(full_calendar)
return df
Use business day frequency (freq='B') to generate a complete calendar. Missing dates will automatically be filled with NaN, which can be filled or marked later.
3.2 Ex-Dividend/Ex-Rights and Price Adjustment Unification
This is the biggest pitfall!
Many beginners use unadjusted data directly for backtesting, only to find that prices suddenly gap down 30% one day (actually due to ex-rights), causing the strategy to mistakenly interpret it as a sharp decline and execute incorrect trades.
Best Practice: Use forward-adjusted (qfq) data throughout to maintain continuous and comparable historical prices. However, note that forward adjustment can cause early prices to become negative (extreme dividends), requiring truncation:
# Remove negative or extremely small prices after forward adjustment
df = df[(df['close'] > 0.01) & (df['high'] > 0.01)]
3.3 Price Limit Marking
During backtesting, if your strategy generates buy signals at the upper price limit, you actually cannot execute the trade. You need to mark these in advance:
# Calculate price limits (A-share main board ±10%, STAR/ChiNext ±20%, HK stocks have no price limits)
def calc_limit_prices(df, stock_code):
# Determine market based on stock code
if stock_code.startswith('688') or stock_code.startswith('300'):
limit_pct = 0.20 # STAR Market/ChiNext Board
elif stock_code.startswith('600') or stock_code.startswith('000'):
limit_pct = 0.10 # A-share Main Board
else:
# HK stocks have no price limits, return directly
df['is_limit_up'] = False
df['is_limit_down'] = False
return df
df['prev_close'] = df['close'].shift(1)
df['upper_limit'] = df['prev_close'] * (1 + limit_pct)
df['lower_limit'] = df['prev_close'] * (1 - limit_pct)
# Mark limit-up/limit-down boards
df['is_limit_up'] = (df['open'] >= df['upper_limit'] - 0.001) & (df['close'] >= df['upper_limit'] - 0.001)
df['is_limit_down'] = (df['open'] <= df['lower_limit'] + 0.001) & (df['close'] <= df['lower_limit'] + 0.001)
return df
When executing backtests, if you encounter is_limit_up with a buy signal, you should skip or adjust the strategy.
3.4 Trading Suspension Data Handling
During trading suspensions, there are no transactions. You should not fill with the previous day's price (this would lead to unreasonable returns in backtesting). The correct approach:
# Volume on suspension days should be 0 or NaN, no forward filling
df['volume'] = df['volume'].fillna(0)
# For price fields, keep NaN on suspension days; the backtesting engine should skip these days when encountering NaN
3.5 Data Alignment (Multi-Stock Backtesting)
For multi-stock backtesting, you need to align all stocks to the same trading calendar:
def align_multi_stocks(stock_dfs, trading_days):
"""
stock_dfs: dict {code: DataFrame}
trading_days: list of trading days (pd.DatetimeIndex)
"""
aligned = {}
for code, df in stock_dfs.items():
df_aligned = df.set_index('trade_date').reindex(trading_days)
aligned[code] = df_aligned
return aligned
4. Data Quality Validation
After cleaning, always run automated validation:
def validate_data(df, stock_code):
checks = {
"Has duplicate dates": df.index.duplicated().sum() == 0,
"Has null prices": df[['open','high','low','close']].isna().any().any() == False,
"Low price higher than high price": (df['low'] <= df['high']).all(),
"Volume is non-negative": (df['volume'] >= 0).all(),
"Price series has abnormal monotonicity": (
(df['close'] - df['close'].shift(1)).abs() / df['close'].shift(1) < 0.2
).all(), # Excluding price limits
}
for name, result in checks.items():
print(f"{stock_code} - {name}: {'Pass' if result else 'Fail'}")
return all(checks.values())
5. Storage and Version Management Recommendations
- Format: Strongly recommend Parquet or Feather, which are 10x+ faster than CSV and take up less space.
- Directory Structure:
data/
raw/ # Raw API-fetched data (saved by stock)
cleaned/ # Cleaned data (adjusted, aligned, filled)
meta/ # Stock lists, trading calendars, ex-rights factor backups
- Version Control: Don't put historical data in Git; use DVC (Data Version Control) or direct cloud storage (S3, OSS).
6. Personal Recommendations
- Always preserve the original fetched data; cleaning scripts can be re-executed. Otherwise, if you discover an error in the cleaning logic someday, you'll have to refetch everything.
- Don't be a perfectionist. Backtesting data can't be 100% accurate, but it must be unbiased (errors should appear randomly on both buy and sell sides).
- Test with small samples first. Fetch 3 years of data for one stock, manually verify ex-dividend/ex-rights days and price limit days, and only proceed with batch processing after confirming the process is correct.
- Have backup data sources. For your core stock pool, prepare at least two data sources for cross-validation.
Finally, remember this: Backtesting is for eliminating bad strategies, not for proving good ones. And the starting point of all this is reliable historical market data. I hope this article helps you avoid unnecessary detours.
Reference Documentation: https://docs.itick.org/websocket/stocks
GitHub: https://github.com/itick-org/
Top comments (0)