Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `BStackGenOp::Len { out }` / `BSTACK_GEN_LEN` (Rust + C, `set` + `atomic`): writes the current logical payload size into `out` and, unlike the mutating variants, does not end the sequence — the in-sequence equivalent of `len`, useful when a later step's offset or length depends on the current payload size.
- `BStackGenOp::Discard { len }` (Rust) / `BSTACK_GEN_POP` with a `NULL` `u.pop.buf` (C) (`set` + `atomic`): removes the last `len` bytes from the end of the file without reading them back, shrinking the payload and ending the sequence — the in-sequence, buffer-free equivalent of `discard` and the counterpart of `Pop`. Useful for truncating a tail whose size is only known once earlier `Read`s/`Len` have resolved, without allocating a throwaway buffer. In Rust this is a dedicated variant (slices cannot be null); in C it is expressed idiomatically as a `Pop` whose destination pointer is `NULL`. Errors on the same conditions as `Pop`.

### Fixed

- **`atrunc`, `splice`, `splice_into`, `replace` (Rust, `atomic`) and `bstack_atrunc`, `bstack_splice` (C, `BSTACK_FEATURE_ATOMIC`) — committed-length write not durably synced**: The header `clen` write that commits the new payload length was the last step of these operations and was never followed by `durable_sync`/`plat_durable_sync`, so a crash could leave the on-disk `clen` update only in the OS page cache. Every commit of a new `clen` — including best-effort rollback writes after a failed commit — is now followed by a sync. Crate-level durability table updated to reflect the additional sync.

### Changed

- **`BStack::len` (Rust) / `bstack_len` (C) and `BStack::is_empty` / `bstack_is_empty` no longer make a syscall**: The committed payload length is now cached in memory and kept in sync by every write-lock-held operation that commits a new length to the header. `len`/`is_empty` read this cache under the read lock instead of calling `File::metadata` (Rust) or `fstat`/`GetFileSizeEx` (C). Behaviour and signatures are unchanged.
- **`SlabBStackAllocator` and `CheckedSlabBStackAllocator` — `alloc` / `dealloc` / `realloc` are now lock-free under the `atomic` feature** (`alloc` + `set` features): The allocator-level `Mutex` that previously serialised free-list push/pop is gone from these paths. Free-list pop now drives a single `BStack::process_gen` sequence (read `free_head`, read the popped block's `next`, advance `free_head` — all under one held `BStack` write lock, closing the ABA window a `get`/`cas` pair would leave open); free-list push splices a single block or a whole freed run onto the head with one `BStack::cross_exchange`; tail grow/shrink use `BStack::try_extend_zeros` / `BStack::try_discard` (atomic check-and-act under `BStack`'s own write lock). `SlabBStackAllocator` drops its allocator-level `Mutex` entirely and is `Sync` purely through `BStack`'s interior mutability. `CheckedSlabBStackAllocator` retains a `Mutex` solely for `recover` (see below); none of `alloc` / `dealloc` / `realloc` take it. The on-disk format is unchanged — no magic-number bump.
- **`CheckedSlabBStackAllocator::recover` runs under its own mutex** (`alloc` + `set` features, `atomic`): the `Mutex` is held for the full call solely to keep recovery single-flight, preventing two concurrent runs from reclaiming the same leaked block twice. The scan itself (free-list walk, arena classification, and its one optional tail discard) runs as a single `BStack::process_gen` sequence, so the `BStack` write lock — not the `Mutex` — serialises it against the lock-free `alloc` / `dealloc` / `realloc`. Ordinary `alloc` / `dealloc` / `realloc` never take the `Mutex`.

Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,10 @@ maps to `io::ErrorKind::WouldBlock` in Rust). The lock is released when the

## Thread safety

`BStack` wraps the file in a `RwLock<File>`.
`BStack` wraps the file in a `RwLock`. The committed payload length is also
cached in memory and kept in sync with the on-disk header by every
write-lock-held operation, so `len`/`is_empty` can be answered under the read
lock without any `File::metadata` syscall.

