Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,11 @@ async fn prefill_rpc_caches_for_tipset(state_manager: StateManager, tsk: TipsetK
warn!("failed to call `Block::from_filecoin_tipset` for cache warmup: {e:#}");
}
}

{
if let Err(e) = state_manager.execution_trace(&ts).await {
warn!("failed to call `StateManager::execution_trace` for cache warmup: {e:#}");
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
{
use crate::rpc::eth::filter::{Matcher, SkipEvent};
struct CollectEventsCachePrefillingMatcher;
Expand Down
18 changes: 6 additions & 12 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3486,16 +3486,8 @@ async fn execute_tipset_traces(
ts: &Tipset,
ext: &http::Extensions,
) -> Result<(StateTree<DbImpl>, Vec<trace::TipsetTraceEntry>), ServerError> {
let (state_root, raw_traces) = {
let sm = ctx.state_manager.shallow_clone();
let ts = ts.shallow_clone();
tokio::task::spawn_blocking(move || sm.execution_trace(&ts))
.await
.context("execution_trace task panicked")??
};

let (state_root, raw_traces) = ctx.state_manager.execution_trace(ts).await?;
let state = ctx.state_manager.get_state_tree(&state_root)?;

let mut entries = Vec::new();
for (msg_idx, ir) in non_system_traces_with_positions(raw_traces) {
let tx_hash = EthGetTransactionHashByCid::handle(ctx.clone(), (ir.msg_cid,), ext).await?;
Expand All @@ -3515,8 +3507,8 @@ async fn execute_tipset_traces(
/// `transactionIndex` from `eth_getBlockByNumber`. System-actor messages
/// are filtered out without consuming a position.
fn non_system_traces_with_positions(
raw_traces: impl IntoIterator<Item = ApiInvocResult>,
) -> impl Iterator<Item = (i64, ApiInvocResult)> {
raw_traces: impl IntoIterator<Item = Arc<ApiInvocResult>>,
) -> impl Iterator<Item = (i64, Arc<ApiInvocResult>)> {
raw_traces
.into_iter()
.filter(|ir| ir.msg.from != system::ADDRESS.into())
Expand Down Expand Up @@ -3662,6 +3654,7 @@ async fn debug_trace_transaction(
let execution_trace = entry
.invoc_result
.execution_trace
.clone()
.context("no execution trace for transaction")?;

let mut env = trace::base_environment(&state, &entry.invoc_result.msg.from).map_err(|e| {
Expand Down Expand Up @@ -4086,7 +4079,7 @@ mod test {
use crate::shim::address::Address as ShimAddress;
use crate::shim::message::Message_v3;

let invoc_with_from = |from: ShimAddress| -> ApiInvocResult {
let invoc_with_from = |from: ShimAddress| -> Arc<ApiInvocResult> {
ApiInvocResult {
msg: Message_v3 {
to: ShimAddress::new_id(1).into(),
Expand All @@ -4096,6 +4089,7 @@ mod test {
.into(),
..Default::default()
}
.into()
};

let raw_traces = vec![
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/methods/eth/trace/parity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ fn trace_evm_private(
pub struct TipsetTraceEntry {
pub tx_hash: EthHash,
pub msg_position: i64,
pub invoc_result: crate::rpc::state::ApiInvocResult,
pub invoc_result: Arc<crate::rpc::state::ApiInvocResult>,
}

impl TipsetTraceEntry {
Expand Down
28 changes: 18 additions & 10 deletions src/rpc/methods/state/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@
use crate::blocks::TipsetKey;
use crate::lotus_json::{LotusJson, lotus_json_with_self};
use crate::message::MessageRead as _;
use crate::shim::executor::ApplyRet;
use crate::shim::{
address::Address,
clock::ChainEpoch,
econ::TokenAmount,
error::ExitCode,
executor::Receipt,
executor::{ApplyRet, Receipt},
fvm_latest::trace::IpldOperation,
message::Message,
state_tree::{ActorID, ActorState},
};
use crate::utils::get_size::raw_bytes_heap_size_helper;
use cid::Cid;
use fvm_ipld_encoding::RawBytes;
use get_size2::GetSize;
use num::Zero as _;
use schemars::{JsonSchema, Schema, SchemaGenerator};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -49,11 +50,12 @@ pub struct ForestComputeStateOutput {

lotus_json_with_self!(ForestComputeStateOutput);

#[derive(Debug, Default, Serialize, Deserialize, Clone, JsonSchema)]
#[derive(Debug, Default, Serialize, Deserialize, Clone, JsonSchema, GetSize)]
#[serde(rename_all = "PascalCase")]
pub struct ApiInvocResult {
#[serde(with = "crate::lotus_json")]
#[schemars(with = "LotusJson<Cid>")]
#[get_size(ignore)]
pub msg_cid: Cid,
#[serde(with = "crate::lotus_json")]
#[schemars(with = "LotusJson<Message>")]
Expand Down Expand Up @@ -81,11 +83,12 @@ impl PartialEq for ApiInvocResult {
}
}

#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize, JsonSchema, GetSize)]
#[serde(rename_all = "PascalCase")]
pub struct MessageGasCost {
#[serde(with = "crate::lotus_json")]
#[schemars(with = "LotusJson<Option<Cid>>")]
#[get_size(ignore)]
pub message: Option<Cid>,
#[serde(with = "crate::lotus_json")]
#[schemars(with = "LotusJson<TokenAmount>")]
Expand Down Expand Up @@ -151,6 +154,7 @@ impl MessageGasCost {
DeserializeFromStr,
strum::Display,
strum::EnumString,
GetSize,
)]
#[strum(serialize_all = "PascalCase")]
pub enum TraceIpldOp {
Expand Down Expand Up @@ -183,19 +187,20 @@ impl From<IpldOperation> for TraceIpldOp {
}

/// IPLD operation details attached to an [`ExecutionTrace`].
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, GetSize)]
#[serde(rename_all = "PascalCase")]
pub struct TraceIpld {
pub op: TraceIpldOp,
#[serde(with = "crate::lotus_json")]
#[schemars(with = "LotusJson<Cid>")]
#[get_size(ignore)]
pub cid: Cid,
pub size: u64,
}

lotus_json_with_self!(TraceIpld);

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, GetSize)]
#[serde(rename_all = "PascalCase")]
pub struct ExecutionTrace {
pub msg: MessageTrace,
Expand Down Expand Up @@ -228,7 +233,7 @@ impl ExecutionTrace {

lotus_json_with_self!(ExecutionTrace);

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, GetSize)]
#[serde(rename_all = "PascalCase")]
pub struct MessageTrace {
#[serde(with = "crate::lotus_json")]
Expand All @@ -243,6 +248,7 @@ pub struct MessageTrace {
pub method: u64,
#[serde(with = "crate::lotus_json")]
#[schemars(with = "LotusJson<RawBytes>")]
#[get_size(size_fn = raw_bytes_heap_size_helper)]
pub params: RawBytes,
pub params_codec: u64,
pub gas_limit: Option<u64>,
Expand All @@ -251,7 +257,7 @@ pub struct MessageTrace {

lotus_json_with_self!(MessageTrace);

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, GetSize)]
#[serde(rename_all = "PascalCase")]
pub struct ActorTrace {
pub id: ActorID,
Expand All @@ -262,19 +268,21 @@ pub struct ActorTrace {

lotus_json_with_self!(ActorTrace);

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, GetSize)]
#[serde(rename_all = "PascalCase")]
pub struct ReturnTrace {
#[get_size(ignore)]
pub exit_code: ExitCode,
#[serde(with = "crate::lotus_json")]
#[schemars(with = "LotusJson<RawBytes>")]
#[get_size(size_fn = raw_bytes_heap_size_helper)]
pub r#return: RawBytes,
pub return_codec: u64,
}

lotus_json_with_self!(ReturnTrace);

#[derive(Default, Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[derive(Default, Debug, Clone, Serialize, Deserialize, JsonSchema, GetSize)]
#[serde(rename_all = "PascalCase")]
pub struct GasTrace {
pub name: String,
Expand Down
10 changes: 2 additions & 8 deletions src/shim/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use get_size2::GetSize;
use serde::{Deserialize, Serialize};

use crate::shim::{address::Address, econ::TokenAmount};
use crate::utils::get_size::raw_bytes_heap_size_helper;

/// Method number indicator for calling actor methods.
pub type MethodNum = u64;
Expand All @@ -27,20 +28,13 @@ pub struct Message {
#[cfg_attr(test, arbitrary(gen(
|g| RawBytes::new(Vec::arbitrary(g))
)))]
#[get_size(size_fn = raw_bytes_heap_size)]
#[get_size(size_fn = raw_bytes_heap_size_helper)]
pub params: RawBytes,
pub gas_limit: u64,
pub gas_fee_cap: TokenAmount,
pub gas_premium: TokenAmount,
}

fn raw_bytes_heap_size(b: &RawBytes) -> usize {
// Note: this is a cheap but inaccurate estimation,
// the correct implementation should be `Vec<u8>.from(b.clone()).get_heap_size()`,
// or `b.bytes.get_heap_size()` if `bytes` is made public.
b.bytes().get_heap_size()
}

impl From<Message_v4> for Message {
fn from(other: Message_v4) -> Self {
Self {
Expand Down
19 changes: 14 additions & 5 deletions src/shim/state_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use super::actors::LoadActorStateFromBlockstore;
pub use super::fvm_shared_latest::{ActorID, state::StateRoot};
use crate::{
blocks::Tipset,
prelude::*,
shim::{actors::AccountActorStateLoad as _, address::Address, econ::TokenAmount},
};
use crate::{
networks::{ACTOR_BUNDLES_METADATA, ActorBundleMetadata},
shim::actors::account,
prelude::*,
shim::{
actors::{AccountActorStateLoad as _, account},
address::Address,
econ::TokenAmount,
},
utils::get_size::big_int_heap_size_helper,
};
use anyhow::bail;
use fvm_ipld_encoding::{
Expand All @@ -24,6 +26,7 @@ pub use fvm3::state_tree::{ActorState as ActorStateV3, StateTree as StateTreeV3}
pub use fvm4::state_tree::{
ActorState as ActorStateV4, ActorState as ActorState_latest, StateTree as StateTreeV4,
};
use get_size2::GetSize;
use num::FromPrimitive;
use num_derive::FromPrimitive;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -423,6 +426,12 @@ impl ActorState {
}
}

impl GetSize for ActorState {
fn get_heap_size(&self) -> usize {
big_int_heap_size_helper(self.balance.atto())
}
}

impl From<&ActorStateV2> for ActorState {
fn from(value: &ActorStateV2) -> Self {
Self(ActorState_latest {
Expand Down
32 changes: 28 additions & 4 deletions src/state_manager/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,39 @@ impl StateManager {
)
}

pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec<ApiInvocResult>)> {
pub async fn execution_trace(
&self,
tipset: &Tipset,
) -> anyhow::Result<(Cid, Vec<Arc<ApiInvocResult>>)> {
let key = tipset.key();
let (state_root, invoc_trace) = self
.trace_cache
.get_or_insert_async(key, {
let this = self.shallow_clone();
let tipset = tipset.shallow_clone();
async move {
tokio::task::spawn_blocking(move || this.execution_trace_inner(&tipset))
.await
.context("tokio join error")
.flatten()
}
})
.await?;
Ok((state_root.into(), invoc_trace))
}

fn execution_trace_inner(
&self,
tipset: &Tipset,
) -> anyhow::Result<(CidWrapper, Vec<Arc<ApiInvocResult>>)> {
let mut invoc_trace = vec![];

let genesis_timestamp = self.chain_store().genesis_block_header().timestamp;

let callback = |ctx: MessageCallbackCtx<'_>| {
match ctx.at {
CalledAt::Applied | CalledAt::Reward => {
invoc_trace.push(ApiInvocResult {
invoc_trace.push(Arc::new(ApiInvocResult {
msg_cid: ctx.message.cid(),
msg: ctx.message.message().clone(),
msg_rct: Some(ctx.apply_ret.msg_receipt()),
Expand All @@ -237,7 +261,7 @@ impl StateManager {
gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?,
execution_trace: structured::parse_events(ctx.apply_ret.exec_trace())
.unwrap_or_default(),
});
}));
Ok(())
}
_ => Ok(()), // ignored
Expand All @@ -255,6 +279,6 @@ impl StateManager {
VMTrace::Traced,
)?;

Ok((state_root, invoc_trace))
Ok((state_root.into(), invoc_trace))
}
}
6 changes: 6 additions & 0 deletions src/state_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::lotus_json::{LotusJson, lotus_json_with_self};
use crate::message::ChainMessage;
use crate::networks::ChainConfig;
use crate::prelude::*;
use crate::rpc::state::ApiInvocResult;
use crate::rpc::types::SectorOnChainInfo;
use crate::shim::actors::init::{self, State};
use crate::shim::actors::*;
Expand Down Expand Up @@ -60,6 +61,7 @@ use tracing::warn;

const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(8192usize);
const DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE: NonZeroUsize = nonzero!(8192usize);
const DEFAULT_TRACE_CACHE_SIZE: NonZeroUsize = nonzero!(32usize); // maximum ~135MiB on mainnet
pub const EVENTS_AMT_BITWIDTH: u32 = 5;
pub type IdToAddressCache = SizeTrackingCache<AddressId, Address>;

Expand Down Expand Up @@ -167,6 +169,8 @@ pub struct StateManager {
cs: ChainStore,
/// This is a cache which indexes tipsets to their calculated state output (state root, receipt root).
cache: ForestCache<TipsetKey, ExecutedTipset>,
/// This is a cache which indexes tipsets to their traces.
trace_cache: ForestCache<TipsetKey, (CidWrapper, Vec<Arc<ApiInvocResult>>)>,
id_to_deterministic_address_cache: IdToAddressCache,
beacon: Arc<crate::beacon::BeaconSchedule>,
engine: Arc<MultiEngine>,
Expand All @@ -177,6 +181,7 @@ impl ShallowClone for StateManager {
Self {
cs: self.cs.shallow_clone(),
cache: self.cache.shallow_clone(),
trace_cache: self.trace_cache.shallow_clone(),
id_to_deterministic_address_cache: self
.id_to_deterministic_address_cache
.shallow_clone(),
Expand Down Expand Up @@ -209,6 +214,7 @@ impl StateManager {
Ok(Self {
cs,
cache: ForestCache::new("tipset_state_executed_tipset"), // For StateOutput
trace_cache: ForestCache::with_size("tipset_trace", DEFAULT_TRACE_CACHE_SIZE),
beacon,
engine,
id_to_deterministic_address_cache: SizeTrackingCache::new_with_metrics(
Expand Down
7 changes: 7 additions & 0 deletions src/utils/get_size/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ pub fn big_int_heap_size_helper(b: &BigInt) -> usize {
b.bits().div_ceil(8) as usize
}

pub fn raw_bytes_heap_size_helper(b: &fvm_ipld_encoding::RawBytes) -> usize {
// Note: this is a cheap but inaccurate estimation,
// the correct implementation should be `Vec<u8>.from(b.clone()).get_heap_size()`,
// or `b.bytes.get_heap_size()` if `bytes` is made public.
b.bytes().get_heap_size()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading