From f979bf3183ffcff9b8259df1b60d8b442b692a15 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Mon, 10 Nov 2025 22:24:29 +0000 Subject: [PATCH 1/6] fix: merge volatile totals for deltas of the same tx to avoid overcounting txs Signed-off-by: William Hankins --- common/src/types.rs | 58 ++++++++++++++++++++++ modules/address_state/src/address_state.rs | 4 +- modules/address_state/src/state.rs | 42 ++++++++++++---- 3 files changed, 90 insertions(+), 14 deletions(-) diff --git a/common/src/types.rs b/common/src/types.rs index ea76d8b7..3ae224c9 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -254,6 +254,7 @@ pub type PolicyId = [u8; 28]; pub type NativeAssets = Vec<(PolicyId, Vec)>; pub type NativeAssetsDelta = Vec<(PolicyId, Vec)>; pub type NativeAssetsMap = HashMap>; +pub type NativeAssetsDeltaMap = HashMap>; #[derive( Debug, @@ -401,6 +402,63 @@ impl AddAssign for ValueMap { } } +/// Hashmap representation of ValueDelta (lovelace + multiasset) +pub struct ValueDeltaMap { + pub lovelace: i64, + pub assets: NativeAssetsDeltaMap, +} + +impl From for ValueDeltaMap { + fn from(value: ValueDelta) -> Self { + let mut assets = HashMap::new(); + + for (policy, asset_list) in value.assets { + let policy_entry = assets.entry(policy).or_insert_with(HashMap::new); + for asset in asset_list { + *policy_entry.entry(asset.name).or_insert(0) += asset.amount; + } + } + + ValueDeltaMap { + lovelace: value.lovelace, + assets, + } + } +} + +impl AddAssign for ValueDeltaMap { + fn add_assign(&mut self, delta: ValueDelta) { + self.lovelace += delta.lovelace; + + for (policy, assets) in delta.assets { + let policy_entry = self.assets.entry(policy).or_default(); + for asset in assets { + *policy_entry.entry(asset.name).or_insert(0) += asset.amount; + } + } + } +} + +impl From for ValueDelta { + fn from(map: ValueDeltaMap) -> Self { + let mut assets_vec = Vec::with_capacity(map.assets.len()); + + for (policy, asset_map) in map.assets { + let inner_assets = asset_map + .into_iter() + .map(|(name, amount)| NativeAssetDelta { name, amount }) + .collect(); + + assets_vec.push((policy, inner_assets)); + } + + ValueDelta { + lovelace: map.lovelace, + assets: assets_vec, + } + } +} + #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] pub struct ValueDelta { pub lovelace: i64, diff --git a/modules/address_state/src/address_state.rs b/modules/address_state/src/address_state.rs index 5426ac7d..ef07fed0 100644 --- a/modules/address_state/src/address_state.rs +++ b/modules/address_state/src/address_state.rs @@ -117,9 +117,7 @@ impl AddressState { } // Add deltas to volatile - if let Err(e) = state.apply_address_deltas(&address_deltas_msg.deltas) { - error!("address deltas handling error: {e:#}"); - } + state.apply_address_deltas(&address_deltas_msg.deltas); store = state.immutable.clone(); config = state.config.clone(); diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs index 7f92d72c..4e2b9022 100644 --- a/modules/address_state/src/state.rs +++ b/modules/address_state/src/state.rs @@ -6,7 +6,7 @@ use std::{ use acropolis_common::{ Address, AddressDelta, AddressTotals, BlockInfo, ShelleyAddress, TxIdentifier, UTxOIdentifier, - ValueDelta, + ValueDelta, ValueDeltaMap, }; use anyhow::Result; @@ -172,12 +172,19 @@ impl State { && block_info.number > self.volatile.epoch_start_block + self.volatile.security_param_k } - pub fn apply_address_deltas(&mut self, deltas: &[AddressDelta]) -> Result<()> { + pub fn apply_address_deltas(&mut self, deltas: &[AddressDelta]) { let addresses = self.volatile.window.back_mut().expect("window should never be empty"); + // Keeps track of previous tx to avoid duplicating tx hashes or overcounting totals tx count + let mut last_block: Option = None; + let mut last_tx_index: Option = None; + for delta in deltas { let entry = addresses.entry(delta.address.clone()).or_default(); + let same_tx = last_block == Some(delta.utxo.block_number()) + && last_tx_index == Some(delta.utxo.tx_index()); + if self.config.store_info { let utxos = entry.utxos.get_or_insert(Vec::new()); if delta.value.lovelace > 0 { @@ -189,16 +196,29 @@ impl State { if self.config.store_transactions { let txs = entry.transactions.get_or_insert(Vec::new()); - txs.push(TxIdentifier::from(delta.utxo)) + + if !same_tx { + txs.push(TxIdentifier::from(delta.utxo)); + } } if self.config.store_totals { let totals = entry.totals.get_or_insert(Vec::new()); - totals.push(delta.value.clone()); + + if same_tx { + if let Some(last_total) = totals.last_mut() { + // Create temporary map for summing same tx deltas efficently + let mut map = ValueDeltaMap::from(last_total.clone()); + map += delta.value.clone(); + *last_total = ValueDelta::from(map); + } + } else { + totals.push(delta.value.clone()); + } } + last_block = Some(delta.utxo.block_number()); + last_tx_index = Some(delta.utxo.tx_index()); } - - Ok(()) } pub async fn get_addresses_totals( @@ -277,7 +297,7 @@ mod tests { let deltas = vec![delta(&addr, &utxo, 1)]; // Apply deltas - state.apply_address_deltas(&deltas)?; + state.apply_address_deltas(&deltas); // Verify UTxO is retrievable when in volatile let utxos = state.get_address_utxos(&addr).await?; @@ -319,7 +339,7 @@ mod tests { let created = vec![delta(&addr, &utxo, 1)]; // Apply delta to volatile - state.apply_address_deltas(&created)?; + state.apply_address_deltas(&created); // Drain volatile to immutable pending state.volatile.epoch_start_block = 1; @@ -333,7 +353,7 @@ mod tests { assert_eq!(after_persist.as_ref().unwrap(), &[utxo]); state.volatile.next_block(); - state.apply_address_deltas(&[delta(&addr, &utxo, -1)])?; + state.apply_address_deltas(&[delta(&addr, &utxo, -1)]); // Verify UTxO was removed while in volatile let after_spend_volatile = state.get_address_utxos(&addr).await?; @@ -368,9 +388,9 @@ mod tests { state.volatile.epoch_start_block = 1; - state.apply_address_deltas(&[delta(&addr, &utxo_old, 1)])?; + state.apply_address_deltas(&[delta(&addr, &utxo_old, 1)]); state.volatile.next_block(); - state.apply_address_deltas(&[delta(&addr, &utxo_old, -1), delta(&addr, &utxo_new, 1)])?; + state.apply_address_deltas(&[delta(&addr, &utxo_old, -1), delta(&addr, &utxo_new, 1)]); // Verify Create and spend both in volatile is not included in address utxos let volatile = state.get_address_utxos(&addr).await?; From 2f254c045b4fc35ed2d9d785f4f0630d6b86885a Mon Sep 17 00:00:00 2001 From: William Hankins Date: Mon, 10 Nov 2025 22:30:37 +0000 Subject: [PATCH 2/6] feat: account totals endpoint handler Signed-off-by: William Hankins --- .../rest_blockfrost/src/handlers/accounts.rs | 79 +++++++++++++++++-- modules/rest_blockfrost/src/types.rs | 2 +- 2 files changed, 72 insertions(+), 9 deletions(-) diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index e2b7415f..a739759b 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use crate::handlers_config::HandlersConfig; use crate::types::{ - AccountAddressREST, AccountRewardREST, AccountUTxOREST, AccountWithdrawalREST, AmountList, - DelegationUpdateREST, RegistrationUpdateREST, + AccountAddressREST, AccountRewardREST, AccountTotalsREST, AccountUTxOREST, + AccountWithdrawalREST, AmountList, DelegationUpdateREST, RegistrationUpdateREST, }; use acropolis_common::messages::{Message, RESTResponse, StateQuery, StateQueryResponse}; use acropolis_common::queries::accounts::{AccountsStateQuery, AccountsStateQueryResponse}; @@ -563,7 +563,7 @@ pub async fn handle_account_assets_blockfrost( .await?; let Some(addresses) = addresses else { - return Ok(RESTResponse::with_text(404, "Account not found")); + return Err(RESTError::not_found("Account not found")); }; // Get utxos from address state @@ -621,11 +621,74 @@ pub async fn handle_account_assets_blockfrost( /// Handle `/accounts/{stake_address}/addresses/total` Blockfrost-compatible endpoint pub async fn handle_account_totals_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Err(RESTError::not_implemented("Account totals not implemented")) + let account = parse_stake_address(¶ms)?; + + // Get addresses from historical accounts state + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetAccountAssociatedAddresses { + account: account.clone(), + }, + ))); + let addresses = query_state( + &context, + &handlers_config.historical_accounts_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::AccountAssociatedAddresses(addresses), + )) => Ok(Some(addresses)), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(QueryError::NotFound { .. }), + )) => Ok(None), + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving account addresses", + )), + }, + ) + .await?; + + let Some(addresses) = addresses else { + return Err(RESTError::not_found("Account not found")); + }; + + // Get totals from address state + let msg = Arc::new(Message::StateQuery(StateQuery::Addresses( + AddressStateQuery::GetAddressesTotals { addresses }, + ))); + let totals = query_state( + &context, + &handlers_config.addresses_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::AddressesTotals(utxos), + )) => Ok(utxos), + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving account totals", + )), + }, + ) + .await?; + + let rest_response = AccountTotalsREST { + stake_address: account.to_string()?, + received_sum: totals.received.into(), + sent_sum: totals.sent.into(), + tx_count: totals.tx_count, + }; + + let json = serde_json::to_string_pretty(&rest_response)?; + Ok(RESTResponse::with_json(200, &json)) } /// Handle `/accounts/{stake_address}/utxos` Blockfrost-compatible endpoint @@ -659,7 +722,7 @@ pub async fn handle_account_utxos_blockfrost( .await?; let Some(addresses) = addresses else { - return Ok(RESTResponse::with_text(404, "Account not found")); + return Err(RESTError::not_found("Account not found")); }; // Get utxos from address state diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 4c699fcf..62bc21c6 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -939,7 +939,7 @@ pub struct AccountUTxOREST { } #[derive(serde::Serialize)] -pub struct _AccountTotalsREST { +pub struct AccountTotalsREST { pub stake_address: String, pub received_sum: AmountList, pub sent_sum: AmountList, From 62b90dd7a819b67a160ce61ae8a8ae2eecbaec36 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Mon, 10 Nov 2025 22:52:41 +0000 Subject: [PATCH 3/6] feat: address totals endpoint handler Signed-off-by: William Hankins --- .../rest_blockfrost/src/handlers/accounts.rs | 4 +- .../rest_blockfrost/src/handlers/addresses.rs | 85 ++++++++++++------- modules/rest_blockfrost/src/types.rs | 8 ++ 3 files changed, 66 insertions(+), 31 deletions(-) diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index a739759b..f67dea66 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -668,8 +668,8 @@ pub async fn handle_account_totals_blockfrost( msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Addresses( - AddressStateQueryResponse::AddressesTotals(utxos), - )) => Ok(utxos), + AddressStateQueryResponse::AddressesTotals(totals), + )) => Ok(totals), Message::StateQueryResponse(StateQueryResponse::Addresses( AddressStateQueryResponse::Error(e), )) => Err(e), diff --git a/modules/rest_blockfrost/src/handlers/addresses.rs b/modules/rest_blockfrost/src/handlers/addresses.rs index 490f6ef0..5848380b 100644 --- a/modules/rest_blockfrost/src/handlers/addresses.rs +++ b/modules/rest_blockfrost/src/handlers/addresses.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use crate::types::AddressTotalsREST; use crate::{handlers_config::HandlersConfig, types::AddressInfoREST}; use acropolis_common::queries::errors::QueryError; use acropolis_common::rest_error::RESTError; @@ -20,35 +21,19 @@ pub async fn handle_address_single_blockfrost( params: Vec, handlers_config: Arc, ) -> Result { - let [address_str] = ¶ms[..] else { - return Err(RESTError::param_missing("address")); - }; - - let (address, stake_address) = match Address::from_string(address_str) { - Ok(Address::None) | Ok(Address::Stake(_)) => { - return Err(RESTError::invalid_param( - "address", - "must be a payment address", - )); - } - Ok(Address::Byron(byron)) => (Address::Byron(byron), None), - Ok(Address::Shelley(shelley)) => { - let stake_addr = shelley - .stake_address_string() - .map_err(|e| RESTError::invalid_param("address", &e.to_string()))?; - - (Address::Shelley(shelley), stake_addr) - } - Err(e) => { - return Err(RESTError::invalid_param("address", &e.to_string())); - } + let address = parse_address(¶ms)?; + let stake_address = match address { + Address::Shelley(ref addr) => addr.stake_address_string()?, + _ => None, }; let address_type = address.kind().to_string(); let is_script = address.is_script(); let address_query_msg = Arc::new(Message::StateQuery(StateQuery::Addresses( - AddressStateQuery::GetAddressUTxOs { address }, + AddressStateQuery::GetAddressUTxOs { + address: address.clone(), + }, ))); let utxo_identifiers = query_state( @@ -77,7 +62,7 @@ pub async fn handle_address_single_blockfrost( None => { // Empty address - return zero balance (Blockfrost behavior) let rest_response = AddressInfoREST { - address: address_str.to_string(), + address: address.to_string()?, amount: Value { lovelace: 0, assets: Vec::new(), @@ -116,7 +101,7 @@ pub async fn handle_address_single_blockfrost( .await?; let rest_response = AddressInfoREST { - address: address_str.to_string(), + address: address.to_string()?, amount: address_balance.into(), stake_address, address_type, @@ -138,11 +123,45 @@ pub async fn handle_address_extended_blockfrost( /// Handle `/addresses/{address}/totals` Blockfrost-compatible endpoint pub async fn handle_address_totals_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Err(RESTError::not_implemented("Address totals endpoint")) + let address = parse_address(¶ms)?; + + // Get totals from address state + let msg = Arc::new(Message::StateQuery(StateQuery::Addresses( + AddressStateQuery::GetAddressTotals { + address: address.clone(), + }, + ))); + let totals = query_state( + &context, + &handlers_config.addresses_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::AddressTotals(totals), + )) => Ok(totals), + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving address totals", + )), + }, + ) + .await?; + + let rest_response = AddressTotalsREST { + address: address.to_string()?, + received_sum: totals.received.into(), + sent_sum: totals.sent.into(), + tx_count: totals.tx_count, + }; + + let json = serde_json::to_string_pretty(&rest_response)?; + Ok(RESTResponse::with_json(200, &json)) } /// Handle `/addresses/{address}/utxos` Blockfrost-compatible endpoint @@ -171,3 +190,11 @@ pub async fn handle_address_transactions_blockfrost( ) -> Result { Err(RESTError::not_implemented("Address transactions endpoint")) } + +fn parse_address(params: &[String]) -> Result { + let Some(address_str) = params.first() else { + return Err(RESTError::param_missing("address")); + }; + + Ok(Address::from_string(address_str)?) +} diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 62bc21c6..8fc576fd 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -945,3 +945,11 @@ pub struct AccountTotalsREST { pub sent_sum: AmountList, pub tx_count: u64, } + +#[derive(serde::Serialize)] +pub struct AddressTotalsREST { + pub address: String, + pub received_sum: AmountList, + pub sent_sum: AmountList, + pub tx_count: u64, +} From f035970f83318b4a410246210d02ed9deccbde39 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Mon, 10 Nov 2025 23:12:57 +0000 Subject: [PATCH 4/6] test: add tests for summing same tx totals and preventing duplicate tx entries Signed-off-by: William Hankins --- modules/address_state/src/state.rs | 63 ++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs index 4e2b9022..e924a6df 100644 --- a/modules/address_state/src/state.rs +++ b/modules/address_state/src/state.rs @@ -420,4 +420,67 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_same_tx_deltas_sums_totals_in_volatile() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + + let mut state = setup_state_and_store().await?; + + let addr = dummy_address(); + let delta_1 = UTxOIdentifier::new(0, 1, 0); + let delta_2 = UTxOIdentifier::new(0, 1, 1); + + state.volatile.epoch_start_block = 1; + + state.apply_address_deltas(&[delta(&addr, &delta_1, 1), delta(&addr, &delta_2, 1)]); + + // Verify only 1 totals entry with delta of 2 + let volatile = state + .volatile + .window + .back() + .expect("Window should have a delta") + .get(&addr) + .expect("Entry should be populated") + .totals + .as_ref() + .expect("Totals should be populated"); + + assert_eq!(volatile.len(), 1); + assert_eq!(volatile.first().expect("Should be populated").lovelace, 2); + + Ok(()) + } + + #[tokio::test] + async fn test_same_tx_deltas_prevents_duplicate_identifier_in_volatile() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + + let mut state = setup_state_and_store().await?; + + let addr = dummy_address(); + let delta_1 = UTxOIdentifier::new(0, 1, 0); + let delta_2 = UTxOIdentifier::new(0, 1, 1); + + state.volatile.epoch_start_block = 1; + + state.apply_address_deltas(&[delta(&addr, &delta_1, 1), delta(&addr, &delta_2, 1)]); + + // Verify only 1 transactions entry + let volatile = state + .volatile + .window + .back() + .expect("Window should have a delta") + .get(&addr) + .expect("Entry should be populated") + .transactions + .as_ref() + .expect("Transactions should be populated"); + + assert_eq!(volatile.len(), 1); + + Ok(()) + } } From f78f709b2238f67c73ea28aea81181029b879fe9 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Tue, 11 Nov 2025 22:40:51 +0000 Subject: [PATCH 5/6] add temporary HashMap for deduplicating transactions in address state Signed-off-by: William Hankins --- modules/address_state/src/state.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs index e924a6df..3a6b9bbf 100644 --- a/modules/address_state/src/state.rs +++ b/modules/address_state/src/state.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, path::{Path, PathBuf}, sync::Arc, }; @@ -176,15 +176,12 @@ impl State { let addresses = self.volatile.window.back_mut().expect("window should never be empty"); // Keeps track of previous tx to avoid duplicating tx hashes or overcounting totals tx count - let mut last_block: Option = None; - let mut last_tx_index: Option = None; + let mut seen: HashMap> = HashMap::new(); for delta in deltas { + let tx_id = TxIdentifier::from(delta.utxo); let entry = addresses.entry(delta.address.clone()).or_default(); - let same_tx = last_block == Some(delta.utxo.block_number()) - && last_tx_index == Some(delta.utxo.tx_index()); - if self.config.store_info { let utxos = entry.utxos.get_or_insert(Vec::new()); if delta.value.lovelace > 0 { @@ -196,16 +193,19 @@ impl State { if self.config.store_transactions { let txs = entry.transactions.get_or_insert(Vec::new()); + let seen_for_addr = seen.entry(delta.address.clone()).or_default(); - if !same_tx { - txs.push(TxIdentifier::from(delta.utxo)); + if !seen_for_addr.contains(&tx_id) { + seen_for_addr.insert(tx_id); + txs.push(tx_id); } } if self.config.store_totals { let totals = entry.totals.get_or_insert(Vec::new()); + let seen_for_addr = seen.entry(delta.address.clone()).or_default(); - if same_tx { + if seen_for_addr.contains(&tx_id) { if let Some(last_total) = totals.last_mut() { // Create temporary map for summing same tx deltas efficently let mut map = ValueDeltaMap::from(last_total.clone()); @@ -216,8 +216,6 @@ impl State { totals.push(delta.value.clone()); } } - last_block = Some(delta.utxo.block_number()); - last_tx_index = Some(delta.utxo.tx_index()); } } From 1b3cbb15a529b163503060b9ed539fd94bbe5b27 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Tue, 11 Nov 2025 23:13:30 +0000 Subject: [PATCH 6/6] fix: defer marking tx as seen until after totals and txs are both processed Signed-off-by: William Hankins --- modules/address_state/src/state.rs | 41 ++++++++++--------- .../rest_blockfrost/src/handlers/accounts.rs | 4 ++ 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs index 3a6b9bbf..dc84d068 100644 --- a/modules/address_state/src/state.rs +++ b/modules/address_state/src/state.rs @@ -175,7 +175,7 @@ impl State { pub fn apply_address_deltas(&mut self, deltas: &[AddressDelta]) { let addresses = self.volatile.window.back_mut().expect("window should never be empty"); - // Keeps track of previous tx to avoid duplicating tx hashes or overcounting totals tx count + // Keeps track seen txs to avoid overcounting totals tx count and duplicating tx identifiers let mut seen: HashMap> = HashMap::new(); for delta in deltas { @@ -191,30 +191,31 @@ impl State { } } - if self.config.store_transactions { - let txs = entry.transactions.get_or_insert(Vec::new()); + if self.config.store_transactions || self.config.store_totals { let seen_for_addr = seen.entry(delta.address.clone()).or_default(); - if !seen_for_addr.contains(&tx_id) { - seen_for_addr.insert(tx_id); - txs.push(tx_id); + if self.config.store_transactions { + let txs = entry.transactions.get_or_insert(Vec::new()); + if !seen_for_addr.contains(&tx_id) { + txs.push(tx_id); + } } - } - - if self.config.store_totals { - let totals = entry.totals.get_or_insert(Vec::new()); - let seen_for_addr = seen.entry(delta.address.clone()).or_default(); - - if seen_for_addr.contains(&tx_id) { - if let Some(last_total) = totals.last_mut() { - // Create temporary map for summing same tx deltas efficently - let mut map = ValueDeltaMap::from(last_total.clone()); - map += delta.value.clone(); - *last_total = ValueDelta::from(map); + if self.config.store_totals { + let totals = entry.totals.get_or_insert(Vec::new()); + + if seen_for_addr.contains(&tx_id) { + if let Some(last_total) = totals.last_mut() { + // Create temporary map for summing same tx deltas efficiently + // TODO: Potentially move upstream to address deltas publisher + let mut map = ValueDeltaMap::from(last_total.clone()); + map += delta.value.clone(); + *last_total = ValueDelta::from(map); + } + } else { + totals.push(delta.value.clone()); } - } else { - totals.push(delta.value.clone()); } + seen_for_addr.insert(tx_id); } } } diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index f67dea66..5ac14472 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -680,6 +680,10 @@ pub async fn handle_account_totals_blockfrost( ) .await?; + // TODO: Query historical accounts state to retrieve account tx count instead of + // using the addresses totals as the addresses totals does not deduplicate + // for multi-address transactions, overstating count + let rest_response = AccountTotalsREST { stake_address: account.to_string()?, received_sum: totals.received.into(),