| Operation | Lock (Unix / Windows) | Lock (other) |
|--------------------------------------------------------------|-----------------------|--------------|
Expand Down
157 changes: 102 additions & 55 deletions c/bstack.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ struct bstack {
pthread_rwlock_t lock;
pthread_mutex_t cache_mutex;
#endif
/* Cached copy of the on-disk header's committed payload length (clen).
* Seeded from the validated header at construction time (after recovery)
* and kept in sync by every write-lock-held operation that commits a new
* clen to the header, via write_committed_len. bstack_len and
* bstack_is_empty read it under the same lock used for the on-disk
* state, so no extra synchronisation is needed. */
uint64_t clen;
/* Monotonically growing partition boundary. Bytes in [0, locked) are
* immutable and can be read without the rwlock on supported platforms.
* Not persisted — resets to 0 on every open. */
Expand Down Expand Up @@ -300,9 +307,14 @@ static int write_le64(bstack_fd_t fd, uint64_t file_offset, uint64_t val)
* Header helpers
* ---------------------------------------------------------------------- */

static int write_committed_len(bstack_fd_t fd, uint64_t len)
/* Overwrite the committed-length field at file offset 8 and update the
* in-memory cache (*clen) to match. */
static int write_committed_len(bstack_fd_t fd, uint64_t *clen, uint64_t len)
{
return write_le64(fd, 8, len);
if (write_le64(fd, 8, len) != 0)
return -1;
*clen = len;
return 0;
}

static int init_header(bstack_fd_t fd)
Expand Down Expand Up @@ -399,6 +411,7 @@ bstack_t *bstack_open(const char *path)
return NULL;
}

uint64_t clen = 0;
if (raw_size == 0) {
/* New file — write header and sync. */
if (init_header(fd) != 0 || plat_durable_sync(fd) != 0) {
Expand All @@ -413,26 +426,28 @@ bstack_t *bstack_open(const char *path)
return NULL;
} else {
/* Existing file — validate header and crash-recover if needed. */
uint64_t clen;
if (read_header(fd, &clen) != 0) {
uint64_t committed_len;
if (read_header(fd, &committed_len) != 0) {
int saved = errno;
close_fd(fd);
errno = saved;
return NULL;
}

uint64_t actual = raw_size - HEADER_SIZE;
if (actual != clen) {
uint64_t correct = (clen < actual) ? clen : actual;
if (actual != committed_len) {
uint64_t correct = (committed_len < actual) ? committed_len : actual;
if (plat_ftruncate(fd, HEADER_SIZE + correct) != 0 ||
write_committed_len(fd, correct) != 0 ||
write_committed_len(fd, &clen, correct) != 0 ||
plat_durable_sync(fd) != 0)
{
int saved = errno;
close_fd(fd);
errno = saved;
return NULL;
}
} else {
clen = committed_len;
}
}

Expand All @@ -442,6 +457,7 @@ bstack_t *bstack_open(const char *path)
return NULL;
}
bs->fd = fd;
bs->clen = clen;
bs->locked = ATOMIC_INIT(0);
bs->cache_enabled = 0;
bs->cache_buf = NULL;
Expand Down Expand Up @@ -516,12 +532,16 @@ int bstack_push(bstack_t *bs, const uint8_t *data, size_t len,
}

uint64_t new_len = logical_offset + (uint64_t)len;
if (write_committed_len(bs->fd, new_len) != 0 ||
if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 ||
plat_durable_sync(bs->fd) != 0)
{
/* Rollback: remove written data and reset committed length. */
/* Rollback: remove written data and reset committed length. The
* cache is reset up front so it reflects the rolled-back file even
* if the best-effort header rewrite below fails. */
plat_ftruncate(bs->fd, raw_size);
write_committed_len(bs->fd, logical_offset);
bs->clen = logical_offset;
write_committed_len(bs->fd, &bs->clen, logical_offset);
plat_durable_sync(bs->fd);
Comment thread
williamwutq marked this conversation as resolved.
goto fail_unlock;
}

Expand Down Expand Up @@ -566,12 +586,16 @@ int bstack_extend(bstack_t *bs, size_t n, uint64_t *out_offset)
goto fail_unlock;

uint64_t new_len = logical_offset + (uint64_t)n;
if (write_committed_len(bs->fd, new_len) != 0 ||
if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 ||
plat_durable_sync(bs->fd) != 0)
{
/* Rollback: truncate and reset committed length. */
/* Rollback: truncate and reset committed length. The cache is reset
* up front so it reflects the rolled-back file even if the
* best-effort header rewrite below fails. */
plat_ftruncate(bs->fd, raw_size);
write_committed_len(bs->fd, logical_offset);
bs->clen = logical_offset;
write_committed_len(bs->fd, &bs->clen, logical_offset);
plat_durable_sync(bs->fd);
Comment thread
williamwutq marked this conversation as resolved.
goto fail_unlock;
}

Expand Down Expand Up @@ -627,8 +651,13 @@ int bstack_pop(bstack_t *bs, size_t n,
goto fail_unlock;
}

if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0 ||
write_committed_len(bs->fd, new_len) != 0 ||
if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0)
goto fail_unlock;
/* The truncation is the commit point: the tail bytes are gone and
* recovery would adopt the smaller file size, so update the cache now —
* before the header write, which can fail and skip it. */
bs->clen = new_len;
if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 ||
plat_durable_sync(bs->fd) != 0)
goto fail_unlock;

