diff --git a/CHANGELOG.md b/CHANGELOG.md index 5aa27e6..cbf96ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,8 +15,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `BStackGenOp::Len { out }` / `BSTACK_GEN_LEN` (Rust + C, `set` + `atomic`): writes the current logical payload size into `out` and, unlike the mutating variants, does not end the sequence — the in-sequence equivalent of `len`, useful when a later step's offset or length depends on the current payload size. - `BStackGenOp::Discard { len }` (Rust) / `BSTACK_GEN_POP` with a `NULL` `u.pop.buf` (C) (`set` + `atomic`): removes the last `len` bytes from the end of the file without reading them back, shrinking the payload and ending the sequence — the in-sequence, buffer-free equivalent of `discard` and the counterpart of `Pop`. Useful for truncating a tail whose size is only known once earlier `Read`s/`Len` have resolved, without allocating a throwaway buffer. In Rust this is a dedicated variant (slices cannot be null); in C it is expressed idiomatically as a `Pop` whose destination pointer is `NULL`. Errors on the same conditions as `Pop`. +### Fixed + +- **`atrunc`, `splice`, `splice_into`, `replace` (Rust, `atomic`) and `bstack_atrunc`, `bstack_splice` (C, `BSTACK_FEATURE_ATOMIC`) — committed-length write not durably synced**: The header `clen` write that commits the new payload length was the last step of these operations and was never followed by `durable_sync`/`plat_durable_sync`, so a crash could leave the on-disk `clen` update only in the OS page cache. Every commit of a new `clen` — including best-effort rollback writes after a failed commit — is now followed by a sync. Crate-level durability table updated to reflect the additional sync. + ### Changed +- **`BStack::len` (Rust) / `bstack_len` (C) and `BStack::is_empty` / `bstack_is_empty` no longer make a syscall**: The committed payload length is now cached in memory and kept in sync by every write-lock-held operation that commits a new length to the header. `len`/`is_empty` read this cache under the read lock instead of calling `File::metadata` (Rust) or `fstat`/`GetFileSizeEx` (C). Behaviour and signatures are unchanged. - **`SlabBStackAllocator` and `CheckedSlabBStackAllocator` — `alloc` / `dealloc` / `realloc` are now lock-free under the `atomic` feature** (`alloc` + `set` features): The allocator-level `Mutex` that previously serialised free-list push/pop is gone from these paths. Free-list pop now drives a single `BStack::process_gen` sequence (read `free_head`, read the popped block's `next`, advance `free_head` — all under one held `BStack` write lock, closing the ABA window a `get`/`cas` pair would leave open); free-list push splices a single block or a whole freed run onto the head with one `BStack::cross_exchange`; tail grow/shrink use `BStack::try_extend_zeros` / `BStack::try_discard` (atomic check-and-act under `BStack`'s own write lock). `SlabBStackAllocator` drops its allocator-level `Mutex` entirely and is `Sync` purely through `BStack`'s interior mutability. `CheckedSlabBStackAllocator` retains a `Mutex` solely for `recover` (see below); none of `alloc` / `dealloc` / `realloc` take it. The on-disk format is unchanged — no magic-number bump. - **`CheckedSlabBStackAllocator::recover` runs under its own mutex** (`alloc` + `set` features, `atomic`): the `Mutex` is held for the full call solely to keep recovery single-flight, preventing two concurrent runs from reclaiming the same leaked block twice. The scan itself (free-list walk, arena classification, and its one optional tail discard) runs as a single `BStack::process_gen` sequence, so the `BStack` write lock — not the `Mutex` — serialises it against the lock-free `alloc` / `dealloc` / `realloc`. Ordinary `alloc` / `dealloc` / `realloc` never take the `Mutex`. diff --git a/README.md b/README.md index b2836f7..64a79d7 100644 --- a/README.md +++ b/README.md @@ -621,7 +621,10 @@ maps to `io::ErrorKind::WouldBlock` in Rust). The lock is released when the ## Thread safety -`BStack` wraps the file in a `RwLock`. +`BStack` wraps the file in a `RwLock`. The committed payload length is also +cached in memory and kept in sync with the on-disk header by every +write-lock-held operation, so `len`/`is_empty` can be answered under the read +lock without any `File::metadata` syscall. | Operation | Lock (Unix / Windows) | Lock (other) | |--------------------------------------------------------------|-----------------------|--------------| diff --git a/c/bstack.c b/c/bstack.c index 8ef6f61..c28c4a5 100644 --- a/c/bstack.c +++ b/c/bstack.c @@ -82,6 +82,13 @@ struct bstack { pthread_rwlock_t lock; pthread_mutex_t cache_mutex; #endif + /* Cached copy of the on-disk header's committed payload length (clen). + * Seeded from the validated header at construction time (after recovery) + * and kept in sync by every write-lock-held operation that commits a new + * clen to the header, via write_committed_len. bstack_len and + * bstack_is_empty read it under the same lock used for the on-disk + * state, so no extra synchronisation is needed. */ + uint64_t clen; /* Monotonically growing partition boundary. Bytes in [0, locked) are * immutable and can be read without the rwlock on supported platforms. * Not persisted — resets to 0 on every open. */ @@ -300,9 +307,14 @@ static int write_le64(bstack_fd_t fd, uint64_t file_offset, uint64_t val) * Header helpers * ---------------------------------------------------------------------- */ -static int write_committed_len(bstack_fd_t fd, uint64_t len) +/* Overwrite the committed-length field at file offset 8 and update the + * in-memory cache (*clen) to match. */ +static int write_committed_len(bstack_fd_t fd, uint64_t *clen, uint64_t len) { - return write_le64(fd, 8, len); + if (write_le64(fd, 8, len) != 0) + return -1; + *clen = len; + return 0; } static int init_header(bstack_fd_t fd) @@ -399,6 +411,7 @@ bstack_t *bstack_open(const char *path) return NULL; } + uint64_t clen = 0; if (raw_size == 0) { /* New file — write header and sync. */ if (init_header(fd) != 0 || plat_durable_sync(fd) != 0) { @@ -413,8 +426,8 @@ bstack_t *bstack_open(const char *path) return NULL; } else { /* Existing file — validate header and crash-recover if needed. */ - uint64_t clen; - if (read_header(fd, &clen) != 0) { + uint64_t committed_len; + if (read_header(fd, &committed_len) != 0) { int saved = errno; close_fd(fd); errno = saved; @@ -422,10 +435,10 @@ bstack_t *bstack_open(const char *path) } uint64_t actual = raw_size - HEADER_SIZE; - if (actual != clen) { - uint64_t correct = (clen < actual) ? clen : actual; + if (actual != committed_len) { + uint64_t correct = (committed_len < actual) ? committed_len : actual; if (plat_ftruncate(fd, HEADER_SIZE + correct) != 0 || - write_committed_len(fd, correct) != 0 || + write_committed_len(fd, &clen, correct) != 0 || plat_durable_sync(fd) != 0) { int saved = errno; @@ -433,6 +446,8 @@ bstack_t *bstack_open(const char *path) errno = saved; return NULL; } + } else { + clen = committed_len; } } @@ -442,6 +457,7 @@ bstack_t *bstack_open(const char *path) return NULL; } bs->fd = fd; + bs->clen = clen; bs->locked = ATOMIC_INIT(0); bs->cache_enabled = 0; bs->cache_buf = NULL; @@ -516,12 +532,16 @@ int bstack_push(bstack_t *bs, const uint8_t *data, size_t len, } uint64_t new_len = logical_offset + (uint64_t)len; - if (write_committed_len(bs->fd, new_len) != 0 || + if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 || plat_durable_sync(bs->fd) != 0) { - /* Rollback: remove written data and reset committed length. */ + /* Rollback: remove written data and reset committed length. The + * cache is reset up front so it reflects the rolled-back file even + * if the best-effort header rewrite below fails. */ plat_ftruncate(bs->fd, raw_size); - write_committed_len(bs->fd, logical_offset); + bs->clen = logical_offset; + write_committed_len(bs->fd, &bs->clen, logical_offset); + plat_durable_sync(bs->fd); goto fail_unlock; } @@ -566,12 +586,16 @@ int bstack_extend(bstack_t *bs, size_t n, uint64_t *out_offset) goto fail_unlock; uint64_t new_len = logical_offset + (uint64_t)n; - if (write_committed_len(bs->fd, new_len) != 0 || + if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 || plat_durable_sync(bs->fd) != 0) { - /* Rollback: truncate and reset committed length. */ + /* Rollback: truncate and reset committed length. The cache is reset + * up front so it reflects the rolled-back file even if the + * best-effort header rewrite below fails. */ plat_ftruncate(bs->fd, raw_size); - write_committed_len(bs->fd, logical_offset); + bs->clen = logical_offset; + write_committed_len(bs->fd, &bs->clen, logical_offset); + plat_durable_sync(bs->fd); goto fail_unlock; } @@ -627,8 +651,13 @@ int bstack_pop(bstack_t *bs, size_t n, goto fail_unlock; } - if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0 || - write_committed_len(bs->fd, new_len) != 0 || + if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0) + goto fail_unlock; + /* The truncation is the commit point: the tail bytes are gone and + * recovery would adopt the smaller file size, so update the cache now — + * before the header write, which can fail and skip it. */ + bs->clen = new_len; + if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 || plat_durable_sync(bs->fd) != 0) goto fail_unlock; @@ -782,8 +811,13 @@ int bstack_discard(bstack_t *bs, size_t n) return -1; } - if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0 || - write_committed_len(bs->fd, new_len) != 0 || + if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0) + goto fail_unlock; + /* The truncation is the commit point: the tail bytes are gone and + * recovery would adopt the smaller file size, so update the cache now — + * before the header write, which can fail and skip it. */ + bs->clen = new_len; + if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 || plat_durable_sync(bs->fd) != 0) goto fail_unlock; @@ -806,17 +840,8 @@ int bstack_discard(bstack_t *bs, size_t n) int bstack_len(bstack_t *bs, uint64_t *out_len) { BS_RDLOCK(bs); - - uint64_t raw_size; - if (file_size(bs->fd, &raw_size) != 0) { - int saved = errno; - BS_RDUNLOCK(bs); - errno = saved; - return -1; - } - + *out_len = bs->clen; BS_RDUNLOCK(bs); - *out_len = raw_size - HEADER_SIZE; return 0; } @@ -827,17 +852,8 @@ int bstack_len(bstack_t *bs, uint64_t *out_len) int bstack_is_empty(bstack_t *bs, int *out_empty) { BS_RDLOCK(bs); - - uint64_t raw_size; - if (file_size(bs->fd, &raw_size) != 0) { - int saved = errno; - BS_RDUNLOCK(bs); - errno = saved; - return -1; - } - + *out_empty = (bs->clen == 0) ? 1 : 0; BS_RDUNLOCK(bs); - *out_empty = (raw_size == HEADER_SIZE) ? 1 : 0; return 0; } @@ -1122,13 +1138,14 @@ int bstack_zero(bstack_t *bs, uint64_t offset, size_t n) /* Shared body for atrunc and splice (after the removed bytes are read). * Caller already holds the write lock. raw_size / data_size / tail_offset / * final_data_len are pre-computed by the caller. */ -static int atomic_write_tail(bstack_fd_t fd, +static int atomic_write_tail(bstack_t *bs, uint64_t raw_size, uint64_t tail_offset, uint64_t final_data_len, const uint8_t *buf, size_t buf_len, size_t n) { + bstack_fd_t fd = bs->fd; if (buf_len > n) { /* Net extension: extend first so crashes roll back cleanly, then * write buf over the old tail + the new space, sync, commit clen. */ @@ -1143,7 +1160,9 @@ static int atomic_write_tail(bstack_fd_t fd, plat_ftruncate(fd, raw_size); /* best-effort rollback */ return -1; } - return write_committed_len(fd, final_data_len); + if (write_committed_len(fd, &bs->clen, final_data_len) != 0) + return -1; + return plat_durable_sync(fd); } else { /* Net truncation or same size: write buf into old tail, truncate, * sync, commit clen. A crash after truncate is committed by @@ -1153,9 +1172,15 @@ static int atomic_write_tail(bstack_fd_t fd, return -1; if (plat_ftruncate(fd, HEADER_SIZE + final_data_len) != 0) return -1; + /* The truncation is the commit point (recovery adopts the smaller + * file size), so update the cache now — before the sync and header + * write, which can fail and skip it. */ + bs->clen = final_data_len; if (plat_durable_sync(fd) != 0) return -1; - return write_committed_len(fd, final_data_len); + if (write_committed_len(fd, &bs->clen, final_data_len) != 0) + return -1; + return plat_durable_sync(fd); } } @@ -1189,7 +1214,7 @@ int bstack_atrunc(bstack_t *bs, size_t n, uint64_t tail_offset = HEADER_SIZE + new_tail_start; uint64_t final_data_len = new_tail_start + (uint64_t)buf_len; - if (atomic_write_tail(bs->fd, raw_size, tail_offset, + if (atomic_write_tail(bs, raw_size, tail_offset, final_data_len, buf, buf_len, n) != 0) goto fail_unlock; @@ -1238,7 +1263,7 @@ int bstack_splice(bstack_t *bs, goto fail_unlock; } - if (atomic_write_tail(bs->fd, raw_size, tail_offset, + if (atomic_write_tail(bs, raw_size, tail_offset, final_data_len, new_buf, new_len, n) != 0) goto fail_unlock; @@ -1277,10 +1302,14 @@ int bstack_try_extend(bstack_t *bs, uint64_t s, goto fail_unlock; } uint64_t new_len = data_size + (uint64_t)buf_len; - if (write_committed_len(bs->fd, new_len) != 0 || + if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 || plat_durable_sync(bs->fd) != 0) { + /* The cache is reset up front so it reflects the rolled-back file + * even if the best-effort header rewrite below fails. */ plat_ftruncate(bs->fd, raw_size); - write_committed_len(bs->fd, data_size); + bs->clen = data_size; + write_committed_len(bs->fd, &bs->clen, data_size); + plat_durable_sync(bs->fd); goto fail_unlock; } @@ -1338,8 +1367,13 @@ int bstack_try_discard(bstack_t *bs, uint64_t s, size_t n, int *ok) return -1; } - if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0 || - write_committed_len(bs->fd, new_len) != 0 || + if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0) + goto fail_unlock; + /* The truncation is the commit point: the tail bytes are gone and + * recovery would adopt the smaller file size, so update the cache now — + * before the header write, which can fail and skip it. */ + bs->clen = new_len; + if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 || plat_durable_sync(bs->fd) != 0) goto fail_unlock; @@ -1410,7 +1444,7 @@ int bstack_replace(bstack_t *bs, size_t n, uint64_t final_data_len = new_tail_start + (uint64_t)new_len; - if (atomic_write_tail(bs->fd, raw_size, tail_offset, + if (atomic_write_tail(bs, raw_size, tail_offset, final_data_len, new_buf, new_len, n) != 0) { free(new_buf); goto fail_unlock; @@ -1447,12 +1481,15 @@ int bstack_try_extend_zeros(bstack_t *bs, uint64_t s, size_t n, int *ok) uint64_t new_len = data_size + (uint64_t)n; if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0 || - write_committed_len(bs->fd, new_len) != 0 || + write_committed_len(bs->fd, &bs->clen, new_len) != 0 || plat_durable_sync(bs->fd) != 0) { - /* Best-effort rollback. */ + /* Best-effort rollback. The cache is reset up front so it reflects + * the rolled-back file even if the header rewrite below fails. */ plat_ftruncate(bs->fd, raw_size); - write_committed_len(bs->fd, data_size); + bs->clen = data_size; + write_committed_len(bs->fd, &bs->clen, data_size); + plat_durable_sync(bs->fd); goto fail_unlock; } @@ -1878,13 +1915,17 @@ int bstack_process_gen(bstack_t *bs, goto fail_unlock; } uint64_t new_len = data_size + (uint64_t)len; - if (write_committed_len(bs->fd, new_len) != 0 || + if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 || plat_durable_sync(bs->fd) != 0) { /* Rollback: remove written data and reset committed - * length. */ + * length. The cache is reset up front so it reflects + * the rolled-back file even if the header rewrite below + * fails. */ plat_ftruncate(bs->fd, raw_size_now); - write_committed_len(bs->fd, data_size); + bs->clen = data_size; + write_committed_len(bs->fd, &bs->clen, data_size); + plat_durable_sync(bs->fd); goto fail_unlock; } } @@ -1909,8 +1950,14 @@ int bstack_process_gen(bstack_t *bs, if (plat_pread(bs->fd, buf, len, read_offset) != 0) goto fail_unlock; } - if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0 || - write_committed_len(bs->fd, new_len) != 0 || + if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0) + goto fail_unlock; + /* The truncation is the commit point: the tail bytes are + * gone and recovery would adopt the smaller file size, so + * update the cache now — before the header write, which can + * fail and skip it. */ + bs->clen = new_len; + if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 || plat_durable_sync(bs->fd) != 0) goto fail_unlock; } diff --git a/c/bstack.h b/c/bstack.h index 6ec5baf..5aca11a 100644 --- a/c/bstack.h +++ b/c/bstack.h @@ -120,13 +120,16 @@ int bstack_discard(bstack_t *bs, size_t n); /* * Write the current logical payload size (excluding the 16-byte header) - * into *out_len. Takes the read lock; concurrent calls are allowed. + * into *out_len. This value is cached in memory, so no syscall is made; + * it takes the read lock, so it can run concurrently with other readers + * but blocks while a writer is in progress. */ int bstack_len(bstack_t *bs, uint64_t *out_len); /* * Write 1 into *out_empty if the payload is empty (len == 0), else 0. - * Takes the read lock; concurrent calls are allowed. + * Like bstack_len, this is a cached read under the read lock and makes + * no syscall. */ int bstack_is_empty(bstack_t *bs, int *out_empty); diff --git a/src/lib.rs b/src/lib.rs index 8cf0077..268c070 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,8 +70,8 @@ //! | `discard` | `ftruncate` → `lseek(8)` → `write(clen)` → `durable_sync` | //! | `set` *(feature)* | `lseek(offset)` → `write(data)` → `durable_sync` | //! | `zero` *(feature)* | `lseek(offset)` → `write(zeros)` → `durable_sync` | -//! | `atrunc` *(feature: atomic, net extension)* | `set_len(new_end)` → `lseek(tail)` → `write(buf)` → `durable_sync` → `lseek(8)` → `write(clen)` | -//! | `atrunc` *(feature: atomic, net truncation)* | `lseek(tail)` → `write(buf)` → `set_len(new_end)` → `durable_sync` → `lseek(8)` → `write(clen)` | +//! | `atrunc` *(feature: atomic, net extension)* | `set_len(new_end)` → `lseek(tail)` → `write(buf)` → `durable_sync` → `lseek(8)` → `write(clen)` → `durable_sync` | +//! | `atrunc` *(feature: atomic, net truncation)* | `lseek(tail)` → `write(buf)` → `set_len(new_end)` → `durable_sync` → `lseek(8)` → `write(clen)` → `durable_sync` | //! | `splice`, `splice_into` *(feature: atomic)* | `lseek(tail)` → `read(n)` → *(then as `atrunc`)* | //! | `try_extend` *(feature: atomic)* | `lseek(END)` — conditional `push` sequence if size matches | //! | `try_discard` *(feature: atomic)* | `lseek(END)` — conditional `discard` sequence if size matches | @@ -135,7 +135,11 @@ //! //! # Thread safety //! -//! `BStack` wraps the file in a [`std::sync::RwLock`]. +//! `BStack` wraps the file in a [`std::sync::RwLock`]. The committed payload +//! length is also cached in memory and kept in sync with the on-disk header +//! by every write-lock-held operation, so [`len`](BStack::len) and +//! [`is_empty`](BStack::is_empty) can be answered under the read lock without +//! any `File::metadata` syscall. //! //! | Operation | Lock (Unix / Windows) | Lock (other) | //! |-----------|-----------------------|--------------| @@ -606,10 +610,13 @@ fn init_header(file: &mut File) -> io::Result<()> { file.write_all(&0u64.to_le_bytes()) } -/// Overwrite the committed-length field at file offset 8. -fn write_committed_len(file: &mut File, len: u64) -> io::Result<()> { +/// Overwrite the committed-length field at file offset 8 and update the +/// in-memory cache (`clen`) to match. +fn write_committed_len(file: &mut File, clen: &mut u64, len: u64) -> io::Result<()> { file.seek(SeekFrom::Start(8))?; - file.write_all(&len.to_le_bytes()) + file.write_all(&len.to_le_bytes())?; + *clen = len; + Ok(()) } /// Read `len` bytes from absolute file position `offset` without modifying @@ -760,7 +767,16 @@ fn read_header(file: &mut File) -> io::Result { /// See the [crate-level documentation](crate) for the file format, durability /// guarantees, crash recovery, multi-process safety, and thread-safety model. pub struct BStack { - lock: RwLock, + /// The file handle together with a cached copy of the on-disk header's + /// committed payload length (`clen`). + /// + /// `clen` (the `.1` field) is seeded from the validated header at + /// construction time (after recovery) and kept in sync by every + /// write-lock-held operation that commits a new `clen` to the header, via + /// `write_committed_len`. [`BStack::len`] and [`BStack::is_empty`] read it + /// under the same lock used for the on-disk state, so no extra + /// synchronisation is needed. + lock: RwLock<(File, u64)>, /// Monotonically growing partition boundary. Bytes in `[0, locked)` are /// immutable and can be read without the rwlock on supported platforms. /// Not persisted — resets to 0 on every open. @@ -828,6 +844,7 @@ impl BStack { let raw_size = file.metadata()?.len(); + let mut clen = 0u64; if raw_size == 0 { init_header(&mut file)?; durable_sync(&file)?; @@ -846,8 +863,10 @@ impl BStack { // value is the last successfully synced boundary). let correct_len = committed_len.min(actual_data_len); file.set_len(HEADER_SIZE + correct_len)?; - write_committed_len(&mut file, correct_len)?; + write_committed_len(&mut file, &mut clen, correct_len)?; durable_sync(&file)?; + } else { + clen = committed_len; } } @@ -861,7 +880,7 @@ impl BStack { fd, #[cfg(windows)] handle, - lock: RwLock::new(file), + lock: RwLock::new((file, clen)), locked: AtomicU64::new(0), cache_enabled: false, cache: Mutex::new(Vec::new()), @@ -886,7 +905,8 @@ impl BStack { /// fallback `set_len`. pub fn push(&self, data: impl AsRef<[u8]>) -> io::Result { let data = data.as_ref(); - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let (file, clen) = &mut *guard; let file_end = file.seek(SeekFrom::End(0))?; let logical_offset = file_end - HEADER_SIZE; @@ -900,10 +920,14 @@ impl BStack { } let new_len = logical_offset + data.len() as u64; - if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) { - // Roll back: truncate data and reset header. + if let Err(e) = write_committed_len(file, clen, new_len).and_then(|_| durable_sync(file)) { + // Roll back: truncate data and reset header. The cache is reset + // up front so it reflects the rolled-back file even if the + // best-effort header rewrite below fails. let _ = file.set_len(file_end); - let _ = write_committed_len(&mut file, logical_offset); + *clen = logical_offset; + let _ = write_committed_len(file, clen, logical_offset); + let _ = durable_sync(file); return Err(e); } @@ -927,7 +951,8 @@ impl BStack { /// Returns any [`io::Error`] from `set_len`, `durable_sync`, or the /// fallback `set_len`. pub fn extend(&self, n: u64) -> io::Result { - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let (file, clen) = &mut *guard; let file_end = file.seek(SeekFrom::End(0))?; let logical_offset = file_end - HEADER_SIZE; @@ -939,10 +964,14 @@ impl BStack { file.set_len(new_file_end)?; let new_len = logical_offset + n; - if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) { - // Roll back: truncate and reset header. + if let Err(e) = write_committed_len(file, clen, new_len).and_then(|_| durable_sync(file)) { + // Roll back: truncate and reset header. The cache is reset up + // front so it reflects the rolled-back file even if the + // best-effort header rewrite below fails. let _ = file.set_len(file_end); - let _ = write_committed_len(&mut file, logical_offset); + *clen = logical_offset; + let _ = write_committed_len(file, clen, logical_offset); + let _ = durable_sync(file); return Err(e); } @@ -965,7 +994,8 @@ impl BStack { /// payload size. Also propagates any I/O error from `read_exact`, /// `set_len`, `write_all`, or `durable_sync`. pub fn pop(&self, n: u64) -> io::Result> { - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let (file, clen) = &mut *guard; let raw_size = file.seek(SeekFrom::End(0))?; let data_size = raw_size - HEADER_SIZE; if n > data_size { @@ -986,8 +1016,12 @@ impl BStack { let mut buf = vec![0u8; n as usize]; file.read_exact(&mut buf)?; file.set_len(HEADER_SIZE + new_data_len)?; - write_committed_len(&mut file, new_data_len)?; - durable_sync(&file)?; + // The truncation is the commit point: the tail bytes are gone and + // recovery would adopt the smaller file size, so update the cache now + // — before the header write, which `?` could skip on error. + *clen = new_data_len; + write_committed_len(file, clen, new_data_len)?; + durable_sync(file)?; Ok(buf) } @@ -1015,7 +1049,8 @@ impl BStack { pub fn peek(&self, offset: u64) -> io::Result> { #[cfg(any(unix, windows))] { - let file = self.lock.read().unwrap(); + let guard = self.lock.read().unwrap(); + let file = &guard.0; let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE); if offset > data_size { return Err(io::Error::new( @@ -1023,11 +1058,12 @@ impl BStack { format!("peek offset ({offset}) exceeds payload size ({data_size})"), )); } - pread_exact(&file, HEADER_SIZE + offset, (data_size - offset) as usize) + pread_exact(file, HEADER_SIZE + offset, (data_size - offset) as usize) } #[cfg(not(any(unix, windows)))] { - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let raw_size = file.seek(SeekFrom::End(0))?; let data_size = raw_size.saturating_sub(HEADER_SIZE); if offset > data_size { @@ -1095,7 +1131,8 @@ impl BStack { } #[cfg(any(unix, windows))] { - let file = self.lock.read().unwrap(); + let guard = self.lock.read().unwrap(); + let file = &guard.0; let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE); if end > data_size { return Err(io::Error::new( @@ -1103,7 +1140,7 @@ impl BStack { format!("get: end ({end}) exceeds payload size ({data_size})"), )); } - pread_exact(&file, HEADER_SIZE + start, (end - start) as usize) + pread_exact(file, HEADER_SIZE + start, (end - start) as usize) } #[cfg(not(any(unix, windows)))] { @@ -1112,7 +1149,8 @@ impl BStack { let cache = self.cache.lock().unwrap(); return Ok(cache[start as usize..end as usize].to_vec()); } - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let raw_size = file.seek(SeekFrom::End(0))?; let data_size = raw_size.saturating_sub(HEADER_SIZE); if end > data_size { @@ -1158,7 +1196,8 @@ impl BStack { })?; #[cfg(any(unix, windows))] { - let file = self.lock.read().unwrap(); + let guard = self.lock.read().unwrap(); + let file = &guard.0; let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE); if end > data_size { return Err(io::Error::new( @@ -1168,11 +1207,12 @@ impl BStack { ), )); } - pread_exact_into(&file, HEADER_SIZE + offset, buf) + pread_exact_into(file, HEADER_SIZE + offset, buf) } #[cfg(not(any(unix, windows)))] { - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); if end > data_size { return Err(io::Error::new( @@ -1233,7 +1273,8 @@ impl BStack { } #[cfg(any(unix, windows))] { - let file = self.lock.read().unwrap(); + let guard = self.lock.read().unwrap(); + let file = &guard.0; let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE); if end > data_size { return Err(io::Error::new( @@ -1241,7 +1282,7 @@ impl BStack { format!("get_into: end ({end}) exceeds payload size ({data_size})"), )); } - pread_exact_into(&file, HEADER_SIZE + start, buf) + pread_exact_into(file, HEADER_SIZE + start, buf) } #[cfg(not(any(unix, windows)))] { @@ -1251,7 +1292,8 @@ impl BStack { buf.copy_from_slice(&cache[start as usize..end as usize]); return Ok(()); } - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); if end > data_size { return Err(io::Error::new( @@ -1285,7 +1327,8 @@ impl BStack { return Ok(()); } let n = buf.len() as u64; - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let (file, clen) = &mut *guard; let raw_size = file.seek(SeekFrom::End(0))?; let data_size = raw_size - HEADER_SIZE; if n > data_size { @@ -1305,8 +1348,12 @@ impl BStack { file.seek(SeekFrom::Start(HEADER_SIZE + new_data_len))?; file.read_exact(buf)?; file.set_len(HEADER_SIZE + new_data_len)?; - write_committed_len(&mut file, new_data_len)?; - durable_sync(&file)?; + // The truncation is the commit point: the tail bytes are gone and + // recovery would adopt the smaller file size, so update the cache now + // — before the header write, which `?` could skip on error. + *clen = new_data_len; + write_committed_len(file, clen, new_data_len)?; + durable_sync(file)?; Ok(()) } @@ -1328,7 +1375,8 @@ impl BStack { if n == 0 { return Ok(()); } - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let (file, clen) = &mut *guard; let raw_size = file.seek(SeekFrom::End(0))?; let data_size = raw_size - HEADER_SIZE; if n > data_size { @@ -1346,8 +1394,12 @@ impl BStack { )); } file.set_len(HEADER_SIZE + new_data_len)?; - write_committed_len(&mut file, new_data_len)?; - durable_sync(&file)?; + // The truncation is the commit point: the tail bytes are gone and + // recovery would adopt the smaller file size, so update the cache now + // — before the header write, which `?` could skip on error. + *clen = new_data_len; + write_committed_len(file, clen, new_data_len)?; + durable_sync(file)?; Ok(()) } @@ -1383,7 +1435,8 @@ impl BStack { "set: offset + len overflows u64", ) })?; - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; // Load `locked` under the write lock — otherwise a concurrent // `lock_up_to` could extend the locked region between our check and // our write, letting us mutate a now-immutable byte. @@ -1403,7 +1456,7 @@ impl BStack { } file.seek(SeekFrom::Start(HEADER_SIZE + offset))?; file.write_all(data)?; - durable_sync(&file) + durable_sync(file) } /// Overwrite `n` bytes with zeros in place starting at logical `offset`. @@ -1437,7 +1490,8 @@ impl BStack { "zero: offset + n overflows u64", ) })?; - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; // Load `locked` under the write lock (see `set` for rationale). let locked = self.locked.load(Ordering::Acquire); if offset < locked { @@ -1456,7 +1510,7 @@ impl BStack { file.seek(SeekFrom::Start(HEADER_SIZE + offset))?; let zeros = vec![0u8; n as usize]; file.write_all(&zeros)?; - durable_sync(&file) + durable_sync(file) } } @@ -1500,7 +1554,8 @@ impl BStack { if n == 0 && buf_len == 0 { return Ok(()); } - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let (file, clen) = &mut *guard; let file_end = file.seek(SeekFrom::End(0))?; let data_size = file_end - HEADER_SIZE; if n > data_size { @@ -1530,11 +1585,12 @@ impl BStack { let _ = file.set_len(file_end); return Err(e); } - if let Err(e) = durable_sync(&file) { + if let Err(e) = durable_sync(file) { let _ = file.set_len(file_end); return Err(e); } - write_committed_len(&mut file, final_data_len)?; + write_committed_len(file, clen, final_data_len)?; + durable_sync(file)?; } else { // Net truncation or same size: write buf into the old tail first, // truncate, sync, then commit the new length. @@ -1543,8 +1599,13 @@ impl BStack { file.write_all(buf)?; } file.set_len(HEADER_SIZE + final_data_len)?; - durable_sync(&file)?; - write_committed_len(&mut file, final_data_len)?; + // The truncation is the commit point (recovery adopts the smaller + // file size), so update the cache now — before the sync and header + // write, which `?` could skip on error. + *clen = final_data_len; + durable_sync(file)?; + write_committed_len(file, clen, final_data_len)?; + durable_sync(file)?; } Ok(()) } @@ -1573,7 +1634,8 @@ impl BStack { if n == 0 && buf_len == 0 { return Ok(Vec::new()); } - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let (file, clen) = &mut *guard; let file_end = file.seek(SeekFrom::End(0))?; let data_size = file_end - HEADER_SIZE; if n > data_size { @@ -1607,11 +1669,12 @@ impl BStack { let _ = file.set_len(file_end); return Err(e); } - if let Err(e) = durable_sync(&file) { + if let Err(e) = durable_sync(file) { let _ = file.set_len(file_end); return Err(e); } - write_committed_len(&mut file, final_data_len)?; + write_committed_len(file, clen, final_data_len)?; + durable_sync(file)?; } else { // Net truncation or same size: write buf, truncate, sync, commit. if !buf.is_empty() { @@ -1619,8 +1682,13 @@ impl BStack { file.write_all(buf)?; } file.set_len(HEADER_SIZE + final_data_len)?; - durable_sync(&file)?; - write_committed_len(&mut file, final_data_len)?; + // The truncation is the commit point (recovery adopts the smaller + // file size), so update the cache now — before the sync and header + // write, which `?` could skip on error. + *clen = final_data_len; + durable_sync(file)?; + write_committed_len(file, clen, final_data_len)?; + durable_sync(file)?; } Ok(removed) @@ -1652,7 +1720,8 @@ impl BStack { if n == 0 && new_len == 0 { return Ok(()); } - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let (file, clen) = &mut *guard; let file_end = file.seek(SeekFrom::End(0))?; let data_size = file_end - HEADER_SIZE; if n > data_size { @@ -1685,11 +1754,12 @@ impl BStack { let _ = file.set_len(file_end); return Err(e); } - if let Err(e) = durable_sync(&file) { + if let Err(e) = durable_sync(file) { let _ = file.set_len(file_end); return Err(e); } - write_committed_len(&mut file, final_data_len)?; + write_committed_len(file, clen, final_data_len)?; + durable_sync(file)?; } else { // Net truncation or same size: write new, truncate, sync, commit. if !new.is_empty() { @@ -1697,8 +1767,13 @@ impl BStack { file.write_all(new)?; } file.set_len(HEADER_SIZE + final_data_len)?; - durable_sync(&file)?; - write_committed_len(&mut file, final_data_len)?; + // The truncation is the commit point (recovery adopts the smaller + // file size), so update the cache now — before the sync and header + // write, which `?` could skip on error. + *clen = final_data_len; + durable_sync(file)?; + write_committed_len(file, clen, final_data_len)?; + durable_sync(file)?; } Ok(()) } @@ -1720,7 +1795,8 @@ impl BStack { #[cfg(feature = "atomic")] pub fn try_extend(&self, s: u64, buf: impl AsRef<[u8]>) -> io::Result { let buf = buf.as_ref(); - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let (file, clen) = &mut *guard; let file_end = file.seek(SeekFrom::End(0))?; let data_size = file_end - HEADER_SIZE; if data_size != s { @@ -1734,9 +1810,13 @@ impl BStack { return Err(e); } let new_len = data_size + buf.len() as u64; - if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) { + if let Err(e) = write_committed_len(file, clen, new_len).and_then(|_| durable_sync(file)) { + // Reset the cache up front so it reflects the rolled-back file + // even if the best-effort header rewrite below fails. let _ = file.set_len(file_end); - let _ = write_committed_len(&mut file, data_size); + *clen = data_size; + let _ = write_committed_len(file, clen, data_size); + let _ = durable_sync(file); return Err(e); } Ok(true) @@ -1759,7 +1839,8 @@ impl BStack { /// `set_len`, `write_committed_len`, or `durable_sync`. #[cfg(feature = "atomic")] pub fn try_extend_zeros(&self, s: u64, n: u64) -> io::Result { - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let (file, clen) = &mut *guard; let file_end = file.seek(SeekFrom::End(0))?; let data_size = file_end - HEADER_SIZE; if data_size != s { @@ -1775,9 +1856,13 @@ impl BStack { ) })?; file.set_len(HEADER_SIZE + new_len)?; - if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) { + if let Err(e) = write_committed_len(file, clen, new_len).and_then(|_| durable_sync(file)) { + // Reset the cache up front so it reflects the rolled-back file + // even if the best-effort header rewrite below fails. let _ = file.set_len(file_end); - let _ = write_committed_len(&mut file, data_size); + *clen = data_size; + let _ = write_committed_len(file, clen, data_size); + let _ = durable_sync(file); return Err(e); } Ok(true) @@ -1803,11 +1888,13 @@ impl BStack { #[cfg(feature = "atomic")] pub fn try_discard(&self, s: u64, n: u64) -> io::Result { if n == 0 { - let file = self.lock.read().unwrap(); + let guard = self.lock.read().unwrap(); + let file = &guard.0; let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE); return Ok(data_size == s); } - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let (file, clen) = &mut *guard; let raw_size = file.seek(SeekFrom::End(0))?; let data_size = raw_size - HEADER_SIZE; if data_size != s { @@ -1828,8 +1915,12 @@ impl BStack { )); } file.set_len(HEADER_SIZE + new_data_len)?; - write_committed_len(&mut file, new_data_len)?; - durable_sync(&file)?; + // The truncation is the commit point: the tail bytes are gone and + // recovery would adopt the smaller file size, so update the cache now + // — before the header write, which `?` could skip on error. + *clen = new_data_len; + write_committed_len(file, clen, new_data_len)?; + durable_sync(file)?; Ok(true) } @@ -1872,7 +1963,8 @@ impl BStack { } #[cfg(any(unix, windows))] { - let file = self.lock.read().unwrap(); + let guard = self.lock.read().unwrap(); + let file = &guard.0; let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE); let mut results = Vec::with_capacity(ranges.len()); for r in &ranges { @@ -1886,7 +1978,7 @@ impl BStack { )); } results.push(pread_exact( - &file, + file, HEADER_SIZE + r.start, (r.end - r.start) as usize, )?); @@ -1895,7 +1987,8 @@ impl BStack { } #[cfg(not(any(unix, windows)))] { - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); let mut results = Vec::with_capacity(ranges.len()); for r in &ranges { @@ -1947,7 +2040,8 @@ impl BStack { } #[cfg(any(unix, windows))] { - let file = self.lock.read().unwrap(); + let guard = self.lock.read().unwrap(); + let file = &guard.0; let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE); for (ptr, buf) in bufs { let end = ptr.checked_add(buf.len() as u64).ok_or_else(|| { @@ -1962,13 +2056,14 @@ impl BStack { format!("get_batched_into: end ({end}) exceeds payload size ({data_size})",), )); } - pread_exact_into(&file, HEADER_SIZE + ptr, buf)?; + pread_exact_into(file, HEADER_SIZE + ptr, buf)?; } Ok(()) } #[cfg(not(any(unix, windows)))] { - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); for (ptr, buf) in bufs { let end = ptr.checked_add(buf.len() as u64).ok_or_else(|| { @@ -2016,7 +2111,8 @@ impl BStack { { #[cfg(any(unix, windows))] { - let file = self.lock.read().unwrap(); + let guard = self.lock.read().unwrap(); + let file = &guard.0; let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE); while let Some((offset, buf)) = f() { let end = offset.checked_add(buf.len() as u64).ok_or_else(|| { @@ -2031,13 +2127,14 @@ impl BStack { format!("get_batched_gen: end ({end}) exceeds payload size ({data_size})"), )); } - pread_exact_into(&file, HEADER_SIZE + offset, buf)?; + pread_exact_into(file, HEADER_SIZE + offset, buf)?; } Ok(()) } #[cfg(not(any(unix, windows)))] { - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); while let Some((offset, buf)) = f() { let end = offset.checked_add(buf.len() as u64).ok_or_else(|| { @@ -2085,7 +2182,8 @@ impl BStack { where F: FnOnce(&[u8]) -> Vec, { - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let (file, clen) = &mut *guard; let file_end = file.seek(SeekFrom::End(0))?; let data_size = file_end - HEADER_SIZE; if n > data_size { @@ -2119,11 +2217,12 @@ impl BStack { let _ = file.set_len(file_end); return Err(e); } - if let Err(e) = durable_sync(&file) { + if let Err(e) = durable_sync(file) { let _ = file.set_len(file_end); return Err(e); } - write_committed_len(&mut file, final_data_len)?; + write_committed_len(file, clen, final_data_len)?; + durable_sync(file)?; } else { // Net truncation or same size: write new tail, truncate, sync, commit. if !new_tail.is_empty() { @@ -2131,8 +2230,13 @@ impl BStack { file.write_all(&new_tail)?; } file.set_len(HEADER_SIZE + final_data_len)?; - durable_sync(&file)?; - write_committed_len(&mut file, final_data_len)?; + // The truncation is the commit point (recovery adopts the smaller + // file size), so update the cache now — before the sync and header + // write, which `?` could skip on error. + *clen = final_data_len; + durable_sync(file)?; + write_committed_len(file, clen, final_data_len)?; + durable_sync(file)?; } Ok(()) } @@ -2273,7 +2377,8 @@ impl BStack { "swap: offset + len overflows u64", ) })?; - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; // Load `locked` under the write lock (see `set` for rationale). let locked = self.locked.load(Ordering::Acquire); if offset < locked { @@ -2294,7 +2399,7 @@ impl BStack { file.read_exact(&mut old)?; file.seek(SeekFrom::Start(HEADER_SIZE + offset))?; file.write_all(buf)?; - durable_sync(&file)?; + durable_sync(file)?; Ok(old) } @@ -2328,7 +2433,8 @@ impl BStack { "swap_into: offset + len overflows u64", ) })?; - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; // Load `locked` under the write lock (see `set` for rationale). let locked = self.locked.load(Ordering::Acquire); if offset < locked { @@ -2349,7 +2455,7 @@ impl BStack { file.read_exact(&mut tmp)?; file.seek(SeekFrom::Start(HEADER_SIZE + offset))?; file.write_all(buf)?; - durable_sync(&file)?; + durable_sync(file)?; buf.copy_from_slice(&tmp); Ok(()) } @@ -2394,7 +2500,8 @@ impl BStack { "cas: offset + len overflows u64", ) })?; - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; // Load `locked` under the write lock (see `set` for rationale). let locked = self.locked.load(Ordering::Acquire); if offset < locked { @@ -2418,7 +2525,7 @@ impl BStack { } file.seek(SeekFrom::Start(HEADER_SIZE + offset))?; file.write_all(new)?; - durable_sync(&file)?; + durable_sync(file)?; Ok(true) } @@ -2464,7 +2571,8 @@ impl BStack { )); } } - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let locked = self.locked.load(Ordering::Acquire); if a < locked { return Err(io::Error::new( @@ -2508,7 +2616,7 @@ impl BStack { file.write_all(&buf_b)?; file.seek(SeekFrom::Start(HEADER_SIZE + b))?; file.write_all(&buf_a)?; - durable_sync(&file) + durable_sync(file) } /// Copy `n` bytes from `from..from+n` to `to..to+n` under a single write lock. @@ -2537,7 +2645,8 @@ impl BStack { let to_end = to.checked_add(n).ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidInput, "copy: to + n overflows u64") })?; - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let locked = self.locked.load(Ordering::Acquire); if to < locked { return Err(io::Error::new( @@ -2566,7 +2675,7 @@ impl BStack { file.read_exact(&mut buf)?; file.seek(SeekFrom::Start(HEADER_SIZE + to))?; file.write_all(&buf)?; - durable_sync(&file) + durable_sync(file) } /// Read bytes in the half-open logical range `[start, end)`, pass them to @@ -2602,7 +2711,8 @@ impl BStack { )); } let n = end - start; - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); if end > data_size { return Err(io::Error::new( @@ -2626,7 +2736,7 @@ impl BStack { if n > 0 { file.seek(SeekFrom::Start(HEADER_SIZE + start))?; file.write_all(&buf)?; - durable_sync(&file)?; + durable_sync(file)?; } Ok(()) } @@ -2706,7 +2816,8 @@ impl BStack { where F: FnMut() -> Option>, { - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let (file, clen) = &mut *guard; let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); let locked = self.locked.load(Ordering::Acquire); loop { @@ -2743,7 +2854,7 @@ impl BStack { pread_exact_raw_handle(self.handle, HEADER_SIZE + offset, buf)?; } } else { - pread_exact_into(&file, HEADER_SIZE + offset, buf)?; + pread_exact_into(file, HEADER_SIZE + offset, buf)?; } } #[cfg(not(any(unix, windows)))] @@ -2783,7 +2894,7 @@ impl BStack { if !data.is_empty() { file.seek(SeekFrom::Start(HEADER_SIZE + offset))?; file.write_all(data)?; - durable_sync(&file)?; + durable_sync(file)?; } return Ok(()); } @@ -2862,7 +2973,7 @@ impl BStack { file.write_all(&buf_b)?; file.seek(SeekFrom::Start(HEADER_SIZE + b_offset))?; file.write_all(&buf_a)?; - durable_sync(&file)?; + durable_sync(file)?; } return Ok(()); } @@ -2875,12 +2986,17 @@ impl BStack { return Err(e); } let new_len = logical_offset + data.len() as u64; - if let Err(e) = write_committed_len(&mut file, new_len) - .and_then(|_| durable_sync(&file)) + if let Err(e) = write_committed_len(file, clen, new_len) + .and_then(|_| durable_sync(file)) { - // Roll back: truncate data and reset header. + // Roll back: truncate data and reset header. The + // cache is reset up front so it reflects the + // rolled-back file even if the best-effort header + // rewrite below fails. let _ = file.set_len(file_end); - let _ = write_committed_len(&mut file, logical_offset); + *clen = logical_offset; + let _ = write_committed_len(file, clen, logical_offset); + let _ = durable_sync(file); return Err(e); } } @@ -2907,8 +3023,13 @@ impl BStack { file.seek(SeekFrom::Start(HEADER_SIZE + new_data_len))?; file.read_exact(buf)?; file.set_len(HEADER_SIZE + new_data_len)?; - write_committed_len(&mut file, new_data_len)?; - durable_sync(&file)?; + // The truncation is the commit point: the tail bytes + // are gone and recovery would adopt the smaller file + // size, so update the cache now — before the header + // write, which `?` could skip on error. + *clen = new_data_len; + write_committed_len(file, clen, new_data_len)?; + durable_sync(file)?; } return Ok(()); } @@ -2932,8 +3053,13 @@ impl BStack { } if len > 0 { file.set_len(HEADER_SIZE + new_data_len)?; - write_committed_len(&mut file, new_data_len)?; - durable_sync(&file)?; + // The truncation is the commit point: the tail bytes + // are gone and recovery would adopt the smaller file + // size, so update the cache now — before the header + // write, which `?` could skip on error. + *clen = new_data_len; + write_committed_len(file, clen, new_data_len)?; + durable_sync(file)?; } return Ok(()); } @@ -2998,7 +3124,8 @@ impl BStack { "eq_crds: b_offset + b_len overflows u64", ) })?; - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let locked = self.locked.load(Ordering::Acquire); if !b_buf.is_empty() && b_offset < locked { return Err(io::Error::new( @@ -3041,7 +3168,7 @@ impl BStack { file.read_exact(&mut old_b)?; file.seek(SeekFrom::Start(HEADER_SIZE + b_offset))?; file.write_all(b_buf)?; - durable_sync(&file)?; + durable_sync(file)?; Ok(Some(old_b)) } @@ -3087,7 +3214,8 @@ impl BStack { "ne_crds: b_offset + b_len overflows u64", ) })?; - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let locked = self.locked.load(Ordering::Acquire); if !b_buf.is_empty() && b_offset < locked { return Err(io::Error::new( @@ -3130,7 +3258,7 @@ impl BStack { file.read_exact(&mut old_b)?; file.seek(SeekFrom::Start(HEADER_SIZE + b_offset))?; file.write_all(b_buf)?; - durable_sync(&file)?; + durable_sync(file)?; Ok(Some(old_b)) } @@ -3191,7 +3319,8 @@ impl BStack { "masked_eq_crds: b_offset + b_len overflows u64", ) })?; - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let locked = self.locked.load(Ordering::Acquire); if !b_buf.is_empty() && b_offset < locked { return Err(io::Error::new( @@ -3239,7 +3368,7 @@ impl BStack { file.read_exact(&mut old_b)?; file.seek(SeekFrom::Start(HEADER_SIZE + b_offset))?; file.write_all(b_buf)?; - durable_sync(&file)?; + durable_sync(file)?; Ok(Some(old_b)) } @@ -3298,7 +3427,8 @@ impl BStack { "masked_ne_crds: b_offset + b_len overflows u64", ) })?; - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let locked = self.locked.load(Ordering::Acquire); if !b_buf.is_empty() && b_offset < locked { return Err(io::Error::new( @@ -3346,7 +3476,7 @@ impl BStack { file.read_exact(&mut old_b)?; file.seek(SeekFrom::Start(HEADER_SIZE + b_offset))?; file.write_all(b_buf)?; - durable_sync(&file)?; + durable_sync(file)?; Ok(Some(old_b)) } } @@ -3357,25 +3487,25 @@ impl BStack { /// Return the current **logical** payload size in bytes (excludes the /// 16-byte header). /// - /// Takes the read lock, so it can run concurrently with other `len` calls - /// but blocks while any write-lock operation is in progress. The returned - /// value always reflects a clean operation boundary. + /// Reads the in-memory `clen` cache under the read lock, so it can run + /// concurrently with other `len` calls but blocks while any write-lock + /// operation is in progress. No syscall is made. The returned value + /// always reflects a clean operation boundary. /// /// # Errors /// - /// Propagates any [`io::Error`] from [`File::metadata`]. + /// Never actually fails; returns [`io::Result`] for source compatibility. pub fn len(&self) -> io::Result { - let file = self.lock.read().unwrap(); - Ok(file.metadata()?.len().saturating_sub(HEADER_SIZE)) + Ok(self.lock.read().unwrap().1) } /// Return `true` if the stack contains no payload bytes. /// /// # Errors /// - /// Propagates any [`io::Error`] from [`File::metadata`]. + /// Never actually fails; returns [`io::Result`] for source compatibility. pub fn is_empty(&self) -> io::Result { - Ok(self.len()? == 0) + Ok(self.lock.read().unwrap().1 == 0) } /// Returns the current locked length. `0` means no bytes are locked. @@ -3419,7 +3549,8 @@ impl BStack { // Acquire the write lock to serialise against any in-flight writers. #[allow(unused_mut)] // `mut` is not needed on Unix and Windows, but other platforms may need it for the file handle. - let mut file = self.lock.write().unwrap(); + let mut guard = self.lock.write().unwrap(); + let file = &mut guard.0; let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE); let current_locked = self.locked.load(Ordering::Relaxed); if n < current_locked { @@ -3516,7 +3647,7 @@ impl BStack { // Release store: all writes completed under the write lock above are // visible to any thread that subsequently loads `locked` with Acquire. self.locked.store(n, Ordering::Release); - drop(file); + drop(guard); Ok(()) }