Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions crates/chain-orchestrator/src/handle/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use crate::{ChainOrchestratorEvent, ChainOrchestratorStatus};
use reth_network_api::FullNetwork;
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_tokio_util::EventStream;
use rollup_node_primitives::{BlockInfo, L1MessageEnvelope};
use rollup_node_primitives::{BlockInfo, ChainImport, L1MessageEnvelope};
use scroll_db::L1MessageKey;
use scroll_network::ScrollNetworkHandle;
use scroll_network::{NewBlockWithPeer, ScrollNetworkHandle};
use tokio::sync::oneshot;

/// The commands that can be sent to the rollup manager.
Expand All @@ -29,6 +29,13 @@ pub enum ChainOrchestratorCommand<N: FullNetwork<Primitives = ScrollNetworkPrimi
DatabaseQuery(DatabaseQuery),
/// Revert the rollup node state to the specified L1 block number.
RevertToL1Block((u64, oneshot::Sender<bool>)),
/// Import a block from a remote source.
ImportBlock {
/// The block to import with peer info
block_with_peer: NewBlockWithPeer,
/// Response channel
response: oneshot::Sender<Result<ChainImport, String>>,
},
/// Enable gossiping of blocks to peers.
#[cfg(feature = "test-utils")]
SetGossip((bool, oneshot::Sender<()>)),
Expand Down
14 changes: 12 additions & 2 deletions crates/chain-orchestrator/src/handle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use super::ChainOrchestratorEvent;
use reth_network_api::FullNetwork;
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_tokio_util::EventStream;
use rollup_node_primitives::{BlockInfo, L1MessageEnvelope};
use rollup_node_primitives::{BlockInfo, ChainImport, L1MessageEnvelope};
use scroll_db::L1MessageKey;
use scroll_network::ScrollNetworkHandle;
use scroll_network::{NewBlockWithPeer, ScrollNetworkHandle};
use tokio::sync::{mpsc, oneshot};
use tracing::error;

Expand Down Expand Up @@ -132,6 +132,16 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ChainOrchestratorHand
rx.await
}

/// Import a block from a remote source.
pub async fn import_block(
&self,
block_with_peer: NewBlockWithPeer,
) -> Result<Result<ChainImport, String>, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
self.send_command(ChainOrchestratorCommand::ImportBlock { block_with_peer, response: tx });
rx.await
}

/// Sends a command to the rollup manager to enable or disable gossiping of blocks to peers.
#[cfg(feature = "test-utils")]
pub async fn set_gossip(&self, enabled: bool) -> Result<(), oneshot::error::RecvError> {
Expand Down
8 changes: 8 additions & 0 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,13 @@ impl<
self.notify(ChainOrchestratorEvent::UnwoundToL1Block(block_number));
let _ = tx.send(true);
}
ChainOrchestratorCommand::ImportBlock { block_with_peer, response } => {
let result = self
.import_chain(vec![block_with_peer.block.clone()], block_with_peer)
.await
.map_err(|e| e.to_string());
let _ = response.send(result);
}
#[cfg(feature = "test-utils")]
ChainOrchestratorCommand::SetGossip((enabled, tx)) => {
self.network.handle().set_gossip(enabled).await;
Expand Down Expand Up @@ -1218,6 +1225,7 @@ impl<
chain,
peer_id: block_with_peer.peer_id,
signature: block_with_peer.signature,
result,
})
}

Expand Down
9 changes: 3 additions & 6 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ async-trait.workspace = true

# alloy
alloy-chains.workspace = true
alloy-eips.workspace = true
alloy-primitives.workspace = true
alloy-provider.workspace = true
alloy-rpc-client.workspace = true
alloy-rpc-types-engine.workspace = true
alloy-signer-local.workspace = true
alloy-signer-aws = "1.0.30"
alloy-signer = "1.0.30"
Expand Down Expand Up @@ -59,6 +61,7 @@ reth-rpc-api.workspace = true
reth-rpc-eth-api.workspace = true
reth-rpc-eth-types.workspace = true
reth-tasks.workspace = true
reth-tokio-util.workspace = true
reth-transaction-pool.workspace = true
reth-trie-db.workspace = true

Expand All @@ -76,16 +79,13 @@ aws-config = "1.8.0"
aws-sdk-kms = "1.76.0"