Expand Down Expand Up @@ -782,8 +811,13 @@ int bstack_discard(bstack_t *bs, size_t n)
return -1;
}

if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0 ||
write_committed_len(bs->fd, new_len) != 0 ||
if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0)
goto fail_unlock;
/* The truncation is the commit point: the tail bytes are gone and
* recovery would adopt the smaller file size, so update the cache now —
* before the header write, which can fail and skip it. */
bs->clen = new_len;
if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 ||
plat_durable_sync(bs->fd) != 0)
goto fail_unlock;

Expand All @@ -806,17 +840,8 @@ int bstack_discard(bstack_t *bs, size_t n)
int bstack_len(bstack_t *bs, uint64_t *out_len)
{
BS_RDLOCK(bs);

uint64_t raw_size;
if (file_size(bs->fd, &raw_size) != 0) {
int saved = errno;
BS_RDUNLOCK(bs);
errno = saved;
return -1;
}

*out_len = bs->clen;
BS_RDUNLOCK(bs);
*out_len = raw_size - HEADER_SIZE;
return 0;
}

Expand All @@ -827,17 +852,8 @@ int bstack_len(bstack_t *bs, uint64_t *out_len)
int bstack_is_empty(bstack_t *bs, int *out_empty)
{
BS_RDLOCK(bs);

uint64_t raw_size;
if (file_size(bs->fd, &raw_size) != 0) {
int saved = errno;
BS_RDUNLOCK(bs);
errno = saved;
return -1;
}

*out_empty = (bs->clen == 0) ? 1 : 0;
BS_RDUNLOCK(bs);
*out_empty = (raw_size == HEADER_SIZE) ? 1 : 0;
return 0;
}

