diff --git a/Cargo.lock b/Cargo.lock index 2b81b54fd19..54d406fc7ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3812,6 +3812,7 @@ dependencies = [ "async-trait", "base64 0.22.1", "envconfig", + "futures 0.3.31", "graph", "graph-runtime-derive", "graph-runtime-wasm", diff --git a/chain/ethereum/Cargo.toml b/chain/ethereum/Cargo.toml index da8ea87709f..c9a125136bf 100644 --- a/chain/ethereum/Cargo.toml +++ b/chain/ethereum/Cargo.toml @@ -16,6 +16,7 @@ tiny-keccak = "1.5.0" hex = "0.4.3" semver = { workspace = true } thiserror = { workspace = true } +futures = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } tower = { workspace = true } diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index 34b360d9674..94d4c8dfe50 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1174,6 +1174,11 @@ pub trait EthereumAdapter: Send + Sync + 'static { address: Address, block_ptr: BlockPtr, ) -> Result; + + /// Returns a boolean indicating whether the adapter can reach + /// the RPC provider it is configured to use. + /// This is used to determine if a provider should be considered healthy. + async fn is_reachable(&self) -> bool; } #[cfg(test)] diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 3f9312e0449..67be8dc9e0e 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -57,7 +57,7 @@ use std::convert::TryFrom; use std::iter::FromIterator; use std::pin::Pin; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use tokio::sync::RwLock; use tokio::time::timeout; @@ -1246,7 +1246,7 @@ impl EthereumAdapterTrait for EthereumAdapter { let alloy = self.alloy.clone(); retry("eth_getBlockByNumber(latest) no txs RPC call", logger) .redact_log_urls(true) - .no_limit() + .limit(ENV_VARS.request_retries) .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { let alloy = alloy.cheap_clone(); @@ -1270,6 +1270,14 @@ impl EthereumAdapterTrait for EthereumAdapter { .await } + async fn is_reachable(&self) -> bool { + let alloy = self.alloy.clone(); + tokio::time::timeout(Duration::from_secs(10), alloy.get_block_number()) + .await + .map(|r| r.is_ok()) + .unwrap_or(false) + } + async fn block_by_hash( &self, logger: &Logger, diff --git a/chain/ethereum/src/ingestor.rs b/chain/ethereum/src/ingestor.rs index 9e7a8c01725..e2d469a3954 100644 --- a/chain/ethereum/src/ingestor.rs +++ b/chain/ethereum/src/ingestor.rs @@ -1,6 +1,7 @@ use crate::{chain::BlockFinality, ENV_VARS}; use crate::{EthereumAdapter, EthereumAdapterTrait as _}; use async_trait::async_trait; +use futures::future::select_ok; use graph::blockchain::client::ChainClient; use graph::blockchain::BlockchainKind; use graph::components::network_provider::ChainName; @@ -11,7 +12,8 @@ use graph::{ blockchain::{BlockHash, BlockIngestor, BlockPtr, IngestorError}, cheap_clone::CheapClone, prelude::{ - error, info, tokio, trace, warn, ChainStore, Error, EthereumBlockWithCalls, LogCode, Logger, + debug, error, info, tokio, trace, warn, ChainStore, Error, EthereumBlockWithCalls, LogCode, + Logger, }, }; use std::{sync::Arc, time::Duration}; @@ -213,12 +215,111 @@ impl PollingBlockIngestor { eth_adapter.latest_block_ptr(logger).await } - async fn eth_adapter(&self) -> anyhow::Result> { - self.chain_client - .rpc()? - .cheapest() - .await - .ok_or_else(|| graph::anyhow::anyhow!("unable to get eth adapter")) + /// Executes one polling iteration. On failure delegates to `on_poll_failure` for + /// probe+switch logic. + async fn poll_once( + &self, + providers: &[Arc], + current_provider: &mut Option, + ) { + // Resolve by name; resets to first provider if the tracked one left the list. + let eth_adapter = resolve_provider(providers, current_provider, &self.logger).clone(); + // Pin the name so the next iteration knows which provider is active. + let provider_name = eth_adapter.provider().to_string(); + let logger = self.logger.new(o!("provider" => provider_name.clone())); + *current_provider = Some(provider_name); + + if let Err(err) = self.do_poll(&logger, eth_adapter).await { + error!(logger, "Trying again after block polling failed: {}", err); + on_poll_failure(providers, current_provider, &self.logger).await; + } + } +} + +/// Returns the currently-tracked provider from `providers`. +/// +/// If the tracked provider is no longer in the list (it became invalid and was removed by +/// `ProviderManager`), logs a warning, resets the state, and returns the first available +/// provider. +fn resolve_provider<'a, A: crate::EthereumAdapterTrait>( + providers: &'a [Arc], + current_provider: &mut Option, + logger: &Logger, +) -> &'a Arc { + if let Some(name) = current_provider.as_ref() { + if let Some(found) = providers.iter().find(|p| p.provider() == name) { + return found; + } + warn!( + logger, + "Current RPC provider is no longer available, resetting to first provider"; + "provider" => name, + ); + *current_provider = None; + } + &providers[0] +} + +async fn on_poll_failure( + providers: &[Arc], + current_provider: &mut Option, + logger: &Logger, +) { + if providers.len() <= 1 { + return; + } + + let current_name = match current_provider.as_ref() { + Some(name) => name.clone(), + None => return, + }; + + // Probe the current provider before trying alternatives. do_poll() can fail for + // reasons unrelated to RPC availability (e.g. DB errors from chain_head_ptr or + // attempt_chain_head_update, or a BlockUnavailable from a chain reorg). All of + // these surface as IngestorError::Unknown, indistinguishable from an RPC failure + // at the match level. If the current provider responds to eth_blockNumber, the + // failure was not caused by provider unavailability — switching cannot help. + let current = providers.iter().find(|p| p.provider() == current_name); + if let Some(current) = current { + if current.is_reachable().await { + return; + } + } + + // Probe all alternatives in parallel; switch to the first that responds. + let futs = providers + .iter() + .filter(|p| p.provider() != current_name) + .map(|p| { + let name = p.provider().to_string(); + debug!(logger, "Trying RPC provider"; "provider" => &name); + Box::pin(async move { + if p.is_reachable().await { + Ok(name) + } else { + Err(()) + } + }) + }); + + match select_ok(futs).await { + Ok((next_name, _)) => { + warn!( + logger, + "Switching RPC provider for block ingestor"; + "from" => ¤t_name, + "to" => &next_name, + ); + *current_provider = Some(next_name); + } + Err(_) => { + warn!( + logger, + "All RPC providers unreachable, continuing on current provider"; + "provider" => ¤t_name, + ); + } } } @@ -227,33 +328,28 @@ impl BlockIngestor for PollingBlockIngestor { async fn run(self: Box) { let mut backoff = ExponentialBackoff::new(Duration::from_millis(250), Duration::from_secs(30)); + // Name of the provider currently in use. `None` until the first poll. + let mut current_provider: Option = None; loop { - let eth_adapter = match self.eth_adapter().await { - Ok(adapter) => { - backoff.reset(); - adapter - } - Err(err) => { - error!( - &self.logger, - "unable to get ethereum adapter, backing off... error: {}", - err.to_string() - ); - backoff.sleep_async().await; - continue; - } - }; - let logger = self - .logger - .new(o!("provider" => eth_adapter.provider().to_string())); + let providers = self + .chain_client + .rpc() + .expect("PollingBlockIngestor is only created for RPC chains") + .all_cheapest() + .await; - if let Err(err) = self.do_poll(&logger, eth_adapter).await { - error!(logger, "Trying again after block polling failed: {}", err); + if providers.is_empty() { + error!(self.logger, "No RPC providers available for block ingestor"); + backoff.sleep_async().await; + continue; } + backoff.reset(); + + self.poll_once(&providers, &mut current_provider).await; if ENV_VARS.cleanup_blocks { - self.cleanup_cached_blocks().await + self.cleanup_cached_blocks().await; } tokio::time::sleep(self.polling_interval).await; @@ -268,3 +364,266 @@ impl BlockIngestor for PollingBlockIngestor { BlockchainKind::Ethereum } } + +#[cfg(test)] +mod tests { + use super::on_poll_failure; + use super::*; + use crate::adapter::{ + ContractCallError, EthereumAdapter as EthereumAdapterTrait, EthereumRpcError, + }; + use async_trait::async_trait; + use graph::blockchain::{BlockPtr, ChainIdentifier}; + use graph::components::ethereum::AnyBlock; + use graph::components::ethereum::LightEthereumBlock; + use graph::data::store::ethereum::call; + use graph::data_source::common::ContractCall; + use graph::prelude::alloy::primitives::{Address, Bytes, B256, U256}; + use graph::prelude::{BlockNumber, Error, EthereumCallCache, Logger}; + use graph::slog::Discard; + use std::collections::HashSet; + use std::sync::Arc; + + struct MockEthAdapter { + provider_name: String, + reachable: bool, + } + + impl MockEthAdapter { + fn new(name: &str, reachable: bool) -> Arc { + Arc::new(Self { + provider_name: name.to_string(), + reachable, + }) + } + } + + #[async_trait] + impl EthereumAdapterTrait for MockEthAdapter { + fn provider(&self) -> &str { + &self.provider_name + } + + async fn is_reachable(&self) -> bool { + self.reachable + } + + async fn net_identifiers(&self) -> Result { + unimplemented!() + } + async fn latest_block_ptr( + &self, + _: &Logger, + ) -> Result { + unimplemented!() + } + async fn load_blocks( + &self, + _: Logger, + _: Arc, + _: HashSet, + ) -> Result>, Error> { + unimplemented!() + } + async fn block_by_hash(&self, _: &Logger, _: B256) -> Result, Error> { + unimplemented!() + } + async fn block_by_number( + &self, + _: &Logger, + _: BlockNumber, + ) -> Result, Error> { + unimplemented!() + } + async fn load_full_block( + &self, + _: &Logger, + _: AnyBlock, + ) -> Result { + unimplemented!() + } + async fn next_existing_ptr_to_number( + &self, + _: &Logger, + _: BlockNumber, + ) -> Result { + unimplemented!() + } + async fn contract_call( + &self, + _: &Logger, + _: &ContractCall, + _: Arc, + ) -> Result<(Option>, call::Source), ContractCallError> + { + unimplemented!() + } + async fn contract_calls( + &self, + _: &Logger, + _: &[&ContractCall], + _: Arc, + ) -> Result>, call::Source)>, ContractCallError> + { + unimplemented!() + } + async fn get_balance( + &self, + _: &Logger, + _: Address, + _: BlockPtr, + ) -> Result { + unimplemented!() + } + async fn get_code( + &self, + _: &Logger, + _: Address, + _: BlockPtr, + ) -> Result { + unimplemented!() + } + } + + fn discard_logger() -> Logger { + Logger::root(Discard, o!()) + } + + #[test] + fn test_current_provider_unavailable_resets_to_first() { + // p0 left the validated list; only p1 and p2 remain (p1 is now at index 0). + let providers: Vec> = vec![ + MockEthAdapter::new("p1", true), + MockEthAdapter::new("p2", true), + ]; + let mut current_provider = Some("p0".to_string()); + let resolved = resolve_provider(&providers, &mut current_provider, &discard_logger()); + assert_eq!(resolved.provider(), "p1"); + assert_eq!(current_provider, None); + } + + #[tokio::test] + async fn test_current_reachable_no_switch() { + // Current provider is reachable: on_poll_failure should return early without + // switching, even if alternatives are also reachable. + let providers: Vec> = vec![ + MockEthAdapter::new("p0", true), + MockEthAdapter::new("p1", true), + ]; + let mut current_provider = Some("p0".to_string()); + on_poll_failure(&providers, &mut current_provider, &discard_logger()).await; + assert_eq!(current_provider, Some("p0".to_string())); + } + + #[tokio::test] + async fn test_successful_probe_switches_provider() { + let providers: Vec> = vec![ + MockEthAdapter::new("p0", false), + MockEthAdapter::new("p1", true), + ]; + let mut current_provider = Some("p0".to_string()); + on_poll_failure(&providers, &mut current_provider, &discard_logger()).await; + assert_eq!(current_provider, Some("p1".to_string())); + } + + #[tokio::test] + async fn test_all_unreachable_stays_on_current() { + let providers: Vec> = vec![ + MockEthAdapter::new("p0", false), + MockEthAdapter::new("p1", false), + ]; + let mut current_provider = Some("p0".to_string()); + on_poll_failure(&providers, &mut current_provider, &discard_logger()).await; + assert_eq!(current_provider, Some("p0".to_string())); + } + + #[tokio::test] + async fn test_single_provider_no_switch() { + let providers: Vec> = vec![MockEthAdapter::new("p0", true)]; + let mut current_provider = Some("p0".to_string()); + on_poll_failure(&providers, &mut current_provider, &discard_logger()).await; + assert_eq!(current_provider, Some("p0".to_string())); + } + + // --- Provider-list churn tests --- + // + // These tests simulate the state carried across loop iterations where + // all_cheapest() returns a different subset each time. + + /// Current provider survives a list shrink; its index shifts but identity is preserved. + /// + /// Before: [p0, p1, p2], active = p1 (index 1) + /// After: [p1, p2], active = p1 (index 0) + /// + /// With index-based tracking this would silently move to p2. Name-based tracking + /// must keep us on p1. + #[test] + fn test_list_shrinks_current_provider_index_remaps() { + let providers: Vec> = vec![ + MockEthAdapter::new("p1", true), + MockEthAdapter::new("p2", true), + ]; + let mut current_provider = Some("p1".to_string()); + let resolved = resolve_provider(&providers, &mut current_provider, &discard_logger()); + // p1 is now at index 0 — must not drift to p2 + assert_eq!(resolved.provider(), "p1"); + assert_eq!(current_provider, Some("p1".to_string())); + } + + /// After failing over to p1, p1 itself disappears from the validated list in the + /// next iteration. The ingestor must fall back to the first available provider (p0). + /// + /// Iteration 1: [p0(unreachable), p1(reachable)] — p0 fails, switch to p1 + /// Iteration 2: [p0(reachable), p2(reachable)] — p1 gone, reset to p0 + #[tokio::test] + async fn test_failover_target_then_disappears_resets_to_first() { + let logger = discard_logger(); + + // Iteration 1: p0 fails, switch to p1. + let providers_iter1: Vec> = vec![ + MockEthAdapter::new("p0", false), + MockEthAdapter::new("p1", true), + ]; + let mut current_provider = Some("p0".to_string()); + on_poll_failure(&providers_iter1, &mut current_provider, &logger).await; + assert_eq!(current_provider, Some("p1".to_string())); + + // Iteration 2: p1 has dropped from the validated list. + let providers_iter2: Vec> = vec![ + MockEthAdapter::new("p0", true), + MockEthAdapter::new("p2", true), + ]; + let resolved = resolve_provider(&providers_iter2, &mut current_provider, &logger); + assert_eq!(resolved.provider(), "p0"); // reset to first + assert_eq!(current_provider, None); + } + + /// After failing over to p1, the original provider p0 reappears in the list. + /// The ingestor must stay on p1 — there is no proactive return to p0. + /// + /// Iteration 1: [p0(unreachable), p1(reachable)] — switch to p1 + /// Iteration 2: [p0(reachable), p1(reachable)] — p0 is back; must stay on p1 + #[tokio::test] + async fn test_original_provider_reappears_no_involuntary_return() { + let logger = discard_logger(); + + // Iteration 1: switch to p1. + let providers_iter1: Vec> = vec![ + MockEthAdapter::new("p0", false), + MockEthAdapter::new("p1", true), + ]; + let mut current_provider = Some("p0".to_string()); + on_poll_failure(&providers_iter1, &mut current_provider, &logger).await; + assert_eq!(current_provider, Some("p1".to_string())); + + // Iteration 2: p0 is back in the validated list alongside p1. + let providers_iter2: Vec> = vec![ + MockEthAdapter::new("p0", true), + MockEthAdapter::new("p1", true), + ]; + let resolved = resolve_provider(&providers_iter2, &mut current_provider, &logger); + // Must resolve to p1, not drift back to p0. + assert_eq!(resolved.provider(), "p1"); + assert_eq!(current_provider, Some("p1".to_string())); + } +} diff --git a/chain/ethereum/src/network.rs b/chain/ethereum/src/network.rs index 98240ddea07..b68811fe571 100644 --- a/chain/ethereum/src/network.rs +++ b/chain/ethereum/src/network.rs @@ -249,6 +249,15 @@ impl EthereumNetworkAdapters { Self::cheapest_from(cheapest, required_capabilities, self.retest_percent) } + /// Returns all validated providers. Unvalidated providers are excluded. + pub async fn all_cheapest(&self) -> Vec> { + self.manager + .providers(&self.chain_id) + .await + .map(|adapters| adapters.map(|a| a.adapter.clone()).collect()) + .unwrap_or_default() + } + pub async fn cheapest(&self) -> Option> { // EthereumAdapters are sorted by their NodeCapabilities when the EthereumNetworks // struct is instantiated so they do not need to be sorted here diff --git a/docs/config.md b/docs/config.md index bcf15fc2c56..63c3b098ed5 100644 --- a/docs/config.md +++ b/docs/config.md @@ -182,6 +182,33 @@ protocol = "near" provider = [ { label = "near", details = { type = "firehose", url = "https://..", key = "", features = ["compression", "filters"] } } ] ``` +### Block ingestor failover + +When the block ingestor's `do_poll()` call fails (after all internal per-request retries are +exhausted), `graph-node` automatically attempts to switch to a healthier provider. The logic +is: + +1. **Probe the current provider first.** `do_poll()` can fail for reasons unrelated to RPC + availability (e.g. a database error or a chain reorg). If the current provider still + responds to `eth_blockNumber`, the failure was not caused by the provider — no switch + occurs. +2. **Probe all alternatives in parallel.** If the current provider is unreachable, all other + validated providers are probed simultaneously via `eth_blockNumber` to minimise wait time + when providers are timing out. +3. **Switch to the first reachable provider.** The first provider to respond successfully + to the probe is selected as the new provider for the ingestor. + The remaining probes are cancelled at this point. +4. **If all providers are unreachable**, the ingestor stays on the current provider and + re-probes on the next `do_poll()` failure. + +There is no automatic return to the original provider. Once the ingestor switches, it stays on +the new provider until that provider fails, at which point the same probe-and-switch logic +applies. + +Only validated providers are eligible as failover candidates. A provider in a temporary failure +state (e.g. unreachable at startup, pending re-validation) is excluded until it passes +validation again. + ### Controlling the number of subgraphs using a provider **This feature is experimental and might be removed in a future release** diff --git a/graph/src/util/futures.rs b/graph/src/util/futures.rs index 7dff592b342..01d775bfc6b 100644 --- a/graph/src/util/futures.rs +++ b/graph/src/util/futures.rs @@ -412,7 +412,8 @@ pub fn retry_strategy( Some(limit) => { // Items are delays *between* attempts, // so subtract 1 from limit. - Box::new(backoff.take(limit - 1)) + // Use saturating_sub to avoid underflow if limit is set to 0 + Box::new(backoff.take(limit.saturating_sub(1))) } None => Box::new(backoff), }