Skip to content

WIP: feat(storage): add transaction support with journal, undo, and crash recovery#431

Open
qin-ctx wants to merge 4 commits intomainfrom
feature/transaction-support
Open

WIP: feat(storage): add transaction support with journal, undo, and crash recovery#431
qin-ctx wants to merge 4 commits intomainfrom
feature/transaction-support

Conversation

@qin-ctx
Copy link
Collaborator

@qin-ctx qin-ctx commented Mar 5, 2026

Description

Implement a full transaction mechanism for VikingFS storage operations, including write-ahead journal (WAL), undo/rollback, path locking, context manager API, and crash recovery. Core write operations (rm, mv, add_resource, session.commit) now have atomicity guarantees — automatic rollback on failure and automatic recovery on restart after process crashes.

cc @r266-tech

Related Issue

Closes #390

RFC Discussion: #115

Type of Change

  • Bug fix
  • New feature (non-breaking change that adds functionality)
  • Documentation update
  • Test update

Scope & Limitations

Important

This design is for single-node deployments only and does not support multi-node/distributed scenarios.

Component Current Approach Limitation Multi-node Requirements
PathLock File-based locks (fencing token written to .path.ovlock) Requires strong read-write consistency from the AGFS backend; effective within a single process only Distributed locks (etcd txn+lease / ZooKeeper ephemeral nodes) with atomic CAS + session-bound leases + monotonically increasing fencing tokens. See path_lock.py:21-29
TransactionJournal Journal files persisted to local AGFS path /local/_system/transactions/ Only visible on the local machine; crash recovery only works when the same node restarts Persist journals to a distributed consistent store, or bind to the same coordination service as the distributed lock
QueueFS (SQLite) SQLite + WAL mode, single queue_messages table, RecoverStale resets crashed processing messages on startup SQLite does not support cross-process concurrent writes Replace with a distributed message queue or use the existing TiDB/MySQL backend
QueueFS (TiDB/MySQL) FOR UPDATE SKIP LOCKED for concurrent dequeue, one table per queue, MySQL protocol driver Dequeue soft-deletes directly (deleted=1), Ack is a no-op, RecoverStale not implemented Complete at-least-once semantics (Ack with real deletion + RecoverStale reset), or introduce deduplication
QueueFS (Memory) Pure in-memory, no persistence Lost on restart, RecoverStale is a no-op Development/testing use only

Architecture

Transaction Core

TransactionContext (async context manager — user-facing entry point)
  ├── TransactionManager (global singleton, transaction lifecycle)
  │     ├── TransactionJournal (AGFS-persisted WAL at /local/_system/transactions/{tx_id}/journal.json)
  │     ├── PathLock (fencing-token-based path locks)
  │     └── Crash Recovery (journal replay on startup)
  ├── UndoLog (ordered reversible operation log, replayed in reverse on rollback)
  └── PostActions (deferred post-commit tasks, e.g. enqueue_semantic)

Transaction State Machine

         ┌──────────────────────── Normal Flow ────────────────────────┐
         │                                                              │
      INIT ──→ ACQUIRE ──→ EXEC ──→ COMMIT ──→ RELEASING ──→ RELEASED
         │         │         │                                          ↑
         │         │         │    ┌──── Error Flow ────┐                │
         │         │         └──→ FAIL ──→ RELEASING ──→ RELEASED       │
         │         └─────────────→ FAIL ──→ RELEASING ──────────────────┘
         │                        (lock failed)
         │
    [journal write point]
Status Meaning Trigger Journal Persistence
INIT Transaction created, waiting for lock TransactionContext.__aenter__ creates record Journal written before lock acquisition (ensures orphan locks can be located on crash)
ACQUIRE Acquiring lock resources acquire_lock_point/subtree/mv begins Journal updated (records actual lock paths)
EXEC Lock acquired, business operations in progress Automatically entered after successful lock undo_log updated in journal after each sub-operation
COMMIT Business operations complete, preparing to release User calls tx.commit() Journal updated to COMMIT (post_actions begin execution)
FAIL Lock failed or abnormal exit Lock failure / __aexit__ without commit Journal updated to FAIL → undo rollback executed
RELEASING Releasing lock resources After commit or rollback
RELEASED Locks released, transaction ended All locks released Journal deleted

