Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ pub type PolicyId = [u8; 28];
pub type NativeAssets = Vec<(PolicyId, Vec<NativeAsset>)>;
pub type NativeAssetsDelta = Vec<(PolicyId, Vec<NativeAssetDelta>)>;
pub type NativeAssetsMap = HashMap<PolicyId, HashMap<AssetName, u64>>;
pub type NativeAssetsDeltaMap = HashMap<PolicyId, HashMap<AssetName, i64>>;

#[derive(
Debug,
Expand Down Expand Up @@ -401,6 +402,63 @@ impl AddAssign for ValueMap {
}
}

/// Hashmap representation of ValueDelta (lovelace + multiasset)
pub struct ValueDeltaMap {
pub lovelace: i64,
pub assets: NativeAssetsDeltaMap,
}

impl From<ValueDelta> for ValueDeltaMap {
fn from(value: ValueDelta) -> Self {
let mut assets = HashMap::new();

for (policy, asset_list) in value.assets {
let policy_entry = assets.entry(policy).or_insert_with(HashMap::new);
for asset in asset_list {
*policy_entry.entry(asset.name).or_insert(0) += asset.amount;
}
}

ValueDeltaMap {
lovelace: value.lovelace,
assets,
}
}
}

impl AddAssign<ValueDelta> for ValueDeltaMap {
fn add_assign(&mut self, delta: ValueDelta) {
self.lovelace += delta.lovelace;

for (policy, assets) in delta.assets {
let policy_entry = self.assets.entry(policy).or_default();
for asset in assets {
*policy_entry.entry(asset.name).or_insert(0) += asset.amount;
}
}
}
}

impl From<ValueDeltaMap> for ValueDelta {
fn from(map: ValueDeltaMap) -> Self {
let mut assets_vec = Vec::with_capacity(map.assets.len());

for (policy, asset_map) in map.assets {
let inner_assets = asset_map
.into_iter()
.map(|(name, amount)| NativeAssetDelta { name, amount })
.collect();

assets_vec.push((policy, inner_assets));
}

ValueDelta {
lovelace: map.lovelace,
assets: assets_vec,
}
}
}