# test-utils
alloy-eips = { workspace = true, optional = true }
alloy-rpc-types-eth = { workspace = true, optional = true }
alloy-rpc-types-engine = { workspace = true, optional = true }
reth-e2e-test-utils = { workspace = true, optional = true }
reth-engine-local = { workspace = true, optional = true }
reth-provider = { workspace = true, optional = true }
reth-rpc-layer = { workspace = true, optional = true }
reth-rpc-server-types = { workspace = true, optional = true }
reth-storage-api = { workspace = true, optional = true }
reth-tokio-util = { workspace = true, optional = true }
scroll-alloy-rpc-types-engine = { workspace = true, optional = true }
scroll-alloy-rpc-types.workspace = true

Expand Down Expand Up @@ -154,14 +154,11 @@ test-utils = [
"reth-e2e-test-utils",
"reth-rpc-server-types",
"reth-rpc-layer",
"reth-tokio-util",
"scroll-alloy-rpc-types-engine",
"alloy-rpc-types-engine",
"reth-primitives-traits/test-utils",
"reth-network-p2p/test-utils",
"rollup-node-chain-orchestrator/test-utils",
"scroll-network/test-utils",
"alloy-eips",
"reth-storage-api",
"alloy-rpc-types-eth",
]
21 changes: 21 additions & 0 deletions crates/node/src/add_ons/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ use std::sync::Arc;
mod handle;
pub use handle::ScrollAddOnsHandle;

mod remote_block_source;
pub use remote_block_source::RemoteBlockSourceAddOn;

mod rpc;
pub use rpc::{
RollupNodeAdminApiClient, RollupNodeAdminApiServer, RollupNodeApiClient, RollupNodeApiServer,
Expand Down Expand Up @@ -133,6 +136,8 @@ where

let (tx, rx) = tokio::sync::oneshot::channel();
let rpc_config = rollup_node_manager_addon.config().rpc_args.clone();
let remote_block_source_config =
rollup_node_manager_addon.config().remote_block_source_args.clone();

// Register rollupNode API and rollupNodeAdmin API if enabled
let rollup_node_rpc_ext = Arc::new(RollupNodeRpcExt::<N::Network>::new(rx));
Expand Down Expand Up @@ -161,6 +166,22 @@ where
.map_err(|_| eyre::eyre!("failed to send rollup manager handle"))?;
}

// Launch remote block source if enabled
if remote_block_source_config.enabled {
let remote_source = RemoteBlockSourceAddOn::new(
remote_block_source_config,
rollup_manager_handle.clone(),
)
.await?;
ctx.node
.task_executor()
.spawn_critical_with_shutdown_signal("remote_block_source", |shutdown| async move {
if let Err(e) = remote_source.run_until_shutdown(shutdown).await {
tracing::error!(target: "scroll::remote_source", ?e, "Remote block source failed");
}
});
}

Ok(ScrollAddOnsHandle { rollup_manager_handle, rpc_handle })
}
}
Expand Down
183 changes: 183 additions & 0 deletions crates/node/src/add_ons/remote_block_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//! Remote block source add-on for importing blocks from a remote L2 node
//! and building new blocks on top.

use crate::args::RemoteBlockSourceArgs;
use alloy_primitives::Signature;
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_client::RpcClient;
use alloy_transport::layers::RetryBackoffLayer;
use futures::StreamExt;
use reth_network_api::{FullNetwork, PeerId};
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_tasks::shutdown::Shutdown;
use reth_tokio_util::EventStream;
use rollup_node_chain_orchestrator::{ChainOrchestratorEvent, ChainOrchestratorHandle};
use scroll_alloy_network::Scroll;
use scroll_network::NewBlockWithPeer;
use tokio::time::{interval, Duration};

/// Remote block source add-on that imports blocks from a trusted remote L2 node
/// and triggers block building on top of each imported block.
#[derive(Debug)]
pub struct RemoteBlockSourceAddOn<N>
where
N: FullNetwork<Primitives = ScrollNetworkPrimitives>,
{
/// Configuration for the remote block source.
config: RemoteBlockSourceArgs,
/// Handle to the chain orchestrator for sending commands.
handle: ChainOrchestratorHandle<N>,
/// Tracks the last block number we imported from remote.
/// This is different from local head because we build blocks on top of imports.
last_imported_block: u64,
}

