Skip to content

Commit 8c7590d

Browse files
authored
Merge pull request #339 from input-output-hk/sg/peer-network-interface
Implement PeerNetworkInterface
2 parents 5eb7ff9 + 30ebc8d commit 8c7590d

File tree

24 files changed

+1514
-34
lines changed

24 files changed

+1514
-34
lines changed

Cargo.lock

Lines changed: 15 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ members = [
1111
"modules/mithril_snapshot_fetcher", # Mithril snapshot fetcher
1212
"modules/snapshot_bootstrapper", # Bootstrap state from a ledger snapshot
1313
"modules/upstream_chain_fetcher", # Upstream chain fetcher
14+
"modules/peer_network_interface", # Multi-peer network interface
1415
"modules/block_unpacker", # Block to transaction unpacker
1516
"modules/tx_unpacker", # Tx to UTXO unpacker
1617
"modules/utxo_state", # UTXO state

common/src/genesis_values.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const MAINNET_SHELLEY_GENESIS_HASH: &str =
1212
"1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81";
1313

1414
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
15+
#[serde(rename_all = "kebab-case")]
1516
pub struct GenesisValues {
1617
pub byron_timestamp: u64,
1718
pub shelley_epoch: u64,

common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub mod snapshot;
2222
pub mod stake_addresses;
2323
pub mod state_history;
2424
pub mod types;
25+
pub mod upstream_cache;
2526
pub mod validation;
2627

2728
// Flattened re-exports

modules/upstream_chain_fetcher/src/upstream_cache.rs renamed to common/src/upstream_cache.rs

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use acropolis_common::{messages::RawBlockMessage, BlockInfo};
2-
use anyhow::{anyhow, bail, Result};
1+
use crate::{messages::RawBlockMessage, BlockInfo};
2+
use anyhow::{anyhow, bail, Context, Result};
33
use std::{
44
fs::File,
55
io::{BufReader, Write},
6-
path::Path,
6+
path::{Path, PathBuf},
77
sync::Arc,
88
};
99

@@ -19,26 +19,26 @@ pub trait Storage {
1919
}
2020

2121
pub struct FileStorage {
22-
path: String,
22+
path: PathBuf,
2323
}
2424

2525
impl FileStorage {
26-
pub fn new(path: &str) -> Self {
27-
Self {
28-
path: path.to_string(),
29-
}
26+
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
27+
let path = path.as_ref().to_path_buf();
28+
std::fs::create_dir_all(&path)?;
29+
Ok(Self { path })
3030
}
3131

32-
fn get_file_name(&self, chunk_no: usize) -> String {
33-
format!("{}/chunk-{chunk_no}.json", self.path)
32+
fn get_file_name(&self, chunk_no: usize) -> PathBuf {
33+
self.path.join(format!("chunk-{chunk_no}.json"))
3434
}
3535
}
3636

3737
pub type UpstreamCache = UpstreamCacheImpl<FileStorage>;
3838

3939
impl UpstreamCache {
40-
pub fn new(path: &str) -> Self {
41-
UpstreamCache::new_impl(FileStorage::new(path))
40+
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
41+
Ok(UpstreamCache::new_impl(FileStorage::new(path)?))
4242
}
4343
}
4444

@@ -124,7 +124,9 @@ impl<S: Storage> UpstreamCacheImpl<S> {
124124

125125
pub fn write_record(&mut self, record: &UpstreamCacheRecord) -> Result<()> {
126126
self.chunk_cached.push(record.clone());
127-
self.storage.write_chunk(self.current_chunk, &self.chunk_cached)?;
127+
self.storage
128+
.write_chunk(self.current_chunk, &self.chunk_cached)
129+
.context("could not write cache record")?;
128130

129131
self.current_record += 1;
130132
if self.current_record >= self.density {
@@ -139,34 +141,33 @@ impl<S: Storage> UpstreamCacheImpl<S> {
139141

140142
impl Storage for FileStorage {
141143
fn read_chunk(&mut self, chunk_no: usize) -> Result<Vec<UpstreamCacheRecord>> {
142-
let name = self.get_file_name(chunk_no);
143-
let path = Path::new(&name);
144+
let path = self.get_file_name(chunk_no);
144145
if !path.try_exists()? {
145146
return Ok(vec![]);
146147
}
147148

148-
let file = File::open(&name)?;
149+
let file = File::open(&path)?;
149150
let reader = BufReader::new(file);
150-
match serde_json::from_reader::<BufReader<std::fs::File>, Vec<UpstreamCacheRecord>>(reader)
151-
{
152-
Ok(res) => Ok(res.clone()),
153-
Err(err) => Err(anyhow!(
154-
"Error reading upstream cache chunk JSON from {name}: '{err}'"
155-
)),
156-
}
151+
serde_json::from_reader(reader).with_context(|| {
152+
format!(
153+
"Error reading upstream cache chunk JSON from {}",
154+
path.display()
155+
)
156+
})
157157
}
158158

159159
fn write_chunk(&mut self, chunk_no: usize, data: &[UpstreamCacheRecord]) -> Result<()> {
160-
let mut file = File::create(self.get_file_name(chunk_no))?;
161-
file.write_all(serde_json::to_string(data)?.as_bytes())?;
160+
let mut file =
161+
File::create(self.get_file_name(chunk_no)).context("could not write chunk")?;
162+
file.write_all(&serde_json::to_vec(data)?)?;
162163
Ok(())
163164
}
164165
}
165166

166167
#[cfg(test)]
167168
mod test {
168169
use crate::upstream_cache::{Storage, UpstreamCacheImpl, UpstreamCacheRecord};
169-
use acropolis_common::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era};
170+
use crate::{messages::RawBlockMessage, BlockHash, BlockInfo, BlockStatus, Era};
170171
use anyhow::Result;
171172
use std::{collections::HashMap, sync::Arc};
172173

modules/genesis_bootstrapper/build.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ async fn main() -> Result<()> {
6262
"https://book.world.dev.cardano.org/environments/mainnet/shelley-genesis.json",
6363
"mainnet-shelley-genesis.json",
6464
),
65+
download(
66+
&client,
67+
"https://book.world.dev.cardano.org/environments/preview/byron-genesis.json",
68+
"preview-byron-genesis.json",
69+
),
70+
download(
71+
&client,
72+
"https://book.world.dev.cardano.org/environments/preview/shelley-genesis.json",
73+
"preview-shelley-genesis.json",
74+
),
6575
download(
6676
&client,
6777
"https://raw.githubusercontent.com/Hornan7/SanchoNet-Tutorials/refs/heads/main/genesis/byron-genesis.json",

modules/genesis_bootstrapper/src/genesis_bootstrapper.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ const DEFAULT_NETWORK_NAME: &str = "mainnet";
3434
const MAINNET_BYRON_GENESIS: &[u8] = include_bytes!("../downloads/mainnet-byron-genesis.json");
3535
const MAINNET_SHELLEY_GENESIS: &[u8] = include_bytes!("../downloads/mainnet-shelley-genesis.json");
3636
const MAINNET_SHELLEY_START_EPOCH: u64 = 208;
37+
const PREVIEW_BYRON_GENESIS: &[u8] = include_bytes!("../downloads/preview-byron-genesis.json");
38+
const PREVIEW_SHELLEY_GENESIS: &[u8] = include_bytes!("../downloads/preview-shelley-genesis.json");
39+
const PREVIEW_SHELLEY_START_EPOCH: u64 = 0;
3740
const SANCHONET_BYRON_GENESIS: &[u8] = include_bytes!("../downloads/sanchonet-byron-genesis.json");
3841
const SANCHONET_SHELLEY_GENESIS: &[u8] =
3942
include_bytes!("../downloads/sanchonet-shelley-genesis.json");
@@ -103,6 +106,11 @@ impl GenesisBootstrapper {
103106
MAINNET_SHELLEY_GENESIS,
104107
MAINNET_SHELLEY_START_EPOCH,
105108
),
109+
"preview" => (
110+
PREVIEW_BYRON_GENESIS,
111+
PREVIEW_SHELLEY_GENESIS,
112+
PREVIEW_SHELLEY_START_EPOCH,
113+
),
106114
"sanchonet" => (
107115
SANCHONET_BYRON_GENESIS,
108116
SANCHONET_SHELLEY_GENESIS,
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Acropolis upstream chain fetcher module
2+
3+
[package]
4+
name = "acropolis_module_peer_network_interface"
5+
version = "0.2.0"
6+
edition = "2024"
7+
authors = ["Simon Gellis <simon@sundae.fi>"]
8+
description = "Multiplexed chain fetcher Caryatid module for Acropolis"
9+
license = "Apache-2.0"
10+
11+
[dependencies]
12+
acropolis_common = { path = "../../common" }
13+
14+
caryatid_sdk = { workspace = true }
15+
16+
anyhow = { workspace = true }
17+
config = { workspace = true }
18+
pallas = { workspace = true }
19+
serde = { workspace = true, features = ["rc"] }
20+
tokio = { workspace = true }
21+
tracing = { workspace = true }
22+
23+
[lib]
24+
path = "src/peer_network_interface.rs"
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Architecture
2+
3+
This module uses an event-queue-based architecture. A `NetworkManager` is responsible for creating a set of `PeerConnection`s and sending commands to them. Each `PeerConnection` maintains a connection to a single peer; it responds to commands from the `NetworkManager`, and emits events to an event queue. The `NetworkManager` reads from that queue to decide which chain to follow. When blocks from the preferred chain have been fetched, it publishes those blocks to the message bus.
4+
5+
This module requests the body for every block announced by any chain, from the first chain which announced it. When it has the body for the next block announced, it will publish it to the message bus.
6+
7+
```mermaid
8+
graph LR
9+
EQ[Event Queue]-->NM[NetworkManager]
10+
subgraph Peers
11+
P1[PeerConnection 1]
12+
P2[PeerConnection 2]
13+
P3[PeerConnection 3]
14+
end
15+
NM -->|RequestBlock</br>FindIntersect| P1 & P2 & P3
16+
Peers -->|ChainSync<br/>BlockFetched<br/>Disconnect|EQ
17+
NM -->|BlockAvailable| MB[Message Bus]
18+
```
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Peer network interface module
2+
3+
The peer network interface module uses the ChainSync and BlockFetch protocols to fetch blocks from one of several upstream sources. It chooses one peer to treat as the "preferred" chain to follow, but will gracefully switch which peer it follows during network issues.
4+
5+
It can either run independently, from the origin or current tip, or
6+
be triggered by a Mithril snapshot event (the default) where it starts from
7+
where the snapshot left off, and follows the chain from there.
8+
9+
Rollbacks are handled by signalling in the block data - it is downstream
10+
subscribers' responsibility to deal with the effects of this.
11+
12+
## Configuration
13+
14+
See [./config.default.toml](./config.default.toml) for the available configuration options and their default values.
15+
16+
## Messages
17+
18+
This module publishes "raw block messages" to the configured `block-topic`. Each message includes the raw bytes composing the header and body of a block. The module follows the head of one chain at any given time, though that chain may switch during runtime. If that chain reports a rollback (or if this module switches to a different chain), the next message it emits will be the new head of the chain and have the status `RolledBack`.

0 commit comments

Comments
 (0)