Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1893,7 +1893,7 @@ async fn gas_search(
msg.gas_limit = limit;
let (_invoc_res, apply_ret, _, _) = data
.state_manager
.call_with_gas(&mut msg.into(), prior_messages, Some(ts), VMFlush::Skip)
.call_with_gas(msg.into(), prior_messages, Some(ts), VMFlush::Skip)
.await?;
Ok(apply_ret.msg_receipt().exit_code().is_success())
}
Expand Down Expand Up @@ -2137,6 +2137,7 @@ async fn eth_get_code(
match ctx
.state_manager
.call_on_state(state_root, &message, Some(ts))
.await
{
Ok(res) => {
break 'invoc res;
Expand Down Expand Up @@ -2230,6 +2231,7 @@ async fn get_storage_at(
match ctx
.state_manager
.call_on_state(state_root, &message, Some(ts))
.await
{
Ok(res) => {
break 'invoc res;
Expand Down
4 changes: 2 additions & 2 deletions src/rpc/methods/gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl GasEstimateGasLimit {
// Pretend that the message is signed. This has an influence on the gas
// cost. We obviously can't generate a valid signature. Instead, we just
// fill the signature with zeros. The validity is not checked.
let mut chain_msg = match from_a.protocol() {
let chain_msg: ChainMessage = match from_a.protocol() {
Protocol::Secp256k1 => {
SignedMessage::new_unchecked(msg, Signature::new_secp256k1(vec![0; SECP_SIG_LEN]))
.into()
Expand All @@ -253,7 +253,7 @@ impl GasEstimateGasLimit {
let (invoc_res, apply_ret, _, _) = data
.state_manager
.call_with_gas(
&mut chain_msg,
chain_msg,
&prior_messages,
Some(ts.shallow_clone()),
VMFlush::Skip,
Expand Down
6 changes: 3 additions & 3 deletions src/rpc/methods/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ const INITIAL_PLEDGE_DEN: u64 = 100;
pub enum StateCall {}

impl StateCall {
pub fn run(
pub async fn run(
state_manager: &StateManager,
message: &Message,
tsk: Option<TipsetKey>,
) -> anyhow::Result<ApiInvocResult> {
let tipset = state_manager
.chain_store()
.load_required_tipset_or_heaviest(&tsk)?;
Ok(state_manager.call(message, Some(tipset))?)
Ok(state_manager.call(message, Some(tipset)).await?)
}
}

Expand All @@ -110,7 +110,7 @@ impl RpcMethod<2> for StateCall {
(message, ApiTipsetKey(tsk)): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
Ok(Self::run(&ctx.state_manager, &message, tsk)?)
Ok(Self::run(&ctx.state_manager, &message, tsk).await?)
}
}

Expand Down
10 changes: 4 additions & 6 deletions src/state_manager/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ impl StateManager {
/// indicated message, assuming it was executed in the indicated tipset.
pub async fn replay(&self, ts: Tipset, mcid: Cid) -> Result<ApiInvocResult, Error> {
let this = self.shallow_clone();
tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid)).await?
self.run_on_fvm_pool(move || this.replay_blocking(ts, mcid))
.await?
}

/// Blocking version of `replay`
Expand Down Expand Up @@ -62,11 +63,8 @@ impl StateManager {
target_message_cid: Cid,
) -> Result<(Cid, ApiInvocResult, Cid), Error> {
let this = self.shallow_clone();
tokio::task::spawn_blocking(move || {
this.replay_for_prestate_blocking(ts, target_message_cid)
})
.await
.map_err(|e| Error::Other(format!("{e}")))?
self.run_on_fvm_pool(move || this.replay_for_prestate_blocking(ts, target_message_cid))
.await?
}

fn replay_for_prestate_blocking(
Expand Down
127 changes: 72 additions & 55 deletions src/state_manager/message_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,33 @@ impl StateManager {

/// runs the given message and returns its result without any persisted
/// changes.
pub fn call(&self, message: &Message, tipset: Option<Tipset>) -> Result<ApiInvocResult, Error> {
pub async fn call(
&self,
message: &Message,
tipset: Option<Tipset>,
) -> Result<ApiInvocResult, Error> {
let ts = tipset.unwrap_or_else(|| self.heaviest_tipset());
let chain_rand = self.chain_rand(ts.shallow_clone());
self.call_raw(None, message, chain_rand, &ts)
let message = message.clone();
let this = self.shallow_clone();
self.run_on_fvm_pool(move || this.call_raw(None, &message, chain_rand, &ts))
.await?
}

/// Same as [`StateManager::call`] but runs the message on the given state and not
/// on the parent state of the tipset.
pub fn call_on_state(
pub async fn call_on_state(
&self,
state_cid: Cid,
message: &Message,
tipset: Option<Tipset>,
) -> Result<ApiInvocResult, Error> {
let ts = tipset.unwrap_or_else(|| self.cs.heaviest_tipset());
let chain_rand = self.chain_rand(ts.shallow_clone());
self.call_raw(Some(state_cid), message, chain_rand, &ts)
let message = message.clone();
let this = self.shallow_clone();
self.run_on_fvm_pool(move || this.call_raw(Some(state_cid), &message, chain_rand, &ts))
.await?
}

pub async fn apply_on_state_with_gas(
Expand All @@ -127,7 +137,7 @@ impl StateManager {
// Pretend that the message is signed. This has an influence on the gas
// cost. We obviously can't generate a valid signature. Instead, we just
// fill the signature with zeros. The validity is not checked.
let mut chain_msg = match from_a.protocol() {
let chain_msg: ChainMessage = match from_a.protocol() {
Protocol::Secp256k1 => SignedMessage::new_unchecked(
msg.clone(),
Signature::new_secp256k1(vec![0; SECP_SIG_LEN]),
Expand All @@ -144,7 +154,7 @@ impl StateManager {
};

let (_invoc_res, apply_ret, duration, state_root) = self
.call_with_gas(&mut chain_msg, &[], Some(ts), vm_flush)
.call_with_gas(chain_msg, &[], Some(ts), vm_flush)
.await?;

Ok((
Expand All @@ -166,7 +176,7 @@ impl StateManager {
/// messages and returns the values computed in the VM.
pub async fn call_with_gas(
&self,
message: &mut ChainMessage,
message: ChainMessage,
prior_messages: &[ChainMessage],
tipset: Option<Tipset>,
vm_flush: VMFlush,
Expand All @@ -178,55 +188,62 @@ impl StateManager {
.map_err(|e| Error::Other(format!("Could not load tipset state: {e:#}")))?;
let chain_rand = self.chain_rand(ts.clone());

// Since we're simulating a future message, pretend we're applying it in the
// "next" tipset
// Simulate as if applied in the next tipset.
let epoch = ts.epoch() + 1;
let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone());
// FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from
// FVM, but that introduces some constraints, and possible deadlocks.
let (ret, duration, state_cid) = stacker::grow(64 << 20, || -> anyhow::Result<_> {
let mut vm = VM::new(
ExecutionContext {
heaviest_tipset: ts.clone(),
state_tree_root: state_root,
epoch,
rand: Box::new(chain_rand),
base_fee: ts.block_headers().first().parent_base_fee.clone(),
circ_supply: genesis_info.get_vm_circulating_supply(
let chain_config = self.chain_config().shallow_clone();
let genesis_info = GenesisInfo::from_chain_config(chain_config.shallow_clone());
let chain_index = self.chain_index().shallow_clone();
let engine = self.engine.shallow_clone();
let base_fee = ts.block_headers().first().parent_base_fee.clone();
let timestamp = ts.min_timestamp();
let prior_messages: Vec<ChainMessage> = prior_messages.to_vec();

self.run_on_fvm_pool(move || -> Result<_, Error> {
// FVM requires a 64MiB stack; ThreadedExecutor is the alternative
// but adds constraints.
stacker::grow(64 << 20, move || -> anyhow::Result<_> {
let circ_supply =
genesis_info.get_vm_circulating_supply(epoch, chain_index.db(), &state_root)?;
let mut vm = VM::new(
ExecutionContext {
heaviest_tipset: ts,
state_tree_root: state_root,
epoch,
self.chain_index().db(),
&state_root,
)?,
chain_config: self.chain_config().shallow_clone(),
chain_index: self.chain_index().shallow_clone(),
timestamp: ts.min_timestamp(),
},
&self.engine,
VMTrace::NotTraced,
)?;

for msg in prior_messages {
vm.apply_message(msg)?;
}
let from_actor = vm
.get_actor(&message.from())
.map_err(|e| Error::Other(format!("Could not get actor from state: {e:#}")))?
.ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?;

message.set_sequence(from_actor.sequence);
let (ret, duration) = vm.apply_message(message)?;
let state_root = match vm_flush {
VMFlush::Flush => Some(vm.flush()?),
VMFlush::Skip => None,
};
Ok((ret, duration, state_root))
})?;

Ok((
InvocResult::new(message.message().clone(), &ret),
ret,
duration,
state_cid,
))
rand: Box::new(chain_rand),
base_fee,
circ_supply,
chain_config,
chain_index,
timestamp,
},
&engine,
VMTrace::NotTraced,
)?;

for m in &prior_messages {
vm.apply_message(m)?;
}
let mut message = message;
let from_actor = vm
.get_actor(&message.from())
.map_err(|e| Error::Other(format!("Could not get actor from state: {e:#}")))?
.ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?;

message.set_sequence(from_actor.sequence);
let (ret, duration) = vm.apply_message(&message)?;
let state_root = match vm_flush {
VMFlush::Flush => Some(vm.flush()?),
VMFlush::Skip => None,
};
Ok((
InvocResult::new(message.message().clone(), &ret),
ret,
duration,
state_root,
))
})
.map_err(|e| Error::Other(format!("FVM execution failed: {e:#}")))
})
.await?
}
}
37 changes: 37 additions & 0 deletions src/state_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ pub struct StateManager {
id_to_deterministic_address_cache: IdToAddressCache,
beacon: Arc<crate::beacon::BeaconSchedule>,
engine: Arc<MultiEngine>,
/// Caps concurrent FVM tasks at `num_cpus`. Any code path that runs
/// `VM::new` or `vm.apply_message` MUST go through
/// [`StateManager::run_on_fvm_pool`] — bypassing this semaphore silently
/// un-throttles every other FVM caller.
fvm_concurrency: Arc<tokio::sync::Semaphore>,
}

impl ShallowClone for StateManager {
Expand All @@ -182,6 +187,7 @@ impl ShallowClone for StateManager {
.shallow_clone(),
beacon: self.beacon.shallow_clone(),
engine: self.engine.shallow_clone(),
fvm_concurrency: self.fvm_concurrency.shallow_clone(),
}
}
}
Expand All @@ -205,6 +211,9 @@ impl StateManager {
pub fn new_with_engine(cs: ChainStore, engine: Arc<MultiEngine>) -> anyhow::Result<Self> {
let genesis = cs.genesis_block_header();
let beacon = Arc::new(cs.chain_config().get_beacon_schedule(genesis.timestamp));
let cpus = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(8);

Ok(Self {
cs,
Expand All @@ -215,9 +224,37 @@ impl StateManager {
"id_to_deterministic_address",
DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE,
),
fvm_concurrency: Arc::new(tokio::sync::Semaphore::new(cpus)),
})
}

/// Run a CPU-bound FVM closure on the blocking pool, throttled to
/// `num_cpus` concurrent invocations via this `StateManager`'s
/// `fvm_concurrency` semaphore.
pub(crate) async fn run_on_fvm_pool<F, R>(&self, f: F) -> Result<R, Error>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let permit = self
.fvm_concurrency
.clone()
.acquire_owned()
.await
.map_err(|_| Error::Other("FVM concurrency semaphore closed".to_string()))?;
// Tuple shape forces the closure to capture `permit`; dropping it
// becomes a compile error rather than a silent throttle loss.
let (result, _permit) = tokio::task::spawn_blocking(move || (f(), permit))
.await
.map_err(|e| {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
}
Error::Other(format!("FVM task join error: {e}"))
})?;
Ok(result)
}

/// Returns the currently tracked heaviest tipset.
pub fn heaviest_tipset(&self) -> Tipset {
self.chain_store().heaviest_tipset()
Expand Down
4 changes: 2 additions & 2 deletions src/state_manager/state_computation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl StateManager {
enable_tracing: VMTrace,
) -> Result<ExecutedTipset, Error> {
let this = self.shallow_clone();
tokio::task::spawn_blocking(move || {
self.run_on_fvm_pool(move || {
this.compute_tipset_state_blocking(tipset, callback, enable_tracing)
})
.await?
Expand Down Expand Up @@ -198,7 +198,7 @@ impl StateManager {
enable_tracing: VMTrace,
) -> Result<ExecutedTipset, Error> {
let this = self.shallow_clone();
tokio::task::spawn_blocking(move || {
self.run_on_fvm_pool(move || {
this.compute_state_blocking(height, messages, tipset, callback, enable_tracing)
})
.await?
Expand Down
Loading