Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
7390aad
feat: implementation of PeerNetworkInterface
SupernaviX Nov 3, 2025
2281311
fix: blockfetch uses slots
SupernaviX Nov 3, 2025
2d1c5b1
feat: test PeerNetworkInterface in omnibus
SupernaviX Nov 3, 2025
41ee5de
fix: gracefully handle dropped connections
SupernaviX Nov 3, 2025
e085283
fix: handle rollbacks when switching chains
SupernaviX Nov 4, 2025
ece87d3
feat: integrate upstream cache
SupernaviX Nov 4, 2025
4647b72
fix: handle cache errors more gracefully
SupernaviX Nov 4, 2025
dabb272
feat: extract chain logic to separate helper
SupernaviX Nov 4, 2025
6159534
fix: gracefully handle disconnection during in-flight block reqs
SupernaviX Nov 4, 2025
a91d460
fix: correctly implement and test chain switching
SupernaviX Nov 4, 2025
21f7e67
docs: docs
SupernaviX Nov 5, 2025
bf8a3b7
Merge branch 'main' into sg/peer-network-interface
SupernaviX Nov 5, 2025
03ec421
fix: credit where credit is due
SupernaviX Nov 5, 2025
f9b1a4e
fix: finish a sentence
SupernaviX Nov 5, 2025
b0c5fc7
fix: run fmt on whole project
SupernaviX Nov 5, 2025
1b85707
fix: gracefully handle connecting to lagging-behind peer
SupernaviX Nov 5, 2025
89bf544
Merge branch 'main' into sg/peer-network-interface
SupernaviX Nov 11, 2025
41d802a
fix: fix cargo-shear issues
SupernaviX Nov 11, 2025
3c87d61
fix: make stake-delta-filter create cache as needed
SupernaviX Nov 11, 2025
212ca55
fix: address copilot comments
SupernaviX Nov 11, 2025
30ebc8d
fix: correctly handle disconnect from only peer
SupernaviX Nov 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ members = [
"modules/mithril_snapshot_fetcher", # Mithril snapshot fetcher
"modules/snapshot_bootstrapper", # Bootstrap state from a ledger snapshot
"modules/upstream_chain_fetcher", # Upstream chain fetcher
"modules/peer_network_interface", # Multi-peer network interface
"modules/block_unpacker", # Block to transaction unpacker
"modules/tx_unpacker", # Tx to UTXO unpacker
"modules/utxo_state", # UTXO state
Expand Down
1 change: 1 addition & 0 deletions common/src/genesis_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const MAINNET_SHELLEY_GENESIS_HASH: &str =
"1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81";

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct GenesisValues {
pub byron_timestamp: u64,
pub shelley_epoch: u64,
Expand Down
1 change: 1 addition & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod snapshot;
pub mod stake_addresses;
pub mod state_history;
pub mod types;
pub mod upstream_cache;
pub mod validation;

// Flattened re-exports
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use acropolis_common::{messages::RawBlockMessage, BlockInfo};
use anyhow::{anyhow, bail, Result};
use crate::{messages::RawBlockMessage, BlockInfo};
use anyhow::{anyhow, bail, Context, Result};
use std::{
fs::File,
io::{BufReader, Write},
path::Path,
path::{Path, PathBuf},
sync::Arc,
};

Expand All @@ -19,26 +19,26 @@ pub trait Storage {
}

pub struct FileStorage {
path: String,
path: PathBuf,
}

impl FileStorage {
pub fn new(path: &str) -> Self {
Self {
path: path.to_string(),
}
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref().to_path_buf();
std::fs::create_dir_all(&path)?;
Ok(Self { path })
}

fn get_file_name(&self, chunk_no: usize) -> String {
format!("{}/chunk-{chunk_no}.json", self.path)
fn get_file_name(&self, chunk_no: usize) -> PathBuf {
self.path.join(format!("chunk-{chunk_no}.json"))
}
}

pub type UpstreamCache = UpstreamCacheImpl<FileStorage>;

impl UpstreamCache {
pub fn new(path: &str) -> Self {
UpstreamCache::new_impl(FileStorage::new(path))
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
Ok(UpstreamCache::new_impl(FileStorage::new(path)?))
}
}

Expand Down Expand Up @@ -124,7 +124,9 @@ impl<S: Storage> UpstreamCacheImpl<S> {

pub fn write_record(&mut self, record: &UpstreamCacheRecord) -> Result<()> {
self.chunk_cached.push(record.clone());
self.storage.write_chunk(self.current_chunk, &self.chunk_cached)?;
self.storage
.write_chunk(self.current_chunk, &self.chunk_cached)
.context("could not write cache record")?;

self.current_record += 1;
if self.current_record >= self.density {
Expand All @@ -139,34 +141,33 @@ impl<S: Storage> UpstreamCacheImpl<S> {

impl Storage for FileStorage {
fn read_chunk(&mut self, chunk_no: usize) -> Result<Vec<UpstreamCacheRecord>> {
let name = self.get_file_name(chunk_no);
let path = Path::new(&name);
let path = self.get_file_name(chunk_no);
if !path.try_exists()? {
return Ok(vec![]);
}

let file = File::open(&name)?;
let file = File::open(&path)?;
let reader = BufReader::new(file);
match serde_json::from_reader::<BufReader<std::fs::File>, Vec<UpstreamCacheRecord>>(reader)
{
Ok(res) => Ok(res.clone()),
Err(err) => Err(anyhow!(
"Error reading upstream cache chunk JSON from {name}: '{err}'"
)),
}
serde_json::from_reader(reader).with_context(|| {
format!(
"Error reading upstream cache chunk JSON from {}",
path.display()
)
})
}

fn write_chunk(&mut self, chunk_no: usize, data: &[UpstreamCacheRecord]) -> Result<()> {
let mut file = File::create(self.get_file_name(chunk_no))?;
file.write_all(serde_json::to_string(data)?.as_bytes())?;
let mut file =
File::create(self.get_file_name(chunk_no)).context("could not write chunk")?;
file.write_all(&serde_json::to_vec(data)?)?;
Ok(())
}
}

#[cfg(test)]
mod test {
use crate::upstream_cache::{Storage, UpstreamCacheImpl, UpstreamCacheRecord};
use acropolis_common::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era};
use crate::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era};
use anyhow::Result;
use std::{collections::HashMap, sync::Arc};

Expand Down
10 changes: 10 additions & 0 deletions modules/genesis_bootstrapper/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ async fn main() -> Result<()> {
"https://book.world.dev.cardano.org/environments/mainnet/shelley-genesis.json",
"mainnet-shelley-genesis.json",
),
download(
&client,
"https://book.world.dev.cardano.org/environments/preview/byron-genesis.json",
"preview-byron-genesis.json",
),
download(
&client,
"https://book.world.dev.cardano.org/environments/preview/shelley-genesis.json",
"preview-shelley-genesis.json",
),
download(
&client,
"https://raw.githubusercontent.com/Hornan7/SanchoNet-Tutorials/refs/heads/main/genesis/byron-genesis.json",
Expand Down
8 changes: 8 additions & 0 deletions modules/genesis_bootstrapper/src/genesis_bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const DEFAULT_NETWORK_NAME: &str = "mainnet";
const MAINNET_BYRON_GENESIS: &[u8] = include_bytes!("../downloads/mainnet-byron-genesis.json");
const MAINNET_SHELLEY_GENESIS: &[u8] = include_bytes!("../downloads/mainnet-shelley-genesis.json");
const MAINNET_SHELLEY_START_EPOCH: u64 = 208;
const PREVIEW_BYRON_GENESIS: &[u8] = include_bytes!("../downloads/preview-byron-genesis.json");
const PREVIEW_SHELLEY_GENESIS: &[u8] = include_bytes!("../downloads/preview-shelley-genesis.json");
const PREVIEW_SHELLEY_START_EPOCH: u64 = 0;
const SANCHONET_BYRON_GENESIS: &[u8] = include_bytes!("../downloads/sanchonet-byron-genesis.json");
const SANCHONET_SHELLEY_GENESIS: &[u8] =
include_bytes!("../downloads/sanchonet-shelley-genesis.json");
Expand Down Expand Up @@ -101,6 +104,11 @@ impl GenesisBootstrapper {
MAINNET_SHELLEY_GENESIS,
MAINNET_SHELLEY_START_EPOCH,
),
"preview" => (
PREVIEW_BYRON_GENESIS,
PREVIEW_SHELLEY_GENESIS,
PREVIEW_SHELLEY_START_EPOCH,
),
"sanchonet" => (
SANCHONET_BYRON_GENESIS,
SANCHONET_SHELLEY_GENESIS,
Expand Down
24 changes: 24 additions & 0 deletions modules/peer_network_interface/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Acropolis upstream chain fetcher module

[package]
name = "acropolis_module_peer_network_interface"
version = "0.2.0"
edition = "2024"
authors = ["Simon Gellis <simon@sundae.fi>"]
description = "Multiplexed chain fetcher Caryatid module for Acropolis"
license = "Apache-2.0"

[dependencies]
acropolis_common = { path = "../../common" }

caryatid_sdk = { workspace = true }

anyhow = { workspace = true }
config = { workspace = true }
pallas = { workspace = true }
serde = { workspace = true, features = ["rc"] }
tokio = { workspace = true }
tracing = { workspace = true }

[lib]
path = "src/peer_network_interface.rs"
18 changes: 18 additions & 0 deletions modules/peer_network_interface/NOTES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Architecture

This module uses an event-queue-based architecture. A `NetworkManager` is responsible for creating a set of `PeerConnection`s and sending commands to them. Each `PeerConnection` maintains a connection to a single peer; it responds to commands from the `NetworkManager`, and emits events to an event queue. The `NetworkManager` reads from that queue to decide which chain to follow. When blocks from the preferred chain have been fetched, it publishes those blocks to the message bus.

This module requests the body for every block announced by any chain, from the first chain which announced it. When it has the body for the next block announced, it will publish it to the message bus.

```mermaid
graph LR
EQ[Event Queue]-->NM[NetworkManager]
subgraph Peers
P1[PeerConnection 1]
P2[PeerConnection 2]
P3[PeerConnection 3]
end
NM -->|RequestBlock</br>FindIntersect| P1 & P2 & P3
Peers -->|ChainSync<br/>BlockFetched<br/>Disconnect|EQ
NM -->|BlockAvailable| MB[Message Bus]
```
18 changes: 18 additions & 0 deletions modules/peer_network_interface/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Peer network interface module

The peer network interface module uses the ChainSync and BlockFetch protocols to fetch blocks from one of several upstream sources. It chooses one peer to treat as the "preferred" chain to follow, but will gracefully switch which peer it follows during network issues.

It can either run independently, from the origin or current tip, or
be triggered by a Mithril snapshot event (the default) where it starts from
where the snapshot left off, and follows the chain from there.

Rollbacks are handled by signalling in the block data - it is downstream
subscribers' responsibility to deal with the effects of this.

## Configuration

See [./config.default.toml](./config.default.toml) for the available configuration options and their default values.

## Messages

This module publishes "raw block messages" to the configured `block-topic`. Each message includes the raw bytes composing the header and body of a block. The module follows the head of one chain at any given time, though that chain may switch during runtime. If that chain reports a rollback (or if this module switches to a different chain), the next message it emits will be the new head of the chain and have the status `RolledBack`.
24 changes: 24 additions & 0 deletions modules/peer_network_interface/config.default.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# The topic to publish blocks on
block-topic = "cardano.block.available"
# The topic to wait for when sync-point is "snapshot"
snapshot-completion-topic = "cardano.snapshot.complete"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about this topic. Does this mark the completion of a Mithril snapshot? For a future PR, I plan to deconflict this name from a snapshot boot completion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, it marks the completion of a mithril snapshot. But for this, we want a topic which marks the completion of any snapshot. If sync-point is "snapshot", we're

  1. waiting for this event, which signals that snapshot restoration is complete (and tells us which block the system was restored to)
  2. beginning chainsync as of that block

So whether it's a mithril snapshot or the boot snapshot, we just want to know when it's done

# The topic to wait for when listening for genesis values from another module
genesis-completion-topic = "cardano.sequence.bootstrapped"

# Upstream node connections
node-addresses = [
"backbone.cardano.iog.io:3001",
"backbone.mainnet.cardanofoundation.org:3001",
"backbone.mainnet.emurgornd.com:3001",
]
# The network magic for the chain to connect to
magic-number = 764824073

# The initial point to start syncing from. Options:
# - "origin": sync from the very start of the chain
# - "tip": sync from the very end of the chain
# - "cache": replay messages from a local filesystem cache, then sync from the point right after that cache.
# - "snapshot": wait for another module to restore from a snapshot, then sync from the point right after that snapshot.
sync-point = "snapshot"
# The cache dir to use when sync-point is "cache"
cache-dir = "upstream-cache"
Loading