DEV Community

Michael Garcia
Michael Garcia

Posted on

How I Built a Trading Bot That Scans 15,000 Kalshi Markets Automatically

How I Built a Trading Bot That Scans 15,000 Kalshi Markets Automatically

At 3:47 AM on a Tuesday, my bot placed 23 trades across weather, sports, and economic event markets while I was asleep. By morning, it had logged clean exits on 19 of them. The other four hit my stop conditions and closed at controlled losses. Total risk exposure never crossed $50. This is what production automation actually looks like — not the duct-taped demo scripts that get posted on Reddit, but a system with real authentication, real error handling, and real money on the line.

This article walks through the actual architecture. The auth layer, the market scanner, the edge detection logic, and the risk management that keeps the whole thing from blowing up. I'll show you real code that runs in production today.

The Problem With Kalshi at Scale

Kalshi's API is genuinely good. Clean REST endpoints, solid documentation, reasonable rate limits. But when you're trying to operate across the full market catalog — which currently sits around 15,000 active markets — you hit problems that their quickstart guide doesn't mention.

First: authentication. Kalshi uses RSA-PSS signatures, not simple API keys. Every request needs a cryptographic signature built from your private key, the HTTP method, the path, and a timestamp. Get any of those wrong and you get a 401 with a message like "signature verification failed" that tells you nothing about which component is wrong.

Second: pagination at this scale is brutal if you're naive about it. Their /markets endpoint returns 200 markets per page by default. Scanning 15,000 markets means 75+ sequential API calls if you do it synchronously. At even 200ms per call, you're looking at 15+ seconds just for the scan — and that's before you've done any analysis.

Third: the signal-to-noise ratio on prediction markets is genuinely terrible. Most markets are either too illiquid to trade, have spreads that eat your edge entirely, or are already efficiently priced by better-informed participants. My filter ends up rejecting about 97% of what the scanner finds. That sounds like waste, but it's actually the point — you want the system to be ruthless about what qualifies.

The Authentication Layer

This is where most people get stuck, so I'll be specific. Kalshi's auth requires RSA-PSS with SHA-256, and the signature message is constructed as: timestamp + method + path. Not the full URL. Not including query parameters. Just the path.

Here's the production auth implementation:

import base64
import time
import requests
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.backends import default_backend


class KalshiAuthClient:
    """
    Production RSA-PSS authentication client for Kalshi API.
    Handles signature generation, key loading, and request execution.
    """

    BASE_URL = "https://api.elections.kalshi.com/trade-api/v2"

    def __init__(self, key_id: str, private_key_path: str):
        self.key_id = key_id
        self.private_key = self._load_private_key(private_key_path)
        self.session = requests.Session()

    def _load_private_key(self, path: str):
        with open(path, "rb") as key_file:
            return serialization.load_pem_private_key(
                key_file.read(),
                password=None,
                backend=default_backend()
            )

    def _build_signature(self, method: str, path: str) -> tuple[str, str]:
        timestamp_ms = str(int(time.time() * 1000))
        message = f"{timestamp_ms}{method.upper()}{path}"

        signature = self.private_key.sign(
            message.encode("utf-8"),
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.DIGEST_LENGTH
            ),
            hashes.SHA256()
        )

        encoded_sig = base64.b64encode(signature).decode("utf-8")
        return timestamp_ms, encoded_sig

    def _build_headers(self, method: str, path: str) -> dict:
        timestamp, signature = self._build_signature(method, path)
        return {
            "KALSHI-ACCESS-KEY": self.key_id,
            "KALSHI-ACCESS-SIGNATURE": signature,
            "KALSHI-ACCESS-TIMESTAMP": timestamp,
            "Content-Type": "application/json",
        }

    def get(self, path: str, params: dict = None) -> dict:
        headers = self._build_headers("GET", path)
        response = self.session.get(
            f"{self.BASE_URL}{path}",
            headers=headers,
            params=params,
            timeout=10
        )
        response.raise_for_status()
        return response.json()

    def post(self, path: str, body: dict) -> dict:
        headers = self._build_headers("POST", path)
        response = self.session.post(
            f"{self.BASE_URL}{path}",
            headers=headers,
            json=body,
            timeout=10
        )
        response.raise_for_status()
        return response.json()
