DEV Community

Cover image for 不引入任何区块链库,用 Python 写一个链上提醒 bot
Слава Жуланов
Слава Жуланов

Posted on

不引入任何区块链库,用 Python 写一个链上提醒 bot

PolySignal——我那个监控 Polymarket 的 Telegram bot——里有一个小子系统叫 chain_watcher。它只做一件事:把 Polymarket API 报上来的每一笔交易,和 Polygon 链上真实发生的事情做交叉核对,一旦 API 数据对不上就通过 Sentry 报警。

大约 280 行 Python,唯一依赖是 httpx。没有 web3.py,没有 eth-account,也没有 eth-abi。下面我会把"从 Python 里只用标准库 + 一个 HTTP 客户端读公开链上事件"这件事走一遍,因为 (a) 我自己确实需要做,(b) 大多数教程一上来就让你去装 SDK,但对于只读型的监听场景,SDK 完全不必。

这不是一篇反 SDK 的檄文。如果你要签交易、发资金、做严肃的 ABI 工作,请用 web3.py。我这里的边界很窄:只是从一个已知合约里读特定事件,纯 httpx + JSON-RPC 就能干得很干净。

"监听链上"到底是在干什么

一个区块链节点对外暴露 JSON-RPC 接口。你可以向它问问题:

  • "你当前在哪个 block?" → eth_blockNumber
  • "把符合这个 filter、区块号 N 到 M 之间的全部 log 给我" → eth_getLogs
  • "这个 hash 对应的交易长什么样?" → eth_getTransactionByHash

对一个提醒 bot 而言,只需要前两个。你反复轮询链顶,问:"从我上次检查过的那个 block 开始到现在,合约 X 里有没有符合这个事件签名的新 log?"如果回来有新 log,解码、推消息,就这样。

四十行轮询循环,十行 JSON-RPC 客户端,二十行事件解码,剩下都是粘合逻辑。

JSON-RPC 客户端

一个节点就是一个监听 HTTP POST 的 RPC URL。每次调用就是一个 JSON body。你不需要 SDK,只需要一个 HTTP 客户端和一点耐心。

class _ChainRPC:
    def __init__(self, url: str, timeout: float = 15.0) -> None:
        self._url = url
        self._client = httpx.AsyncClient(timeout=timeout)
        self._id = 0

    async def _call(self, method: str, params: list) -> object:
        for attempt in range(3):
            self._id += 1
            try:
                resp = await self._client.post(
                    self._url,
                    json={
                        "jsonrpc": "2.0",
                        "id": self._id,
                        "method": method,
                        "params": params,
                    },
                )
                resp.raise_for_status()
                body = resp.json()
                if body.get("error"):
                    raise RuntimeError(f"RPC {method} failed: {body['error']}")
                return body["result"]
            except (httpx.HTTPError, RuntimeError):
                if attempt == 2:
                    raise
                await asyncio.sleep(0.5 * (attempt + 1))
        raise RuntimeError("unreachable")
Enter fullscreen mode Exit fullscreen mode

整个客户端就是这些。上面再加两个简便方法:

    async def block_number(self) -> int:
        result = await self._call("eth_blockNumber", [])
        return int(result, 16)  # the node returns hex strings

    async def get_logs(self, address, from_block, to_block, topics):
        return await self._call("eth_getLogs", [{
            "address": address,
            "fromBlock": hex(from_block),
            "toBlock": hex(to_block),
            "topics": topics,
        }])
Enter fullscreen mode Exit fullscreen mode

为什么要做重试?公共 Polygon RPC(polygon-rpc.com 那一类)会零零散散返回 5xx。在同一个 tick 内做两次半秒级重试,可以平滑掉这种波动,又不会把整个轮询卡住。

过滤你真正想要的事件

合约触发的事件带有一个固定签名。对 Polymarket 的 CTF Exchange V2 来说,我关心的是 OrderFilled

OrderFilled(
    bytes32 indexed orderHash,
    address indexed maker,
    address indexed taker,
    uint256 side,
    uint256 tokenId,
    uint256 makerAmountFilled,
    uint256 takerAmountFilled,
    uint256 fee
)
Enter fullscreen mode Exit fullscreen mode

