diff --git a/crates/fs-utils/src/dir_trie.rs b/crates/fs-utils/src/dir_trie.rs index 14e6aa7bdeb..d23bda0609e 100644 --- a/crates/fs-utils/src/dir_trie.rs +++ b/crates/fs-utils/src/dir_trie.rs @@ -123,7 +123,15 @@ impl DirTrie { if src_file.is_file() { let dst_file = self.file_path(file_id); Self::create_parent(&dst_file)?; - std::fs::hard_link(src_file, dst_file)?; + std::fs::hard_link(&src_file, &dst_file)?; + // fsync the file, so its nlink count is durable. + // Note that we could also fsync `src_file`. + File::open(&dst_file)?.sync_all()?; + // fsync directory, so the entry for `dst_file` is durable. + // `parent()` is known to succeed, because `self.file_path` creates + // a path with a parent. + File::open(dst_file.parent().unwrap())?.sync_all()?; + Ok(true) } else { Ok(false) @@ -178,6 +186,12 @@ impl DirTrie { let contents = contents(); file.write_all(contents.as_ref())?; file.flush()?; + // fsync the file. + file.into_inner().expect("buffered writer just flushed").sync_all()?; + // fsync the directory. `parent()` is known to succeed, because + // `self.file_path` creates a path with a parent. + File::open(self.file_path(file_id).parent().unwrap())?.sync_all()?; + counter.objects_written += 1; Ok(()) } diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 6606d22f9e4..2bbbbe9ca9b 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -43,7 +43,7 @@ use spacetimedb_table::{ page_pool::PagePool, table::Table, }; -use std::fs; +use std::fs::{self, File}; use std::ops::RangeBounds; use std::time::{Duration, Instant}; use std::{ @@ -708,6 +708,9 @@ impl SnapshotRepository { snapshot.write_all_blobs(&object_repo, blobs, prev_snapshot.as_ref(), &mut counter)?; snapshot.write_all_tables(&object_repo, tables, prev_snapshot.as_ref(), &mut counter)?; + // Ensure all the object directories are durable. + File::open(object_repo.root())?.sync_all()?; + self.write_snapshot_file(&snapshot_dir, snapshot)?; log::info!( @@ -744,6 +747,12 @@ impl SnapshotRepository { snapshot_file.write_all(hash.as_bytes())?; snapshot_file.write_all(&snapshot_bsatn)?; snapshot_file.flush()?; + // fsync file + enclosing directory. + snapshot_file + .into_inner() + .expect("buffered writer just flushed") + .sync_all()?; + File::open(&snapshot_dir.0)?.sync_all()?; } Ok(()) @@ -1102,6 +1111,7 @@ impl SnapshotRepository { if old_file.is_compressed() { std::fs::hard_link(old_path, src.with_extension("_tmp"))?; std::fs::rename(src.with_extension("_tmp"), src)?; + File::open(src.parent().unwrap())?.sync_all()?; if let Some(stats) = stats { stats.hardlinked += 1; } @@ -1134,6 +1144,7 @@ impl SnapshotRepository { log::error!("Failed to compress object file {path:?}: {err}"); })?; } + File::open(dir.root())?.sync_all()?; // Compress the snapshot file last, // which marks the whole snapshot as compressed. @@ -1142,6 +1153,7 @@ impl SnapshotRepository { compress(&old, &snapshot_file.0, None, None).inspect_err(|err| { log::error!("Failed to compress snapshot file {snapshot_file:?}: {err}"); })?; + File::open(&snapshot_dir.0)?.sync_all()?; log::info!( "Compressed snapshot {snapshot_dir:?} of replica {}: {compress_type:?}", diff --git a/crates/snapshot/src/remote.rs b/crates/snapshot/src/remote.rs index 81884983b66..2c35e1ada79 100644 --- a/crates/snapshot/src/remote.rs +++ b/crates/snapshot/src/remote.rs @@ -727,10 +727,13 @@ where Some(file_path) => { let dir = file_path.parent().expect("file not in a directory").to_owned(); fs::create_dir_all(&dir).await?; - let (tmp_file, tmp_out) = spawn_blocking(move || { - let tmp = NamedTempFile::new_in(dir)?; - let out = tmp.reopen()?; - Ok::<_, io::Error>((tmp, out)) + let (tmp_file, tmp_out) = spawn_blocking({ + let dir = dir.clone(); + move || { + let tmp = NamedTempFile::new_in(dir)?; + let out = tmp.reopen()?; + Ok::<_, io::Error>((tmp, out)) + } }) .await .unwrap()?; @@ -743,6 +746,7 @@ where .await .unwrap() .map_err(|e| e.error)?; + fs::File::open(dir).await?.sync_all().await?; } None => {