From e4208cb1f288febc7f6ada06002b0075f8b60384 Mon Sep 17 00:00:00 2001 From: Hubert Bugaj Date: Fri, 22 May 2026 13:23:13 +0200 Subject: [PATCH] fix: limit FVM concurrency + use blocking tasks --- src/rpc/methods/eth.rs | 4 +- src/rpc/methods/gas.rs | 4 +- src/rpc/methods/state.rs | 6 +- src/state_manager/execution.rs | 10 +- src/state_manager/message_simulation.rs | 127 ++++++++++++++---------- src/state_manager/mod.rs | 37 +++++++ src/state_manager/state_computation.rs | 4 +- 7 files changed, 123 insertions(+), 69 deletions(-) diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 18908282a310..4c5d6ae94df9 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -1893,7 +1893,7 @@ async fn gas_search( msg.gas_limit = limit; let (_invoc_res, apply_ret, _, _) = data .state_manager - .call_with_gas(&mut msg.into(), prior_messages, Some(ts), VMFlush::Skip) + .call_with_gas(msg.into(), prior_messages, Some(ts), VMFlush::Skip) .await?; Ok(apply_ret.msg_receipt().exit_code().is_success()) } @@ -2137,6 +2137,7 @@ async fn eth_get_code( match ctx .state_manager .call_on_state(state_root, &message, Some(ts)) + .await { Ok(res) => { break 'invoc res; @@ -2230,6 +2231,7 @@ async fn get_storage_at( match ctx .state_manager .call_on_state(state_root, &message, Some(ts)) + .await { Ok(res) => { break 'invoc res; diff --git a/src/rpc/methods/gas.rs b/src/rpc/methods/gas.rs index ec507edc55a3..2645d099af61 100644 --- a/src/rpc/methods/gas.rs +++ b/src/rpc/methods/gas.rs @@ -235,7 +235,7 @@ impl GasEstimateGasLimit { // Pretend that the message is signed. This has an influence on the gas // cost. We obviously can't generate a valid signature. Instead, we just // fill the signature with zeros. The validity is not checked. - let mut chain_msg = match from_a.protocol() { + let chain_msg: ChainMessage = match from_a.protocol() { Protocol::Secp256k1 => { SignedMessage::new_unchecked(msg, Signature::new_secp256k1(vec![0; SECP_SIG_LEN])) .into() @@ -253,7 +253,7 @@ impl GasEstimateGasLimit { let (invoc_res, apply_ret, _, _) = data .state_manager .call_with_gas( - &mut chain_msg, + chain_msg, &prior_messages, Some(ts.shallow_clone()), VMFlush::Skip, diff --git a/src/rpc/methods/state.rs b/src/rpc/methods/state.rs index e16367ba31f9..f3cc89b4b2a4 100644 --- a/src/rpc/methods/state.rs +++ b/src/rpc/methods/state.rs @@ -81,7 +81,7 @@ const INITIAL_PLEDGE_DEN: u64 = 100; pub enum StateCall {} impl StateCall { - pub fn run( + pub async fn run( state_manager: &StateManager, message: &Message, tsk: Option, @@ -89,7 +89,7 @@ impl StateCall { let tipset = state_manager .chain_store() .load_required_tipset_or_heaviest(&tsk)?; - Ok(state_manager.call(message, Some(tipset))?) + Ok(state_manager.call(message, Some(tipset)).await?) } } @@ -110,7 +110,7 @@ impl RpcMethod<2> for StateCall { (message, ApiTipsetKey(tsk)): Self::Params, _: &http::Extensions, ) -> Result { - Ok(Self::run(&ctx.state_manager, &message, tsk)?) + Ok(Self::run(&ctx.state_manager, &message, tsk).await?) } } diff --git a/src/state_manager/execution.rs b/src/state_manager/execution.rs index e86a2d13f419..e1e569aed463 100644 --- a/src/state_manager/execution.rs +++ b/src/state_manager/execution.rs @@ -15,7 +15,8 @@ impl StateManager { /// indicated message, assuming it was executed in the indicated tipset. pub async fn replay(&self, ts: Tipset, mcid: Cid) -> Result { let this = self.shallow_clone(); - tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid)).await? + self.run_on_fvm_pool(move || this.replay_blocking(ts, mcid)) + .await? } /// Blocking version of `replay` @@ -62,11 +63,8 @@ impl StateManager { target_message_cid: Cid, ) -> Result<(Cid, ApiInvocResult, Cid), Error> { let this = self.shallow_clone(); - tokio::task::spawn_blocking(move || { - this.replay_for_prestate_blocking(ts, target_message_cid) - }) - .await - .map_err(|e| Error::Other(format!("{e}")))? + self.run_on_fvm_pool(move || this.replay_for_prestate_blocking(ts, target_message_cid)) + .await? } fn replay_for_prestate_blocking( diff --git a/src/state_manager/message_simulation.rs b/src/state_manager/message_simulation.rs index 028c902c0c77..bacf5c78e8ae 100644 --- a/src/state_manager/message_simulation.rs +++ b/src/state_manager/message_simulation.rs @@ -95,15 +95,22 @@ impl StateManager { /// runs the given message and returns its result without any persisted /// changes. - pub fn call(&self, message: &Message, tipset: Option) -> Result { + pub async fn call( + &self, + message: &Message, + tipset: Option, + ) -> Result { let ts = tipset.unwrap_or_else(|| self.heaviest_tipset()); let chain_rand = self.chain_rand(ts.shallow_clone()); - self.call_raw(None, message, chain_rand, &ts) + let message = message.clone(); + let this = self.shallow_clone(); + self.run_on_fvm_pool(move || this.call_raw(None, &message, chain_rand, &ts)) + .await? } /// Same as [`StateManager::call`] but runs the message on the given state and not /// on the parent state of the tipset. - pub fn call_on_state( + pub async fn call_on_state( &self, state_cid: Cid, message: &Message, @@ -111,7 +118,10 @@ impl StateManager { ) -> Result { let ts = tipset.unwrap_or_else(|| self.cs.heaviest_tipset()); let chain_rand = self.chain_rand(ts.shallow_clone()); - self.call_raw(Some(state_cid), message, chain_rand, &ts) + let message = message.clone(); + let this = self.shallow_clone(); + self.run_on_fvm_pool(move || this.call_raw(Some(state_cid), &message, chain_rand, &ts)) + .await? } pub async fn apply_on_state_with_gas( @@ -127,7 +137,7 @@ impl StateManager { // Pretend that the message is signed. This has an influence on the gas // cost. We obviously can't generate a valid signature. Instead, we just // fill the signature with zeros. The validity is not checked. - let mut chain_msg = match from_a.protocol() { + let chain_msg: ChainMessage = match from_a.protocol() { Protocol::Secp256k1 => SignedMessage::new_unchecked( msg.clone(), Signature::new_secp256k1(vec![0; SECP_SIG_LEN]), @@ -144,7 +154,7 @@ impl StateManager { }; let (_invoc_res, apply_ret, duration, state_root) = self - .call_with_gas(&mut chain_msg, &[], Some(ts), vm_flush) + .call_with_gas(chain_msg, &[], Some(ts), vm_flush) .await?; Ok(( @@ -166,7 +176,7 @@ impl StateManager { /// messages and returns the values computed in the VM. pub async fn call_with_gas( &self, - message: &mut ChainMessage, + message: ChainMessage, prior_messages: &[ChainMessage], tipset: Option, vm_flush: VMFlush, @@ -178,55 +188,62 @@ impl StateManager { .map_err(|e| Error::Other(format!("Could not load tipset state: {e:#}")))?; let chain_rand = self.chain_rand(ts.clone()); - // Since we're simulating a future message, pretend we're applying it in the - // "next" tipset + // Simulate as if applied in the next tipset. let epoch = ts.epoch() + 1; - let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone()); - // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from - // FVM, but that introduces some constraints, and possible deadlocks. - let (ret, duration, state_cid) = stacker::grow(64 << 20, || -> anyhow::Result<_> { - let mut vm = VM::new( - ExecutionContext { - heaviest_tipset: ts.clone(), - state_tree_root: state_root, - epoch, - rand: Box::new(chain_rand), - base_fee: ts.block_headers().first().parent_base_fee.clone(), - circ_supply: genesis_info.get_vm_circulating_supply( + let chain_config = self.chain_config().shallow_clone(); + let genesis_info = GenesisInfo::from_chain_config(chain_config.shallow_clone()); + let chain_index = self.chain_index().shallow_clone(); + let engine = self.engine.shallow_clone(); + let base_fee = ts.block_headers().first().parent_base_fee.clone(); + let timestamp = ts.min_timestamp(); + let prior_messages: Vec = prior_messages.to_vec(); + + self.run_on_fvm_pool(move || -> Result<_, Error> { + // FVM requires a 64MiB stack; ThreadedExecutor is the alternative + // but adds constraints. + stacker::grow(64 << 20, move || -> anyhow::Result<_> { + let circ_supply = + genesis_info.get_vm_circulating_supply(epoch, chain_index.db(), &state_root)?; + let mut vm = VM::new( + ExecutionContext { + heaviest_tipset: ts, + state_tree_root: state_root, epoch, - self.chain_index().db(), - &state_root, - )?, - chain_config: self.chain_config().shallow_clone(), - chain_index: self.chain_index().shallow_clone(), - timestamp: ts.min_timestamp(), - }, - &self.engine, - VMTrace::NotTraced, - )?; - - for msg in prior_messages { - vm.apply_message(msg)?; - } - let from_actor = vm - .get_actor(&message.from()) - .map_err(|e| Error::Other(format!("Could not get actor from state: {e:#}")))? - .ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?; - - message.set_sequence(from_actor.sequence); - let (ret, duration) = vm.apply_message(message)?; - let state_root = match vm_flush { - VMFlush::Flush => Some(vm.flush()?), - VMFlush::Skip => None, - }; - Ok((ret, duration, state_root)) - })?; - - Ok(( - InvocResult::new(message.message().clone(), &ret), - ret, - duration, - state_cid, - )) + rand: Box::new(chain_rand), + base_fee, + circ_supply, + chain_config, + chain_index, + timestamp, + }, + &engine, + VMTrace::NotTraced, + )?; + + for m in &prior_messages { + vm.apply_message(m)?; + } + let mut message = message; + let from_actor = vm + .get_actor(&message.from()) + .map_err(|e| Error::Other(format!("Could not get actor from state: {e:#}")))? + .ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?; + + message.set_sequence(from_actor.sequence); + let (ret, duration) = vm.apply_message(&message)?; + let state_root = match vm_flush { + VMFlush::Flush => Some(vm.flush()?), + VMFlush::Skip => None, + }; + Ok(( + InvocResult::new(message.message().clone(), &ret), + ret, + duration, + state_root, + )) + }) + .map_err(|e| Error::Other(format!("FVM execution failed: {e:#}"))) + }) + .await? } } diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index 6b84217b2293..791796477e70 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -170,6 +170,11 @@ pub struct StateManager { id_to_deterministic_address_cache: IdToAddressCache, beacon: Arc, engine: Arc, + /// Caps concurrent FVM tasks at `num_cpus`. Any code path that runs + /// `VM::new` or `vm.apply_message` MUST go through + /// [`StateManager::run_on_fvm_pool`] — bypassing this semaphore silently + /// un-throttles every other FVM caller. + fvm_concurrency: Arc, } impl ShallowClone for StateManager { @@ -182,6 +187,7 @@ impl ShallowClone for StateManager { .shallow_clone(), beacon: self.beacon.shallow_clone(), engine: self.engine.shallow_clone(), + fvm_concurrency: self.fvm_concurrency.shallow_clone(), } } } @@ -205,6 +211,9 @@ impl StateManager { pub fn new_with_engine(cs: ChainStore, engine: Arc) -> anyhow::Result { let genesis = cs.genesis_block_header(); let beacon = Arc::new(cs.chain_config().get_beacon_schedule(genesis.timestamp)); + let cpus = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(8); Ok(Self { cs, @@ -215,9 +224,37 @@ impl StateManager { "id_to_deterministic_address", DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE, ), + fvm_concurrency: Arc::new(tokio::sync::Semaphore::new(cpus)), }) } + /// Run a CPU-bound FVM closure on the blocking pool, throttled to + /// `num_cpus` concurrent invocations via this `StateManager`'s + /// `fvm_concurrency` semaphore. + pub(crate) async fn run_on_fvm_pool(&self, f: F) -> Result + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let permit = self + .fvm_concurrency + .clone() + .acquire_owned() + .await + .map_err(|_| Error::Other("FVM concurrency semaphore closed".to_string()))?; + // Tuple shape forces the closure to capture `permit`; dropping it + // becomes a compile error rather than a silent throttle loss. + let (result, _permit) = tokio::task::spawn_blocking(move || (f(), permit)) + .await + .map_err(|e| { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } + Error::Other(format!("FVM task join error: {e}")) + })?; + Ok(result) + } + /// Returns the currently tracked heaviest tipset. pub fn heaviest_tipset(&self) -> Tipset { self.chain_store().heaviest_tipset() diff --git a/src/state_manager/state_computation.rs b/src/state_manager/state_computation.rs index 66a2a9241dfb..9a65551a1447 100644 --- a/src/state_manager/state_computation.rs +++ b/src/state_manager/state_computation.rs @@ -149,7 +149,7 @@ impl StateManager { enable_tracing: VMTrace, ) -> Result { let this = self.shallow_clone(); - tokio::task::spawn_blocking(move || { + self.run_on_fvm_pool(move || { this.compute_tipset_state_blocking(tipset, callback, enable_tracing) }) .await? @@ -198,7 +198,7 @@ impl StateManager { enable_tracing: VMTrace, ) -> Result { let this = self.shallow_clone(); - tokio::task::spawn_blocking(move || { + self.run_on_fvm_pool(move || { this.compute_state_blocking(height, messages, tipset, callback, enable_tracing) }) .await?