以太坊 log 的 topics 字段最多承载四个东西:

  1. 事件签名的 Keccak 哈希(topics[0])。
  2. 每一个 indexed 参数,左侧补齐到 32 字节。

OrderFilled 来说,topics[0] 是一个已知常量,topics[2] / topics[3] 是 maker 和 taker 地址(左侧补到 32 字节)。要找"任何涉及到我关注钱包的成交",我会发两次 eth_getLogs——一次按 maker topic 过滤,一次按 taker:

for filter_topics in (
    [ORDER_FILLED_TOPIC, None, watched_address_topics],         # as maker
    [ORDER_FILLED_TOPIC, None, None, watched_address_topics],   # as taker
):
    raw_logs += await rpc.get_logs(CONTRACT, from_block, to_block, filter_topics)
Enter fullscreen mode Exit fullscreen mode

watched_address_topics 是一组左侧补齐到 32 字节的地址 hex。重活节点替你干,你拿到的只有命中的那些 log。

解码一条 log

一条 log 大致长成 {topics: [...], data: "0x...", transactionHash: "0x...", ...}topics 是若干个 32 字节 hex;data 字段是所有非 indexed 参数顺序拼接的 hex,每个参数补到 32 字节。对一个布局已知、固定字段的事件来说,解码不需要任何 ABI 库——直接切 hex 就行。

def decode_order_filled(raw: dict) -> OrderFill:
    topics = raw["topics"]
    data = raw["data"].removeprefix("0x")
    words = [data[i : i + 64] for i in range(0, len(data), 64)]
    return OrderFill(
        tx_hash=raw["transactionHash"].lower(),
        order_hash=topics[1],
        maker="0x" + topics[2][-40:].lower(),
        taker="0x" + topics[3][-40:].lower(),
        side=int(words[0], 16),
        token_id=int(words[1], 16),
        maker_amount_filled=int(words[2], 16),
        taker_amount_filled=int(words[3], 16),
    )
Enter fullscreen mode Exit fullscreen mode

就这样。每个事件返回一个 struct 风格的 tuple。

一些值得保留的"防御性细节"

两个我在 watcher 里专门处理过的生产现实:

1. 给区块跨度设上限。 RPC 抖动恢复后,"从上次看过的 block 到现在"可能跨好几千个 block。公共节点会拒绝过大的 eth_getLogs 请求,watcher 就会卡在那。所以每个 tick 把跨度封顶(我用的是 1000 个 block——按 Polygon 大约每 2 秒一个 block 算,差不多 30 分钟),watcher 在后续多个 tick 里分段追上来。

2. 这是一个交叉核对循环,不是重复入库。 我不是把链当作主数据源,而是用链来*核对*API。watcher 把链上成交记录下来,等几分钟让 API 报上同一笔交易,如果 API 一直没有,就通过 Sentry 报警。一次有 bug 的 API 响应会触发一个我能看见的信号;如果链和 API 都静默,任何一层都看不出问题,那就麻烦了。

这一段恰好是"没有 SDK"反而比"有 SDK"更强的地方:我和字节之间没有任何库。出问题的时候,trace 很短。

什么时候应该跳过 SDK

诚实地把代价说清楚:

  • 要签发交易就别跳过。 Nonce 管理、gas 估算、签名——这就是 web3.py 存在的意义。
  • 要做 ABI 内省就别跳过。 动态 ABI 解析确实是个库级别的问题。
  • 要一个客户端同时跑 ETH 加好几条 L2 还要处理各自怪癖,就别跳过。 库会替你把这些差异抹平。

对一个只盯单链单事件的被动监听器来说,以上几条都不成立。八行 import,一个 HTTP 客户端,~280 行代码。

跑在哪里

这套逻辑跑在 PolySignal 里——这是我做的 Telegram bot,会在 Polymarket 上高水平钱包成交时给用户发提醒。chain_watcher 是它里面让我能睡得着觉的那一层——尤其在 Polymarket API 偶尔抽风的时候。

如果你想看完整模块,含注释和交叉核对循环大约 280 行。乐意贴 Gist,也欢迎在评论里问问题。

Top comments (0)