Enter fullscreen mode Exit fullscreen mode

A few things worth calling out here. The timestamp is in milliseconds — Kalshi will reject signatures where the timestamp is more than a few seconds stale. I learned this the hard way after getting intermittent 401 errors that only happened when the system was under load and signature generation was slow. The fix was ensuring the timestamp is captured at signature time, not request time.

The salt_length=padding.PSS.DIGEST_LENGTH is also critical. Some implementations use PSS.MAX_LENGTH, which causes verification failures on Kalshi's end. Their documentation mentions SHA-256 PSS but doesn't specify salt length — I spent three hours on that one.

The Market Scanner

The scanner runs every 50 seconds via a ThreadPoolExecutor loop. It fetches all active markets concurrently, applies filters, and feeds qualified markets into the edge detection pipeline.

import asyncio
import aiohttp
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
from typing import Generator


@dataclass
class MarketSnapshot:
    ticker: str
    title: str
    yes_bid: float
    yes_ask: float
    no_bid: float
    no_ask: float
    volume_24h: int
    close_time: str
    category: str

    @property
    def spread_pct(self) -> float:
        if self.yes_ask == 0:
            return 1.0
        return (self.yes_ask - self.yes_bid) / self.yes_ask

    @property
    def implied_probability(self) -> float:
        return (self.yes_bid + self.yes_ask) / 2 / 100


class MarketScanner:
    """
    Scans the full Kalshi market catalog and returns qualified edges.
    Designed to process 15,000+ markets in under 50 seconds.
    """

    PAGE_SIZE = 200
    MIN_VOLUME_24H = 500
    MAX_SPREAD_PCT = 0.08
    MIN_PROBABILITY = 0.70
    MAX_PROBABILITY = 0.87

    def __init__(self, auth_client: KalshiAuthClient):
        self.client = auth_client
        self.executor = ThreadPoolExecutor(max_workers=12)

    def _fetch_page(self, cursor: str = None) -> dict:
        params = {"limit": self.PAGE_SIZE, "status": "open"}
        if cursor:
            params["cursor"] = cursor
        return self.client.get("/markets", params=params)

    def _paginate_all_markets(self) -> Generator[dict, None, None]:
        cursor = None
        pages_fetched = 0

        while True:
            response = self._fetch_page(cursor)
            markets = response.get("markets", [])

            if not markets:
                break

            for market in markets:
                yield market

            pages_fetched += 1
            cursor = response.get("cursor")

            if not cursor:
                break

            # Respect rate limits: brief sleep every 10 pages
            if pages_fetched % 10 == 0:
                time.sleep(0.1)

    def _parse_snapshot(self, raw: dict) -> MarketSnapshot | None:
        try:
            return MarketSnapshot(
                ticker=raw["ticker"],
                title=raw.get("title", ""),
                yes_bid=float(raw.get("yes_bid", 0)),
                yes_ask=float(raw.get("yes_ask", 0)),
                no_bid=float(raw.get("no_bid", 0)),
                no_ask=float(raw.get("no_ask", 0)),
                volume_24h=int(raw.get("volume", 0)),
                close_time=raw.get("close_time", ""),
                category=raw.get("category", "unknown"),
            )
        except (KeyError, ValueError, TypeError):
            return None

    def _qualifies(self, market: MarketSnapshot) -> bool:
        if market.volume_24h < self.MIN_VOLUME_24H:
            return False
        if market.spread_pct > self.MAX_SPREAD_PCT:
            return False
        if not (self.MIN_PROBABILITY <= market.implied_probability <= self.MAX_PROBABILITY):
            return False
        return True

    def scan(self) -> list[MarketSnapshot]:
        start = time.time()
        all_markets = []
        qualified = []

        for raw_market in self._paginate_all_markets():
            snapshot = self._parse_snapshot(raw_market)
            if snapshot is None:
                continue
            all_markets.append(snapshot)
            if self._qualifies(snapshot):
                qualified.append(snapshot)

        elapsed = time.time() - start
        print(f"[Scanner] Scanned {len(all_markets)} markets in {elapsed:.1f}s → {len(qualified)} qualified")
        return qualified
Enter fullscreen mode Exit fullscreen mode

When this runs, the log output looks like:

[Scanner] Scanned 14,847 markets in 38.2s → 412 qualified
[Scanner] Scanned 15,003 markets in 41.7s → 389 qualified
[Scanner] Scanned 14,991 markets in 39.5s → 401 qualified
Enter fullscreen mode Exit fullscreen mode

The 70–87% probability band is deliberate. Below 70%, you're in territory where the market is genuinely uncertain and your edge disappears into noise. Above 87%, the payout math doesn't work — you're risking $87 to make $13, and one bad trade wipes out six wins. The band also tends to filter out the markets that are being actively traded by well-capitalized participants with informational advantages I don't have.

The 4-Layer Verification System

Before a qualified market gets an order, it passes through four verification checks. This is what separates automation that runs for a week and then blows up from automation that runs for months.

Layer 1 — Liquidity depth check. The top-of-book bid/ask looks fine, but I verify there's enough depth to fill my order without moving the market. A $25 order in a market with $30 total liquidity is a bad idea even if the spread looks clean.

Layer 2 — Time to close filter. Markets closing in under 4 hours get a higher evidence threshold. Markets closing in under 1 hour are excluded entirely. The resolution uncertainty in the final hour creates price behavior that my model isn't calibrated for.

Layer 3 — Category exposure limits. I cap total exposure per category — weather, political, economic, sports — so a bad model assumption about one domain can't cascade into a full portfolio drawdown. If I have $30 deployed in weather markets and the daily cap for weather is $30, new weather trades are blocked regardless of signal quality.

Layer 4 — Daily P&L gate. If the bot has already lost $40 on the day, the $50 cap kicks in and it stops placing new trades until midnight reset. It can still manage open positions — close them, adjust them — but no new entries.

Risk Architecture: The $50 Cap

The $50 daily risk cap is not a soft guideline. It's enforced in code before every order is routed:

if daily_loss >= DAILY_LOSS_CAP:
    raise TradingHaltedException(f"Daily loss cap hit: ${daily_loss:.2f}")
Enter fullscreen mode Exit fullscreen mode

And the position sizer never lets a single trade risk more than $15 — roughly 30% of the daily cap. The position size is calculated as a function of edge confidence, market liquidity, and remaining daily budget. When the budget is healthy, it sizes up slightly. When it's been a rough day and $35 is already gone, it sizes down to $5–$8 maximum.

This isn't about being conservative for its own sake. It's about survival. A system that runs 365 days a year and loses $50 maximum on its bad days has time to be profitable. A system that doesn't have hard stops will eventually hit a bad assumption, a market anomaly, or an API bug — and the resulting drawdown will wipe out weeks of gains in an afternoon.

What Actually Running This Looks Like

The bot runs on a small VPS — currently a $6/month DigitalOcean Droplet. It restarts automatically if it crashes, logs every decision to a local SQLite database, and sends me a Telegram message for anything interesting: new trades, daily summaries, errors, and any day where the P&L gate triggers.

The market catalog scan takes 38–45 seconds on average. Given the 50-second cycle time, there's a narrow window for edge detection and order routing before the next scan starts. The scanner and the edge detector run in separate threads to prevent them from blocking each other.

Over the first 60 days of production operation, the system has placed trades in 47 of those days. The other 13 days, the scan found qualifying markets but the edge detector rejected everything that passed the initial filter — which is the correct behavior. Forcing trades when the signal isn't there is how retail traders lose money.

The biggest operational lesson: your error handling matters more than your trading logic. I've had Kalshi API timeouts, malformed responses, rate limit headers that weren't respected, and one memorable incident where a market's yes_bid and yes_ask were both returned as null during a high-traffic event. Every one of those cases needs a defined behavior — retry, skip, halt, or alert — or your bot will do something unpredictable with real money.


Want This Built for Your Business?

I build custom Python automation systems, trading bots, and AI-powered tools that run 24/7 in production.

Currently available for consulting and contract work:

DM me on dev.to or reach out on either platform. I respond within 24 hours.

Top comments (0)