Skip to content

Commit 3e49734

Browse files
authored
feat: async header transform (#365)
* async header transform * concurrent headers transform * batch transform headers
1 parent 797d440 commit 3e49734

File tree

4 files changed

+29
-22
lines changed

4 files changed

+29
-22
lines changed

crates/net/network/src/config.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ pub struct NetworkConfig<C, N: NetworkPrimitives = EthNetworkPrimitives> {
9999
/// If non-empty, peers that don't have these blocks will be filtered out.
100100
pub required_block_hashes: Vec<B256>,
101101
/// A transformation hook applied to the downloaded headers.
102-
pub header_transform: Box<dyn HeaderTransform<N::BlockHeader>>,
102+
pub header_transform: Arc<dyn HeaderTransform<N::BlockHeader>>,
103103
}
104104

105105
// === impl NetworkConfig ===
@@ -232,7 +232,7 @@ pub struct NetworkConfigBuilder<N: NetworkPrimitives = EthNetworkPrimitives> {
232232
/// Optional network id
233233
network_id: Option<u64>,
234234
/// The header transform type.
235-
header_transform: Option<Box<dyn HeaderTransform<N::BlockHeader>>>,
235+
header_transform: Option<Arc<dyn HeaderTransform<N::BlockHeader>>>,
236236
}
237237

238238
impl NetworkConfigBuilder<EthNetworkPrimitives> {
@@ -605,7 +605,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
605605
/// Sets the header transform type.
606606
pub fn header_transform(
607607
mut self,
608-
header_transform: Box<dyn HeaderTransform<N::BlockHeader>>,
608+
header_transform: Arc<dyn HeaderTransform<N::BlockHeader>>,
609609
) -> Self {
610610
self.header_transform = Some(header_transform);
611611
self
@@ -717,7 +717,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
717717
nat,
718718
handshake,
719719
required_block_hashes,
720-
header_transform: header_transform.unwrap_or_else(|| Box::new(())),
720+
header_transform: header_transform.unwrap_or_else(|| Arc::new(())),
721721
}
722722
}
723723
}

crates/net/network/src/fetch/mod.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
5656
/// Sender for download requests, used to detach a [`FetchClient`]
5757
download_requests_tx: UnboundedSender<DownloadRequest<N>>,
5858
/// A transformation hook applied to the downloaded headers.
59-
header_transform: Box<dyn HeaderTransform<N::BlockHeader>>,
59+
header_transform: Arc<dyn HeaderTransform<N::BlockHeader>>,
6060
}
6161

6262
// === impl StateSyncer ===
@@ -65,7 +65,7 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
6565
pub(crate) fn new(
6666
peers_handle: PeersHandle,
6767
num_active_peers: Arc<AtomicUsize>,
68-
header_transform: Box<dyn HeaderTransform<N::BlockHeader>>,
68+
header_transform: Arc<dyn HeaderTransform<N::BlockHeader>>,
6969
) -> Self {
7070
let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
7171
Self {
@@ -279,10 +279,15 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
279279
resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
280280

281281
if let Some(resp) = resp {
282-
// apply the header transform and delegate the response
283-
let _ = resp.response.send(res.map(|h| {
284-
(peer_id, h.into_iter().map(|h| self.header_transform.map(h)).collect()).into()
285-
}));
282+
let header_transform = self.header_transform.clone();
283+
tokio::spawn(async move {
284+
let res = match res {
285+
Ok(headers) => Ok(header_transform.map(headers).await),
286+
Err(e) => Err(e),
287+
};
288+
289+
let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
290+
});
286291
}
287292

288293
if let Some(peer) = self.peers.get_mut(&peer_id) {
@@ -496,7 +501,7 @@ mod tests {
496501
let mut fetcher = StateFetcher::<EthNetworkPrimitives>::new(
497502
manager.handle(),
498503
Default::default(),
499-
Box::new(()),
504+
Arc::new(()),
500505
);
501506

502507
poll_fn(move |cx| {
@@ -521,7 +526,7 @@ mod tests {
521526
let mut fetcher = StateFetcher::<EthNetworkPrimitives>::new(
522527
manager.handle(),
523528
Default::default(),
524-
Box::new(()),
529+
Arc::new(()),
525530
);
526531
// Add a few random peers
527532
let peer1 = B512::random();
@@ -548,7 +553,7 @@ mod tests {
548553
let mut fetcher = StateFetcher::<EthNetworkPrimitives>::new(
549554
manager.handle(),
550555
Default::default(),
551-
Box::new(()),
556+
Arc::new(()),
552557
);
553558
// Add a few random peers
554559
let peer1 = B512::random();
@@ -577,7 +582,7 @@ mod tests {
577582
let mut fetcher = StateFetcher::<EthNetworkPrimitives>::new(
578583
manager.handle(),
579584
Default::default(),
580-
Box::new(()),
585+
Arc::new(()),
581586
);
582587
let peer_id = B512::random();
583588

@@ -611,7 +616,7 @@ mod tests {
611616
let mut fetcher = StateFetcher::<EthNetworkPrimitives>::new(
612617
manager.handle(),
613618
Default::default(),
614-
Box::new(()),
619+
Arc::new(()),
615620
);
616621
let peer_id = B512::random();
617622

crates/net/network/src/state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl<N: NetworkPrimitives> NetworkState<N> {
102102
discovery: Discovery,
103103
peers_manager: PeersManager,
104104
num_active_peers: Arc<AtomicUsize>,
105-
header_transform: Box<dyn HeaderTransform<N::BlockHeader>>,
105+
header_transform: Arc<dyn HeaderTransform<N::BlockHeader>>,
106106
) -> Self {
107107
let state_fetcher =
108108
StateFetcher::new(peers_manager.handle(), num_active_peers, header_transform);
@@ -582,7 +582,7 @@ mod tests {
582582
queued_messages: Default::default(),
583583
client: BlockNumReader(Box::new(NoopProvider::default())),
584584
discovery: Discovery::noop(),
585-
state_fetcher: StateFetcher::new(handle, Default::default(), Box::new(())),
585+
state_fetcher: StateFetcher::new(handle, Default::default(), Arc::new(())),
586586
}
587587
}
588588

crates/net/network/src/transform/header.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@
22
33
use reth_primitives_traits::BlockHeader;
44

5-
/// An instance of the trait applies a mapping to the input header.
5+
/// An instance of the trait applies a mapping to the input headers.
6+
#[async_trait::async_trait]
67
pub trait HeaderTransform<H: BlockHeader>: std::fmt::Debug + Send + Sync {
7-
/// Applies a mapping to the input header.
8-
fn map(&self, header: H) -> H;
8+
/// Applies a mapping to the input headers.
9+
async fn map(&self, headers: Vec<H>) -> Vec<H>;
910
}
1011

12+
#[async_trait::async_trait]
1113
impl<H: BlockHeader> HeaderTransform<H> for () {
12-
fn map(&self, header: H) -> H {
13-
header
14+
async fn map(&self, headers: Vec<H>) -> Vec<H> {
15+
headers
1416
}
1517
}
1618

0 commit comments

Comments
 (0)