Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion shared/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub type DelayedMsgsData = HashMap<U256, Bytes>;
#[async_trait]
pub trait BlockBuilder<T>: Send {
/// Process a single slot
fn build_block(&self, block: &PartialBlock, msgs_data: DelayedMsgsData) -> eyre::Result<T>;
fn build_block(&self, block: PartialBlock, msgs_data: DelayedMsgsData) -> eyre::Result<T>;
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)]
Expand Down
64 changes: 39 additions & 25 deletions shared/test-utils/src/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use eyre::Result;
use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder};
use redis::aio::ConnectionManager;
use std::{
collections::HashMap,
env,
future::Future,
path::Path,
Expand Down Expand Up @@ -125,6 +126,7 @@ impl Drop for E2EProcess {
pub async fn start_component(
executable_name: &str,
api_port: u16,
env_vars: HashMap<String, String>,
args: Vec<String>,
cargs: Vec<String>,
) -> Result<E2EProcess> {
Expand Down Expand Up @@ -160,18 +162,14 @@ pub async fn start_component(
tag = Ok("dev".to_string());
}
let mut docker = if let Ok(tag) = tag {
E2EProcess::new(
Command::new("docker")
.arg("run")
.arg("--init")
.arg("--rm")
.arg("--net=host")
.arg(format!(
"ghcr.io/syndicateprotocol/syndicate-appchains/{executable_name}:{tag}"
))
.args(args),
executable_name,
)
let mut cmd = Command::new("docker");
cmd.arg("run").arg("--init").arg("--rm").arg("--net=host");
for (key, value) in &env_vars {
cmd.arg("-e").arg(format!("{key}={value}"));
}
cmd.arg(format!("ghcr.io/syndicateprotocol/syndicate-appchains/{executable_name}:{tag}"))
.args(args);
E2EProcess::new(&mut cmd, executable_name)
} else {
let mut cmd = Command::new("cargo");
// ring has a custom build.rs script that rebuilds whenever certain environment
Expand All @@ -184,6 +182,7 @@ pub async fn start_component(
.env_remove("CARGO_PKG_VERSION_PRE")
.env_remove("CARGO_MANIFEST_LINKS")
.current_dir(env!("CARGO_WORKSPACE_DIR"))
.envs(env_vars)
.arg("run");

if needs_rocksdb {
Expand Down Expand Up @@ -217,6 +216,7 @@ pub async fn start_mchain(
appchain_chain_id: u64,
finality_delay: u64,
migration_params: Option<MigrationParams>,
l1_block_num_hardfork_ts: Option<u64>,
config_manager_rpc_url: Option<String>,
config_manager_address: Option<Address>,
) -> Result<(String, E2EProcess, MProvider)> {
Expand Down Expand Up @@ -258,8 +258,22 @@ pub async fn start_mchain(
args.extend(vec!["--config-manager-address".to_string(), address.to_string()]);
}

let docker =
start_component("synd-mchain", port, args, vec!["--datadir".to_string(), tmp_dir]).await?;
let mut env_vars = HashMap::new();
if let Some(custom_l1_block_num_hardfork_ts) = l1_block_num_hardfork_ts {
env_vars.insert(
"L1_BLOCK_NUM_HARDFORK_TS".to_string(),
custom_l1_block_num_hardfork_ts.to_string(),
);
}

let docker = start_component(
"synd-mchain",
port,
env_vars,
args,
vec!["--datadir".to_string(), tmp_dir],
)
.await?;
let url = format!("ws://localhost:{port}");
let mchain = MProvider::new(&url).await?;
Ok((url, docker, mchain))
Expand Down Expand Up @@ -372,6 +386,7 @@ pub async fn launch_nitro_node(args: NitroNodeArgs) -> Result<ChainInfo> {
.arg("--execution.tx-pre-checker.strictness=20")
.arg("--ensure-rollup-deployment=false")
.arg("--init.validate-genesis-assertion=false")
.arg("--execution.sequencer.max-acceptable-timestamp-delta=8760h") // 1 year
.arg(format!(
"--chain.info-json={}",
nitro_chain_info_json(NitroChainInfoArgs {
Expand Down Expand Up @@ -444,7 +459,9 @@ pub async fn start_valkey() -> Result<(E2EProcess, String)> {
Ok((valkey, valkey_url))
}

pub async fn launch_enclave_server() -> Result<(E2EProcess, String, Address)> {
pub async fn launch_enclave_server(
env_vars: HashMap<String, String>,
) -> Result<(E2EProcess, String, Address)> {
info!("launching enclave server");

let project_root = env!("CARGO_WORKSPACE_DIR");
Expand Down Expand Up @@ -489,16 +506,13 @@ pub async fn launch_enclave_server() -> Result<(E2EProcess, String, Address)> {
}

let port = PortManager::instance().next_port().await;
let docker = E2EProcess::new(
Command::new("docker")
.arg("run")
.arg("--init")
.arg("--rm")
.arg("-p")
.arg(format!("{port}:1234"))
.arg(image_name),
"enclave-server",
)?;
let mut cmd = Command::new("docker");
cmd.arg("run").arg("--init").arg("--rm");
for (key, value) in env_vars {
cmd.arg("-e").arg(format!("{key}={value}"));
}
cmd.arg("-p").arg(format!("{port}:1234")).arg(image_name);
let docker = E2EProcess::new(&mut cmd, "enclave-server")?;

let enclave_rpc_url = format!("http://localhost:{port}");

Expand Down
30 changes: 25 additions & 5 deletions synd-chain-ingestor/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ struct BlockStream<
> {
stream: Pin<Box<Peekable<ReadyChunks<S>>>>,
buffer: VecDeque<Block>,
block_builder: Arc<B>,
block_builder: B,
indexed_block_number: u64,
inbox_addr: Option<Address>,
client: EthClient,
Expand All @@ -225,7 +225,7 @@ impl<
{
fn new(
stream: S,
block_builder: Arc<B>,
block_builder: B,
start_block: u64,
client: EthClient,
init_data: (Vec<Address>, u64),
Expand Down Expand Up @@ -313,6 +313,26 @@ impl<
B: BlockBuilder<Block> + Sync,
> BlockStreamT<Block> for BlockStream<S, Block, B>
{
/// Receives the next block from the stream once a block with timestamp >= the provided
/// timestamp has arrived.
///
/// This function:
/// 1. Processes the initial message on first call (converting init data into batched requests)
/// 2. Executes queued init requests or polls the stream for new block data
/// 3. Builds blocks using the block builder with delayed message data
/// 4. Manages a buffer of blocks, handling reorgs by updating stale blocks in-place
/// 5. Returns the oldest block in the buffer once its timestamp meets the requirement
///
/// # Arguments
/// * `timestamp` - The minimum timestamp threshold for the returned block
///
/// # Returns
/// The next block with `block.timestamp >= timestamp`, pulled from the back of the buffer
///
/// # Errors
/// - If the stream closes unexpectedly
/// - If a reorg affects a block that has already been removed from the buffer
/// - If block building or delayed message processing fails
#[allow(clippy::unwrap_used)]
async fn recv(&mut self, timestamp: u64) -> eyre::Result<Block> {
let mut responses = vec![];
Expand Down Expand Up @@ -340,7 +360,7 @@ impl<
self.inbox_addr,
)
.await?;
let block = self.block_builder.build_block(&partial_block, delayed_msgs_data)?;
let block = self.block_builder.build_block(partial_block, delayed_msgs_data)?;
let block_number = block.block_ref().number;
assert!(
block_number <= self.indexed_block_number,
Expand Down Expand Up @@ -498,7 +518,7 @@ pub trait IngestorProvider: Sync {
&self,
start_block: u64,
addresses: Vec<Address>,
block_builder: Arc<impl BlockBuilder<Block> + Sync + 'static>,
block_builder: impl BlockBuilder<Block> + Sync + 'static,
client: EthClient,
inbox_addr: Option<Address>,
) -> Result<impl BlockStreamT<Block>, ClientError> {
Expand Down Expand Up @@ -598,7 +618,7 @@ impl IngestorProvider for IngestorProviderImpl {
&self,
start_block: u64,
addresses: Vec<Address>,
block_builder: Arc<impl BlockBuilder<Block> + Sync + 'static>,
block_builder: impl BlockBuilder<Block> + Sync + 'static,
client: EthClient,
inbox_addr: Option<Address>,
) -> Result<impl BlockStreamT<Block>, ClientError> {
Expand Down
16 changes: 8 additions & 8 deletions synd-chain-ingestor/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use common::types::SequencingBlock;
use shared::types::{BlockBuilder, PartialBlock};
use std::{sync::Arc, time::Duration};
use std::time::Duration;
use synd_chain_ingestor::{
client::{BlockStreamT, IngestorProvider, IngestorProviderConfig, IngestorProviderImpl},
eth_client::EthClient,
Expand All @@ -21,14 +21,15 @@ use tracing::info;
mod tests {
use super::*;
use shared::types::DelayedMsgsData;
use std::collections::HashMap;
use url::Url;

struct MockBlockBuilder;

impl BlockBuilder<SequencingBlock> for MockBlockBuilder {
fn build_block(
&self,
block: &PartialBlock,
block: PartialBlock,
_msgs_data: DelayedMsgsData,
) -> eyre::Result<SequencingBlock> {
Ok(SequencingBlock {
Expand Down Expand Up @@ -79,6 +80,7 @@ mod tests {
let sequencing_chain_ingestor = start_component(
"synd-chain-ingestor",
seq_chain_ingestor_cfg.port,
HashMap::new(),
seq_chain_ingestor_cfg.cli_args(),
Default::default(),
)
Expand Down Expand Up @@ -127,9 +129,8 @@ mod tests {
)
.await;

let mut block_stream = client
.get_blocks(start_block, vec![], Arc::new(MockBlockBuilder), eth_client, None)
.await?;
let mut block_stream =
client.get_blocks(start_block, vec![], MockBlockBuilder, eth_client, None).await?;

for _ in 0..post_init_blocks {
mine_block(&anvil.provider, 10).await?;
Expand Down Expand Up @@ -184,9 +185,8 @@ mod tests {
)
.await;

let mut block_stream = client
.get_blocks(start_block, vec![], Arc::new(MockBlockBuilder), eth_client, None)
.await?;
let mut block_stream =
client.get_blocks(start_block, vec![], MockBlockBuilder, eth_client, None).await?;

for _ in 0..post_init_blocks {
mine_block(&anvil.provider, 10).await?;
Expand Down
4 changes: 2 additions & 2 deletions synd-mchain/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ mod tests {
assert_eq!(mchain.get_block_number().await, 2);
mchain
.add_batch(&MBlock {
payload: Some(ArbitrumBatch::new(Default::default(), vec![empty.clone()])),
payload: Some(ArbitrumBatch::new(Default::default(), vec![empty.clone()], 0)),
slot: Slot { seq_block_number: 2, ..Default::default() },
timestamp: 0,
})
Expand All @@ -510,7 +510,7 @@ mod tests {
assert_eq!(mchain.get_block_number().await, 3);
mchain
.add_batch(&MBlock {
payload: Some(ArbitrumBatch::new(Default::default(), vec![empty; 2])),
payload: Some(ArbitrumBatch::new(Default::default(), vec![empty; 2], 0)),
slot: Slot { seq_block_number: 3, ..Default::default() },
timestamp: 0,
})
Expand Down
23 changes: 17 additions & 6 deletions synd-mchain/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use jsonrpsee::types::{error::INTERNAL_ERROR_CODE, ErrorObjectOwned};
#[cfg(feature = "rocksdb")]
use rocksdb::{DBWithThreadMode, ThreadMode};
use serde::{Deserialize, Serialize};
use std::fmt;
use tracing::{debug, trace};
use std::{env, fmt};
use tracing::{debug, info, trace};

/// VERSION must be bumped whenever a breaking change is made
const VERSION: u64 = 4;
Expand Down Expand Up @@ -64,12 +64,18 @@ pub struct ArbitrumBatch {
pub batch_data: Bytes,
/// The delayed messages included in this batch
pub delayed_messages: Vec<DelayedMessage>,
/// the l1 block number
pub l1_block_number: u64,
}

impl ArbitrumBatch {
/// Creates a new [`ArbitrumBatch`]
pub const fn new(batch_data: Bytes, delayed_messages: Vec<DelayedMessage>) -> Self {
Self { batch_data, delayed_messages }
pub const fn new(
batch_data: Bytes,
delayed_messages: Vec<DelayedMessage>,
l1_block_number: u64,
) -> Self {
Self { batch_data, delayed_messages, l1_block_number }
}
}

Expand Down Expand Up @@ -108,6 +114,9 @@ pub struct Block {
pub before_message_count: u64,
/// reorg data
pub slot: Slot,
// TODO this is a breaking chain, need to update MCHAIN version
/// the l1 block number
pub l1_block_number: u64,
}

impl Block {
Expand Down Expand Up @@ -347,6 +356,7 @@ pub trait ArbitrumDB {
};
let batch = arbitrum_batch.batch_data;
let messages = arbitrum_batch.delayed_messages;
let l1_block_number = arbitrum_batch.l1_block_number;

let mut block = Block {
timestamp: mblock.timestamp,
Expand All @@ -357,16 +367,16 @@ pub trait ArbitrumDB {
before_message_acc: state.message_acc,
messages: messages.iter().map(|x| (x.to_owned(), FixedBytes::ZERO)).collect(),
after_batch_acc: Default::default(),
l1_block_number,
};
let mut inbox_acc = block.before_message_acc;
let offset = self.get_migration_offset();
for (i, (msg, acc)) in block.messages.iter_mut().enumerate() {
let l1_block_num = block.slot.seq_block_number;
let message_hash = keccak256(
(
[msg.kind],
msg.sender,
l1_block_num,
block.l1_block_number,
mblock.timestamp,
U256::from(block.before_message_count + i as u64),
msg.base_fee_l1,
Expand Down Expand Up @@ -464,6 +474,7 @@ pub trait ArbitrumDB {
},
true,
)?;
info!("migration applied: {params:?}");

Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions synd-mchain/src/methods/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,13 @@ pub fn create_log(

/// Helper function to create a mock header object
pub fn create_header(batch_count: u64, offset: u64, block: &Block) -> alloy::rpc::types::Header {
let l1_block_num = block.slot.seq_block_number;
alloy::rpc::types::Header {
inner: alloy::consensus::Header {
number: batch_count + offset,
base_fee_per_gas: Some(1),
extra_data: FixedBytes::<32>::ZERO.into(),
#[allow(clippy::unwrap_used)]
mix_hash: U256::from(l1_block_num)
mix_hash: U256::from(block.l1_block_number)
.checked_shl(64)
.unwrap()
.checked_add(U256::from(1))
Expand Down
Loading