#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
pub struct ValueDelta {
pub lovelace: i64,
Expand Down
4 changes: 1 addition & 3 deletions modules/address_state/src/address_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ impl AddressState {
}

// Add deltas to volatile
if let Err(e) = state.apply_address_deltas(&address_deltas_msg.deltas) {
error!("address deltas handling error: {e:#}");
}
state.apply_address_deltas(&address_deltas_msg.deltas);

store = state.immutable.clone();
config = state.config.clone();
Expand Down
116 changes: 99 additions & 17 deletions modules/address_state/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::{
collections::HashSet,
collections::{HashMap, HashSet},
path::{Path, PathBuf},
sync::Arc,
};

use acropolis_common::{
Address, AddressDelta, AddressTotals, BlockInfo, ShelleyAddress, TxIdentifier, UTxOIdentifier,
ValueDelta,
ValueDelta, ValueDeltaMap,
};
use anyhow::Result;

Expand Down Expand Up @@ -172,10 +172,14 @@ impl State {
&& block_info.number > self.volatile.epoch_start_block + self.volatile.security_param_k
}

pub fn apply_address_deltas(&mut self, deltas: &[AddressDelta]) -> Result<()> {
pub fn apply_address_deltas(&mut self, deltas: &[AddressDelta]) {
let addresses = self.volatile.window.back_mut().expect("window should never be empty");

// Keeps track seen txs to avoid overcounting totals tx count and duplicating tx identifiers
let mut seen: HashMap<Address, HashSet<TxIdentifier>> = HashMap::new();

for delta in deltas {
let tx_id = TxIdentifier::from(delta.utxo);
let entry = addresses.entry(delta.address.clone()).or_default();

if self.config.store_info {
Expand All @@ -187,18 +191,33 @@ impl State {
}
}

if self.config.store_transactions {
let txs = entry.transactions.get_or_insert(Vec::new());
txs.push(TxIdentifier::from(delta.utxo))
}
if self.config.store_transactions || self.config.store_totals {
let seen_for_addr = seen.entry(delta.address.clone()).or_default();

if self.config.store_totals {
let totals = entry.totals.get_or_insert(Vec::new());
totals.push(delta.value.clone());
if self.config.store_transactions {
let txs = entry.transactions.get_or_insert(Vec::new());
if !seen_for_addr.contains(&tx_id) {
txs.push(tx_id);
}
}
if self.config.store_totals {
let totals = entry.totals.get_or_insert(Vec::new());

if seen_for_addr.contains(&tx_id) {
if let Some(last_total) = totals.last_mut() {
// Create temporary map for summing same tx deltas efficiently
// TODO: Potentially move upstream to address deltas publisher
let mut map = ValueDeltaMap::from(last_total.clone());
map += delta.value.clone();
*last_total = ValueDelta::from(map);
}
} else {
totals.push(delta.value.clone());
}
}
seen_for_addr.insert(tx_id);
}
}

Ok(())
}

pub async fn get_addresses_totals(
Expand Down Expand Up @@ -277,7 +296,7 @@ mod tests {
let deltas = vec![delta(&addr, &utxo, 1)];

// Apply deltas
state.apply_address_deltas(&deltas)?;
state.apply_address_deltas(&deltas);

// Verify UTxO is retrievable when in volatile
let utxos = state.get_address_utxos(&addr).await?;
Expand Down Expand Up @@ -319,7 +338,7 @@ mod tests {
let created = vec![delta(&addr, &utxo, 1)];

// Apply delta to volatile
state.apply_address_deltas(&created)?;
state.apply_address_deltas(&created);

// Drain volatile to immutable pending
state.volatile.epoch_start_block = 1;
Expand All @@ -333,7 +352,7 @@ mod tests {
assert_eq!(after_persist.as_ref().unwrap(), &[utxo]);

state.volatile.next_block();
state.apply_address_deltas(&[delta(&addr, &utxo, -1)])?;
state.apply_address_deltas(&[delta(&addr, &utxo, -1)]);

// Verify UTxO was removed while in volatile
let after_spend_volatile = state.get_address_utxos(&addr).await?;
Expand Down Expand Up @@ -368,9 +387,9 @@ mod tests {

state.volatile.epoch_start_block = 1;

state.apply_address_deltas(&[delta(&addr, &utxo_old, 1)])?;
state.apply_address_deltas(&[delta(&addr, &utxo_old, 1)]);
state.volatile.next_block();
state.apply_address_deltas(&[delta(&addr, &utxo_old, -1), delta(&addr, &utxo_new, 1)])?;
state.apply_address_deltas(&[delta(&addr, &utxo_old, -1), delta(&addr, &utxo_new, 1)]);

// Verify Create and spend both in volatile is not included in address utxos
let volatile = state.get_address_utxos(&addr).await?;
Expand Down Expand Up @@ -400,4 +419,67 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_same_tx_deltas_sums_totals_in_volatile() -> Result<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice tests

let _ = tracing_subscriber::fmt::try_init();

let mut state = setup_state_and_store().await?;

let addr = dummy_address();
let delta_1 = UTxOIdentifier::new(0, 1, 0);
let delta_2 = UTxOIdentifier::new(0, 1, 1);

state.volatile.epoch_start_block = 1;

state.apply_address_deltas(&[delta(&addr, &delta_1, 1), delta(&addr, &delta_2, 1)]);

// Verify only 1 totals entry with delta of 2
let volatile = state
.volatile
.window
.back()
.expect("Window should have a delta")
.get(&addr)
.expect("Entry should be populated")
.totals
.as_ref()
.expect("Totals should be populated");

assert_eq!(volatile.len(), 1);
assert_eq!(volatile.first().expect("Should be populated").lovelace, 2);

Ok(())
}

#[tokio::test]
async fn test_same_tx_deltas_prevents_duplicate_identifier_in_volatile() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();

let mut state = setup_state_and_store().await?;

let addr = dummy_address();
let delta_1 = UTxOIdentifier::new(0, 1, 0);
let delta_2 = UTxOIdentifier::new(0, 1, 1);

state.volatile.epoch_start_block = 1;

state.apply_address_deltas(&[delta(&addr, &delta_1, 1), delta(&addr, &delta_2, 1)]);

// Verify only 1 transactions entry
let volatile = state
.volatile
.window
.back()
.expect("Window should have a delta")
.get(&addr)
.expect("Entry should be populated")
.transactions
.as_ref()
.expect("Transactions should be populated");

assert_eq!(volatile.len(), 1);

Ok(())
}
}
83 changes: 75 additions & 8 deletions modules/rest_blockfrost/src/handlers/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::sync::Arc;

use crate::handlers_config::HandlersConfig;
use crate::types::{
AccountAddressREST, AccountRewardREST, AccountUTxOREST, AccountWithdrawalREST, AmountList,
DelegationUpdateREST, RegistrationUpdateREST,
AccountAddressREST, AccountRewardREST, AccountTotalsREST, AccountUTxOREST,
AccountWithdrawalREST, AmountList, DelegationUpdateREST, RegistrationUpdateREST,
};
use acropolis_common::messages::{Message, RESTResponse, StateQuery, StateQueryResponse};
use acropolis_common::queries::accounts::{AccountsStateQuery, AccountsStateQueryResponse};
Expand Down Expand Up @@ -563,7 +563,7 @@ pub async fn handle_account_assets_blockfrost(
.await?;

let Some(addresses) = addresses else {
return Ok(RESTResponse::with_text(404, "Account not found"));
return Err(RESTError::not_found("Account not found"));
};

// Get utxos from address state
Expand Down Expand Up @@ -621,11 +621,78 @@ pub async fn handle_account_assets_blockfrost(

/// Handle `/accounts/{stake_address}/addresses/total` Blockfrost-compatible endpoint
pub async fn handle_account_totals_blockfrost(
_context: Arc<Context<Message>>,
_params: Vec<String>,
_handlers_config: Arc<HandlersConfig>,
context: Arc<Context<Message>>,
params: Vec<String>,
handlers_config: Arc<HandlersConfig>,
) -> Result<RESTResponse, RESTError> {
Err(RESTError::not_implemented("Account totals not implemented"))
let account = parse_stake_address(&params)?;

// Get addresses from historical accounts state
let msg = Arc::new(Message::StateQuery(StateQuery::Accounts(
AccountsStateQuery::GetAccountAssociatedAddresses {
account: account.clone(),
},
)));
let addresses = query_state(
&context,
&handlers_config.historical_accounts_query_topic,
msg,
|message| match message {
Message::StateQueryResponse(StateQueryResponse::Accounts(
AccountsStateQueryResponse::AccountAssociatedAddresses(addresses),
)) => Ok(Some(addresses)),
Message::StateQueryResponse(StateQueryResponse::Accounts(
AccountsStateQueryResponse::Error(QueryError::NotFound { .. }),
)) => Ok(None),
Message::StateQueryResponse(StateQueryResponse::Addresses(
AddressStateQueryResponse::Error(e),
)) => Err(e),
_ => Err(QueryError::internal_error(
"Unexpected message type while retrieving account addresses",
)),
},
)
.await?;

let Some(addresses) = addresses else {
return Err(RESTError::not_found("Account not found"));
};

// Get totals from address state
let msg = Arc::new(Message::StateQuery(StateQuery::Addresses(
AddressStateQuery::GetAddressesTotals { addresses },
)));
let totals = query_state(
&context,
&handlers_config.addresses_query_topic,
msg,
|message| match message {
Message::StateQueryResponse(StateQueryResponse::Addresses(
AddressStateQueryResponse::AddressesTotals(totals),
)) => Ok(totals),
Message::StateQueryResponse(StateQueryResponse::Addresses(
AddressStateQueryResponse::Error(e),
)) => Err(e),
_ => Err(QueryError::internal_error(
"Unexpected message type while retrieving account totals",
)),
},
)
.await?;

// TODO: Query historical accounts state to retrieve account tx count instead of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good TODO / comment here.

// using the addresses totals as the addresses totals does not deduplicate
// for multi-address transactions, overstating count

let rest_response = AccountTotalsREST {
stake_address: account.to_string()?,
received_sum: totals.received.into(),
sent_sum: totals.sent.into(),
tx_count: totals.tx_count,
};

let json = serde_json::to_string_pretty(&rest_response)?;
Ok(RESTResponse::with_json(200, &json))
}

/// Handle `/accounts/{stake_address}/utxos` Blockfrost-compatible endpoint
Expand Down Expand Up @@ -659,7 +726,7 @@ pub async fn handle_account_utxos_blockfrost(
.await?;

let Some(addresses) = addresses else {
return Ok(RESTResponse::with_text(404, "Account not found"));
return Err(RESTError::not_found("Account not found"));
};

// Get utxos from address state
Expand Down
Loading