Expand Down Expand Up @@ -1122,13 +1138,14 @@ int bstack_zero(bstack_t *bs, uint64_t offset, size_t n)
/* Shared body for atrunc and splice (after the removed bytes are read).
* Caller already holds the write lock. raw_size / data_size / tail_offset /
* final_data_len are pre-computed by the caller. */
static int atomic_write_tail(bstack_fd_t fd,
static int atomic_write_tail(bstack_t *bs,
uint64_t raw_size,
uint64_t tail_offset,
uint64_t final_data_len,
const uint8_t *buf, size_t buf_len,
size_t n)
{
bstack_fd_t fd = bs->fd;
if (buf_len > n) {
/* Net extension: extend first so crashes roll back cleanly, then
* write buf over the old tail + the new space, sync, commit clen. */
Expand All @@ -1143,7 +1160,9 @@ static int atomic_write_tail(bstack_fd_t fd,
plat_ftruncate(fd, raw_size); /* best-effort rollback */
return -1;
}
return write_committed_len(fd, final_data_len);
if (write_committed_len(fd, &bs->clen, final_data_len) != 0)
return -1;
return plat_durable_sync(fd);
Comment thread
williamwutq marked this conversation as resolved.
} else {
/* Net truncation or same size: write buf into old tail, truncate,
* sync, commit clen. A crash after truncate is committed by
Expand All @@ -1153,9 +1172,15 @@ static int atomic_write_tail(bstack_fd_t fd,
return -1;
if (plat_ftruncate(fd, HEADER_SIZE + final_data_len) != 0)
return -1;
/* The truncation is the commit point (recovery adopts the smaller
* file size), so update the cache now — before the sync and header
* write, which can fail and skip it. */
bs->clen = final_data_len;
if (plat_durable_sync(fd) != 0)
return -1;
return write_committed_len(fd, final_data_len);
if (write_committed_len(fd, &bs->clen, final_data_len) != 0)
return -1;
return plat_durable_sync(fd);
}
}

Expand Down Expand Up @@ -1189,7 +1214,7 @@ int bstack_atrunc(bstack_t *bs, size_t n,
uint64_t tail_offset = HEADER_SIZE + new_tail_start;
uint64_t final_data_len = new_tail_start + (uint64_t)buf_len;

if (atomic_write_tail(bs->fd, raw_size, tail_offset,
if (atomic_write_tail(bs, raw_size, tail_offset,
final_data_len, buf, buf_len, n) != 0)
goto fail_unlock;

Expand Down Expand Up @@ -1238,7 +1263,7 @@ int bstack_splice(bstack_t *bs,
goto fail_unlock;
}

if (atomic_write_tail(bs->fd, raw_size, tail_offset,
if (atomic_write_tail(bs, raw_size, tail_offset,
final_data_len, new_buf, new_len, n) != 0)
goto fail_unlock;

Expand Down Expand Up @@ -1277,10 +1302,14 @@ int bstack_try_extend(bstack_t *bs, uint64_t s,
goto fail_unlock;
}
uint64_t new_len = data_size + (uint64_t)buf_len;
if (write_committed_len(bs->fd, new_len) != 0 ||
if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 ||
plat_durable_sync(bs->fd) != 0) {
/* The cache is reset up front so it reflects the rolled-back file
* even if the best-effort header rewrite below fails. */
plat_ftruncate(bs->fd, raw_size);
write_committed_len(bs->fd, data_size);
bs->clen = data_size;
write_committed_len(bs->fd, &bs->clen, data_size);
plat_durable_sync(bs->fd);
Comment thread
williamwutq marked this conversation as resolved.
goto fail_unlock;
}

Expand Down Expand Up @@ -1338,8 +1367,13 @@ int bstack_try_discard(bstack_t *bs, uint64_t s, size_t n, int *ok)
return -1;
}

if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0 ||
write_committed_len(bs->fd, new_len) != 0 ||
if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0)
goto fail_unlock;
/* The truncation is the commit point: the tail bytes are gone and
* recovery would adopt the smaller file size, so update the cache now —
* before the header write, which can fail and skip it. */
bs->clen = new_len;
if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 ||
plat_durable_sync(bs->fd) != 0)
goto fail_unlock;

Expand Down Expand Up @@ -1410,7 +1444,7 @@ int bstack_replace(bstack_t *bs, size_t n,

uint64_t final_data_len = new_tail_start + (uint64_t)new_len;

if (atomic_write_tail(bs->fd, raw_size, tail_offset,
if (atomic_write_tail(bs, raw_size, tail_offset,
final_data_len, new_buf, new_len, n) != 0) {
free(new_buf);
goto fail_unlock;
Expand Down Expand Up @@ -1447,12 +1481,15 @@ int bstack_try_extend_zeros(bstack_t *bs, uint64_t s, size_t n, int *ok)

uint64_t new_len = data_size + (uint64_t)n;
if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0 ||
write_committed_len(bs->fd, new_len) != 0 ||
write_committed_len(bs->fd, &bs->clen, new_len) != 0 ||
plat_durable_sync(bs->fd) != 0)
{
/* Best-effort rollback. */
/* Best-effort rollback. The cache is reset up front so it reflects
* the rolled-back file even if the header rewrite below fails. */
plat_ftruncate(bs->fd, raw_size);
write_committed_len(bs->fd, data_size);
bs->clen = data_size;
write_committed_len(bs->fd, &bs->clen, data_size);
plat_durable_sync(bs->fd);
goto fail_unlock;
}

Expand Down Expand Up @@ -1878,13 +1915,17 @@ int bstack_process_gen(bstack_t *bs,
goto fail_unlock;
}
uint64_t new_len = data_size + (uint64_t)len;
if (write_committed_len(bs->fd, new_len) != 0 ||
if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 ||
plat_durable_sync(bs->fd) != 0)
{
/* Rollback: remove written data and reset committed
* length. */
* length. The cache is reset up front so it reflects
* the rolled-back file even if the header rewrite below
* fails. */
plat_ftruncate(bs->fd, raw_size_now);
write_committed_len(bs->fd, data_size);
bs->clen = data_size;
write_committed_len(bs->fd, &bs->clen, data_size);
plat_durable_sync(bs->fd);
Comment thread
williamwutq marked this conversation as resolved.
goto fail_unlock;
}
}
Expand All @@ -1909,8 +1950,14 @@ int bstack_process_gen(bstack_t *bs,
if (plat_pread(bs->fd, buf, len, read_offset) != 0)
goto fail_unlock;
}
if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0 ||
write_committed_len(bs->fd, new_len) != 0 ||
if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0)
goto fail_unlock;
/* The truncation is the commit point: the tail bytes are
* gone and recovery would adopt the smaller file size, so
* update the cache now — before the header write, which can
* fail and skip it. */
bs->clen = new_len;
if (write_committed_len(bs->fd, &bs->clen, new_len) != 0 ||
plat_durable_sync(bs->fd) != 0)
goto fail_unlock;
}
Expand Down
7 changes: 5 additions & 2 deletions c/bstack.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,16 @@ int bstack_discard(bstack_t *bs, size_t n);

/*
* Write the current logical payload size (excluding the 16-byte header)
* into *out_len. Takes the read lock; concurrent calls are allowed.
* into *out_len. This value is cached in memory, so no syscall is made;
* it takes the read lock, so it can run concurrently with other readers
* but blocks while a writer is in progress.
*/
int bstack_len(bstack_t *bs, uint64_t *out_len);

/*
* Write 1 into *out_empty if the payload is empty (len == 0), else 0.
* Takes the read lock; concurrent calls are allowed.
* Like bstack_len, this is a cached read under the read lock and makes
* no syscall.
*/
int bstack_is_empty(bstack_t *bs, int *out_empty);

Expand Down
Loading
Loading