diff --git a/crates/core/src/spk_client.rs b/crates/core/src/spk_client.rs index 64fe3dd8c..03124eb00 100644 --- a/crates/core/src/spk_client.rs +++ b/crates/core/src/spk_client.rs @@ -2,7 +2,7 @@ use crate::{ alloc::{boxed::Box, collections::VecDeque, vec::Vec}, collections::{BTreeMap, HashMap, HashSet}, - CheckPoint, ConfirmationBlockTime, Indexed, + BlockId, CheckPoint, ConfirmationBlockTime, Indexed, }; use bitcoin::{BlockHash, OutPoint, Script, ScriptBuf, Txid}; @@ -132,6 +132,14 @@ impl SyncRequestBuilder { self } + /// Set the target chain tip for the sync request. + /// + /// The sync will not fetch chain data beyond this target. + pub fn target(mut self, target: BlockId) -> Self { + self.inner.target = Some(target); + self + } + /// Add [`Script`]s coupled with associated indexes that will be synced against. /// /// # Example @@ -259,6 +267,7 @@ impl SyncRequestBuilder { pub struct SyncRequest { start_time: u64, chain_tip: Option>, + target: Option, spks: VecDeque<(I, ScriptBuf)>, spks_consumed: usize, spk_expected_txids: HashMap>, @@ -289,6 +298,7 @@ impl SyncRequest { inner: Self { start_time, chain_tip: None, + target: None, spks: VecDeque::new(), spks_consumed: 0, spk_expected_txids: HashMap::new(), @@ -337,6 +347,11 @@ impl SyncRequest { self.chain_tip.clone() } + /// Get the target chain tip [`BlockId`] of this request (if any). + pub fn target(&self) -> Option { + self.target + } + /// Advances the sync request and returns the next [`ScriptBuf`] with corresponding [`Txid`] /// history. /// @@ -444,6 +459,14 @@ impl FullScanRequestBuilder { self } + /// Set the target chain tip for the full scan request. + /// + /// The sync will not fetch chain data beyond this target. + pub fn target(mut self, target: BlockId) -> Self { + self.inner.target = Some(target); + self + } + /// Set the spk iterator for a given `keychain`. pub fn spks_for_keychain( mut self, @@ -495,6 +518,7 @@ impl FullScanRequestBuilder { pub struct FullScanRequest { start_time: u64, chain_tip: Option>, + target: Option, spks_by_keychain: BTreeMap> + Send>>, last_revealed: BTreeMap, inspect: Box>, @@ -520,6 +544,7 @@ impl FullScanRequest { inner: Self { start_time, chain_tip: None, + target: None, spks_by_keychain: BTreeMap::new(), last_revealed: BTreeMap::new(), inspect: Box::new(|_, _, _| ()), @@ -551,6 +576,11 @@ impl FullScanRequest { self.chain_tip.clone() } + /// Get the target chain tip [`BlockId`] of this request (if any). + pub fn target(&self) -> Option { + self.target + } + /// List all keychains contained in this request. pub fn keychains(&self) -> Vec { self.spks_by_keychain.keys().cloned().collect() diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index a7c943150..c00dee350 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -132,7 +132,11 @@ impl BdkElectrumClient { let start_time = request.start_time(); let tip_and_latest_blocks = match request.chain_tip() { - Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?), + Some(chain_tip) => Some(fetch_tip_and_latest_blocks( + &self.inner, + chain_tip, + request.target(), + )?), None => None, }; @@ -150,6 +154,7 @@ impl BdkElectrumClient { spks, stop_gap, last_revealed, + request.target(), batch_size, &mut pending_anchors, )? { @@ -215,9 +220,10 @@ impl BdkElectrumClient { ) -> Result { let mut request: SyncRequest = request.into(); let start_time = request.start_time(); + let target = request.target(); let tip_and_latest_blocks = match request.chain_tip() { - Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip)?), + Some(chain_tip) => Some(fetch_tip_and_latest_blocks(&self.inner, chain_tip, target)?), None => None, }; @@ -232,6 +238,7 @@ impl BdkElectrumClient { .map(|(i, spk)| (i as u32, spk)), usize::MAX, None, + target, batch_size, &mut pending_anchors, )?; @@ -239,12 +246,14 @@ impl BdkElectrumClient { start_time, &mut tx_update, request.iter_txids(), + target, &mut pending_anchors, )?; self.populate_with_outpoints( start_time, &mut tx_update, request.iter_outpoints(), + target, &mut pending_anchors, )?; @@ -288,6 +297,7 @@ impl BdkElectrumClient { mut spks_with_expected_txids: impl Iterator, stop_gap: usize, last_revealed: Option, + target: Option, batch_size: usize, pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result, Error> { @@ -335,7 +345,11 @@ impl BdkElectrumClient { match tx_res.height.try_into() { // Returned heights 0 & -1 are reserved for unconfirmed txs. Ok(height) if height > 0 => { - pending_anchors.push((tx_res.tx_hash, height)); + if target.map_or(true, |t| height as u32 <= t.height) { + pending_anchors.push((tx_res.tx_hash, height)); + } else { + tx_update.seen_ats.insert((tx_res.tx_hash, start_time)); + } } _ => { tx_update.seen_ats.insert((tx_res.tx_hash, start_time)); @@ -355,6 +369,7 @@ impl BdkElectrumClient { start_time: u64, tx_update: &mut TxUpdate, outpoints: impl IntoIterator, + target: Option, pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result<(), Error> { // Collect valid outpoints with their corresponding `spk` and `tx`. @@ -398,7 +413,11 @@ impl BdkElectrumClient { match res.height.try_into() { // Returned heights 0 & -1 are reserved for unconfirmed txs. Ok(height) if height > 0 => { - pending_anchors.push((res.tx_hash, height)); + if target.map_or(true, |t| height as u32 <= t.height) { + pending_anchors.push((res.tx_hash, height)); + } else { + tx_update.seen_ats.insert((res.tx_hash, start_time)); + } } _ => { tx_update.seen_ats.insert((res.tx_hash, start_time)); @@ -420,7 +439,11 @@ impl BdkElectrumClient { match res.height.try_into() { // Returned heights 0 & -1 are reserved for unconfirmed txs. Ok(height) if height > 0 => { - pending_anchors.push((res.tx_hash, height)); + if target.map_or(true, |t| height as u32 <= t.height) { + pending_anchors.push((res.tx_hash, height)); + } else { + tx_update.seen_ats.insert((res.tx_hash, start_time)); + } } _ => { tx_update.seen_ats.insert((res.tx_hash, start_time)); @@ -440,6 +463,7 @@ impl BdkElectrumClient { start_time: u64, tx_update: &mut TxUpdate, txids: impl IntoIterator, + target: Option, pending_anchors: &mut Vec<(Txid, usize)>, ) -> Result<(), Error> { let mut txs = Vec::<(Txid, Arc)>::new(); @@ -501,7 +525,11 @@ impl BdkElectrumClient { match res.height.try_into() { // Returned heights 0 & -1 are reserved for unconfirmed txs. Ok(height) if height > 0 => { - pending_anchors.push((tx.0, height)); + if target.map_or(true, |t| height as u32 <= t.height) { + pending_anchors.push((tx.0, height)); + } else { + tx_update.seen_ats.insert((res.tx_hash, start_time)); + } } _ => { tx_update.seen_ats.insert((res.tx_hash, start_time)); @@ -652,9 +680,16 @@ impl BdkElectrumClient { fn fetch_tip_and_latest_blocks( client: &impl ElectrumApi, prev_tip: CheckPoint, + target: Option, ) -> Result<(CheckPoint, BTreeMap), Error> { let HeaderNotification { height, .. } = client.block_headers_subscribe()?; - let new_tip_height = height as u32; + let mut new_tip_height = height as u32; + + if let Some(t) = target { + if new_tip_height > t.height { + new_tip_height = t.height; + } + } // If electrum returns a tip height that is lower than our previous tip, then checkpoints do // not need updating. We just return the previous tip and use that as the point of agreement. @@ -871,4 +906,48 @@ mod test { Ok(()) } + + #[cfg(feature = "default")] + #[test] + fn test_target_limit() -> anyhow::Result<()> { + use bdk_chain::BlockId; + + let env = TestEnv::new()?; + let client = electrum_client::Client::new(env.electrsd.electrum_url.as_str()).unwrap(); + let electrum_client = BdkElectrumClient::new(client); + + let addr = env.rpc_client().get_new_address(None, None)?.address()?.assume_checked(); + let spk = addr.script_pubkey(); + + // Send funds to address. + let txid = env.send(&addr, Amount::from_sat(50_000))?; + + let target_height: u32 = env.rpc_client().get_block_count()?.into_model().0 as u32; + let target_hash = electrum_client.inner.block_header(target_height as _)?.block_hash(); + + // Mine a block to confirm the transaction. + env.mine_blocks(1, None)?; + env.wait_until_electrum_sees_block(Duration::from_secs(6))?; + + let bogus_genesis = constants::genesis_block(Network::Testnet).block_hash(); + let cp = CheckPoint::new(0, bogus_genesis); + + let target = BlockId { + height: target_height, + hash: target_hash, + }; + + let req = SyncRequest::builder() + .chain_tip(cp) + .spks([spk]) + .target(target) + .build(); + + let res = electrum_client.sync(req, 10, false)?; + + assert!(res.tx_update.anchors.is_empty(), "anchors should be empty"); + assert!(res.tx_update.seen_ats.iter().any(|(t, _)| *t == txid), "tx should be in seen_ats"); + + Ok(()) + } } diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index 2b3828952..d8d245652 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -69,8 +69,9 @@ where let keychains = request.keychains(); let chain_tip = request.chain_tip(); + let target = request.target(); let latest_blocks = if chain_tip.is_some() { - Some(fetch_latest_blocks(self).await?) + Some(fetch_latest_blocks(self, target).await?) } else { None }; @@ -90,6 +91,7 @@ where keychain_spks, stop_gap, last_revealed, + target, parallel_requests, ) .await?; @@ -122,8 +124,9 @@ where let start_time = request.start_time(); let chain_tip = request.chain_tip(); + let target = request.target(); let latest_blocks = if chain_tip.is_some() { - Some(fetch_latest_blocks(self).await?) + Some(fetch_latest_blocks(self, target).await?) } else { None }; @@ -136,6 +139,7 @@ where start_time, &mut inserted_txs, request.iter_spks_with_expected_txids(), + target, parallel_requests, ) .await?, @@ -146,6 +150,7 @@ where start_time, &mut inserted_txs, request.iter_txids(), + target, parallel_requests, ) .await?, @@ -156,6 +161,7 @@ where start_time, &mut inserted_txs, request.iter_outpoints(), + target, parallel_requests, ) .await?, @@ -184,13 +190,21 @@ where /// alternating between chain-sources. async fn fetch_latest_blocks( client: &esplora_client::AsyncClient, + target: Option, ) -> Result, Error> { - Ok(client + let mut latest_blocks = client .get_block_infos(None) .await? .into_iter() + .filter(|b| target.map_or(true, |t| b.height <= t.height)) .map(|b| (b.height, b.id)) - .collect()) + .collect::>(); + + if let Some(t) = target { + latest_blocks.entry(t.height).or_insert(t.hash); + } + + Ok(latest_blocks) } /// Used instead of [`esplora_client::BlockingClient::get_block_hash`]. @@ -308,6 +322,7 @@ async fn fetch_txs_with_keychain_spks( mut keychain_spks: I, stop_gap: usize, last_revealed: Option, + target: Option, parallel_requests: usize, ) -> Result<(TxUpdate, Option), Error> where @@ -369,7 +384,13 @@ where if inserted_txs.insert(tx.txid) { update.txs.push(tx.to_tx().into()); } - insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status); + insert_anchor_or_seen_at_from_status( + &mut update, + start_time, + tx.txid, + tx.status, + target, + ); insert_prevouts(&mut update, tx.vin); } update @@ -398,6 +419,7 @@ async fn fetch_txs_with_spks( start_time: u64, inserted_txs: &mut HashSet, spks: I, + target: Option, parallel_requests: usize, ) -> Result, Error> where @@ -412,6 +434,7 @@ where spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), usize::MAX, None, + target, parallel_requests, ) .await @@ -429,6 +452,7 @@ async fn fetch_txs_with_txids( start_time: u64, inserted_txs: &mut HashSet, txids: I, + target: Option, parallel_requests: usize, ) -> Result, Error> where @@ -462,7 +486,13 @@ where if inserted_txs.insert(txid) { update.txs.push(tx_info.to_tx().into()); } - insert_anchor_or_seen_at_from_status(&mut update, start_time, txid, tx_info.status); + insert_anchor_or_seen_at_from_status( + &mut update, + start_time, + txid, + tx_info.status, + target, + ); insert_prevouts(&mut update, tx_info.vin); } } @@ -481,6 +511,7 @@ async fn fetch_txs_with_outpoints( start_time: u64, inserted_txs: &mut HashSet, outpoints: I, + target: Option, parallel_requests: usize, ) -> Result, Error> where @@ -499,6 +530,7 @@ where start_time, inserted_txs, outpoints.iter().copied().map(|op| op.txid), + target, parallel_requests, ) .await?, @@ -535,6 +567,7 @@ where start_time, spend_txid, spend_status, + target, ); } } @@ -546,6 +579,7 @@ where start_time, inserted_txs, missing_txs, + target, parallel_requests, ) .await?, diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 225b574ea..e299c10c7 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -59,8 +59,9 @@ impl EsploraExt for esplora_client::BlockingClient { let start_time = request.start_time(); let chain_tip = request.chain_tip(); + let target = request.target(); let latest_blocks = if chain_tip.is_some() { - Some(fetch_latest_blocks(self)?) + Some(fetch_latest_blocks(self, target)?) } else { None }; @@ -80,6 +81,7 @@ impl EsploraExt for esplora_client::BlockingClient { keychain_spks, stop_gap, last_revealed, + target, parallel_requests, )?; tx_update.extend(update); @@ -114,8 +116,9 @@ impl EsploraExt for esplora_client::BlockingClient { let start_time = request.start_time(); let chain_tip = request.chain_tip(); + let target = request.target(); let latest_blocks = if chain_tip.is_some() { - Some(fetch_latest_blocks(self)?) + Some(fetch_latest_blocks(self, target)?) } else { None }; @@ -127,6 +130,7 @@ impl EsploraExt for esplora_client::BlockingClient { start_time, &mut inserted_txs, request.iter_spks_with_expected_txids(), + target, parallel_requests, )?); tx_update.extend(fetch_txs_with_txids( @@ -134,6 +138,7 @@ impl EsploraExt for esplora_client::BlockingClient { start_time, &mut inserted_txs, request.iter_txids(), + target, parallel_requests, )?); tx_update.extend(fetch_txs_with_outpoints( @@ -141,6 +146,7 @@ impl EsploraExt for esplora_client::BlockingClient { start_time, &mut inserted_txs, request.iter_outpoints(), + target, parallel_requests, )?); @@ -170,12 +176,20 @@ impl EsploraExt for esplora_client::BlockingClient { /// alternating between chain-sources. fn fetch_latest_blocks( client: &esplora_client::BlockingClient, + target: Option, ) -> Result, Error> { - Ok(client + let mut latest_blocks = client .get_block_infos(None)? .into_iter() + .filter(|b| target.map_or(true, |t| b.height <= t.height)) .map(|b| (b.height, b.id)) - .collect()) + .collect::>(); + + if let Some(t) = target { + latest_blocks.entry(t.height).or_insert(t.hash); + } + + Ok(latest_blocks) } /// Used instead of [`esplora_client::BlockingClient::get_block_hash`]. @@ -280,6 +294,7 @@ fn fetch_txs_with_keychain_spks mut keychain_spks: I, stop_gap: usize, last_revealed: Option, + target: Option, parallel_requests: usize, ) -> Result<(TxUpdate, Option), Error> { type TxsOfSpkIndex = (u32, Vec, HashSet); @@ -338,7 +353,13 @@ fn fetch_txs_with_keychain_spks if inserted_txs.insert(tx.txid) { update.txs.push(tx.to_tx().into()); } - insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status); + insert_anchor_or_seen_at_from_status( + &mut update, + start_time, + tx.txid, + tx.status, + target, + ); insert_prevouts(&mut update, tx.vin); } update @@ -367,6 +388,7 @@ fn fetch_txs_with_spks>( start_time: u64, inserted_txs: &mut HashSet, spks: I, + target: Option, parallel_requests: usize, ) -> Result, Error> { fetch_txs_with_keychain_spks( @@ -376,6 +398,7 @@ fn fetch_txs_with_spks>( spks.into_iter().enumerate().map(|(i, spk)| (i as u32, spk)), usize::MAX, None, + target, parallel_requests, ) .map(|(update, _)| update) @@ -392,6 +415,7 @@ fn fetch_txs_with_txids>( start_time: u64, inserted_txs: &mut HashSet, txids: I, + target: Option, parallel_requests: usize, ) -> Result, Error> { let mut update = TxUpdate::::default(); @@ -426,7 +450,13 @@ fn fetch_txs_with_txids>( if inserted_txs.insert(txid) { update.txs.push(tx_info.to_tx().into()); } - insert_anchor_or_seen_at_from_status(&mut update, start_time, txid, tx_info.status); + insert_anchor_or_seen_at_from_status( + &mut update, + start_time, + txid, + tx_info.status, + target, + ); insert_prevouts(&mut update, tx_info.vin); } } @@ -445,6 +475,7 @@ fn fetch_txs_with_outpoints>( start_time: u64, inserted_txs: &mut HashSet, outpoints: I, + target: Option, parallel_requests: usize, ) -> Result, Error> { let outpoints = outpoints.into_iter().collect::>(); @@ -457,6 +488,7 @@ fn fetch_txs_with_outpoints>( start_time, inserted_txs, outpoints.iter().map(|op| op.txid), + target, parallel_requests, )?); @@ -496,6 +528,7 @@ fn fetch_txs_with_outpoints>( start_time, spend_txid, spend_status, + target, ); } } @@ -507,6 +540,7 @@ fn fetch_txs_with_outpoints>( start_time, inserted_txs, missing_txs, + target, parallel_requests, )?); Ok(update) diff --git a/crates/esplora/src/lib.rs b/crates/esplora/src/lib.rs index 60b4f1eb3..220597462 100644 --- a/crates/esplora/src/lib.rs +++ b/crates/esplora/src/lib.rs @@ -43,6 +43,7 @@ fn insert_anchor_or_seen_at_from_status( start_time: u64, txid: Txid, status: TxStatus, + target: Option, ) { if let TxStatus { block_height: Some(height), @@ -51,14 +52,16 @@ fn insert_anchor_or_seen_at_from_status( .. } = status { - let anchor = ConfirmationBlockTime { - block_id: BlockId { height, hash }, - confirmation_time: time, - }; - update.anchors.insert((anchor, txid)); - } else { - update.seen_ats.insert((txid, start_time)); + if target.map_or(true, |t| height <= t.height) { + let anchor = ConfirmationBlockTime { + block_id: BlockId { height, hash }, + confirmation_time: time, + }; + update.anchors.insert((anchor, txid)); + return; + } } + update.seen_ats.insert((txid, start_time)); } /// Inserts floating txouts into `tx_graph` using [`Vin`](esplora_client::api::Vin)s returned by