Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 195 additions & 15 deletions node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use graph::{
use graph_chain_ethereum::EthereumAdapter;
use graph_chain_ethereum::EthereumAdapterTrait as _;
use graph_chain_ethereum::chain::BlockFinality;
use graph_store_postgres::AsyncPgConnection;
use graph_store_postgres::BlockStore;
use graph_store_postgres::ChainStore;
use graph_store_postgres::PoolCoordinator;
Expand All @@ -32,6 +33,7 @@ use graph_store_postgres::find_chain;
use graph_store_postgres::update_chain_name;
use graph_store_postgres::{ConnectionPool, command_support::catalog::block_store};

use crate::manager::prompt::prompt_for_confirmation;
use crate::network_setup::Networks;

pub async fn list(primary: ConnectionPool, store: BlockStore) -> Result<(), Error> {
Expand Down Expand Up @@ -236,6 +238,59 @@ pub async fn update_chain_genesis(
Ok(())
}

struct ChainSwapOutcome {
reused_previous_backup: bool,
allocated_chain: bool,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum ExistingBackupDisposition<'a> {
ProceedFresh,
PromptReuse,
AbortWrongShard(&'a str),
}

fn backup_name(chain: &str, base: &str) -> String {
format!("{chain}-{base}")
}

fn suffixed_backup_name(backup_name: &str, suffix: usize) -> String {
format!("{backup_name}-{suffix}")
}

fn existing_backup_disposition<'a>(
existing_backup_shard: Option<&'a str>,
target_shard: &str,
) -> ExistingBackupDisposition<'a> {
match existing_backup_shard {
None => ExistingBackupDisposition::ProceedFresh,
Some(shard) if shard == target_shard => ExistingBackupDisposition::PromptReuse,
Some(shard) => ExistingBackupDisposition::AbortWrongShard(shard),
}
}

async fn next_temporary_backup_name(
conn: &mut AsyncPgConnection,
chain: &str,
base: &str,
) -> Result<String, StoreError> {
let backup_name = backup_name(chain, base);
if find_chain(conn, &backup_name).await?.is_none() {
return Ok(backup_name);
}

let mut suffix = 1usize;

loop {
let candidate = suffixed_backup_name(&backup_name, suffix);
if find_chain(conn, &candidate).await?.is_none() {
return Ok(candidate);
}

suffix += 1;
}
}

pub async fn change_block_cache_shard(
primary_store: ConnectionPool,
store: BlockStore,
Expand All @@ -250,51 +305,134 @@ pub async fn change_block_cache_shard(
.await?
.ok_or_else(|| anyhow!("unknown chain: {}", chain_name))?;
let old_shard = chain.shard;
let canonical_backup_name = format!("{chain_name}-old");
let existing_backup = find_chain(&mut conn, &canonical_backup_name).await?;

println!("Current shard: {}", old_shard);

let chain_store = store
.chain_store(&chain_name)
.await
.ok_or_else(|| anyhow!("unknown chain: {}", &chain_name))?;
let new_name = format!("{}-old", &chain_name);
let ident = chain_store.chain_identifier().await?;
let target_shard = Shard::new(shard.clone())?;
let existing_backup_disposition = existing_backup_disposition(
existing_backup.as_ref().map(|backup| backup.shard.as_str()),
target_shard.as_str(),
);
let reuse_existing_backup = matches!(
existing_backup_disposition,
ExistingBackupDisposition::PromptReuse
);

conn.transaction::<(), StoreError, _>(|conn| {
async {
let shard = Shard::new(shard.to_string())?;
match existing_backup_disposition {
ExistingBackupDisposition::ProceedFresh => {}
ExistingBackupDisposition::AbortWrongShard(backup_shard) => {
bail!(
"`{}` already exists on shard `{}`. Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`",
canonical_backup_name,
backup_shard,
canonical_backup_name,
chain_name,
target_shard,
);
}
ExistingBackupDisposition::PromptReuse => {
let prompt = format!(
"`{}` already exists on shard `{}` and will be reused as the active `{}` chain.\nProceed?",
canonical_backup_name, target_shard, chain_name
);
if !prompt_for_confirmation(&prompt)? {
println!(
"Aborting. Remove `{}` with `graphman chain remove {}` if you want to create a fresh cache on shard `{}`.",
canonical_backup_name, canonical_backup_name, target_shard
);
return Ok(());
}
}
}

let chain = BlockStore::allocate_chain(conn, &chain_name, &shard, &ident).await?;
let existing_backup_store = if reuse_existing_backup {
store.chain_store(&canonical_backup_name).await
} else {
None
};

store.add_chain_store(&chain, true).await?;
let allocated_chain = if reuse_existing_backup {
None
} else {
let chain =
BlockStore::allocate_chain(&mut conn, &chain_name, &target_shard, &ident).await?;
store.add_chain_store(&chain, true).await?;
Some(chain)
};

// Drop the foreign key constraint on deployment_schemas
let outcome = conn.transaction::<ChainSwapOutcome, StoreError, _>(|conn| {
async {
sql_query(
"alter table deployment_schemas drop constraint deployment_schemas_network_fkey;",
)
.execute(conn).await?;

// Update the current chain name to chain-old
update_chain_name(conn, &chain_name, &new_name).await?;

// Create a new chain with the name in the destination shard
let _ = add_chain(conn, &chain_name, &shard, ident).await?;
let temp_backup_name = if let Some(backup) = existing_backup.as_ref() {
let temp_name = next_temporary_backup_name(conn, &chain_name, "old").await?;
update_chain_name(conn, &backup.name, &temp_name).await?;
Some(temp_name)
} else {
None
};

update_chain_name(conn, &chain_name, &canonical_backup_name).await?;

if reuse_existing_backup {
debug_assert!(temp_backup_name.is_some());
update_chain_name(conn, temp_backup_name.as_ref().unwrap(), &chain_name).await?;
} else {
add_chain(conn, &chain_name, &target_shard, ident.clone()).await?;
}

// Re-add the foreign key constraint
sql_query(
"alter table deployment_schemas add constraint deployment_schemas_network_fkey foreign key (network) references chains(name);",
)
.execute(conn).await?;
Ok(())

Ok(ChainSwapOutcome {
reused_previous_backup: reuse_existing_backup,
allocated_chain: allocated_chain.is_some(),
})
}.scope_boxed()
}).await?;

chain_store.update_name(&new_name).await?;
let ChainSwapOutcome {
reused_previous_backup,
allocated_chain,
} = outcome;

chain_store.update_name(&canonical_backup_name).await?;

if reused_previous_backup && let Some(backup_store) = existing_backup_store.as_ref() {
backup_store.update_name(&chain_name).await?;
}

println!(
"Changed block cache shard for {} from {} to {}",
chain_name, old_shard, shard
);
println!("Latest backup recorded as `{}`", canonical_backup_name);

if reused_previous_backup {
println!(
"Reused existing backup `{}` as the active `{}` chain",
canonical_backup_name, chain_name
);
}

if allocated_chain {
println!(
"Allocated new chain state for `{}` on shard {}",
chain_name, shard
);
}

Ok(())
}
Expand Down Expand Up @@ -329,3 +467,45 @@ pub async fn ingest(
}
Ok(())
}

#[cfg(test)]
mod tests {
use super::{
ExistingBackupDisposition, backup_name, existing_backup_disposition, suffixed_backup_name,
};

#[test]
fn backup_name_uses_plain_name_first() {
assert_eq!(backup_name("mainnet", "old"), "mainnet-old");
}

#[test]
fn suffixed_backup_name_adds_numeric_suffixes() {
assert_eq!(suffixed_backup_name("mainnet-old", 1), "mainnet-old-1");
assert_eq!(suffixed_backup_name("mainnet-old", 42), "mainnet-old-42");
}

#[test]
fn existing_backup_disposition_proceeds_when_backup_missing() {
assert_eq!(
existing_backup_disposition(None, "shard_b"),
ExistingBackupDisposition::ProceedFresh
);
}

#[test]
fn existing_backup_disposition_prompts_when_backup_matches_target_shard() {
assert_eq!(
existing_backup_disposition(Some("shard_b"), "shard_b"),
ExistingBackupDisposition::PromptReuse
);
}

#[test]
fn existing_backup_disposition_aborts_when_backup_is_on_another_shard() {
assert_eq!(
existing_backup_disposition(Some("shard_a"), "shard_b"),
ExistingBackupDisposition::AbortWrongShard("shard_a")
);
}
}