Key design: Journal is written before lock acquisition (context_manager.py:66-68), and init_info records lock_paths. This way, even if a crash occurs after successful locking but before the journal's locks list is updated, the recovery process can still find and clean up orphan lock files via init_info.lock_paths.

Lock Design

Path locks based on lock files (.path.ovlock), with fencing token content in format {tx_id}:{time_ns}:{lock_type}.

Two lock types & conflict detection:

Lock Type ID Purpose Conflict Detection
POINT (P) Locks a single directory write / semantic operations Walks up ancestors checking for SUBTREE locks held by other transactions
SUBTREE (S) Locks an entire subtree rm / mv-source operations Recursively scans all descendants for any locks held by other transactions

Three locking modes (TransactionContext):

Mode Method Lock Combination Used By
point acquire_lock_point POINT lock on each path add_resource, session.commit
subtree acquire_lock_subtree SUBTREE lock on each path rm (directories)
mv acquire_lock_mv SUBTREE on source + POINT on destination parent mv

Livelock prevention: When two transactions simultaneously write lock files and detect a conflict, the one with the larger (timestamp, tx_id) backs off and retries.

Stale lock cleanup: Lock files contain nanosecond timestamps. Locks held longer than lock_expire (default 300s) are considered remnants of crashed processes and can be force-released.

Per-Command Consistency Design

Command Lock Mode Operation Order Rollback Strategy Design Rationale
rm() Dir: subtree(target)
File: point(parent)
1. Snapshot VectorDB records
2. Delete VectorDB
3. Delete FS
Restore VectorDB from snapshot
(FS delete marked non-reversible, skip)
Delete index before files to avoid search hitting deleted content; FS delete being non-reversible is a known tradeoff
mv() mv: SUBTREE(src) + POINT(dst parent) 1. FS move
2. VectorDB URI batch update
Reverse FS move + reverse URI mapping SUBTREE on source prevents concurrent subtree modifications; POINT on destination parent prevents directory structure conflicts
add_resource (finalize) point(parent dir) 1. FS move (staging → final)
2. post_action(enqueue_semantic)
Delete final directory Semantic generation registered as post_action, replayable after crash
session.commit point(session_path) × 2 independent transactions Two-phase commit:
Phase 1: Archive messages → clear → checkpoint(archived)
LLM call (no transaction, safe to retry)
Phase 2: Write memories → checkpoint(completed) → post_action(enqueue_semantic)
Each phase rolls back independently; checkpoint file records progress LLM call placed between two transactions to avoid holding locks during long calls; checkpoint enables independent recovery of each phase

Crash Recovery

On TransactionManager.start(), scans /local/_system/transactions/ for residual journals and decides recovery strategy by status:

Journal Status Meaning Recovery Strategy
COMMITTED + has post_actions Committed but post_actions incomplete Replay post_actions → release locks → delete journal
COMMITTED / RELEASED (no post_actions) Committed, locks may not be released Release locks → delete journal
EXEC / FAIL / RELEASING Crashed during execution or rollback Execute rollback (recover_all=True, includes incomplete ops) → release locks → delete journal
INIT / ACQUIRE No business operations executed Find orphan lock files via init_info.lock_paths → release locks → delete journal

Best-effort rollback: Each undo step is independently try-caught; a single step failure does not prevent subsequent steps from executing.

Undo Operation Types

