From fe770fa6a1c6e6482a0259a42b2258c987aa18b2 Mon Sep 17 00:00:00 2001 From: katzman Date: Thu, 11 Jun 2026 12:14:05 -0700 Subject: [PATCH] add provider.py for handling async rpc rs --- script/smoke/README.md | 46 ++++++++++ script/smoke/chain.py | 32 +++++-- script/smoke/provider.py | 189 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 262 insertions(+), 5 deletions(-) create mode 100644 script/smoke/provider.py diff --git a/script/smoke/README.md b/script/smoke/README.md index f0138ba..fb21bed 100644 --- a/script/smoke/README.md +++ b/script/smoke/README.md @@ -23,6 +23,12 @@ precompiles are deployed and the features are switched on in the ActivationRegistry. The suite does not stand a node up for you and does not fund anyone for you: you supply the endpoint and two funded keys. +`RPC_URL` may point at a **single node or a load-balanced pool of many nodes** (a +live network typically fronts dozens of backends at slightly different heights). +The harness handles the multi-node case transparently — see +[Multi-node consistency](#multi-node-consistency) — so you can run the same suite +against a local single node and a live pooled endpoint without changes. + ## Running ```bash @@ -115,6 +121,44 @@ test. To accept a known divergence, add its check name to the `INFORMATIONAL` se in `journeys/precompile_invariants.py` — it stays reported but no longer fails the run. +## Multi-node consistency + +A live RPC endpoint is usually a **load balancer in front of many nodes** that sit +at slightly different block heights (we measured ~1–2 blocks of spread on a live +pool). The pool is sticky *per connection* but routes *new* connections to +arbitrary backends. The journeys are serial — write, then immediately read the +result — so a naive harness can confirm a write on one backend and then have the +follow-up read routed to a backend that hasn't imported that block yet, observing +**pre-write state**. The classic symptom is `isB20Initialized == false` +immediately after a `createB20` that already succeeded. + +`ConsistentHTTPProvider` (in `chain.py`) gives the whole run a single +read-your-writes view over the pool, with two layered mechanisms: + +1. **Sticky connection (steady state).** The provider holds one keep-alive + connection open for the run, pinning every request to a single backend that is + trivially consistent with its own writes — no waiting, no retries. +2. **High-water safety net (on reconnect).** It tracks the highest block any + confirmed receipt / head query revealed and pins every **state read** + (`eth_call`, `eth_getBalance`, `eth_getCode`, `eth_getStorageAt`, gas + estimation) to that block. If the connection drops and the pool re-pins us to a + lagging backend, that backend answers `block not found` (rather than silently + serving stale state); the provider drops the connection to force a re-route and + retries until a synced backend answers, or it gives up after ~30s and surfaces + the node's error. + +The **nonce is deliberately not pinned** — it must reflect the account's *latest* +head, not a historical snapshot, or the broadcast is rejected as `nonce too low`. +Instead `Chain.next_nonce` reads the pending count and takes the max with a local +monotonic counter, so every signed tx gets a unique, forward-only nonce regardless +of which backend answered. + +The net effect: a read never observes state older than a write the suite has +already confirmed, and nonces never collide or regress, no matter which backend any +individual request lands on. This is invisible to journeys — they keep calling +`.call()` and `send()` as before — and it is a no-op against a single node (the +high-water block is always present). + ## Troubleshooting | Symptom | Cause / fix | @@ -123,6 +167,7 @@ run. | `RPC_URL did not answer` | Endpoint unreachable. Check the node is up and the URL/port. | | Everything **skipped** | Target node doesn't have the b20 features active. Activate them in the ActivationRegistry, or point `RPC_URL` at a node that has them. | | `deployer ... underfunded ... no faucet configured` | Fund `DEPLOYER_PK`, or set `FAUCET_URL` + `FAUCET_NETWORK`. | +| Reads disagree with a write that just landed / `block not found` after ~30s | A pool backend is lagging far behind (or stuck). The provider retries for ~30s; if it still fails, the pool has a badly desynced node — check backend health. | ## Package layout @@ -131,6 +176,7 @@ script/smoke/ __main__.py # CLI: python -m smoke [-k]; preflight + dispatch config.py # addresses, enum/role/feature constants, env -> Config chain.py # web3 harness: send/read, revert + event assertions, RPC tracing + provider.py # ConsistentHTTPProvider: read-your-writes over a multi-node (load-balanced) pool abis.py # interface ABIs + probe/feeder artifacts, read from out/ codec.py # the one hand-written encode: createB20 params + initCalls errors.py # selector -> custom-error-name map (from the ABIs) diff --git a/script/smoke/chain.py b/script/smoke/chain.py index 9177ec8..550e7f1 100644 --- a/script/smoke/chain.py +++ b/script/smoke/chain.py @@ -5,6 +5,10 @@ live node, waits for the receipt, asserts success, and records it for the flow-level `assert_events_emitted` check. Reads and expected-revert simulations use `eth_call` against the node, so the real precompiles execute (no local EVM). + +The transport is `ConsistentHTTPProvider` (see `provider.py`), which gives the whole run a single +read-your-writes view over a load-balanced pool of nodes. Without it, a read routed to a backend +that lags the one that accepted the preceding write observes pre-write state. """ from __future__ import annotations @@ -29,6 +33,7 @@ from .abis import ASSET_ABI, FACTORY_ABI, POLICY_ABI, STABLECOIN_ABI from .codec import topic0 from .errors import ERROR_BY_SELECTOR +from .provider import ConsistentHTTPProvider def log(msg: str) -> None: @@ -52,7 +57,7 @@ class Chain: def __init__(self, cfg: config.Config) -> None: self.cfg = cfg - self.w3 = Web3(Web3.HTTPProvider(cfg.rpc_url)) + self.w3 = Web3(ConsistentHTTPProvider(cfg.rpc_url)) if not self.w3.is_connected(): die(f"RPC_URL did not answer: {cfg.rpc_url}") self.chain_id = self.w3.eth.chain_id @@ -73,6 +78,23 @@ def __init__(self, cfg: config.Config) -> None: self._receipts: list[TxReceipt] = [] self._user2_funded = False self.trace = cfg.trace + self._nonces: dict[ChecksumAddress, int] = {} + + # ── nonce ───────────────────────────────────────────────────────────────── + def next_nonce(self, address: ChecksumAddress) -> int: + """Next nonce to sign with, robust against a load-balanced pool. + + Across backends a `latest`/`pending` count can come back stale-low (a backend lagging the one + that mined our last tx) and a reused value collides. We take the max of the node's pending count + and a local monotonic counter, then advance the counter — so every signed tx in the run gets a + unique, forward-only nonce regardless of which backend answered. The nonce is intentionally read + from the head (not the consistency high-water block): it must reflect the account's latest state, + not a historical snapshot, or the broadcast is rejected as "nonce too low". + """ + pending = self.w3.eth.get_transaction_count(address, "pending") + nonce = max(pending, self._nonces.get(address, 0)) + self._nonces[address] = nonce + 1 + return nonce # ── contracts at an address ───────────────────────────────────────────── def asset_at(self, address: ChecksumAddress) -> Contract: @@ -85,7 +107,7 @@ def stablecoin_at(self, address: ChecksumAddress) -> Contract: def send(self, fn, account: LocalAccount) -> TxReceipt: """Sign + broadcast a contract function, wait, assert success, record it.""" tx = fn.build_transaction( - {"from": account.address, "nonce": self.w3.eth.get_transaction_count(account.address)} + {"from": account.address, "nonce": self.next_nonce(account.address)} ) signed = account.sign_transaction(tx) tx_hash = self.w3.eth.send_raw_transaction(signed.raw_transaction) @@ -107,7 +129,7 @@ def fund_user2(self) -> None: "value": self.cfg.gas_float_wei, "gas": 21000, "gasPrice": self.w3.eth.gas_price, - "nonce": self.w3.eth.get_transaction_count(self.DEPLOYER), + "nonce": self.next_nonce(self.DEPLOYER), "chainId": self.chain_id, } signed = self.deployer.sign_transaction(tx) @@ -347,7 +369,7 @@ def deploy( """ account = account or self.deployer factory = self.w3.eth.contract(abi=abi, bytecode=bytecode) - overrides = {"from": account.address, "nonce": self.w3.eth.get_transaction_count(account.address)} + overrides = {"from": account.address, "nonce": self.next_nonce(account.address)} if value: overrides["value"] = value tx = factory.constructor(*args).build_transaction(overrides) @@ -401,7 +423,7 @@ def send_expecting_revert(self, fn, account: LocalAccount, *, gas: int = 2_000_0 tx = fn.build_transaction( { "from": account.address, - "nonce": self.w3.eth.get_transaction_count(account.address), + "nonce": self.next_nonce(account.address), "gas": gas, } ) diff --git a/script/smoke/provider.py b/script/smoke/provider.py new file mode 100644 index 0000000..21b74c1 --- /dev/null +++ b/script/smoke/provider.py @@ -0,0 +1,189 @@ +"""Read-your-writes HTTP provider for a load-balanced node pool. + +A live RPC endpoint is usually a load balancer in front of many nodes at slightly different +heights. The journeys are serial — write, then immediately read the result — so a write confirmed +on one backend can have its follow-up read routed to a backend that has not imported that block yet, +observing pre-write state (the canonical `isB20Initialized == false` right after a successful +`createB20`). `ConsistentHTTPProvider` presents the whole run a single, monotonic, read-your-writes +view over the pool so the journeys never see that staleness. See the class docstring for the +mechanism. Against a single node it is a no-op (the high-water block is always present). +""" + +from __future__ import annotations + +import time + +import requests +from web3 import HTTPProvider + +# Methods whose block-tag parameter we pin to the consistency high-water mark, mapped to the +# positional index of that block tag in the JSON-RPC params array. These are *state* reads, where the +# guarantee we want is "never observe state older than a write we've confirmed". +# +# `eth_getTransactionCount` is deliberately NOT here: the nonce must reflect the account's *latest* +# head, not a historical snapshot. Pinning it backwards returns a stale-low nonce (the count as of an +# older block), which the broadcast backend then rejects as "nonce too low". The sticky connection +# keeps the nonce read and the broadcast on one backend, and `Chain` tracks a local monotonic nonce +# on top, so the nonce stays correct without pinning. +_PINNED_BLOCK_PARAM_INDEX = { + "eth_call": 1, + "eth_estimateGas": 1, + "eth_getBalance": 1, + "eth_getCode": 1, + "eth_getStorageAt": 2, +} + +# Block tags meaning "newest state", safe to re-point at the high-water mark. `safe`/`finalized` +# deliberately ask for older agreed state, so we leave those (and explicit numbers/hashes) alone. +_PINNABLE_TAGS = (None, "latest", "pending") + +# JSON-RPC error a backend returns when it has not yet imported the block we pinned to (it lags the +# node that produced the high-water mark). Geth/reth answer with code -32001 / a "not found" message +# rather than silently serving stale state, which is exactly what lets us detect-and-retry. +_STALE_BLOCK_CODE = -32001 +_STALE_BLOCK_MARKERS = ( + "block not found", + "header not found", + "missing trie node", + "missing header", + "unknown block", + "header for hash not found", + "state not available", + "no state available", + "missing state", +) + +# Read-pin retry budget: how long to wait for a lagging backend to import the pinned block (block +# time on the target chains is ~2s, observed head spread ~1-2 blocks, so a few seconds is plenty). +_SYNC_TIMEOUT_SECONDS = 30.0 + + +def _make_sticky_session() -> requests.Session: + """A single-connection keep-alive session, so the whole run pins to one backend in the pool. + + The pool fronting a live endpoint is sticky *per connection* but routes *new* connections to + arbitrary backends. Holding one connection open for the run keeps every request on the same + backend — trivially consistent with its own writes — so the high-water safety net below only has + to engage on the rare occasion the connection drops and the pool re-pins us to a lagging backend. + """ + session = requests.Session() + adapter = requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=1, max_retries=0) + session.mount("http://", adapter) + session.mount("https://", adapter) + session.headers["Connection"] = "keep-alive" + return session + + +def _as_block_int(value: object) -> int | None: + """Coerce a JSON-RPC block number (hex string, int, or decimal string) to int; None if unparseable.""" + if isinstance(value, bool): + return None + if isinstance(value, int): + return value + if isinstance(value, str): + try: + return int(value, 16) if value.lower().startswith("0x") else int(value) + except ValueError: + return None + return None + + +class ConsistentHTTPProvider(HTTPProvider): + """HTTP provider giving the suite one read-your-writes view over a load-balanced node pool. + + A live RPC endpoint usually fronts many nodes at slightly different heights (we measured ~1-2 + blocks of spread). A write confirmed on one backend is not instantly visible on another, so a + follow-up read routed to a lagging backend sees pre-write state — the canonical + `isB20Initialized == false` immediately after a successful `createB20` flake. + + Two mechanisms, layered: + + 1. Stickiness (the steady state): a single keep-alive connection (`_make_sticky_session`) pins + the whole run to one backend, which is consistent with its own writes — no waiting, no retries. + 2. A high-water safety net (when the connection drops and the pool re-pins us to a lagging + backend): we ratchet the highest block any confirmed receipt / head query revealed, pin every + state read to that block, and — because a backend that has not imported it answers + `block not found` instead of silently serving stale state — drop the connection to force a + re-route and retry until a synced backend answers (or we exceed the sync timeout). The nonce + lookup is *not* pinned (see `_PINNED_BLOCK_PARAM_INDEX`); `Chain` tracks it monotonically. + + The net effect: reads never observe a state older than a write the suite has already confirmed, + regardless of which backend the pool happens to route any individual request to. + """ + + def __init__(self, endpoint_uri: str, *, sync_timeout: float = _SYNC_TIMEOUT_SECONDS) -> None: + self._sticky_session = _make_sticky_session() + super().__init__(endpoint_uri, session=self._sticky_session) + self._read_block: int | None = None + self._sync_timeout = sync_timeout + + @property + def read_block(self) -> int | None: + """The current consistency high-water mark (highest block observed), or None before any.""" + return self._read_block + + def _ratchet(self, block: int | None) -> None: + if block is not None and (self._read_block is None or block > self._read_block): + self._read_block = block + + def _observe(self, method: str, response: dict) -> None: + """Ratchet the high-water mark from anything a response reveals about chain progress.""" + result = response.get("result") + if result is None: + return + if method == "eth_blockNumber": + self._ratchet(_as_block_int(result)) + elif method == "eth_getTransactionReceipt" and isinstance(result, dict): + self._ratchet(_as_block_int(result.get("blockNumber"))) + elif method in ("eth_getBlockByNumber", "eth_getBlockByHash") and isinstance(result, dict): + self._ratchet(_as_block_int(result.get("number"))) + + def _pin(self, method: str, params: object) -> tuple[object, bool]: + """Re-point a `latest`/`pending`/absent block tag at the high-water mark. Returns (params, pinned).""" + idx = _PINNED_BLOCK_PARAM_INDEX.get(method) + if idx is None or self._read_block is None: + return params, False + pinned = list(params) if isinstance(params, (list, tuple)) else [params] + tag = hex(self._read_block) + if len(pinned) <= idx: + pinned.extend([None] * (idx - len(pinned))) + pinned.append(tag) + return pinned, True + if pinned[idx] in _PINNABLE_TAGS: + pinned[idx] = tag + return pinned, True + return params, False # explicit block number / hash / safe / finalized — leave it + + @staticmethod + def _is_stale_block_error(error: object) -> bool: + if not error: + return False + if isinstance(error, dict): + if error.get("code") == _STALE_BLOCK_CODE: + return True + message = str(error.get("message", "")) + else: + message = str(error) + message = message.lower() + return any(marker in message for marker in _STALE_BLOCK_MARKERS) + + def make_request(self, method, params): + pinned_params, pinned = self._pin(method, params) + if not pinned: + response = super().make_request(method, params) + self._observe(method, response) + return response + deadline = time.monotonic() + self._sync_timeout + delay = 0.1 + while True: + response = super().make_request(method, pinned_params) + if not self._is_stale_block_error(response.get("error")): + self._observe(method, response) + return response + if time.monotonic() >= deadline: + return response # surface the node's error rather than hang forever + # The sticky connection is parked on a backend that lags the pinned block. Drop it so the + # pool re-routes us, and give backends a moment to import the block, then retry. + self._sticky_session.close() + time.sleep(min(delay, 1.0)) + delay *= 1.5