impl<N> RemoteBlockSourceAddOn<N>
where
N: FullNetwork<Primitives = ScrollNetworkPrimitives> + Send + Sync + 'static,
{
/// Creates a new remote block source add-on.
pub async fn new(
config: RemoteBlockSourceArgs,
handle: ChainOrchestratorHandle<N>,
) -> eyre::Result<Self> {
let last_imported_block = handle.status().await?.l2.fcs.head_block_info().number;
Ok(Self { config, handle, last_imported_block })
}

/// Runs the remote block source until shutdown.
pub async fn run_until_shutdown(mut self, mut shutdown: Shutdown) -> eyre::Result<()> {
let Some(url) = self.config.url.clone() else {
tracing::error!(target: "scroll::remote_source", "URL required when remote-source is enabled");
return Err(eyre::eyre!("URL required when remote-source is enabled"));
};

// Build remote provider with retry layer
let retry_layer = RetryBackoffLayer::new(10, 100, 330);
let client = RpcClient::builder().layer(retry_layer).http(url);
let remote = ProviderBuilder::<_, _, Scroll>::default().connect_client(client);

// Get event listener for waiting on block completion
let mut event_stream = match self.handle.get_event_listener().await {
Ok(stream) => stream,
Err(e) => {
tracing::error!(target: "scroll::remote_source", ?e, "Failed to get event listener");
return Err(eyre::eyre!(e));
}
};

let mut poll_interval = interval(Duration::from_millis(self.config.poll_interval_ms));

loop {
tokio::select! {
biased;
_guard = &mut shutdown => break,
_ = poll_interval.tick() => {
if let Err(e) = self.follow_and_build(&remote, &mut event_stream).await {
tracing::error!(target: "scroll::remote_source", ?e, "Sync error");
}
}
}
}

Ok(())
}

/// Follows the remote node and builds blocks on top of imported blocks.
async fn follow_and_build<P: Provider<Scroll>>(
&mut self,
remote: &P,
event_stream: &mut EventStream<ChainOrchestratorEvent>,
) -> eyre::Result<()> {
loop {
// Get remote head
let remote_block = remote
.get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
.full()
.await?
.ok_or_else(|| eyre::eyre!("Remote block not found"))?;

let remote_head = remote_block.header.number;

// Compare against last imported block
if remote_head <= self.last_imported_block {
tracing::trace!(target: "scroll::remote_source",
last_imported = self.last_imported_block,
remote_head,
"Already synced with remote");
return Ok(());
}

let blocks_behind = remote_head - self.last_imported_block;
tracing::info!(target: "scroll::remote_source",
last_imported = self.last_imported_block,
remote_head,
blocks_behind,
"Catching up");

// Fetch and import the next block from remote
let next_block_num = self.last_imported_block + 1;
let block = remote
.get_block_by_number(next_block_num.into())
.full()
.await?
.ok_or_else(|| eyre::eyre!("Block {} not found", next_block_num))?
.into_consensus()
.map_transactions(|tx| tx.inner.into_inner());

// Create NewBlockWithPeer with dummy peer_id and signature (trusted source)
let block_with_peer = NewBlockWithPeer {
peer_id: PeerId::default(),
block,
signature: Signature::new(Default::default(), Default::default(), false),
};

// Import the block (this will cause a reorg if we had a locally built block at this
// height)
let chain_import = match self.handle.import_block(block_with_peer).await {
Ok(Ok(chain_import)) => {
self.last_imported_block = next_block_num;
chain_import
}
Ok(Err(e)) => {
return Err(eyre::eyre!("Import block failed: {}", e));
}
Err(e) => {
return Err(eyre::eyre!("chain orchestrator command channel error: {}", e));
}
};

if !chain_import.result.is_valid() {
tracing::info!(target: "scroll::remote_source",
result = ?chain_import.result,
"Imported block is not valid according to forkchoice, skipping build");
continue;
}

// Trigger block building on top of the imported block
self.handle.build_block();

// Wait for BlockSequenced event
tracing::debug!(target: "scroll::remote_source", "Waiting for block to be built...");
loop {
match event_stream.next().await {
Some(ChainOrchestratorEvent::BlockSequenced(block)) => {
tracing::info!(target: "scroll::remote_source",
block_number = block.header.number,
block_hash = ?block.hash_slow(),
"Block built successfully, proceeding to next");
break;
}
Some(_) => {
// Ignore other events, keep waiting
}
None => {
return Err(eyre::eyre!("Event stream ended unexpectedly"));
}
}
}

// Loop continues to process next block
}
}
}
Loading
Loading