op_type Rollback Behavior
fs_mv Reverse move (dst → src)
fs_rm Non-reversible, skip
fs_mkdir Delete directory
fs_write_new Recursively delete written path
vectordb_upsert Delete inserted record
vectordb_delete Restore records from snapshot
vectordb_update_uri Reverse URI mapping update

Changes Made

  • Transaction core: TransactionContext, TransactionManager (state machine + timeout cleanup), TransactionJournal (AGFS persistence), UndoLog + rollback executor
  • Path locking: PathLock with POINT / SUBTREE / MV locking modes, fencing tokens to prevent ABA, livelock backoff
  • Crash recovery: Journal written before lock acquisition → orphan locks locatable via init_info
  • VikingFS: rm() / mv() wrapped in transactions
  • Session: commit() internally uses two-phase transactions (bridged via run_async), external sync signature unchanged
  • QueueFS: New SQLite backend (WAL + RecoverStale at-least-once), ack mechanism, SemanticDAG optimizations
  • Documentation: Transaction mechanism docs (en/zh) + configuration guide updates
  • Tests: 11 new test files covering undo, journal, path_lock, context_manager, crash recovery, concurrent locks, e2e, and more

Testing

  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • I have tested this on the following platforms:
    • Linux
    • macOS
    • Windows

Checklist

  • My code follows the project's coding style
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

Additional Notes

  • 41 files changed, +5061/-446 lines

qin-ctx and others added 3 commits March 5, 2026 15:10
…recovery

Implement a full transaction system for VikingFS storage operations including
write-ahead journal, path locking, undo/rollback, context manager API, and
crash recovery. Includes comprehensive tests and documentation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tions

Add end-to-end tests covering rollback scenarios that were missing:
- mv rollback: file moved back to original location on failure
- mv commit: file persists at new location
- Multi-step rollback: mkdir + write + mkdir all reversed in order
- Partial step rollback: only completed entries are reversed
- Nested directory rollback: child removed before parent
- Best-effort rollback: single step failure does not block others

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@qin-ctx
Copy link
Collaborator Author

qin-ctx commented Mar 5, 2026

/review

@github-actions
Copy link

github-actions bot commented Mar 5, 2026

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🎫 Ticket compliance analysis 🔶

390 - Partially compliant

Compliant requirements:

  • Transaction mechanism with journal and undo log implemented
  • Rollback and crash recovery added
  • Path locking with fencing tokens implemented
  • Tests added for various scenarios

Non-compliant requirements:

  • (Cannot fully assess integration with all core operations without full diff)

Requires further human verification:

  • Integration with core operations (rm, mv, add_resource, session.commit)
  • Multi-node deployment limitations and fallback behavior
⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Potential Typo in Status Enum

Verify that TransactionStatus.AQUIRE is the intended enum value (check for possible typo: ACQUIRE).

elif tx.status in (TransactionStatus.INIT, TransactionStatus.AQUIRE):
Async Method Calling Sync I/O

Ensure that AGFS client calls (cat, write, ls, rm, stat) are properly awaited if they are async, or document that they are synchronous to avoid event loop blocking.

        content = self._agfs.cat(lock_path)
        if isinstance(content, bytes):
            return content.decode("utf-8").strip()
        return str(content).strip()
    except Exception:
        return None

async def _is_locked_by_other(self, lock_path: str, transaction_id: str) -> bool:
    """Check if path is locked by another transaction (any lock type)."""
    token = self._read_token(lock_path)
    if token is None:
        return False
    lock_owner, _, _ = _parse_fencing_token(token)
    return lock_owner != transaction_id

async def _create_lock_file(
    self, lock_path: str, transaction_id: str, lock_type: str = LOCK_TYPE_POINT
) -> None:
    """Create lock file with fencing token."""
    token = _make_fencing_token(transaction_id, lock_type)
    self._agfs.write(lock_path, token.encode("utf-8"))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: Backlog

Development

Successfully merging this pull request may close these issues.

[Feature]: Transaction mechanism for atomic multi-subsystem operations

2 participants