diff --git a/CHANGELOG.md b/CHANGELOG.md index e72dd39..5aa27e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,9 +10,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `BStack::process_gen` (Rust) / `bstack_process_gen` (C) (`set` + `atomic`): generator/callback-driven primitive that acquires the write lock once and holds it across a sequence of dependent reads ending in at most one mutating operation (`Write`, `Swap`, `Push`, or `Pop`), which always ends the sequence. Closes the ABA window that a `get_batched_gen` (read, release lock) + `cas` (re-acquire, compare, write) pairing would otherwise leave open for allocator-mutex-free pop-style algorithms — see `examples/atomic_linked_list.rs` / `examples/atomic_linked_list.c` for a worked free-list push/pop demonstration. -- `BStackGenOp<'a>` (Rust) / `bstack_gen_op_t` (C) (`set` + `atomic`): non-exhaustive enum (Rust) / tagged union (C) of operations yielded by `process_gen`'s closure/callback — `Read { offset, buf }`, `Len { out }`, `Write { offset, data }`, `Swap { a_offset, b_offset, len }`, `Push { data }`, and `Pop { buf }`. `Write`, `Swap`, `Push`, and `Pop` are the only mutating variants — exactly one is permitted per call, and any one of them ends the sequence immediately; `Read`/`Len` do not end the sequence. The Rust enum derives `Debug` (intentionally not `PartialEq`/`Eq`/`Hash` — see the type's doc comment). +- `BStackGenOp<'a>` (Rust) / `bstack_gen_op_t` (C) (`set` + `atomic`): non-exhaustive enum (Rust) / tagged union (C) of operations yielded by `process_gen`'s closure/callback — `Read { offset, buf }`, `Len { out }`, `Write { offset, data }`, `Swap { a_offset, b_offset, len }`, `Push { data }`, `Pop { buf }`, and `Discard { len }` (Rust; in C, a `Pop` with a `NULL` destination buffer). `Write`, `Swap`, `Push`, `Pop`, and `Discard` are the only mutating variants — exactly one is permitted per call, and any one of them ends the sequence immediately; `Read`/`Len` do not end the sequence. The Rust enum derives `Debug` (intentionally not `PartialEq`/`Eq`/`Hash` — see the type's doc comment). - `BStackGenOp::Push { data }` / `BSTACK_GEN_PUSH` and `BStackGenOp::Pop { buf }` / `BSTACK_GEN_POP` (Rust + C, `set` + `atomic`): in-sequence equivalents of `push`/`pop` for `process_gen` — `Push` appends `data` and `Pop` removes the last `buf.len()` bytes into `buf`, growing/shrinking the payload. Like `Write` and `Swap`, exactly one of `Write`/`Swap`/`Push`/`Pop` is permitted per call and any one of them ends the sequence immediately. `Pop` errors if it would remove more than the current payload or shrink it below the locked length. - `BStackGenOp::Len { out }` / `BSTACK_GEN_LEN` (Rust + C, `set` + `atomic`): writes the current logical payload size into `out` and, unlike the mutating variants, does not end the sequence — the in-sequence equivalent of `len`, useful when a later step's offset or length depends on the current payload size. +- `BStackGenOp::Discard { len }` (Rust) / `BSTACK_GEN_POP` with a `NULL` `u.pop.buf` (C) (`set` + `atomic`): removes the last `len` bytes from the end of the file without reading them back, shrinking the payload and ending the sequence — the in-sequence, buffer-free equivalent of `discard` and the counterpart of `Pop`. Useful for truncating a tail whose size is only known once earlier `Read`s/`Len` have resolved, without allocating a throwaway buffer. In Rust this is a dedicated variant (slices cannot be null); in C it is expressed idiomatically as a `Pop` whose destination pointer is `NULL`. Errors on the same conditions as `Pop`. + +### Changed + +- **`SlabBStackAllocator` and `CheckedSlabBStackAllocator` — `alloc` / `dealloc` / `realloc` are now lock-free under the `atomic` feature** (`alloc` + `set` features): The allocator-level `Mutex` that previously serialised free-list push/pop is gone from these paths. Free-list pop now drives a single `BStack::process_gen` sequence (read `free_head`, read the popped block's `next`, advance `free_head` — all under one held `BStack` write lock, closing the ABA window a `get`/`cas` pair would leave open); free-list push splices a single block or a whole freed run onto the head with one `BStack::cross_exchange`; tail grow/shrink use `BStack::try_extend_zeros` / `BStack::try_discard` (atomic check-and-act under `BStack`'s own write lock). `SlabBStackAllocator` drops its allocator-level `Mutex` entirely and is `Sync` purely through `BStack`'s interior mutability. `CheckedSlabBStackAllocator` retains a `Mutex` solely for `recover` (see below); none of `alloc` / `dealloc` / `realloc` take it. The on-disk format is unchanged — no magic-number bump. +- **`CheckedSlabBStackAllocator::recover` runs under its own mutex** (`alloc` + `set` features, `atomic`): the `Mutex` is held for the full call solely to keep recovery single-flight, preventing two concurrent runs from reclaiming the same leaked block twice. The scan itself (free-list walk, arena classification, and its one optional tail discard) runs as a single `BStack::process_gen` sequence, so the `BStack` write lock — not the `Mutex` — serialises it against the lock-free `alloc` / `dealloc` / `realloc`. Ordinary `alloc` / `dealloc` / `realloc` never take the `Mutex`. ## [0.2.4] - 2026-06-07 diff --git a/PLANNED.md b/PLANNED.md index 95e0473..0bef226 100644 --- a/PLANNED.md +++ b/PLANNED.md @@ -180,66 +180,6 @@ The same change applies to the corresponding methods on `BStackGuardedSlice`, an --- -## Lock-free free list in `SlabBStackAllocator` and `CheckedSlabBStackAllocator` - -**Feature flag:** `atomic` -**Breaking change:** No (internal implementation change only) - -### Motivation - -Under the `atomic` feature, `SlabBStackAllocator` guards every free-list mutation with an internal `Mutex<()>`. This mutex serialises `push_free_block`, `push_free_blocks`, and `pop_free_block` across threads, preventing concurrent alloc/dealloc from racing on `free_head`. While correct, the mutex is a point of contention: all threads allocating or deallocating single-block regions must queue behind it, even though the underlying `BStack` already provides atomic compound operations — `cross_exchange` for a lock-free push, and `get_batched_gen` + `cas` for a compare-and-swap pop — that could serve the same role without an allocator-level lock. - -The goal is to remove the `Mutex<()>` entirely and replace every free-list path with sequences of BStack primitives that are safe under concurrent `&self` access. - -`CheckedSlabBStackAllocator` carries the same `Mutex<()>` and the same free-list pop/push structure as `SlabBStackAllocator`, so whatever solution is adopted here applies to it by extension with no additional design work. - -### Design - -#### Push: lock-free prepend via `cross_exchange` - -To push block `b` (at payload offset `b_addr`) onto the free list: - -1. **Plant a self-pointer placeholder.** Write `b_addr` as a little-endian `u64` into the first eight bytes of `b` — i.e., call `stack.set(b_addr, b_addr.to_le_bytes())`. This seeds the slot that will become `b->next` with a safe, in-bounds value. - -2. **Atomically splice `b` in as the new head.** Call `stack.cross_exchange(b_addr, FREE_HEAD_OFFSET, 8)`. `cross_exchange` swaps the eight bytes at `b_addr` with the eight bytes at `FREE_HEAD_OFFSET` under a single write lock. Before the call the slot at `b_addr` holds `b_addr` and the slot at `FREE_HEAD_OFFSET` holds the current head `H`; after the call `FREE_HEAD_OFFSET` holds `b_addr` (b is now head) and `b_addr` holds `H` (b's next pointer is the old head). The self-pointer written in step 1 is never observed by any reader: `cross_exchange` atomically replaces it with `H` at the same moment it publishes `b` as the new head. - -The call to `set` in step 1 and the call to `cross_exchange` in step 2 are not jointly atomic — a crash between them leaves the self-pointer sitting in `b->next` with `free_head` still pointing to the old head. This leaks `b` rather than corrupting the list, matching the crash-safety class already documented for `push_free_block`. - -Push is inherently race-free without any allocator-level mutex: even if two threads push concurrently, each `cross_exchange` is atomic with respect to the other, and each thread's block ends up correctly linked into the list (though their relative order at the head is not deterministic). - -#### Pop: single-lock dependent sequence via `process_gen` - -To pop the head block from the free list: - -1. **Run the whole pop as one `process_gen` sequence.** Drive `stack.process_gen` through a small state machine: - - *Step 0* — issue `Read { offset: FREE_HEAD_OFFSET, buf: head_buf }` to read the current head pointer. - - *Step 1* — once `head_buf` is populated, parse `head_val`. If `head_val == SENTINEL`, the list is empty: return `None`, ending the sequence with nothing popped — fall through to the tail-extension branch. Otherwise remember `head_val` and issue `Read { offset: head_val, buf: next_buf }` to read that block's `next` pointer. - - *Step 2* — issue `Write { offset: FREE_HEAD_OFFSET, data: next_buf }`, replacing the head with the next block and ending the sequence. The caller now owns `head_val`. - -The crucial property is that `process_gen` acquires the BStack write lock *before* the first read and holds it, unreleased, across every subsequent read and the terminating write. The read of `free_head`, the read of its `next` pointer, and the write that advances `free_head` all happen as one indivisible critical section — not as separate lock acquisitions that another thread's operations could interleave between. - -#### Why a CAS-based design would be unsafe, and how `process_gen` avoids it - -A more "obvious" design would pair `get_batched_gen` (read `head` and `head->next` under a read lock, then release it) with `cas` (re-acquire the write lock, compare, and conditionally write `free_head`). That pairing leaves a race window between releasing the read lock and acquiring the write lock — and the ABA problem exploits exactly that window: `free_head` can return to the same byte value it held at read time even though the list structure underneath has completely changed. - -**Concrete example.** Suppose the free list is `head → H0 → H1 → H2 → …`: - -1. **Thread A** reads `head = H0` and `H0->next = H1`, then releases its read lock. -2. **Thread B** pops H0 (`free_head`: H0 → H1). H0 is now live. -3. **Thread B** pops H1 (`free_head`: H1 → H2). H1 is now live. -4. **Thread B** deallocates H0 — push: writes `H0->next = H2` (the current head), then sets `head = H0`. The free list is now `H0 → H2 → …`. -5. **Thread A** fires its CAS: re-reads `FREE_HEAD_OFFSET`, sees `H0` — matching what it read in step 1 — and writes `H1`. The CAS "succeeds". - -`free_head` is now `H1`, but H1 is still live (allocated to Thread B in step 3): the next `alloc` hands it out a second time — a silent double allocation that no error reports. A no-retry-on-failure policy would not help here; the corruption comes from a *successful* CAS, one that cannot distinguish "head is still H0 because nothing changed" from "head is H0 again because it cycled back". - -`process_gen` makes this scenario structurally impossible rather than merely improbable. Because Thread A would hold the *same* write lock continuously from its first read of `free_head` through its final write, none of Thread B's steps — pop H0, pop H1, push H0 back — could execute in between; every one of them needs that same write lock and so simply blocks until Thread A's whole sequence, including the terminating write, has completed. There is no window in which `free_head` can change value and cycle back, so the byte-value re-comparison that CAS relies on — and that ABA defeats — is unnecessary. No generation counter, no on-disk format change, and no retry policy are needed; the allocator only has to drive the read-read-write sequence through `process_gen`, which already owns the single lock acquisition that makes it atomic. - -### Open questions - -- **Batch push under concurrency.** `push_free_blocks` currently reads `free_head`, builds the entire linked chain in a buffer, then writes the chain and updates `free_head` in two separate calls — not atomic with respect to concurrent pushes. A lock-free batch push requires either (a) holding the allocator mutex for batch operations only, (b) building the chain tail-to-head and using the same `cross_exchange` trick, or (c) a new BStack primitive. The single-block push via `cross_exchange` is already safe; the batch case needs its own solution before the mutex can be removed entirely. - ---- - ## Typed region and I/O parameter types **Feature flag:** None (additive API surface) diff --git a/README.md b/README.md index 5b4820c..e360dbb 100644 --- a/README.md +++ b/README.md @@ -165,6 +165,15 @@ impl BStack { #[cfg(all(feature = "set", feature = "atomic"))] pub fn cross_exchange(&self, a: u64, b: u64, n: u64) -> io::Result<()>; + /// Run a sequence of dependent reads, optionally followed by a single write, under + /// one held write lock. `f` is called in a loop and drives the sequence through + /// `BStackGenOp::{Read, Len, Write, Swap, Push, Pop, Discard}`; at most one of + /// `Write`/`Swap`/`Push`/`Pop`/`Discard` is permitted and ends the sequence. + /// Requires the `set` and `atomic` features. + #[cfg(all(feature = "set", feature = "atomic"))] + pub fn process_gen<'a, F>(&self, f: F) -> io::Result<()> + where F: FnMut() -> Option>; + /// Copy `n` bytes from `from` to `to` under one write lock. Regions may overlap. /// Requires the `set` and `atomic` features. #[cfg(all(feature = "set", feature = "atomic"))] @@ -478,7 +487,7 @@ bstack = { version = "0.2", features = ["set", "atomic"] } - **`swap_into(offset, buf)`** *(requires `set`)* — Like `swap` but exchanges in-place: on entry `buf` holds new bytes, on return holds old bytes. - **`cas(offset, old, new)`** *(requires `set`)* — Overwrite bytes at `offset` with `new` only if they currently equal `old`. Returns `true` if exchanged. File size never changes. - **`process(start, end, f)`** *(requires `set`)* — Read `[start, end)`, pass to `f` as `&mut [u8]` for in-place mutation, write back. File size never changes. `start == end` is a no-op. -- **`process_gen(f)`** *(requires `set`)* — Like `get_batched_gen` but the closure can also end the sequence with a mutation: it yields `BStackGenOp::Read`, `Len`, `Write`, `Swap`, `Push`, or `Pop` values one at a time, and the write lock is held continuously from before the first step through the terminating `Write`/`Swap`/`Push`/`Pop` (at most one, ending the sequence). `Read` and `Len` do not end the sequence. Useful for dependent read-read-write protocols — e.g. mutex-free free-list pop at the allocator layer — where a CAS-based retry loop would leave an ABA window open. `Push` and `Pop` are the only steps that change the file size. +- **`process_gen(f)`** *(requires `set`)* — Like `get_batched_gen` but the closure can also end the sequence with a mutation: it yields `BStackGenOp::Read`, `Len`, `Write`, `Swap`, `Push`, `Pop`, or `Discard` values one at a time (in C, `Discard` is a `Pop` with a `NULL` destination buffer), and the write lock is held continuously from before the first step through the terminating `Write`/`Swap`/`Push`/`Pop`/`Discard` (at most one, ending the sequence). `Read` and `Len` do not end the sequence. Useful for dependent read-read-write protocols — e.g. mutex-free free-list pop at the allocator layer — where a CAS-based retry loop would leave an ABA window open. `Push`, `Pop`, and `Discard` are the only steps that change the file size; `Discard` truncates a tail without reading it back. - **`cross_exchange(a, b, n)`** *(requires `set`)* — Atomically swap two non-overlapping byte regions of length `n` under one write lock. - **`copy(from, to, n)`** *(requires `set`)* — Copy `n` bytes from `from` to `to` under one write lock. Regions may overlap. - **`eq_crds(a_offset, a_expected, b_offset, b_buf)`** *(requires `set`)* — Write `b_buf` at `b_offset` only if bytes at `a_offset` equal `a_expected` (compare-and-swap across two regions). Returns `Some(old_b)` on success, `None` otherwise. @@ -957,7 +966,7 @@ Each free-list mutation is two `BStack` calls: write the next-pointer into the b Without the `atomic` feature it is **not `Sync`**: free-list mutations require a read then a write of `free_head` as separate `BStack` calls — a TOCTOU race under concurrent `&self` access that can result in two callers receiving the same block. -With the `atomic` feature it **is `Sync`**. An internal mutex serialises all compound operations that span multiple `BStack` calls: free-list pop/push and the tail-length check that precedes any `extend` or `discard`. The tail-path operations themselves use `try_discard` / `try_extend_zeros`, which check-and-act atomically under `BStack`'s own write lock without needing the allocator lock. +With the `atomic` feature it **is `Sync`** with no allocator-level lock at all. Free-list pop drives a single `BStack::process_gen` sequence that holds `BStack`'s write lock across the read of `free_head`, the read of the popped block's `next` pointer, and the write that advances `free_head` — closing the ABA window a `get`/`cas` pair would leave open. Free-list push splices a single block (or a whole freed run) onto the head with one `BStack::cross_exchange`. Tail grow/shrink use `try_extend_zeros` / `try_discard`, which check-and-act atomically under `BStack`'s own write lock. Every concurrent `&self` operation is therefore safe through `BStack`'s interior mutability alone — no `Mutex`. #### Constructors @@ -1058,7 +1067,7 @@ Free-list mutations write block payloads before updating `free_head`. The overhe Without the `atomic` feature it is **not `Sync`**: free-list mutations read then write `free_head` as separate `BStack` calls — a TOCTOU race under concurrent `&self` access. -With the `atomic` feature it **is `Sync`**. An internal mutex serialises all compound operations: free-list pop/push, the tail-length checks preceding `extend` or `discard`, and the `recover` scan. Tail deallocations use `try_discard` (atomically checks tail and removes bytes under `BStack`'s own write lock) without needing the allocator lock. The shrink path acquires the lock before the tail check because the overhead must be written before discarding. +With the `atomic` feature it **is `Sync`**. `alloc` / `dealloc` / `realloc` take no allocator-level lock: free-list pop uses a single `BStack::process_gen` sequence, free-list push uses `BStack::cross_exchange`, and tail grow/shrink use `try_extend_zeros` / `try_discard` — all check-and-act atomically under `BStack`'s own write lock (the shrink path writes the overhead before the tail check, since the overhead must be committed before discarding). The one retained `Mutex` is held only by `recover`, to keep recovery single-flight (two concurrent runs could otherwise reclaim the same leaked block twice); the recovery scan itself is serialised against alloc/dealloc/realloc by the `BStack` write lock it holds across one `process_gen` sequence, not by the `Mutex`. #### Constructors @@ -1154,7 +1163,7 @@ All user-visible offsets (returned by `push`, accepted by `peek`/`get`) are | `swap`, `swap_into` *(set+atomic)* | `lseek(offset)` → `read` → `lseek(offset)` → `write(buf)` → sync | | `cas` *(set+atomic)* | `lseek(offset)` → `read` → compare → conditional `write(new)` → sync | | `process` *(set+atomic)* | `lseek(start)` → `read(end−start)` → *(callback)* → `lseek(start)` → `write(buf)` → sync | -| `process_gen` *(set+atomic)* | closure-driven: zero or more `lseek`/`pread` reads and `Len` size queries, ending in at most one mutating step — `lseek` → `write` → sync for `Write`, the two-region read/read/write/write → sync of `cross_exchange` for `Swap`, an append (then `set_len` on rollback) → sync for `Push`, or `lseek` → `read` → `set_len` → sync for `Pop` | +| `process_gen` *(set+atomic)* | closure-driven: zero or more `lseek`/`pread` reads and `Len` size queries, ending in at most one mutating step — `lseek` → `write` → sync for `Write`, the two-region read/read/write/write → sync of `cross_exchange` for `Swap`, an append (then `set_len` on rollback) → sync for `Push`, `lseek` → `read` → `set_len` → sync for `Pop`, or `set_len` → sync for `Discard` (a `Pop` with no read) | | `replace` *(atomic)* | `lseek(tail)` → `read(n)` → *(callback)* → *(then as `atrunc`)* | | `cross_exchange` *(set+atomic)* | `lseek(a)` → `read(n)` → `lseek(b)` → `read(n)` → `lseek(a)` → `write` → `lseek(b)` → `write` → sync | | `copy` *(set+atomic)* | `lseek(from)` → `read(n)` → `lseek(to)` → `write(n)` → sync | diff --git a/c/bstack.c b/c/bstack.c index 6a1962c..8ef6f61 100644 --- a/c/bstack.c +++ b/c/bstack.c @@ -1902,9 +1902,13 @@ int bstack_process_gen(bstack_t *bs, BS_WRUNLOCK(bs); errno = EINVAL; return -1; } if (len > 0) { - uint64_t read_offset = HEADER_SIZE + new_len; - if (plat_pread(bs->fd, buf, len, read_offset) != 0) - goto fail_unlock; + /* buf == NULL discards the bytes without copying them out — + * the in-sequence equivalent of bstack_discard. */ + if (buf != NULL) { + uint64_t read_offset = HEADER_SIZE + new_len; + if (plat_pread(bs->fd, buf, len, read_offset) != 0) + goto fail_unlock; + } if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0 || write_committed_len(bs->fd, new_len) != 0 || plat_durable_sync(bs->fd) != 0) diff --git a/c/bstack.h b/c/bstack.h index 67e2270..6ec5baf 100644 --- a/c/bstack.h +++ b/c/bstack.h @@ -411,7 +411,9 @@ typedef enum { * equivalent of bstack_push. * - BSTACK_GEN_POP: remove the last u.pop.len bytes from the end of the file * into u.pop.buf, shrinking the payload, and ending the sequence — the - * in-sequence equivalent of bstack_pop. + * in-sequence equivalent of bstack_pop. If u.pop.buf is NULL the bytes are + * dropped without being copied out — the in-sequence equivalent of + * bstack_discard. * - BSTACK_GEN_LEN: write the current logical payload size into * *u.len.out and call gen again — the in-sequence equivalent of * bstack_len. Does not end the sequence. @@ -446,7 +448,7 @@ typedef struct { size_t len; } push; struct { - uint8_t *buf; + uint8_t *buf; /* destination, or NULL to discard the bytes */ size_t len; } pop; struct { @@ -532,7 +534,8 @@ int bstack_process(bstack_t *bs, uint64_t start, uint64_t end, * - Returning 1 with *out_op set to BSTACK_GEN_POP removes the last * u.pop.len bytes from the end of the file into u.pop.buf, shrinking the * payload, and ends the sequence; gen is not called again — the - * in-sequence equivalent of bstack_pop. + * in-sequence equivalent of bstack_pop. A NULL u.pop.buf drops the bytes + * without copying them out — the in-sequence equivalent of bstack_discard. * - Returning 1 with *out_op set to BSTACK_GEN_LEN writes the current logical * payload size into *u.len.out and calls gen again — the in-sequence * equivalent of bstack_len, useful when a later step's offset or length diff --git a/c/bstack_alloc.c b/c/bstack_alloc.c index 9ca7947..708de1c 100644 --- a/c/bstack_alloc.c +++ b/c/bstack_alloc.c @@ -3299,6 +3299,9 @@ static uint64_t slab_blocks_needed(uint64_t len, uint64_t block_size) return (len - 1) / block_size + 1; } +/* free_head read/write helpers: only the non-atomic free-list paths use them; + * the atomic paths drive free_head through process_gen / cross_exchange. */ +#ifndef BSTACK_FEATURE_ATOMIC static int slab_read_free_head(bstack_t *bs, uint64_t *out) { uint8_t buf[8]; @@ -3314,6 +3317,7 @@ static int slab_write_free_head(bstack_t *bs, uint64_t val) write_le64(buf, val); return bstack_set(bs, SLAB_FREE_HEAD_OFFSET, buf, 8); } +#endif /* !BSTACK_FEATURE_ATOMIC */ /* * Pop the head block from the free list. @@ -3321,6 +3325,75 @@ static int slab_write_free_head(bstack_t *bs, uint64_t val) * Zeros the block after popping; failure is propagated to preserve the * allocator contract that returned bytes are zero-initialized. */ +#ifdef BSTACK_FEATURE_ATOMIC +/* + * Atomic pop: drive a single bstack_process_gen sequence — read free_head, + * read the popped block's next-pointer, write that next back into free_head — + * under one held write lock, so the read-read-write is indivisible with + * respect to any other thread's free-list operation (closing the ABA window a + * get/cas pair would leave open). The popped block is zeroed in a separate + * call afterwards, since by then it is exclusively owned by the caller. + */ +struct slab_pop_ctx { + uint8_t head_buf[8]; + uint8_t next_buf[8]; + uint64_t popped; + int have_popped; + int step; +}; + +static int slab_pop_gen(bstack_gen_op_t *out_op, void *userctx) +{ + struct slab_pop_ctx *ctx = userctx; + switch (ctx->step++) { + case 0: /* read the current free-list head */ + out_op->kind = BSTACK_GEN_READ; + out_op->u.read.offset = SLAB_FREE_HEAD_OFFSET; + out_op->u.read.buf = ctx->head_buf; + out_op->u.read.len = 8; + return 1; + case 1: { /* empty list ends with no write; else read head's next-pointer */ + uint64_t head = read_le64(ctx->head_buf); + if (head == SLAB_SENTINEL) return 0; + ctx->popped = head; + ctx->have_popped = 1; + out_op->kind = BSTACK_GEN_READ; + out_op->u.read.offset = head; + out_op->u.read.buf = ctx->next_buf; + out_op->u.read.len = 8; + return 1; + } + case 2: /* advance free_head to the popped block's next-pointer */ + out_op->kind = BSTACK_GEN_WRITE; + out_op->u.write.offset = SLAB_FREE_HEAD_OFFSET; + out_op->u.write.data = ctx->next_buf; + out_op->u.write.len = 8; + return 1; + default: + return 0; + } +} + +static int slab_pop_free_block(bstack_t *bs, uint64_t block_size, + uint64_t *out_block) +{ + struct slab_pop_ctx ctx; + memset(&ctx, 0, sizeof ctx); + + if (bstack_process_gen(bs, slab_pop_gen, &ctx) != 0) return -1; + if (!ctx.have_popped) { *out_block = SLAB_SENTINEL; return 0; } + + /* Zero the block after popping; on failure the popped block is leaked but + * not returned to callers in a non-zero state. */ +#if UINT64_MAX > SIZE_MAX + if (block_size > (uint64_t)SIZE_MAX) { errno = EINVAL; return -1; } +#endif + if (bstack_zero(bs, ctx.popped, (size_t)block_size) != 0) return -1; + + *out_block = ctx.popped; + return 0; +} +#else /* !BSTACK_FEATURE_ATOMIC */ static int slab_pop_free_block(bstack_t *bs, uint64_t block_size, uint64_t *out_block) { @@ -3346,9 +3419,28 @@ static int slab_pop_free_block(bstack_t *bs, uint64_t block_size, *out_block = head; return 0; } +#endif /* BSTACK_FEATURE_ATOMIC */ /* * Prepend the block at block_start to the free list. + */ +#ifdef BSTACK_FEATURE_ATOMIC +/* + * Lock-free splice via bstack_cross_exchange: block_start is first seeded with + * a self-pointer placeholder, then atomically swapped with free_head under one + * write lock — free_head becomes block_start and block_start's next-pointer + * becomes the old head, in a single indivisible step. A crash between the two + * calls leaks block_start rather than corrupting the list. + */ +static int slab_push_free_block(bstack_t *bs, uint64_t block_start) +{ + uint8_t buf[8]; + write_le64(buf, block_start); /* placeholder: replaced by old head below */ + if (bstack_set(bs, block_start, buf, 8) != 0) return -1; + return bstack_cross_exchange(bs, block_start, SLAB_FREE_HEAD_OFFSET, 8); +} +#else /* !BSTACK_FEATURE_ATOMIC */ +/* * Writes the next-pointer into the block before updating free_head: * a crash after the first write but before the second leaks the block rather * than corrupting the list. @@ -3363,25 +3455,39 @@ static int slab_push_free_block(bstack_t *bs, uint64_t block_start) if (bstack_set(bs, block_start, buf, 8) != 0) return -1; return slab_write_free_head(bs, block_start); } +#endif /* BSTACK_FEATURE_ATOMIC */ /* * Prepend `count` contiguous blocks starting at `first_block` to the free list. - * Uses exactly 3 IO calls regardless of count: one read of free_head, one bulk - * write of all next-pointers into the freed region, and one write of free_head. - * Crash behaviour matches slab_push_free_block: a crash after the bulk write - * but before free_head update leaks the entire batch rather than corrupting - * the list. + * + * Without atomic: exactly 3 IO calls regardless of count — one read of + * free_head, one bulk write of all next-pointers into the freed region, and + * one write of free_head. A crash after the bulk write but before the + * free_head update leaks the entire batch rather than corrupting the list. + * + * With atomic: generalises slab_push_free_block to a whole run. The chain + * first_block -> ... -> last_block is built in one buffer, with last_block's + * next-pointer set to the placeholder first_block. A single bulk bstack_set + * writes the chain (still unreachable from free_head), then + * bstack_cross_exchange atomically swaps last_block's next-pointer with + * free_head — free_head becomes first_block and last_block's next-pointer + * becomes the old head, splicing the whole run in under one write lock. */ static int slab_push_free_blocks(bstack_t *bs, uint64_t first_block, uint64_t count, uint64_t block_size) { - uint64_t old_head, i, buf_size; + uint64_t i, buf_size; uint8_t *buf; +#ifndef BSTACK_FEATURE_ATOMIC + uint64_t old_head; +#endif if (count == 0) return 0; if (count == 1) return slab_push_free_block(bs, first_block); +#ifndef BSTACK_FEATURE_ATOMIC if (slab_read_free_head(bs, &old_head) != 0) return -1; +#endif if (count > UINT64_MAX / block_size) { errno = EINVAL; return -1; } buf_size = count * block_size; @@ -3395,14 +3501,26 @@ static int slab_push_free_blocks(bstack_t *bs, uint64_t first_block, uint64_t next = first_block + (i + 1) * block_size; write_le64(buf + (size_t)(i * block_size), next); } +#ifdef BSTACK_FEATURE_ATOMIC + /* Placeholder: replaced with the old free_head by cross_exchange below. */ + write_le64(buf + (size_t)((count - 1) * block_size), first_block); +#else write_le64(buf + (size_t)((count - 1) * block_size), old_head); +#endif if (bstack_set(bs, first_block, buf, (size_t)buf_size) != 0) { free(buf); return -1; } free(buf); +#ifdef BSTACK_FEATURE_ATOMIC + { + uint64_t last_block = first_block + (count - 1) * block_size; + return bstack_cross_exchange(bs, last_block, SLAB_FREE_HEAD_OFFSET, 8); + } +#else return slab_write_free_head(bs, first_block); +#endif } /* ---- vtable ------------------------------------------------------------ */ @@ -3424,11 +3542,9 @@ static int slab_vtbl_alloc(bstack_allocator_t *base, uint64_t len, } if (len <= a->block_size) { - int pop_r; - FF_LOCK(a); - pop_r = slab_pop_free_block(a->bs, a->block_size, &block); - FF_UNLOCK(a); - if (pop_r != 0) return -1; + /* Free-list pop is lock-free under atomic (single process_gen + * sequence); single-threaded otherwise. */ + if (slab_pop_free_block(a->bs, a->block_size, &block) != 0) return -1; if (block != SLAB_SENTINEL) { out->allocator = base; out->offset = block; out->len = len; return 0; @@ -3491,14 +3607,9 @@ static int slab_vtbl_dealloc(bstack_allocator_t *base, bstack_slice_t s) #endif } - /* Not at tail (or single-block): push to the free list under the lock. */ - { - int r; - FF_LOCK(a); - r = slab_push_free_blocks(a->bs, s.offset, n_blocks, a->block_size); - FF_UNLOCK(a); - return r; - } + /* Not at tail (or single-block): push to the free list (lock-free under + * atomic via cross_exchange; single-threaded otherwise). */ + return slab_push_free_blocks(a->bs, s.offset, n_blocks, a->block_size); } static int slab_vtbl_realloc(bstack_allocator_t *base, bstack_slice_t s, @@ -3587,11 +3698,11 @@ static int slab_vtbl_realloc(bstack_allocator_t *base, bstack_slice_t s, /* Not at tail (or tail moved under atomic): grow non-tail. * Read old data into a zeroed new_backing-sized buffer, push it as a - * single atomic write, then free the old blocks under the lock. */ + * single atomic write, then free the old blocks (lock-free under + * atomic via cross_exchange). */ { uint8_t *buf; uint64_t push_offset; - int r; #if UINT64_MAX > SIZE_MAX if (new_backing > (uint64_t)SIZE_MAX) { errno = EINVAL; return -1; } @@ -3610,10 +3721,8 @@ static int slab_vtbl_realloc(bstack_allocator_t *base, bstack_slice_t s, } free(buf); - FF_LOCK(a); - r = slab_push_free_blocks(a->bs, s.offset, old_n, a->block_size); - FF_UNLOCK(a); - if (r != 0) return -1; + if (slab_push_free_blocks(a->bs, s.offset, old_n, + a->block_size) != 0) return -1; out->allocator = base; out->offset = push_offset; @@ -3658,18 +3767,13 @@ static int slab_vtbl_realloc(bstack_allocator_t *base, bstack_slice_t s, return 0; } - /* Shrink non-tail: return excess blocks to free list under the lock. */ - { - int r; - FF_LOCK(a); - r = slab_push_free_blocks(a->bs, - s.offset + new_n * a->block_size, - old_n - new_n, a->block_size); - FF_UNLOCK(a); - if (r != 0) return -1; - out->allocator = base; out->offset = s.offset; out->len = new_len; - return 0; - } + /* Shrink non-tail: return excess blocks to free list (lock-free under + * atomic via cross_exchange). */ + if (slab_push_free_blocks(a->bs, + s.offset + new_n * a->block_size, + old_n - new_n, a->block_size) != 0) return -1; + out->allocator = base; out->offset = s.offset; out->len = new_len; + return 0; } } @@ -3700,19 +3804,12 @@ slab_bstack_allocator_t *slab_bstack_allocator_new(bstack_t *bs, a = malloc(sizeof *a); if (!a) { errno = ENOMEM; return NULL; } -#ifdef BSTACK_FEATURE_ATOMIC - if (bstack_alloc_lock_init(&a->lock) != 0) { free(a); return NULL; } -#endif - memset(hdr, 0, sizeof hdr); memcpy(hdr + (size_t)SLAB_OFFSET_SIZE, alsl_magic, 8); write_le64(hdr + (size_t)SLAB_BLOCK_SIZE_OFFSET, block_size); /* free_head at SLAB_FREE_HEAD_OFFSET stays 0 (SLAB_SENTINEL) */ if (bstack_push(bs, hdr, sizeof hdr, NULL) != 0) { -#ifdef BSTACK_FEATURE_ATOMIC - bstack_alloc_lock_destroy(a->lock); -#endif free(a); return NULL; } @@ -3768,10 +3865,6 @@ slab_bstack_allocator_t *slab_bstack_allocator_open(bstack_t *bs) return NULL; } -#ifdef BSTACK_FEATURE_ATOMIC - if (bstack_alloc_lock_init(&a->lock) != 0) { free(a); return NULL; } -#endif - a->base.vtbl = &slab_allocator_vtbl; a->base.bulk_vtbl = NULL; a->bs = bs; @@ -3781,18 +3874,12 @@ slab_bstack_allocator_t *slab_bstack_allocator_open(bstack_t *bs) void slab_bstack_allocator_free(slab_bstack_allocator_t *alloc) { -#ifdef BSTACK_FEATURE_ATOMIC - bstack_alloc_lock_destroy(alloc->lock); -#endif free(alloc); } bstack_t *slab_bstack_allocator_into_stack(slab_bstack_allocator_t *alloc) { bstack_t *bs = alloc->bs; -#ifdef BSTACK_FEATURE_ATOMIC - bstack_alloc_lock_destroy(alloc->lock); -#endif free(alloc); return bs; } @@ -3846,6 +3933,9 @@ static int alck_write_overhead(bstack_t *bs, uint64_t block_start, uint64_t valu /* ---- free_head I/O ----------------------------------------------------- */ +/* Only the non-atomic free-list paths read/write free_head directly; the + * atomic paths drive it through process_gen / cross_exchange. */ +#ifndef BSTACK_FEATURE_ATOMIC static int alck_read_free_head(bstack_t *bs, uint64_t *out) { uint8_t buf[8]; @@ -3861,6 +3951,7 @@ static int alck_write_free_head(bstack_t *bs, uint64_t val) write_le64(buf, val); return bstack_set(bs, ALCK_FREE_HEAD_OFFSET, buf, 8); } +#endif /* !BSTACK_FEATURE_ATOMIC */ /* ---- blocks_needed ----------------------------------------------------- */ @@ -3923,7 +4014,11 @@ static uint64_t alck_valid_in_use(uint64_t overhead, uint64_t p, * *out_free and *out_cnt are set on success; caller must free *out_free. * *out_corrupt is set to 1 if the walk was cut short by structural corruption. * Returns 0 on success, -1 on I/O failure. + * + * Only the non-atomic recovery path uses this; the atomic path performs the + * equivalent walk inline inside its process_gen scan. */ +#ifndef BSTACK_FEATURE_ATOMIC static int alck_scan_free_list(bstack_t *bs, uint64_t stack_len, uint64_t block_size, uint64_t **out_free, size_t *out_cnt, @@ -3994,26 +4089,32 @@ static int alck_scan_free_list(bstack_t *bs, uint64_t stack_len, *out_corrupt = corrupt; return 0; } +#endif /* !BSTACK_FEATURE_ATOMIC */ /* ---- write_free_run ---------------------------------------------------- */ /* * Write the block prefixes for a run of `count` contiguous free blocks - * starting at `first_block`, linking them into a chain whose tail points at - * the current free_head. Does NOT update free_head. + * starting at `first_block`, linking them into a chain. Each block's overhead + * is zeroed and data[0..8] holds the next block's offset. All other data + * bytes in the run are zeroed. Does NOT update free_head. * - * Each block's overhead is zeroed and data[0..8] holds the next block's - * offset (or the old free_head for the last block). All other data bytes in - * the run are zeroed. The single bulk bstack_set is crash-safe: until - * free_head is repointed the whole run is simply unreachable. + * Without atomic the last block's data[0..8] holds the current free_head, which + * the caller then writes into free_head directly. With atomic it holds the + * placeholder first_block; alck_push_free_blocks splices the whole run onto + * free_head with bstack_cross_exchange afterwards. Either way the single bulk + * bstack_set is crash-safe: until the run is spliced in the whole run is simply + * unreachable. */ static int alck_write_free_run(bstack_t *bs, uint64_t first_block, uint64_t count, uint64_t block_size) { - uint64_t old_head, total, i; + uint64_t total, i; uint8_t *buf; - +#ifndef BSTACK_FEATURE_ATOMIC + uint64_t old_head; if (alck_read_free_head(bs, &old_head) != 0) return -1; +#endif if (count > UINT64_MAX / block_size) { errno = EINVAL; return -1; } total = count * block_size; @@ -4026,10 +4127,16 @@ static int alck_write_free_run(bstack_t *bs, uint64_t first_block, for (i = 0; i < count; i++) { uint64_t next; /* overhead at buf[i*block_size..+8] stays zero */ - if (i + 1 < count) + if (i + 1 < count) { next = first_block + (i + 1) * block_size; - else + } else { +#ifdef BSTACK_FEATURE_ATOMIC + /* Placeholder: replaced with the old free_head by cross_exchange. */ + next = first_block; +#else next = old_head; +#endif + } write_le64(buf + (size_t)(i * block_size) + 8, next); } @@ -4046,13 +4153,28 @@ static int alck_write_free_run(bstack_t *bs, uint64_t first_block, * Prepend `count` contiguous blocks at `first_block` to the free list. * Clears their overhead bytes as part of the operation (transitions live → * free). Does nothing if count == 0. + * + * With atomic the splice is a single bstack_cross_exchange on the last block's + * next-pointer slot (data[0..8], at last_block + ALCK_OVERHEAD): it atomically + * swaps that slot — currently the placeholder first_block — with free_head, so + * free_head becomes first_block and the last block's next-pointer becomes the + * old head, in one indivisible step. For count == 1 the first and last block + * coincide, mirroring slab_push_free_block. */ static int alck_push_free_blocks(bstack_t *bs, uint64_t first_block, uint64_t count, uint64_t block_size) { if (count == 0) return 0; if (alck_write_free_run(bs, first_block, count, block_size) != 0) return -1; +#ifdef BSTACK_FEATURE_ATOMIC + { + uint64_t last_block = first_block + (count - 1) * block_size; + return bstack_cross_exchange(bs, last_block + ALCK_OVERHEAD, + ALCK_FREE_HEAD_OFFSET, 8); + } +#else return alck_write_free_head(bs, first_block); +#endif } /* ---- pop_and_claim_block ----------------------------------------------- */ @@ -4063,6 +4185,87 @@ static int alck_push_free_blocks(bstack_t *bs, uint64_t first_block, * Sets *out_block_start to ALCK_SENTINEL (0) if the list is empty. * Returns 0 on success, -1 on error. */ +#ifdef BSTACK_FEATURE_ATOMIC +/* + * Atomic pop: the read of free_head, the read of the popped block's overhead + * and next-pointer (the 16-byte prefix), and the write that advances free_head + * run as a single bstack_process_gen sequence under one held write lock — the + * read-read-write is indivisible with respect to any other free-list operation + * (mirrors slab_pop_free_block, plus the overhead-corruption check). The + * claim write (overhead + zeroed data) happens afterwards on the now-detached + * block, which is exclusively owned by this call. + */ +struct alck_pop_ctx { + uint8_t head_buf[8]; + uint8_t prefix_buf[16]; + uint64_t head; + int have_head; + int corrupt; + int step; +}; + +static int alck_pop_gen(bstack_gen_op_t *out_op, void *userctx) +{ + struct alck_pop_ctx *ctx = userctx; + switch (ctx->step++) { + case 0: /* read the current free-list head */ + out_op->kind = BSTACK_GEN_READ; + out_op->u.read.offset = ALCK_FREE_HEAD_OFFSET; + out_op->u.read.buf = ctx->head_buf; + out_op->u.read.len = 8; + return 1; + case 1: { /* empty list ends with no write; else read head's prefix */ + uint64_t head = read_le64(ctx->head_buf); + if (head == ALCK_SENTINEL) return 0; + ctx->head = head; + ctx->have_head = 1; + out_op->kind = BSTACK_GEN_READ; + out_op->u.read.offset = head; + out_op->u.read.buf = ctx->prefix_buf; + out_op->u.read.len = 16; + return 1; + } + case 2: { /* corrupt overhead ends with no write; else advance free_head */ + uint64_t overhead = read_le64(ctx->prefix_buf); + if (overhead != 0) { ctx->corrupt = 1; return 0; } + out_op->kind = BSTACK_GEN_WRITE; + out_op->u.write.offset = ALCK_FREE_HEAD_OFFSET; + out_op->u.write.data = ctx->prefix_buf + 8; + out_op->u.write.len = 8; + return 1; + } + default: + return 0; + } +} + +static int alck_pop_and_claim_block(bstack_t *bs, uint64_t num_blocks, + uint64_t block_size, + uint64_t *out_block_start) +{ + struct alck_pop_ctx ctx; + uint8_t *block_buf; + memset(&ctx, 0, sizeof ctx); + + if (bstack_process_gen(bs, alck_pop_gen, &ctx) != 0) return -1; + if (!ctx.have_head) { *out_block_start = ALCK_SENTINEL; return 0; } + if (ctx.corrupt) { errno = EINVAL; return -1; } + + /* Mark in-use and zero data in one bulk write on the detached block. */ +#if UINT64_MAX > SIZE_MAX + if (block_size > (uint64_t)SIZE_MAX) { errno = EINVAL; return -1; } +#endif + block_buf = calloc(1, (size_t)block_size); + if (!block_buf) return -1; + write_le64(block_buf, ALCK_IN_USE_BIT | num_blocks); + if (bstack_set(bs, ctx.head, block_buf, (size_t)block_size) != 0) { + free(block_buf); return -1; + } + free(block_buf); + *out_block_start = ctx.head; + return 0; +} +#else /* !BSTACK_FEATURE_ATOMIC */ static int alck_pop_and_claim_block(bstack_t *bs, uint64_t num_blocks, uint64_t block_size, uint64_t *out_block_start) @@ -4095,9 +4298,14 @@ static int alck_pop_and_claim_block(bstack_t *bs, uint64_t num_blocks, *out_block_start = head; return 0; } +#endif /* BSTACK_FEATURE_ATOMIC */ /* ---- recovery helpers -------------------------------------------------- */ +/* These classification helpers serve the non-atomic sequential recover; the + * atomic recover inlines the equivalent logic inside its process_gen scan. */ +#ifndef BSTACK_FEATURE_ATOMIC + typedef enum { ALCK_CLASS_FREE, ALCK_CLASS_LEAKED, @@ -4213,9 +4421,403 @@ static int alck_resync_tail(bstack_t *bs, uint64_t p, uint64_t stack_len, *out_outcome = ALCK_RESYNC_DISCARD_TAIL; return 0; } +#endif /* !BSTACK_FEATURE_ATOMIC */ /* ---- checked_slab_bstack_allocator_recover ----------------------------- */ +#ifdef BSTACK_FEATURE_ATOMIC +/* + * Atomic recovery: the free-list walk (with cycle detection), the linear arena + * scan, and the single optional orphaned-tail discard run as ONE + * bstack_process_gen sequence — every read happens under one held bstack write + * lock, so a concurrent alloc/dealloc cannot mutate the free list between the + * moment it is walked and the moment a block is classified against it. The + * scan is expressed as a pull-driven state machine: the gen callback contains + * its own loop and only returns an op when it needs a Read/Len (or the + * terminating discard); pure transitions `continue` internally. After the + * locked scan, each reclaimed leak is spliced onto the free list lock-free + * (a leak is unreachable by alloc/dealloc, so its state is stable across the + * gap, and the splice is itself an atomic cross_exchange). + * + * The allocator Mutex is held for the whole call solely to make recovery + * single-flight: two overlapping runs could each observe the same leak and + * splice it in twice. Ordinary alloc/dealloc/realloc never take it. + */ +enum alck_recover_st { + ALCK_ST_READ_LEN, + ALCK_ST_READ_FREE_HEAD, + ALCK_ST_CONSUME_FREE_HEAD, + ALCK_ST_WALK_HEAD, + ALCK_ST_CONSUME_NODE, + ALCK_ST_ARENA_AT, + ALCK_ST_CONSUME_ARENA, + ALCK_ST_CONSUME_RESYNC, + ALCK_ST_FINISH, + ALCK_ST_DONE +}; + +struct alck_recover_ctx { + /* buffers filled by Len/Read ops */ + uint8_t node_buf[16]; + uint8_t head_buf[8]; + uint8_t oh_buf[8]; + uint64_t len_out; + + /* immutable inputs */ + uint64_t block_size; + + /* scan cursors / authoritative size */ + uint64_t stack_len; + uint64_t walk_head; /* WALK_HEAD / CONSUME_NODE cursor */ + uint64_t p; /* ARENA_AT / CONSUME_ARENA cursor */ + + /* free-list block offsets (sorted once the walk completes) */ + uint64_t *free_arr; + size_t free_cnt, free_cap; + uint64_t max_blocks; /* cycle-guard bound */ + + /* backward reach DP over a suspicious tail region */ + uint8_t *reach; + size_t reach_m; + uint64_t resync_p; + size_t rj; + + /* results consumed after the locked scan */ + uint64_t *reclaim; + size_t reclaim_cnt, reclaim_cap; + uint64_t unsure; + uint64_t discard_from; + + int free_corrupt; + int has_discard; + int st; +}; + +/* Append to a grow-on-demand u64 array; returns -1 (errno=ENOMEM) on failure. */ +static int alck_u64_push(uint64_t **arr, size_t *cnt, size_t *cap, uint64_t v) +{ + if (*cnt == *cap) { + size_t nc = *cap ? *cap * 2 : 16; + uint64_t *tmp = realloc(*arr, nc * sizeof *tmp); + if (!tmp) { errno = ENOMEM; return -1; } + *arr = tmp; *cap = nc; + } + (*arr)[(*cnt)++] = v; + return 0; +} + +/* Sort free_arr ascending (insertion sort — the list is typically short). */ +static void alck_recover_sort_free(struct alck_recover_ctx *c) +{ + size_t i, j; + for (i = 1; i < c->free_cnt; i++) { + uint64_t key = c->free_arr[i]; + j = i; + while (j > 0 && c->free_arr[j - 1] > key) { + c->free_arr[j] = c->free_arr[j - 1]; + j--; + } + c->free_arr[j] = key; + } +} + +/* Binary search: is p present in the sorted free_arr? */ +static int alck_recover_free_contains(const struct alck_recover_ctx *c, uint64_t p) +{ + size_t lo = 0, hi = c->free_cnt; + while (lo < hi) { + size_t mid = lo + (hi - lo) / 2; + if (c->free_arr[mid] < p) lo = mid + 1; + else if (c->free_arr[mid] > p) hi = mid; + else return 1; + } + return 0; +} + +/* First index in sorted free_arr whose offset is > p (== free_cnt if none). */ +static size_t alck_recover_free_upper(const struct alck_recover_ctx *c, uint64_t p) +{ + size_t lo = 0, hi = c->free_cnt, idx = c->free_cnt; + while (lo < hi) { + size_t mid = lo + (hi - lo) / 2; + if (c->free_arr[mid] <= p) lo = mid + 1; + else { idx = mid; hi = mid; } + } + return idx; +} + +static int alck_recover_gen(bstack_gen_op_t *out_op, void *userctx) +{ + struct alck_recover_ctx *c = userctx; + uint64_t bs_sz = c->block_size; + + for (;;) { + switch (c->st) { + /* Read the authoritative payload size first. */ + case ALCK_ST_READ_LEN: + c->st = ALCK_ST_READ_FREE_HEAD; + out_op->kind = BSTACK_GEN_LEN; + out_op->u.len.out = &c->len_out; + return 1; + + case ALCK_ST_READ_FREE_HEAD: + c->stack_len = c->len_out; + if (c->stack_len <= ALCK_ARENA_START) { c->st = ALCK_ST_FINISH; continue; } + c->max_blocks = (c->stack_len - ALCK_ARENA_START) / bs_sz; + c->st = ALCK_ST_CONSUME_FREE_HEAD; + out_op->kind = BSTACK_GEN_READ; + out_op->u.read.offset = ALCK_FREE_HEAD_OFFSET; + out_op->u.read.buf = c->head_buf; + out_op->u.read.len = 8; + return 1; + + case ALCK_ST_CONSUME_FREE_HEAD: + c->walk_head = read_le64(c->head_buf); + c->st = ALCK_ST_WALK_HEAD; + continue; + + /* Free-list walk: validate the head, then read its node. */ + case ALCK_ST_WALK_HEAD: { + uint64_t head = c->walk_head; + if (head == ALCK_SENTINEL) { + alck_recover_sort_free(c); + c->p = ALCK_ARENA_START; + c->st = ALCK_ST_ARENA_AT; + continue; + } + if (head < ALCK_ARENA_START + || (head - ALCK_ARENA_START) % bs_sz != 0 + || head >= c->stack_len + || (uint64_t)c->free_cnt > c->max_blocks /* cycle guard */) { + c->free_corrupt = 1; + alck_recover_sort_free(c); + c->p = ALCK_ARENA_START; + c->st = ALCK_ST_ARENA_AT; + continue; + } + c->st = ALCK_ST_CONSUME_NODE; + out_op->kind = BSTACK_GEN_READ; + out_op->u.read.offset = head; + out_op->u.read.buf = c->node_buf; + out_op->u.read.len = 16; + return 1; + } + + case ALCK_ST_CONSUME_NODE: { + uint64_t overhead = read_le64(c->node_buf); + if (overhead != 0) { + c->free_corrupt = 1; + alck_recover_sort_free(c); + c->p = ALCK_ARENA_START; + c->st = ALCK_ST_ARENA_AT; + continue; + } + if (alck_u64_push(&c->free_arr, &c->free_cnt, &c->free_cap, + c->walk_head) != 0) return -1; + c->walk_head = read_le64(c->node_buf + 8); + c->st = ALCK_ST_WALK_HEAD; + continue; + } + + /* Linear arena scan: read the overhead at the cursor. */ + case ALCK_ST_ARENA_AT: + if (c->p >= c->stack_len) { c->st = ALCK_ST_FINISH; continue; } + c->st = ALCK_ST_CONSUME_ARENA; + out_op->kind = BSTACK_GEN_READ; + out_op->u.read.offset = c->p; + out_op->u.read.buf = c->oh_buf; + out_op->u.read.len = 8; + return 1; + + case ALCK_ST_CONSUME_ARENA: { + uint64_t overhead = read_le64(c->oh_buf); + uint64_t p = c->p; + uint64_t n; + size_t idx; + uint64_t span, m64; + size_t m; + + if (overhead == 0) { + if (alck_recover_free_contains(c, p)) { + /* Free: reachable from free_head. */ + } else if (c->free_corrupt) { + /* A bare zero-overhead block is only trustworthy as a leak + * while the free list walked cleanly. */ + c->unsure += 1; + } else { + if (alck_u64_push(&c->reclaim, &c->reclaim_cnt, + &c->reclaim_cap, p) != 0) return -1; + } + c->p = p + bs_sz; + c->st = ALCK_ST_ARENA_AT; + continue; + } + n = alck_valid_in_use(overhead, p, c->stack_len, bs_sz, + c->free_arr, c->free_cnt); + if (n > 0) { + c->p = p + n * bs_sz; /* valid_in_use bounds p + n*bs <= stack_len */ + c->st = ALCK_ST_ARENA_AT; + continue; + } + /* Suspicious: prefer a known-free block as resync anchor. */ + idx = alck_recover_free_upper(c, p); + if (idx < c->free_cnt) { + uint64_t f = c->free_arr[idx]; + c->unsure += (f - p) / bs_sz; + c->p = f; + c->st = ALCK_ST_ARENA_AT; + continue; + } + /* No anchor follows: set up the backward reach DP over + * [p, stack_len) (see the non-atomic alck_resync_tail). */ + span = c->stack_len - p; + m64 = span / bs_sz; + if (m64 > (uint64_t)ALCK_MAX_RECOVER_REGION + || m64 > (uint64_t)SIZE_MAX) { + /* Region too large to analyse safely: leave leaked. */ + c->unsure += span / bs_sz; + c->st = ALCK_ST_FINISH; + continue; + } + m = (size_t)m64; /* m >= 1 since p < stack_len and both are aligned */ + free(c->reach); + c->reach = calloc(m + 1, 1); + if (!c->reach) { errno = ENOMEM; return -1; } + c->reach[m] = 1; + c->reach_m = m; + c->resync_p = p; + c->rj = m - 1; + c->st = ALCK_ST_CONSUME_RESYNC; + out_op->kind = BSTACK_GEN_READ; + out_op->u.read.offset = c->resync_p + (uint64_t)c->rj * bs_sz; + out_op->u.read.buf = c->oh_buf; + out_op->u.read.len = 8; + return 1; + } + + case ALCK_ST_CONSUME_RESYNC: { + uint64_t overhead = read_le64(c->oh_buf); + uint64_t off = c->resync_p + (uint64_t)c->rj * bs_sz; + size_t m, jr; + int found; + + if (overhead == 0) { + c->reach[c->rj] = c->reach[c->rj + 1]; + } else { + uint64_t n = alck_valid_in_use(overhead, off, c->stack_len, + bs_sz, c->free_arr, c->free_cnt); + if (n > 0 && c->rj + (size_t)n <= c->reach_m) + c->reach[c->rj] = c->reach[c->rj + (size_t)n]; + else + c->reach[c->rj] = 0; + } + if (c->rj != 0) { + c->rj -= 1; + c->st = ALCK_ST_CONSUME_RESYNC; + out_op->kind = BSTACK_GEN_READ; + out_op->u.read.offset = c->resync_p + (uint64_t)c->rj * bs_sz; + out_op->u.read.buf = c->oh_buf; + out_op->u.read.len = 8; + return 1; + } + /* DP complete. The smallest interior boundary that tiles cleanly to + * the tail is a mid-arena gap to resync on; j=0 is excluded (it + * would contradict the Suspicious classification that got us here). */ + m = c->reach_m; + found = 0; jr = 0; + for (jr = 1; jr < m; jr++) { + if (c->reach[jr]) { found = 1; break; } + } + if (found) { + c->unsure += (uint64_t)jr; + c->p = c->resync_p + (uint64_t)jr * bs_sz; + c->st = ALCK_ST_ARENA_AT; + continue; + } + /* Orphaned tail. Only safe to discard when the free list is + * trusted; otherwise leave it leaked. */ + if (c->free_corrupt) { + c->unsure += (c->stack_len - c->resync_p) / bs_sz; + } else { + c->has_discard = 1; + c->discard_from = c->resync_p; + } + c->st = ALCK_ST_FINISH; + continue; + } + + /* Emit the single permitted mutation: the orphaned-tail discard, as a + * NULL-buffer Pop (drops the bytes without reading them back). */ + case ALCK_ST_FINISH: + c->st = ALCK_ST_DONE; + if (!c->has_discard) return 0; /* no write */ + { + uint64_t dlen = c->stack_len - c->discard_from; +#if UINT64_MAX > SIZE_MAX + if (dlen > (uint64_t)SIZE_MAX) { errno = EINVAL; return -1; } +#endif + out_op->kind = BSTACK_GEN_POP; + out_op->u.pop.buf = NULL; + out_op->u.pop.len = (size_t)dlen; + } + return 1; + + case ALCK_ST_DONE: + default: + return 0; + } + } +} + +int checked_slab_bstack_allocator_recover(checked_slab_bstack_allocator_t *alloc, + uint64_t *out_unsure) +{ + bstack_t *bs = alloc->bs; + struct alck_recover_ctx c; + uint64_t stack_len; + size_t i; + int ret = 0; + + /* Held for the entire call. Phase-1 reads are serialised against + * alloc/dealloc by the bstack write lock inside process_gen; this lock + * instead prevents two recover runs from overlapping (which would let both + * reclaim the same leaked block and double-link it). */ + FF_LOCK(alloc); + + /* Cheap early-out hint; the authoritative size is read under the + * process_gen lock below via BSTACK_GEN_LEN. */ + if (bstack_len(bs, &stack_len) != 0) { FF_UNLOCK(alloc); return -1; } + if (stack_len <= ALCK_ARENA_START) { + if (out_unsure) *out_unsure = 0; + FF_UNLOCK(alloc); return 0; + } + + memset(&c, 0, sizeof c); + c.block_size = alloc->block_size; + c.st = ALCK_ST_READ_LEN; + + if (bstack_process_gen(bs, alck_recover_gen, &c) != 0) { + ret = -1; goto done; + } + + /* Phase 2: splice reclaimed leaks onto the free list, lock-free. Each is + * unreachable by alloc/dealloc, so its leaked state is stable across the + * unlocked gap, and alck_push_free_blocks splices atomically. */ + for (i = 0; i < c.reclaim_cnt; i++) { + if (alck_push_free_blocks(bs, c.reclaim[i], 1, alloc->block_size) != 0) { + ret = -1; goto done; + } + } + if (out_unsure) *out_unsure = c.unsure; + +done: + free(c.free_arr); + free(c.reclaim); + free(c.reach); + FF_UNLOCK(alloc); + return ret; +} +#else /* !BSTACK_FEATURE_ATOMIC */ int checked_slab_bstack_allocator_recover(checked_slab_bstack_allocator_t *alloc, uint64_t *out_unsure) { @@ -4344,6 +4946,7 @@ int checked_slab_bstack_allocator_recover(checked_slab_bstack_allocator_t *alloc FF_UNLOCK(alloc); return ret; } +#endif /* BSTACK_FEATURE_ATOMIC */ /* ---- vtable implementations -------------------------------------------- */ @@ -4367,15 +4970,13 @@ static int alck_vt_alloc(bstack_allocator_t *base, uint64_t len, if (num_blocks == UINT64_MAX) { errno = EINVAL; return -1; } /* overflow */ if (num_blocks == 1) { - /* Lock is scoped to the free-list pop only; the tail-extend fallback - * below runs without the lock (bstack_extend is internally serialised - * and returns a distinct region to each concurrent caller). */ + /* Free-list pop is lock-free under atomic (single process_gen + * sequence); single-threaded otherwise. The tail-extend fallback below + * uses bstack_extend, which is internally serialised and returns a + * distinct region to each concurrent caller. */ uint64_t block_start; - int pop_r; - FF_LOCK(a); - pop_r = alck_pop_and_claim_block(a->bs, 1, a->block_size, &block_start); - FF_UNLOCK(a); - if (pop_r != 0) return -1; + if (alck_pop_and_claim_block(a->bs, 1, a->block_size, &block_start) != 0) + return -1; if (block_start != ALCK_SENTINEL) { out->allocator = base; out->offset = block_start + ALCK_OVERHEAD; @@ -4457,14 +5058,9 @@ static int alck_vt_dealloc(bstack_allocator_t *base, bstack_slice_t s) } #endif - /* Not at tail: push to the free list under the allocator lock. */ - { - int r; - FF_LOCK(a); - r = alck_push_free_blocks(a->bs, block_start, num_blocks, a->block_size); - FF_UNLOCK(a); - return r; - } + /* Not at tail: push to the free list (lock-free under atomic via + * cross_exchange; single-threaded otherwise). */ + return alck_push_free_blocks(a->bs, block_start, num_blocks, a->block_size); } static int alck_vt_realloc(bstack_allocator_t *base, bstack_slice_t s, @@ -4570,8 +5166,8 @@ static int alck_vt_realloc(bstack_allocator_t *base, bstack_slice_t s, } /* Not at tail (or tail moved under atomic): grow non-tail. - * alloc and dealloc each acquire the lock for their own free-list - * and tail operations independently. */ + * alloc and dealloc each handle their own free-list and tail + * operations independently. */ { bstack_slice_t new_s; uint8_t *tmp; @@ -4600,28 +5196,23 @@ static int alck_vt_realloc(bstack_allocator_t *base, bstack_slice_t s, } } - /* Shrink path (new_n < old_n). The lock covers the overhead write, - * tail check, and non-tail free-list update so those steps are atomic - * w.r.t. other threads. */ + /* Shrink path (new_n < old_n). Lock-free under atomic: the overhead + * write is the commit point, try_discard is atomic, and the non-tail + * push splices via cross_exchange. Single-threaded otherwise. */ { uint64_t delta = old_backing - new_backing; uint64_t excess_start; - int r; #if UINT64_MAX > SIZE_MAX if (delta > (uint64_t)SIZE_MAX) { errno = EINVAL; return -1; } #endif - FF_LOCK(a); - /* Overhead is the commit point for both tail and non-tail paths: * write it first so a crash after this point leaves an orphaned * (but safely unreferenced) tail region or leaked blocks that * recover() can reclaim, rather than an overhead that claims more * blocks than the file contains. */ if (alck_write_overhead(a->bs, block_start, - ALCK_IN_USE_BIT | new_n) != 0) { - FF_UNLOCK(a); return -1; - } + ALCK_IN_USE_BIT | new_n) != 0) return -1; /* Tail shrink: try_discard atomically checks tail == sentinel and * removes the excess under bstack's write lock, so no other thread @@ -4631,11 +5222,8 @@ static int alck_vt_realloc(bstack_allocator_t *base, bstack_slice_t s, { int ok = 0; if (bstack_try_discard(a->bs, sentinel, (size_t)delta, - &ok) != 0) { - FF_UNLOCK(a); return -1; - } + &ok) != 0) return -1; if (ok) { - FF_UNLOCK(a); out->allocator = base; out->offset = s.offset; out->len = new_len; return 0; @@ -4644,14 +5232,9 @@ static int alck_vt_realloc(bstack_allocator_t *base, bstack_slice_t s, #else { uint64_t cur_tail; - if (bstack_len(a->bs, &cur_tail) != 0) { - FF_UNLOCK(a); return -1; - } + if (bstack_len(a->bs, &cur_tail) != 0) return -1; if (sentinel == cur_tail) { - if (bstack_discard(a->bs, (size_t)delta) != 0) { - FF_UNLOCK(a); return -1; - } - FF_UNLOCK(a); + if (bstack_discard(a->bs, (size_t)delta) != 0) return -1; out->allocator = base; out->offset = s.offset; out->len = new_len; return 0; @@ -4659,19 +5242,13 @@ static int alck_vt_realloc(bstack_allocator_t *base, bstack_slice_t s, } #endif - /* Shrink non-tail: overhead already written (commit point); write - * free-list metadata into the excess blocks and repoint free_head. */ - if (new_n > UINT64_MAX / a->block_size) { - FF_UNLOCK(a); errno = EINVAL; return -1; - } + /* Shrink non-tail: overhead already written (commit point); push the + * excess blocks onto the free list (lock-free under atomic). */ + if (new_n > UINT64_MAX / a->block_size) { errno = EINVAL; return -1; } excess_start = block_start + new_n * a->block_size; - if (alck_write_free_run(a->bs, excess_start, - old_n - new_n, a->block_size) != 0) { - FF_UNLOCK(a); return -1; - } - r = alck_write_free_head(a->bs, excess_start); - FF_UNLOCK(a); - if (r != 0) return -1; + if (alck_push_free_blocks(a->bs, excess_start, + old_n - new_n, a->block_size) != 0) + return -1; out->allocator = base; out->offset = s.offset; out->len = new_len; return 0; } diff --git a/c/bstack_alloc.h b/c/bstack_alloc.h index 7e566d9..e57feeb 100644 --- a/c/bstack_alloc.h +++ b/c/bstack_alloc.h @@ -725,19 +725,25 @@ bstack_t *ghost_tree_bstack_allocator_into_stack(ghost_tree_bstack_allocator_t * * * Reallocation growing: newly-exposed bytes are always zero-initialised. * - * Crash consistency: each free-list update is two bstack calls (write - * next-pointer then update free_head). A crash between the two leaks the - * block being added or removed but leaves the rest of the list intact. + * Crash consistency: without atomic, each free-list update is two bstack calls + * (write next-pointer then update free_head); a crash between the two leaks the + * block being added or removed but leaves the rest of the list intact. With + * atomic the push is a single bstack_cross_exchange and the pop a single + * bstack_process_gen sequence, so the leak window is one bstack call. * * Thread safety: without -DBSTACK_FEATURE_ATOMIC, an allocator handle must be * used from one thread at a time — free-list mutations are a read then a write * of free_head as separate bstack calls, a TOCTOU race under concurrent access * that can result in two callers receiving the same block. With - * -DBSTACK_FEATURE_ATOMIC the handle owns an in-memory mutex (lock) that - * serialises all compound operations spanning multiple bstack calls: free-list - * pop/push and the tail-length checks preceding extend or discard. bstack - * extend / discard are internally serialised by bstack's own write lock; the - * allocator lock is not held during those calls. + * -DBSTACK_FEATURE_ATOMIC the handle carries no allocator-level lock at all: + * free-list pop drives a single bstack_process_gen sequence (read free_head, + * read the popped block's next-pointer, advance free_head — all under one held + * bstack write lock, closing the ABA window a get/cas pair would leave open); + * free-list push splices a single block (or a whole freed run) onto the head + * with one bstack_cross_exchange; and tail grow/shrink use + * bstack_try_extend_zeros / bstack_try_discard, which check-and-act atomically + * under bstack's own write lock. Every concurrent operation is therefore safe + * through bstack's interior synchronisation alone — no mutex. * * Requires -DBSTACK_FEATURE_SET. * ====================================================================== */ @@ -746,11 +752,6 @@ typedef struct { bstack_allocator_t base; /* must be first — safe cast to bstack_allocator_t * */ bstack_t *bs; uint64_t block_size; -#ifdef BSTACK_FEATURE_ATOMIC - /* Opaque platform mutex; serialises free-list pop/push and tail operations - * so a single handle may be shared across threads. */ - void *lock; -#endif } slab_bstack_allocator_t; /* @@ -826,19 +827,28 @@ uint64_t slab_bstack_allocator_block_size(const slab_bstack_allocator_t *alloc); * multi-block at tail → bstack_discard (crash-safe, single call) * all other cases → each block prepended to free list * - * Crash consistency: each free-list mutation writes block payloads before - * updating free_head. A crash leaks at most the block being operated on; - * the rest of the list stays intact. checked_slab_bstack_allocator_recover - * reclaims leaked blocks by a linear arena scan. + * Crash consistency: each free-list mutation writes block payloads before the + * splice (without atomic) or splices atomically with bstack_cross_exchange + * (with atomic). A crash leaks at most the block being operated on; the rest + * of the list stays intact. checked_slab_bstack_allocator_recover reclaims + * leaked blocks by a linear arena scan. * * Thread safety: without -DBSTACK_FEATURE_ATOMIC, an allocator handle must be * used from one thread at a time — free-list mutations read then write free_head * as separate bstack calls, a TOCTOU race under concurrent access. With - * -DBSTACK_FEATURE_ATOMIC the handle owns an in-memory mutex (lock) that - * serialises all compound operations: free-list pop/push, the tail-length checks - * preceding extend or discard, and the recover scan. bstack_try_discard and - * bstack_try_extend_zeros are used for the tail paths — those check-and-act - * atomically under bstack's own write lock without the allocator lock. + * -DBSTACK_FEATURE_ATOMIC, alloc/dealloc/realloc take no allocator-level lock: + * free-list pop drives a single bstack_process_gen sequence (read free_head, + * read the popped block's overhead and next-pointer, advance free_head — all + * under one held bstack write lock, closing the ABA window a get/cas pair would + * leave open), free-list push splices a block or a whole freed run with one + * bstack_cross_exchange, and tail grow/shrink use bstack_try_extend_zeros / + * bstack_try_discard (check-and-act atomically under bstack's own write lock). + * The handle still owns an in-memory mutex (lock), held only by recover to keep + * it single-flight: the recover scan and its one optional tail discard run as a + * single bstack_process_gen sequence (so the bstack write lock, not the mutex, + * serialises the scan against alloc/dealloc), while the mutex only prevents two + * concurrent recover runs from each reclaiming the same leaked block. Ordinary + * alloc/dealloc/realloc never take it. * * Requires -DBSTACK_FEATURE_SET. * ====================================================================== */ @@ -848,8 +858,8 @@ typedef struct { bstack_t *bs; uint64_t block_size; #ifdef BSTACK_FEATURE_ATOMIC - /* Opaque platform mutex; serialises free-list pop/push and tail operations - * so a single handle may be shared across threads. */ + /* Opaque platform mutex; held only by recover() to keep it single-flight + * (alloc/dealloc/realloc are lock-free). Shared across threads. */ void *lock; #endif } checked_slab_bstack_allocator_t; diff --git a/c/test_bstack.c b/c/test_bstack.c index d2097ed..baf7885 100644 --- a/c/test_bstack.c +++ b/c/test_bstack.c @@ -3220,6 +3220,44 @@ static int test_process_gen_pop_removes_and_ends_sequence(void) return 0; } +static int pg_pop_null_buf_gen(bstack_gen_op_t *out_op, void *userctx) +{ + int *calls = userctx; + (*calls)++; + if (*calls == 1) { + /* NULL destination: discard the tail without copying it out. */ + out_op->kind = BSTACK_GEN_POP; + out_op->u.pop.buf = NULL; + out_op->u.pop.len = 5; + } else { + out_op->kind = BSTACK_GEN_WRITE; + out_op->u.write.offset = 0; + out_op->u.write.data = (const uint8_t *)"NOPE!"; + out_op->u.write.len = 5; + } + return 1; +} + +static int test_process_gen_pop_null_buf_discards_and_ends_sequence(void) +{ + char tmp[64]; make_tmp(tmp, sizeof tmp); + bstack_t *bs = bstack_open(tmp); + CHECK(bs != NULL); + + CHECK(bstack_push(bs, (uint8_t *)"helloworld", 10, NULL) == 0); + int calls = 0; + CHECK(bstack_process_gen(bs, pg_pop_null_buf_gen, &calls) == 0); + CHECK(calls == 1); + + uint64_t len; CHECK(bstack_len(bs, &len) == 0); CHECK(len == 5); + uint8_t buf[5]; size_t w; + CHECK(bstack_peek(bs, 0, buf, &w) == 0); + CHECK(memcmp(buf, "hello", 5) == 0); + + bstack_close(bs); unlink(tmp); + return 0; +} + static int pg_pop_zero_gen(bstack_gen_op_t *out_op, void *userctx) { (void)userctx; @@ -4210,6 +4248,7 @@ int main(void) T(test_process_gen_push_appends_and_ends_sequence); T(test_process_gen_push_empty_data_is_noop_and_ends_sequence); T(test_process_gen_pop_removes_and_ends_sequence); + T(test_process_gen_pop_null_buf_discards_and_ends_sequence); T(test_process_gen_pop_zero_is_noop_and_ends_sequence); T(test_process_gen_pop_exceeds_payload_returns_error); T(test_process_gen_pop_below_locked_returns_error); diff --git a/src/alloc/checked_slab.rs b/src/alloc/checked_slab.rs index 0dbadb6..837d1f4 100644 --- a/src/alloc/checked_slab.rs +++ b/src/alloc/checked_slab.rs @@ -9,6 +9,8 @@ use super::{BStackAllocator, BStackSlice}; use crate::BStack; +#[cfg(feature = "atomic")] +use crate::BStackGenOp; #[cfg(not(feature = "atomic"))] use core::cell::Cell; #[cfg(not(feature = "atomic"))] @@ -101,12 +103,22 @@ const ALCK_MAGIC_PREFIX: [u8; 6] = *b"ALCK\x00\x01"; /// then write `free_head` as separate [`BStack`] calls — a TOCTOU race under /// concurrent `&self` access. /// -/// With the `atomic` feature it **is `Sync`**. An internal [`Mutex`] serialises -/// compound allocator operations that span multiple [`BStack`] calls (free-list -/// pop/push and the public [`recover`](Self::recover) scan). Tail grow/shrink -/// paths use [`BStack::try_extend_zeros`] / [`BStack::try_discard`] to perform -/// check-and-act atomically under `BStack`'s write lock without holding the -/// allocator mutex. +/// With the `atomic` feature it **is `Sync`**. Free-list push and pop — +/// [`alloc`](Self::alloc), [`dealloc`](Self::dealloc), and the free-list paths +/// of [`realloc`](Self::realloc) — use [`BStack::cross_exchange`] and +/// [`BStack::process_gen`] exactly as described for +/// [`SlabBStackAllocator`](super::SlabBStackAllocator), and need no +/// allocator-level lock. Tail grow/shrink paths use +/// [`BStack::try_extend_zeros`] / [`BStack::try_discard`] to perform +/// check-and-act atomically under `BStack`'s write lock, also without a lock. +/// +/// An internal [`Mutex`] is retained solely to make [`recover`](Self::recover) +/// (and the automatic `recover` call in [`open`](Self::open)) single-flight: +/// the recovery *scan* itself is serialised against alloc/dealloc by the +/// [`BStack`] write lock it holds across one [`BStack::process_gen`] sequence, +/// while the `Mutex` only prevents two concurrent `recover` runs from each +/// reclaiming the same leaked block. It plays no part in ordinary +/// alloc/dealloc/realloc. /// /// ``` /// fn assert_send() {} @@ -158,8 +170,9 @@ pub struct CheckedSlabBStackAllocator { /// Cached from the on-disk header; fixed for the lifetime of the allocator. /// Covers the full block including the 8-byte overhead; must be `≥ 16`. block_size: u64, - /// Serialises multi-step free-list and tail operations when `atomic` is - /// enabled, making the allocator `Sync`. + /// Serialises [`recover`](Self::recover) against itself when `atomic` is + /// enabled, so two recovery runs cannot both reclaim the same leaked block. + /// Ordinary alloc/dealloc/realloc never take it — they stay lock-free. #[cfg(feature = "atomic")] lock: Mutex<()>, #[cfg(not(feature = "atomic"))] @@ -167,7 +180,10 @@ pub struct CheckedSlabBStackAllocator { } /// How a single block looks to the recovery scan. -#[cfg(feature = "set")] +/// +/// Only used by the non-`atomic` recovery path; the `atomic` path inlines the +/// equivalent classification inside its `process_gen` state machine. +#[cfg(all(feature = "set", not(feature = "atomic")))] enum BlockClass { /// `overhead == 0` and the block is reachable from `free_head`. Free, @@ -181,7 +197,10 @@ enum BlockClass { /// Outcome of attempting to resynchronise the scan after a suspicious block /// when no free-list block remains as an anchor. -#[cfg(feature = "set")] +/// +/// Only used by the non-`atomic` recovery path; the `atomic` path inlines the +/// equivalent decision inside its `process_gen` state machine. +#[cfg(all(feature = "set", not(feature = "atomic")))] enum ResyncOutcome { /// A later block boundary cleanly tiles to the tail; resume there. The gap /// is mid-arena garbage and is left leaked. @@ -393,23 +412,43 @@ impl CheckedSlabBStackAllocator { /// runs it automatically; it is exposed so callers can inspect the residual /// count or re-run it on demand. /// - /// # Phase 1 — scan (read-only) + /// # Phase 1 — scan (single locked pass) + /// + /// The free-list walk (with cycle detection) and the linear arena scan run + /// as **one** [`BStack::process_gen`] sequence: every read happens under one + /// held `BStack` write lock, so a concurrent [`alloc`](Self::alloc) or + /// [`dealloc`](Self::dealloc) cannot mutate the free list between the moment + /// it is walked and the moment a block is classified against it. A valid + /// in-use block advances the cursor by its block count; a zero-overhead + /// block not in the free list is a leak and is queued for reclaim; anything + /// else is *suspicious*. At a suspicious block the scan resynchronises on the + /// next free-list block if one follows (the intervening gap is left leaked); + /// otherwise a backward reachability pass decides between a mid-arena gap + /// (left leaked) and an orphaned tail from a failed `realloc` truncation. The + /// orphaned-tail discard is the single mutation permitted in the sequence, + /// expressed as a terminating [`BStackGenOp::Pop`]; because it is performed + /// under the same lock as the scan that chose it, no concurrent `alloc` can + /// extend the tail into the region being discarded. /// - /// The free list is walked into a sorted set (with cycle detection), then - /// the arena is scanned linearly. A valid in-use block advances the cursor - /// by its block count; a zero-overhead block not in the free list is a leak - /// and is queued for reclaim; anything else is *suspicious*. At a suspicious - /// block the scan resynchronises on the next free-list block if one follows - /// (the intervening gap is left leaked); otherwise a backward reachability - /// pass decides between a mid-arena gap (left leaked) and an orphaned tail - /// from a failed `realloc` truncation (discarded). + /// # Phase 2 — reclaim (one block at a time, lock-free) /// - /// # Phase 2 — apply (one block at a time) + /// After the locked scan, each reclaimed block is prepended to the free + /// list individually with `push_free_blocks` — the list is never + /// rebuilt. This runs **outside** the scan lock and may overlap a + /// concurrent `alloc`/`dealloc`: a reclaimed block is a *leak*, reachable by + /// neither (no live slice points at it and it is absent from the free list), + /// so its state cannot change between the scan and the splice, and each + /// splice is itself an atomic [`BStack::cross_exchange`]. A crash mid-reclaim + /// simply leaves the remaining leaks to be re-found on the next run. /// - /// Any orphaned tail is discarded, then each reclaimed block is prepended to - /// the existing free list individually — the list is never rebuilt. Every - /// step is an atomic [`BStack`] operation, so a crash mid-recovery simply - /// leaves the remaining leaks to be re-found on the next run. + /// # Concurrency + /// + /// The allocator-level [`Mutex`] is held for the whole call. It no longer + /// guards individual free-list reads — Phase 1 does that with the `BStack` + /// write lock — but it serialises `recover` against **itself**: two + /// overlapping runs could each observe the same block as leaked and splice + /// it into the free list twice, corrupting the list. The lock makes recovery + /// single-flight; ordinary `alloc`/`dealloc` never take it. /// /// # Safety of destructive steps /// @@ -423,9 +462,290 @@ impl CheckedSlabBStackAllocator { /// /// * Any [`io::Error`] propagated from the underlying [`BStack`] operations /// during the arena scan or while applying reclaim / tail-discard steps. + #[cfg(feature = "atomic")] pub fn recover(&self) -> io::Result { - #[cfg(feature = "atomic")] + // Held for the entire call. Phase 1's reads are serialised against + // alloc/dealloc by the `BStack` write lock inside `process_gen`; this + // lock instead prevents two `recover` runs from overlapping, which would + // let both reclaim the same leaked block and double-link it. let _guard = self.lock.lock().unwrap(); + + // Cheap early-out for an empty allocator. Only a hint: the authoritative + // payload size is taken under the `process_gen` lock below via `Len`. + if self.stack.len()? <= Self::ARENA_START { + return Ok(0); + } + let bs = self.block_size; + + // Results produced by the locked scan, consumed after it returns. + let mut reclaim: Vec = Vec::new(); + let mut unsure: u64 = 0; + let mut discard_from: Option = None; + + // Resume points for the pull-driven `process_gen` state machine. Each + // variant carries the cursor it operates on; shared working state + // (free set, reach DP) lives in the captured locals below. + #[derive(Clone, Copy)] + enum St { + ReadLen, + ReadFreeHead, + ConsumeFreeHead, + WalkHead(u64), + ConsumeNode(u64), + ArenaAt(u64), + ConsumeArena(u64), + ConsumeResync, + Finish, + Done, + } + let mut st = St::ReadLen; + // Authoritative payload size, read under the lock via `Len`. + let mut stack_len: u64 = 0; + // Sorted free-block offsets and whether the walk was cut short. + let mut free: Vec = Vec::new(); + let mut free_corrupt = false; + let mut seen: HashSet = HashSet::new(); + // Backward reach DP over a suspicious tail region `[resync_p, stack_len)`. + let mut reach: Vec = Vec::new(); + let mut resync_p: u64 = 0; + let mut rj: usize = 0; + + // Buffers filled by `Len`/`Read`; declared here so they outlive the + // `process_gen` borrow. The transmutes detach the borrow from these + // locals (see `pop_and_claim_block` for the same idiom); each is safe + // because the local outlives the call and is never aliased while an op + // holds the reference — `process_gen` resolves each op fully before it + // calls the closure again. + let mut len_out: u64 = 0; + let mut head_buf = [0u8; 8]; + let mut node_buf = [0u8; 16]; + let mut oh_buf = [0u8; 8]; + + self.stack.process_gen(|| { + loop { + match st { + // Read the authoritative payload size first. + St::ReadLen => { + st = St::ReadFreeHead; + return Some(BStackGenOp::Len { + // SAFETY: `len_out` outlives this `process_gen` call. + out: unsafe { + core::mem::transmute::<&mut u64, &mut u64>(&mut len_out) + }, + }); + } + St::ReadFreeHead => { + stack_len = len_out; + if stack_len <= Self::ARENA_START { + st = St::Finish; + continue; + } + st = St::ConsumeFreeHead; + return Some(BStackGenOp::Read { + offset: Self::FREE_HEAD_OFFSET, + // SAFETY: `head_buf` outlives this `process_gen` call. + buf: unsafe { + core::mem::transmute::<&mut [u8], &mut [u8]>(&mut head_buf[..]) + }, + }); + } + St::ConsumeFreeHead => { + st = St::WalkHead(u64::from_le_bytes(head_buf)); + continue; + } + // Free-list walk: validate `head`, then read its node. + St::WalkHead(head) => { + if head == Self::SENTINEL { + free.sort_unstable(); + st = St::ArenaAt(Self::ARENA_START); + continue; + } + if head < Self::ARENA_START + || (head - Self::ARENA_START) % bs != 0 + || head >= stack_len + || !seen.insert(head) + { + free_corrupt = true; + free.sort_unstable(); + st = St::ArenaAt(Self::ARENA_START); + continue; + } + st = St::ConsumeNode(head); + return Some(BStackGenOp::Read { + offset: head, + // SAFETY: `node_buf` outlives this `process_gen` call. + buf: unsafe { + core::mem::transmute::<&mut [u8], &mut [u8]>(&mut node_buf[..]) + }, + }); + } + St::ConsumeNode(head) => { + let overhead = u64::from_le_bytes(node_buf[0..8].try_into().unwrap()); + if overhead != 0 { + free_corrupt = true; + free.sort_unstable(); + st = St::ArenaAt(Self::ARENA_START); + continue; + } + free.push(head); + let next = u64::from_le_bytes(node_buf[8..16].try_into().unwrap()); + st = St::WalkHead(next); + continue; + } + // Linear arena scan: read the overhead at `p`. + St::ArenaAt(p) => { + if p >= stack_len { + st = St::Finish; + continue; + } + st = St::ConsumeArena(p); + return Some(BStackGenOp::Read { + offset: p, + // SAFETY: `oh_buf` outlives this `process_gen` call. + buf: unsafe { + core::mem::transmute::<&mut [u8], &mut [u8]>(&mut oh_buf[..]) + }, + }); + } + St::ConsumeArena(p) => { + let overhead = u64::from_le_bytes(oh_buf); + if overhead == 0 { + if free.binary_search(&p).is_ok() { + // Free: reachable from free_head. + } else if free_corrupt { + // A bare zero-overhead block is only trustworthy + // as a leak while the free list walked cleanly; a + // corrupt list means it might already be linked. + unsure += 1; + } else { + reclaim.push(p); + } + st = St::ArenaAt(p + bs); + continue; + } + if let Some(n) = self.valid_in_use(overhead, p, stack_len, &free) { + // valid_in_use guarantees p + n*bs <= stack_len. + st = St::ArenaAt(p + n * bs); + continue; + } + // Suspicious: prefer a known-free block as resync anchor. + let idx = free.partition_point(|&x| x <= p); + if let Some(&f) = free.get(idx) { + unsure += (f - p) / bs; + st = St::ArenaAt(f); + continue; + } + // No anchor follows: set up the backward reach DP over + // `[p, stack_len)` (see the non-atomic `resync_tail`). + let span = stack_len - p; + match usize::try_from(span / bs) { + Ok(m) if m <= Self::MAX_RECOVER_REGION => { + // m >= 1 since p < stack_len and both are aligned. + reach = vec![false; m + 1]; + reach[m] = true; + resync_p = p; + rj = m - 1; + st = St::ConsumeResync; + return Some(BStackGenOp::Read { + offset: resync_p + (rj as u64) * bs, + // SAFETY: `oh_buf` outlives this call. + buf: unsafe { + core::mem::transmute::<&mut [u8], &mut [u8]>( + &mut oh_buf[..], + ) + }, + }); + } + // Region too large to analyse safely: leave leaked. + _ => { + unsure += span / bs; + st = St::Finish; + continue; + } + } + } + St::ConsumeResync => { + let overhead = u64::from_le_bytes(oh_buf); + let off = resync_p + (rj as u64) * bs; + reach[rj] = if overhead == 0 { + reach[rj + 1] + } else if let Some(n) = self.valid_in_use(overhead, off, stack_len, &free) { + // valid_in_use guarantees rj + n <= m. + reach[rj + n as usize] + } else { + false + }; + if rj != 0 { + rj -= 1; + st = St::ConsumeResync; + return Some(BStackGenOp::Read { + offset: resync_p + (rj as u64) * bs, + // SAFETY: `oh_buf` outlives this call. + buf: unsafe { + core::mem::transmute::<&mut [u8], &mut [u8]>(&mut oh_buf[..]) + }, + }); + } + // DP complete. The smallest interior boundary that tiles + // cleanly to the tail is a mid-arena gap to resync on; + // j=0 is excluded (it would contradict the Suspicious + // classification that brought us here). + let m = reach.len() - 1; + if let Some(jr) = (1..m).find(|&j| reach[j]) { + unsure += jr as u64; + st = St::ArenaAt(resync_p + (jr as u64) * bs); + continue; + } + // Orphaned tail. Only safe to discard when the free list + // is trusted; otherwise leave it leaked. + if free_corrupt { + unsure += (stack_len - resync_p) / bs; + } else { + discard_from = Some(resync_p); + } + st = St::Finish; + continue; + } + // Emit the single permitted mutation: the tail discard. + // `Discard` drops the bytes without reading them, so no + // throwaway buffer is needed. + St::Finish => { + // No tail discard chosen: end the sequence with no write. + let t = discard_from?; + st = St::Done; + return Some(BStackGenOp::Discard { len: stack_len - t }); + } + // `Discard` is terminal, so this is defensive only. + St::Done => return None, + } + } + })?; + + // Phase 2: splice reclaimed leaks onto the free list, lock-free. Each + // block is unreachable by alloc/dealloc, so its leaked state is stable + // across the unlocked gap, and `push_free_blocks` splices atomically. + for b in reclaim { + self.push_free_blocks(b, 1)?; + } + Ok(unsure) + } + + /// Repair the allocator after an unclean shutdown and return the number of + /// blocks that remain leaked or could not be classified with certainty + /// (`0` means the arena is fully accounted for). + /// + /// Single-threaded (non-`atomic`) variant: the free-list walk and arena + /// scan run as ordinary sequential [`BStack`] reads, then any orphaned tail + /// is discarded and each reclaimed block is prepended to the free list. See + /// the `atomic` build for the crash- and concurrency-safety contract; here + /// `&self` access is assumed exclusive. + /// + /// # Errors + /// + /// * Any [`io::Error`] propagated from the underlying [`BStack`] operations + /// during the arena scan or while applying reclaim / tail-discard steps. + #[cfg(not(feature = "atomic"))] + pub fn recover(&self) -> io::Result { let stack_len = self.stack.len()?; if stack_len <= Self::ARENA_START { return Ok(0); @@ -499,6 +819,10 @@ impl CheckedSlabBStackAllocator { /// pointer, a head whose overhead is non-zero, or a cycle (detected with a /// visited set bounded by the arena block count) — and reports it via the /// returned flag (`true` = the walk was cut short by corruption). + /// + /// The `atomic` recovery path performs the equivalent walk inline inside + /// its `process_gen` scan, so this helper is only compiled without `atomic`. + #[cfg(not(feature = "atomic"))] fn scan_free_list(&self, stack_len: u64) -> io::Result<(Vec, bool)> { let mut free = Vec::new(); let mut seen: HashSet = HashSet::new(); @@ -527,6 +851,10 @@ impl CheckedSlabBStackAllocator { } /// Classify the block at `p` for the recovery scan. + /// + /// The `atomic` recovery path inlines this classification inside its + /// `process_gen` scan, so this helper is only compiled without `atomic`. + #[cfg(not(feature = "atomic"))] fn classify(&self, p: u64, stack_len: u64, free: &[u64]) -> io::Result { let overhead = self.read_overhead(p)?; if overhead == 0 { @@ -579,6 +907,10 @@ impl CheckedSlabBStackAllocator { /// clean walk (only free or valid in-use blocks) lands exactly on /// `stack_len`. The smallest such interior boundary is a mid-arena gap to /// resync on; if none exists the region is an orphaned tail to discard. + /// + /// The `atomic` recovery path runs the equivalent reach DP inline inside + /// its `process_gen` scan, so this helper is only compiled without `atomic`. + #[cfg(not(feature = "atomic"))] fn resync_tail(&self, p: u64, stack_len: u64, free: &[u64]) -> io::Result { let bs = self.block_size; let m = match usize::try_from((stack_len - p) / bs) { @@ -641,6 +973,98 @@ impl CheckedSlabBStackAllocator { /// Advances `free_head` then writes the full block in one call: the overhead /// is set to `IN_USE_BIT | num_blocks` and the data bytes are zeroed. A /// crash between the two writes merely leaks the detached block. + /// + /// # `atomic` feature + /// + /// The read of `free_head`, the read of the popped block's overhead and + /// `next` pointer (`data[0..8]`), and the write that advances `free_head` + /// run as a single [`BStack::process_gen`] sequence under one held write + /// lock — see [`SlabBStackAllocator::pop_free_block`](super::SlabBStackAllocator). + /// The final claim write (overhead + zeroed data) happens afterwards on + /// the now-detached block, which is exclusively owned by this call. + #[cfg(feature = "atomic")] + fn pop_and_claim_block(&self, num_blocks: u64) -> io::Result> { + let mut head_buf = [0u8; 8]; + let mut prefix_buf = [0u8; 16]; + let mut step = 0usize; + let mut head_opt: Option = None; + let mut corrupt: Option = None; + + self.stack.process_gen(|| { + let op = match step { + // Step 0: read the current free-list head. + 0 => Some(BStackGenOp::Read { + offset: Self::FREE_HEAD_OFFSET, + // SAFETY: `head_buf` outlives this `process_gen` call. + buf: unsafe { core::mem::transmute::<&mut [u8], &mut [u8]>(&mut head_buf[..]) }, + }), + // Step 1: an empty list ends the sequence with no write; + // otherwise read the head block's overhead and next-pointer. + 1 => { + let head = u64::from_le_bytes(head_buf); + if head == Self::SENTINEL { + None + } else { + head_opt = Some(head); + Some(BStackGenOp::Read { + offset: head, + // SAFETY: `prefix_buf` outlives this `process_gen` call. + buf: unsafe { + core::mem::transmute::<&mut [u8], &mut [u8]>(&mut prefix_buf[..]) + }, + }) + } + } + // Step 2: a non-zero overhead means the free list is corrupt — + // end the sequence with no write and report the error after. + // Otherwise advance free_head to the popped block's next + // pointer, still under the lock acquired for step 0's read. + 2 => { + let overhead = u64::from_le_bytes(prefix_buf[0..8].try_into().unwrap()); + if overhead != 0 { + corrupt = Some(overhead); + None + } else { + Some(BStackGenOp::Write { + offset: Self::FREE_HEAD_OFFSET, + // SAFETY: `prefix_buf` outlives this `process_gen` call. + data: unsafe { + core::mem::transmute::<&[u8], &[u8]>(&prefix_buf[8..16]) + }, + }) + } + } + _ => None, + }; + step += 1; + op + })?; + + let Some(head) = head_opt else { + return Ok(None); + }; + if let Some(overhead) = corrupt { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "free-list block at {head} has non-zero overhead {overhead:#018x}; free list corrupt" + ), + )); + } + // Mark in-use and zero data in one write. + let mut block_buf = vec![0u8; self.block_size as usize]; // safe: validated in new/open + block_buf[..8].copy_from_slice(&(Self::IN_USE_BIT | num_blocks).to_le_bytes()); + self.stack.set(head, block_buf)?; + Ok(Some(head)) + } + + /// Pop the head block off the free list, mark it in use with `num_blocks`, + /// and return its block start offset, or `None` if the list is empty. + /// + /// Advances `free_head` then writes the full block in one call: the overhead + /// is set to `IN_USE_BIT | num_blocks` and the data bytes are zeroed. A + /// crash between the two writes merely leaks the detached block. + #[cfg(not(feature = "atomic"))] fn pop_and_claim_block(&self, num_blocks: u64) -> io::Result> { let head = u64::from_le_bytes(read_bstack!(self.stack, Self::FREE_HEAD_OFFSET => u64)); if head == Self::SENTINEL { @@ -667,16 +1091,28 @@ impl CheckedSlabBStackAllocator { } /// Write the block prefixes for a run of `count` contiguous free blocks - /// starting at `first_block`, linking them into a chain whose tail points at - /// the current `free_head`. Does **not** update `free_head`. + /// starting at `first_block`, linking them into a chain. All blocks' + /// overhead is cleared (transitioning a live allocation into free blocks) + /// and `data[0..8]` is set to the next block's offset. + /// + /// # `atomic` feature + /// + /// The last block's `data[0..8]` is set to the placeholder `first_block`; + /// [`push_free_blocks`](Self::push_free_blocks) splices the whole run onto + /// `free_head` with [`BStack::cross_exchange`] afterwards. Does **not** + /// touch `free_head`. /// - /// Each block's overhead is set to zero and its `data[0..8]` to the next - /// block's offset (or the existing `free_head` for the last block). All - /// other data bytes in the run are zeroed. The single bulk - /// [`BStack::set`] makes this crash-safe: until `free_head` is repointed the - /// whole run is simply unreachable. + /// # non-`atomic` + /// + /// The last block's `data[0..8]` is set to the current `free_head`, whose + /// value the caller then writes into `free_head` directly. Does **not** + /// update `free_head`. + /// + /// The single bulk [`BStack::set`] makes this crash-safe: until the run is + /// spliced in (or `free_head` is repointed), it is simply unreachable. fn write_free_run(&self, first_block: u64, count: u64) -> io::Result<()> { debug_assert!(count > 0); + #[cfg(not(feature = "atomic"))] let old_head = read_bstack!(self.stack, Self::FREE_HEAD_OFFSET => u64); let total = count.checked_mul(self.block_size).ok_or_else(|| { io::Error::new( @@ -720,7 +1156,16 @@ impl CheckedSlabBStackAllocator { })?; next.to_le_bytes() } else { - old_head + #[cfg(feature = "atomic")] + { + // Placeholder: replaced with the old free_head by + // cross_exchange in push_free_blocks. + first_block.to_le_bytes() + } + #[cfg(not(feature = "atomic"))] + { + old_head + } }; // Overhead at buf[base..base+8] stays zero; next pointer at data[0..8]. buf[base + 8..base + 16].copy_from_slice(&next_bytes); @@ -731,13 +1176,46 @@ impl CheckedSlabBStackAllocator { /// Prepend `count` contiguous blocks starting at `first_block` to the free /// list. The blocks' overhead bytes are cleared as part of the operation, /// so this also transitions a live allocation into free blocks. + /// + /// # `atomic` feature + /// + /// [`write_free_run`](Self::write_free_run) writes the chain with the last + /// block's `data[0..8]` set to the placeholder `first_block`. A single + /// [`BStack::cross_exchange`] then atomically swaps that slot with + /// `free_head`: `free_head` becomes `first_block` (the new head) and the + /// last block's next-pointer becomes the old head — splicing the whole run + /// in under one write lock, lock-free. For `count == 1`, the first and last + /// block are the same, exactly mirroring + /// [`SlabBStackAllocator::push_free_block`](super::SlabBStackAllocator). fn push_free_blocks(&self, first_block: u64, count: u64) -> io::Result<()> { if count == 0 { return Ok(()); } self.write_free_run(first_block, count)?; - self.stack - .set(Self::FREE_HEAD_OFFSET, first_block.to_le_bytes()) + + #[cfg(feature = "atomic")] + { + let last_block = first_block + .checked_add((count - 1).checked_mul(self.block_size).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "last free-list offset overflows u64", + ) + })?) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "last block offset overflows u64", + ) + })?; + self.stack + .cross_exchange(last_block + Self::OVERHEAD, Self::FREE_HEAD_OFFSET, 8) + } + #[cfg(not(feature = "atomic"))] + { + self.stack + .set(Self::FREE_HEAD_OFFSET, first_block.to_le_bytes()) + } } } @@ -785,29 +1263,21 @@ impl BStackAllocator for CheckedSlabBStackAllocator { let num_blocks = self.blocks_needed(len)?; - if num_blocks == 1 { - // Lock is scoped to the free-list pop only; the tail-extend fallback - // below runs without the lock (see comment there). - let free_block = { - #[cfg(feature = "atomic")] - let _guard = self.lock.lock().unwrap(); - self.pop_and_claim_block(1)? - // guard dropped here - }; - if let Some(block_start) = free_block { - // SAFETY: - // 1. No overflow: `block_start` is a valid in-bounds arena offset; - // `block_start + OVERHEAD + len ≤ block_start + block_size ≤ stack_len ≤ u64::MAX` - // because `blocks_needed(len) == 1` implies `len ≤ data_size = block_size − OVERHEAD`. - // 2. In bounds: the block was just popped from the free list and - // marked in-use by `pop_and_claim_block`; the full `block_size` - // region is part of the arena and present in the stack payload. - // 3. Alloc origin: the slice spans exactly the data region of this - // allocation and may safely be passed to `dealloc`/`realloc`. - return Ok(unsafe { - BStackSlice::from_raw_parts(self, block_start + Self::OVERHEAD, len) - }); - } + if num_blocks == 1 + && let Some(block_start) = self.pop_and_claim_block(1)? + { + // SAFETY: + // 1. No overflow: `block_start` is a valid in-bounds arena offset; + // `block_start + OVERHEAD + len ≤ block_start + block_size ≤ stack_len ≤ u64::MAX` + // because `blocks_needed(len) == 1` implies `len ≤ data_size = block_size − OVERHEAD`. + // 2. In bounds: the block was just popped from the free list and + // marked in-use by `pop_and_claim_block`; the full `block_size` + // region is part of the arena and present in the stack payload. + // 3. Alloc origin: the slice spans exactly the data region of this + // allocation and may safely be passed to `dealloc`/`realloc`. + return Ok(unsafe { + BStackSlice::from_raw_parts(self, block_start + Self::OVERHEAD, len) + }); } // Tail-extend path (single-block with no free block, and all multi-block @@ -919,9 +1389,7 @@ impl BStackAllocator for CheckedSlabBStackAllocator { return self.stack.discard(backing); } - // Not at tail: push to the free list under the allocator lock. - #[cfg(feature = "atomic")] - let _guard = self.lock.lock().unwrap(); + // Not at tail: push to the free list. self.push_free_blocks(block_start, num_blocks) } @@ -1078,8 +1546,8 @@ impl BStackAllocator for CheckedSlabBStackAllocator { } // Not at tail (or tail moved under atomic): grow non-tail. - // alloc and dealloc each acquire the lock for their own free-list and - // tail operations independently. + // alloc and dealloc each handle their own free-list and tail + // operations independently. let new_slice = self.alloc(new_len)?; let data = slice.read()?; new_slice.write(&data)?; @@ -1087,11 +1555,7 @@ impl BStackAllocator for CheckedSlabBStackAllocator { return Ok(new_slice); } - // Shrink path (new_n < old_n). Lock covers the overhead write, tail - // check, and non-tail free-list update. - #[cfg(feature = "atomic")] - let _guard = self.lock.lock().unwrap(); - + // Shrink path (new_n < old_n). // Overhead is the commit point for both tail and non-tail paths: write // it first so a crash after this point leaves an orphaned (but safely // unreferenced) tail region or leaked blocks that recover() can reclaim, @@ -1151,9 +1615,7 @@ impl BStackAllocator for CheckedSlabBStackAllocator { .ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidInput, "free start overflows u64") })?; - self.write_free_run(excess_start, old_n - new_n)?; - self.stack - .set(Self::FREE_HEAD_OFFSET, excess_start.to_le_bytes())?; + self.push_free_blocks(excess_start, old_n - new_n)?; // SAFETY: // 1. No overflow: slice.start() + new_len ≤ block_start + OVERHEAD + new_n * block_size − OVERHEAD ≤ u64::MAX // because new_n * block_size ≤ old_backing ≤ stack_len. diff --git a/src/alloc/mod.rs b/src/alloc/mod.rs index af52d8b..0bd4ac3 100644 --- a/src/alloc/mod.rs +++ b/src/alloc/mod.rs @@ -50,8 +50,11 @@ //! * [`SlabBStackAllocator`] — a fixed-block slab allocator (requires both //! `alloc` **and** `set` features). All blocks are exactly `block_size` //! bytes with no per-block overhead; freed blocks form an intrusive -//! singly-linked free list. O(1) alloc and dealloc. `Send` but not -//! `Sync`. *Experimental.* +//! singly-linked free list. O(1) alloc and dealloc. `Send` without the +//! `atomic` feature (not `Sync`); `Send + Sync` with `atomic`, with no +//! allocator-level lock — free-list pop/push use [`BStack::process_gen`] / +//! [`BStack::cross_exchange`] and tail grow/shrink use the `try_*` ops. +//! *Experimental.* //! //! * [`CheckedSlabBStackAllocator`] — a crash-recoverable variant of //! [`SlabBStackAllocator`] (requires both `alloc` **and** `set` features). @@ -63,7 +66,11 @@ //! on disk is `data_size + 8`. [`open`](CheckedSlabBStackAllocator::open) //! calls [`recover`](CheckedSlabBStackAllocator::recover) automatically to //! reclaim leaked blocks and repair orphaned tails from unclean shutdowns. -//! `Send` but not `Sync`. *Experimental.* +//! `Send` without the `atomic` feature (not `Sync`); `Send + Sync` with +//! `atomic`, where `alloc`/`dealloc`/`realloc` are lock-free (as for +//! [`SlabBStackAllocator`]) and an internal `Mutex` keeps +//! [`recover`](CheckedSlabBStackAllocator::recover) single-flight. +//! *Experimental.* //! //! * [`BStackByteVec`] — a growable byte (`u8`) vector backed by a //! [`BStack`] allocation (requires both `alloc` **and** `set`). Mirrors the diff --git a/src/alloc/slab.rs b/src/alloc/slab.rs index a0e5fd8..c4ac661 100644 --- a/src/alloc/slab.rs +++ b/src/alloc/slab.rs @@ -6,13 +6,13 @@ use super::{BStackAllocator, BStackSlice}; use crate::BStack; +#[cfg(feature = "atomic")] +use crate::BStackGenOp; #[cfg(not(feature = "atomic"))] use core::cell::Cell; #[cfg(not(feature = "atomic"))] use core::marker::PhantomData; use core::num::NonZeroU64; -#[cfg(feature = "atomic")] -use std::sync::Mutex; use std::{fmt, io}; #[cfg(feature = "set")] @@ -62,6 +62,12 @@ const ALSL_MAGIC_PREFIX: [u8; 6] = *b"ALSL\x00\x01"; /// block being operated on; the rest of the free list remains consistent and /// the file can be used without recovery. /// +/// With the `atomic` feature, free-list push and pop instead use +/// [`BStack::cross_exchange`] and [`BStack::process_gen`] (see +/// [Thread safety](#thread-safety)); the call counts in the table below are +/// for the non-`atomic` implementation, but the atomicity class of each +/// operation is unchanged or improved. +/// /// # Method safety /// /// | Method | Atomicity | `BStack` op | Crash effect. | @@ -97,18 +103,22 @@ const ALSL_MAGIC_PREFIX: [u8; 6] = *b"ALSL\x00\x01"; /// TOCTOU race under concurrent `&self` access that can result in two callers /// receiving the same block. /// -/// With the `atomic` feature it **is `Sync`**. An internal [`Mutex`] serialises -/// free-list pop/push operations that require multiple [`BStack`] calls. -/// Tail grow/shrink paths use [`BStack::try_extend_zeros`] / [`BStack::try_discard`] -/// to perform check-and-act atomically under `BStack`'s write lock without holding -/// the allocator mutex. +/// With the `atomic` feature it **is `Sync`** with no allocator-level lock. +/// Free-list push uses [`BStack::cross_exchange`] to splice a block (or a +/// whole freed run) onto the head in one atomic step; free-list pop drives a +/// single [`BStack::process_gen`] sequence that holds `BStack`'s write lock +/// across the read of `free_head`, the read of the popped block's `next` +/// pointer, and the write that advances `free_head`. Tail grow/shrink paths +/// use [`BStack::try_extend_zeros`] / [`BStack::try_discard`] to perform +/// check-and-act atomically under `BStack`'s write lock. Every concurrent +/// `&self` operation is therefore safe without any `Mutex`. /// ``` /// fn assert_send() {} /// assert_send::(); /// ``` /// /// Without `atomic` the type is `!Sync` (this fails to compile); with `atomic` -/// the internal `Mutex` makes it `Sync` (this compiles): +/// `BStack`'s own interior mutability makes it `Sync` (this compiles): /// #[cfg_attr(not(feature = "atomic"), doc = "```compile_fail")] #[cfg_attr(feature = "atomic", doc = "```")] @@ -128,10 +138,6 @@ pub struct SlabBStackAllocator { stack: BStack, /// Cached from the on-disk header; fixed for the lifetime of the allocator. block_size: u64, - /// Serialises multi-step free-list and tail operations when `atomic` is - /// enabled, making the allocator `Sync`. - #[cfg(feature = "atomic")] - lock: Mutex<()>, #[cfg(not(feature = "atomic"))] _not_sync: PhantomData>, } @@ -193,8 +199,6 @@ impl SlabBStackAllocator { Ok(Self { stack, block_size, - #[cfg(feature = "atomic")] - lock: Mutex::new(()), #[cfg(not(feature = "atomic"))] _not_sync: PhantomData, }) @@ -278,8 +282,6 @@ impl SlabBStackAllocator { Ok(Self { stack, block_size: stored_block_size, - #[cfg(feature = "atomic")] - lock: Mutex::new(()), #[cfg(not(feature = "atomic"))] _not_sync: PhantomData, }) @@ -291,6 +293,70 @@ impl SlabBStackAllocator { } /// Pop the head block from the free list. Returns its payload offset, or `None`. + /// + /// # `atomic` feature + /// + /// Drives a single [`BStack::process_gen`] sequence — read `free_head`, + /// read its `next` pointer, write `next` back into `free_head` — under one + /// held write lock, so the read-read-write is indivisible with respect to + /// any other thread's free-list operations. The freed block is zeroed in a + /// separate call afterwards, since by then it is exclusively owned by the + /// caller. + #[cfg(feature = "atomic")] + fn pop_free_block(&self) -> io::Result> { + let mut head_buf = [0u8; 8]; + let mut next_buf = [0u8; 8]; + let mut step = 0usize; + let mut popped: Option = None; + + self.stack.process_gen(|| { + let op = match step { + // Step 0: read the current free-list head. + 0 => Some(BStackGenOp::Read { + offset: Self::FREE_HEAD_OFFSET, + // SAFETY: `head_buf` outlives this `process_gen` call. + buf: unsafe { core::mem::transmute::<&mut [u8], &mut [u8]>(&mut head_buf[..]) }, + }), + // Step 1: an empty list ends the sequence with no write; + // otherwise read the head block's next-pointer. + 1 => { + let head = u64::from_le_bytes(head_buf); + if head == Self::SENTINEL { + None + } else { + popped = Some(head); + Some(BStackGenOp::Read { + offset: head, + // SAFETY: `next_buf` outlives this `process_gen` call. + buf: unsafe { + core::mem::transmute::<&mut [u8], &mut [u8]>(&mut next_buf[..]) + }, + }) + } + } + // Step 2: advance free_head to the popped block's next pointer, + // still under the lock acquired for step 0's read. + 2 => Some(BStackGenOp::Write { + offset: Self::FREE_HEAD_OFFSET, + // SAFETY: `next_buf` outlives this `process_gen` call. + data: unsafe { core::mem::transmute::<&[u8], &[u8]>(&next_buf[..]) }, + }), + _ => None, + }; + step += 1; + op + })?; + + let Some(head) = popped else { + return Ok(None); + }; + self.stack.zero(head, self.block_size)?; + // SAFETY: head is not zero since we checked for the SENTINEL case above, so it is a valid NonZeroU64 + Ok(Some(head.try_into().unwrap())) + } + + /// Pop the head block from the free list. Returns its payload offset, or `None`. + #[cfg(not(feature = "atomic"))] fn pop_free_block(&self) -> io::Result> { let head = u64::from_le_bytes(read_bstack!(self.stack, Self::FREE_HEAD_OFFSET => u64)); if head == Self::SENTINEL { @@ -306,6 +372,24 @@ impl SlabBStackAllocator { } /// Prepend the block at `block_start` to the free list. + /// + /// # `atomic` feature + /// + /// Lock-free splice via [`BStack::cross_exchange`]: `block_start` is first + /// seeded with a self-pointer placeholder, then atomically swapped with + /// `free_head` under one write lock — `free_head` becomes `block_start` and + /// `block_start`'s next-pointer becomes the old head, in a single + /// indivisible step. A crash between the two calls leaks `block_start` + /// rather than corrupting the list. + #[cfg(feature = "atomic")] + fn push_free_block(&self, block_start: u64) -> io::Result<()> { + self.stack.set(block_start, block_start.to_le_bytes())?; + self.stack + .cross_exchange(block_start, Self::FREE_HEAD_OFFSET, 8) + } + + /// Prepend the block at `block_start` to the free list. + #[cfg(not(feature = "atomic"))] fn push_free_block(&self, block_start: u64) -> io::Result<()> { // Write the next-pointer into the block before updating free_head: a // crash after this write but before the header update leaks the block @@ -320,15 +404,22 @@ impl SlabBStackAllocator { /// Prepend `count` contiguous blocks starting at `first_block` to the free list. /// - /// Uses exactly 3 IO calls regardless of `count`: one read of `free_head`, - /// one bulk write of all next-pointers into the freed region, and one write - /// of the new `free_head`. Crash behaviour matches `push_free_block`: a - /// crash after the bulk write but before the `free_head` update leaks the - /// entire batch rather than corrupting the list. - /// /// Requires that count * block_size does not overflow u64 and /// first_block + count * block_size does not overflow u64 and is a valid offset /// on the stack by the caller. + /// + /// # `atomic` feature + /// + /// Generalises [`push_free_block`](Self::push_free_block) to a whole run: + /// the chain `first_block -> first_block + block_size -> ... -> last_block` + /// is built in one buffer, with `last_block`'s next-pointer set to the + /// placeholder `first_block`. A single bulk [`BStack::set`] writes the + /// chain (still unreachable from `free_head`), then + /// [`BStack::cross_exchange`] atomically swaps `last_block`'s next-pointer + /// with `free_head` — `free_head` becomes `first_block` and `last_block`'s + /// next-pointer becomes the old head, splicing the whole run in under one + /// write lock. For `count == 1`, `first_block == last_block` and this is + /// exactly `push_free_block`. fn push_free_blocks(&self, first_block: u64, count: u64) -> io::Result<()> { if count == 0 { return Ok(()); @@ -336,7 +427,6 @@ impl SlabBStackAllocator { if count == 1 { return self.push_free_block(first_block); } - let old_head = read_bstack!(self.stack, Self::FREE_HEAD_OFFSET => u64); let total_bytes = count.checked_mul(self.block_size).ok_or_else(|| { io::Error::new( io::ErrorKind::InvalidInput, @@ -356,20 +446,33 @@ impl SlabBStackAllocator { ) })?; let mut buf = vec![0u8; buf_size]; - for i in 0..count - 1 { - let next = first_block - .checked_add((i + 1).checked_mul(self.block_size).ok_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "next block index multiplication overflows u64", - ) - })?) - .ok_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "next block offset overflows u64", - ) - })?; + for i in 0..count { + let next = if i + 1 < count { + first_block + .checked_add((i + 1).checked_mul(self.block_size).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "next block index multiplication overflows u64", + ) + })?) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "next block offset overflows u64", + ) + })? + } else { + #[cfg(feature = "atomic")] + { + // Placeholder: replaced with the old free_head by + // cross_exchange below. + first_block + } + #[cfg(not(feature = "atomic"))] + { + u64::from_le_bytes(read_bstack!(self.stack, Self::FREE_HEAD_OFFSET => u64)) + } + }; let off = usize::try_from(i.checked_mul(self.block_size).ok_or_else(|| { io::Error::new( io::ErrorKind::InvalidInput, @@ -384,23 +487,31 @@ impl SlabBStackAllocator { })?; buf[off..off + 8].copy_from_slice(&next.to_le_bytes()); } - let last_off = - usize::try_from((count - 1).checked_mul(self.block_size).ok_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "last free-list offset overflows u64", - ) - })?) - .map_err(|_| { - io::Error::new( - io::ErrorKind::InvalidInput, - "last free-list offset overflows usize", - ) - })?; - buf[last_off..last_off + 8].copy_from_slice(&old_head); self.stack.set(first_block, buf)?; - self.stack - .set(Self::FREE_HEAD_OFFSET, first_block.to_le_bytes()) + + #[cfg(feature = "atomic")] + { + let last_block = first_block + .checked_add((count - 1).checked_mul(self.block_size).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "last free-list offset overflows u64", + ) + })?) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "last block offset overflows u64", + ) + })?; + self.stack + .cross_exchange(last_block, Self::FREE_HEAD_OFFSET, 8) + } + #[cfg(not(feature = "atomic"))] + { + self.stack + .set(Self::FREE_HEAD_OFFSET, first_block.to_le_bytes()) + } } /// Number of `block_size` blocks required to back `len` bytes. @@ -453,8 +564,6 @@ impl BStackAllocator for SlabBStackAllocator { } if len <= self.block_size { - #[cfg(feature = "atomic")] - let _guard = self.lock.lock().unwrap(); if let Some(block) = self.pop_free_block()? { // SAFETY: block is a valid block_size region from pop_free_block return Ok(unsafe { BStackSlice::from_raw_parts(self, block.into(), len) }); @@ -516,9 +625,7 @@ impl BStackAllocator for SlabBStackAllocator { return self.stack.discard(backing_size); } - // Not at tail (or single-block): push to the free list under the lock. - #[cfg(feature = "atomic")] - let _guard = self.lock.lock().unwrap(); + // Not at tail (or single-block): push to the free list. self.push_free_blocks(slice.start(), n_blocks) } @@ -624,8 +731,6 @@ impl BStackAllocator for SlabBStackAllocator { self.stack .get_into(slice.start(), &mut data_buf[..old_visible_len])?; let new_ptr = self.stack.push(data_buf)?; - #[cfg(feature = "atomic")] - let _guard = self.lock.lock().unwrap(); self.push_free_blocks(slice.start(), old_n)?; // SAFETY: new_len fits within the new_n blocks of the newly pushed region return Ok(unsafe { BStackSlice::from_raw_parts(self, new_ptr, new_len) }); @@ -664,8 +769,6 @@ impl BStackAllocator for SlabBStackAllocator { .ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidInput, "free start overflows u64") })?; - #[cfg(feature = "atomic")] - let _guard = self.lock.lock().unwrap(); self.push_free_blocks(free_start, old_n - new_n)?; // SAFETY: new_len fits within the first new_n retained blocks Ok(unsafe { BStackSlice::from_raw_parts(self, slice.start(), new_len) }) diff --git a/src/lib.rs b/src/lib.rs index c48cd7e..8cf0077 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -394,9 +394,9 @@ //! with the `atomic` feature, where an internal `Mutex` serialises AVL tree //! mutations. //! -//! * [`SlabBStackAllocator`] — **Experimental.** Fixed-block slab allocator. All blocks are -//! exactly `block_size` bytes with no per-block header or footer; freed -//! blocks are tracked via an intrusive singly-linked free list stored in +//! * [`SlabBStackAllocator`] — **Experimental.** Fixed-block slab allocator. +//! All blocks are exactly `block_size` bytes with no per-block header or footer; +//! freed blocks are tracked via an intrusive singly-linked free list stored in //! the first 8 bytes of each free block. O(1) alloc and dealloc. //! Use [`SlabBStackAllocator::new`] to initialise an empty stack and //! [`SlabBStackAllocator::open`] to reopen an existing one. @@ -2220,6 +2220,18 @@ pub enum BStackGenOp<'a> { /// popped. buf: &'a mut [u8], }, + /// Remove the last `len` bytes from the end of the file, shrinking the + /// payload by `len` bytes, and ending the sequence. + /// + /// The dropped bytes are **not** read back — the in-sequence equivalent of + /// [`discard`](BStack::discard), and the buffer-free counterpart of + /// [`Pop`](Self::Pop) (mirroring a C `pop` with a `NULL` destination). + /// Useful for truncating a tail whose size is only known once earlier + /// `Read`s have been resolved, without allocating a throwaway buffer. + Discard { + /// Number of bytes to remove from the end of the file. + len: u64, + }, /// Write the current logical payload size, in bytes, into `out`, then /// call `f` again — does not end the sequence. Len { @@ -2644,6 +2656,10 @@ impl BStack { /// - `Some(BStackGenOp::Pop { buf })` removes the last `buf.len()` bytes /// from the end of the file into `buf`, shrinking the payload, and ends /// the sequence — the in-sequence equivalent of [`pop`](Self::pop). + /// - `Some(BStackGenOp::Discard { len })` removes the last `len` bytes from + /// the end of the file without reading them back, shrinking the payload, + /// and ends the sequence — the in-sequence equivalent of + /// [`discard`](Self::discard) and the buffer-free counterpart of `Pop`. /// - `Some(BStackGenOp::Len { out })` writes the current logical payload /// size into `out` and calls `f` again — the in-sequence equivalent of /// [`len`](Self::len), useful when a later step's offset depends on the @@ -2652,17 +2668,17 @@ impl BStack { /// reads alone inform a decision, including the decision to change /// nothing. /// - /// `Write`, `Swap`, `Push`, and `Pop` are the only mutating operations, - /// exactly one is permitted per call, and any one of them ends the - /// sequence immediately — `f` is not called again afterwards. + /// `Write`, `Swap`, `Push`, `Pop`, and `Discard` are the only mutating + /// operations, exactly one is permitted per call, and any one of them ends + /// the sequence immediately — `f` is not called again afterwards. /// /// Holding the write lock across every read and the final mutation means /// no other thread can observe or modify any region of the file in /// between — the guarantee that [`get_batched_gen`](Self::get_batched_gen) /// followed by a separate [`cas`](Self::cas) cannot provide, since the two /// separate lock acquisitions leave an ABA window. The mutated region(s) - /// need not overlap any region that was read. `Push` and `Pop` are the - /// only steps that change the file size. + /// need not overlap any region that was read. `Push`, `Pop`, and + /// `Discard` are the only steps that change the file size. /// /// Reads of the locked region `[0, locked_len())` are permitted, matching /// [`get`](Self::get) — locked bytes are immutable, so observing them @@ -2680,10 +2696,11 @@ impl BStack { /// Returns [`io::ErrorKind::InvalidInput`] if any `offset + len` overflows /// `u64`, if a read, write, or swap range exceeds the current payload /// size, if the two `Swap` regions overlap, if a write or swap range - /// overlaps the locked region `[0, locked_len())`, if a `Pop` removes more - /// bytes than the current payload size, or if a `Pop` would shrink the - /// payload below the locked length. Propagates any I/O error from - /// `read_exact`, `write_all`, `set_len`, or `durable_sync`. + /// overlaps the locked region `[0, locked_len())`, if a `Pop` or `Discard` + /// removes more bytes than the current payload size, or if a `Pop` or + /// `Discard` would shrink the payload below the locked length. Propagates + /// any I/O error from `read_exact`, `write_all`, `set_len`, or + /// `durable_sync`. #[cfg(all(feature = "set", feature = "atomic"))] pub fn process_gen<'a, F>(&self, mut f: F) -> io::Result<()> where @@ -2895,6 +2912,31 @@ impl BStack { } return Ok(()); } + Some(BStackGenOp::Discard { len }) => { + if len > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "process_gen: discard({len}) exceeds payload size ({data_size})" + ), + )); + } + let new_data_len = data_size - len; + if new_data_len < locked { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "process_gen: discard({len}) would shrink payload below locked length ({locked})" + ), + )); + } + if len > 0 { + file.set_len(HEADER_SIZE + new_data_len)?; + write_committed_len(&mut file, new_data_len)?; + durable_sync(&file)?; + } + return Ok(()); + } Some(BStackGenOp::Len { out }) => { *out = data_size; } diff --git a/src/test.rs b/src/test.rs index ee945ed..c5db157 100644 --- a/src/test.rs +++ b/src/test.rs @@ -4670,6 +4670,101 @@ mod atomic_tests { assert_eq!(s.peek(0).unwrap(), b"helloworld"); } + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn process_gen_discard_removes_and_ends_sequence() { + use crate::BStackGenOp; + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + let mut calls = 0usize; + s.process_gen(|| { + calls += 1; + match calls { + 1 => Some(BStackGenOp::Discard { len: 5 }), + _ => Some(BStackGenOp::Write { + offset: 0, + data: b"NOPE!", + }), + } + }) + .unwrap(); + assert_eq!(calls, 1, "Discard must end the sequence, like Pop"); + assert_eq!(s.len().unwrap(), 5); + assert_eq!(s.peek(0).unwrap(), b"hello"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn process_gen_discard_zero_is_noop_and_ends_sequence() { + use crate::BStackGenOp; + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + s.process_gen(|| Some(BStackGenOp::Discard { len: 0 })) + .unwrap(); + assert_eq!(s.len().unwrap(), 5); + assert_eq!(s.peek(0).unwrap(), b"hello"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn process_gen_discard_exceeds_payload_returns_error() { + use crate::BStackGenOp; + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hi").unwrap(); + let err = s + .process_gen(|| Some(BStackGenOp::Discard { len: 10 })) + .unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + assert_eq!(s.len().unwrap(), 2); + assert_eq!(s.peek(0).unwrap(), b"hi"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn process_gen_discard_below_locked_returns_error() { + use crate::BStackGenOp; + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + s.lock_up_to(8).unwrap(); + let err = s + .process_gen(|| Some(BStackGenOp::Discard { len: 5 })) + .unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + assert_eq!(s.len().unwrap(), 10); + assert_eq!(s.peek(0).unwrap(), b"helloworld"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn process_gen_len_informs_discard_size() { + use crate::BStackGenOp; + // Discard a trailing region whose length is only known once `Len` has + // reported the current size — the buffer-free analogue of the Pop case. + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"keepDROP").unwrap(); + let mut size = 0u64; + let mut calls = 0usize; + s.process_gen(|| { + calls += 1; + match calls { + // SAFETY: `size` outlives this whole `process_gen` call. + 1 => Some(BStackGenOp::Len { + out: unsafe { core::mem::transmute::<&mut u64, _>(&mut size) }, + }), + _ => Some(BStackGenOp::Discard { len: size - 4 }), + } + }) + .unwrap(); + assert_eq!(size, 8); + assert_eq!(s.len().unwrap(), 4); + assert_eq!(s.peek(0).unwrap(), b"keep"); + } + #[cfg(all(feature = "set", feature = "atomic"))] #[test] fn process_gen_len_reports_current_size_and_continues() {