From ab9ce3ba918e292fbc7d419c66aead4d767dc62d Mon Sep 17 00:00:00 2001 From: Jeremy Wei Date: Thu, 30 Apr 2026 12:42:23 -0400 Subject: [PATCH 1/4] parquet v2 refactor reader worker pool --- sei-db/ledger_db/parquet/store.go | 9 + .../parquet_v2/coordinator/coordinator.go | 66 +++ .../parquet_v2/coordinator/export_test.go | 72 ++++ .../parquet_v2/coordinator/handlers.go | 380 ++++++++++++++---- .../parquet_v2/coordinator/helpers_test.go | 6 +- .../receipt/parquet_v2/coordinator/prune.go | 33 -- .../parquet_v2/coordinator/prune_test.go | 12 +- .../receipt/parquet_v2/coordinator/workers.go | 104 +++++ .../parquet_v2/coordinator/workers_test.go | 189 +++++++++ 9 files changed, 755 insertions(+), 116 deletions(-) create mode 100644 sei-db/ledger_db/receipt/parquet_v2/coordinator/workers.go create mode 100644 sei-db/ledger_db/receipt/parquet_v2/coordinator/workers_test.go diff --git a/sei-db/ledger_db/parquet/store.go b/sei-db/ledger_db/parquet/store.go index dd84c07154..f93b3c3850 100644 --- a/sei-db/ledger_db/parquet/store.go +++ b/sei-db/ledger_db/parquet/store.go @@ -26,6 +26,7 @@ const ( defaultBlockFlushInterval uint64 = 1 defaultMaxBlocksPerFile uint64 = 500 + defaultReadChannelDepth = 1024 ) var removeFile = os.Remove @@ -45,6 +46,13 @@ type StoreConfig struct { // consumed by the v2 store; v1 ignores it. When nil, replay is // skipped — used by lower-level tests that drive replay manually. WALConverter WALReceiptConverter + + // ReadWorkerPoolSize controls the number of reader goroutines in the + // parquet_v2 coordinator's read pool. Zero means runtime.NumCPU(). + ReadWorkerPoolSize int + // ReadChannelDepth is the buffer depth of the coordinator → read pool + // dispatch channel. Zero means defaultReadChannelDepth. + ReadChannelDepth int } // WALReceiptConverter decodes a raw WAL receipt blob into the structured @@ -69,6 +77,7 @@ func DefaultStoreConfig() StoreConfig { BlockFlushInterval: defaultBlockFlushInterval, MaxBlocksPerFile: defaultMaxBlocksPerFile, TxIndexBackend: dbconfig.ReceiptTxIndexBackendPebble, + ReadChannelDepth: defaultReadChannelDepth, } } diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/coordinator.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/coordinator.go index a368749609..27e1aeb45e 100644 --- a/sei-db/ledger_db/receipt/parquet_v2/coordinator/coordinator.go +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/coordinator.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "sync" "sync/atomic" "time" @@ -63,6 +64,27 @@ type Coordinator struct { warmupRecords []parquet.ReceiptRecord replayedBlocks []ReplayedBlock + + // Worker pool dispatch channels. Coordinator builds closures and + // sends them here; workers pull and execute. Coordinator-owned state + // is never mutated by workers — completion handshakes flow back via + // controlChan. + readChan chan func() + writerChan chan func() + pruneChan chan func() + controlChan chan controlMsg + + readerWG sync.WaitGroup + writerWG sync.WaitGroup + prunerWG sync.WaitGroup + shutdownOnce sync.Once + + // Reference tracking for closed parquet files. Incremented on read + // dispatch, decremented on readDoneMsg. Pruning is gated on these. + inFlightReads map[string]int + pendingPrune []closedFile + + readWorkerCount int } // New constructs a Coordinator and drives WAL replay synchronously before @@ -108,6 +130,7 @@ func New(cfg parquet.StoreConfig) (*Coordinator, error) { return nil, err } + readWorkerCount := resolveReadWorkerCount(storeCfg.ReadWorkerPoolSize) c := &Coordinator{ requests: requests, done: done, @@ -121,6 +144,12 @@ func New(cfg parquet.StoreConfig) (*Coordinator, error) { wal: receiptWAL, latestVersion: 0, earliestVersion: 0, + readChan: make(chan func(), storeCfg.ReadChannelDepth), + writerChan: make(chan func(), 4), + pruneChan: make(chan func(), 4), + controlChan: make(chan controlMsg, 64), + inFlightReads: make(map[string]int), + readWorkerCount: readWorkerCount, } cleanupWriters := true defer func() { @@ -156,6 +185,7 @@ func New(cfg parquet.StoreConfig) (*Coordinator, error) { c.replayedBlocks = result.Blocks } + c.startWorkers() go c.run() cleanupReader = false cleanupWAL = false @@ -164,6 +194,20 @@ func New(cfg parquet.StoreConfig) (*Coordinator, error) { return c, nil } +// startWorkers spawns the reader pool, writer goroutine, and pruner +// goroutine. Each adds itself to the matching wait group so handleClose +// can drain them deterministically. +func (c *Coordinator) startWorkers() { + for i := 0; i < c.readWorkerCount; i++ { + c.readerWG.Add(1) + go c.runReader() + } + c.writerWG.Add(1) + go c.runWriter() + c.prunerWG.Add(1) + go c.runPruner() +} + func resolveStoreConfig(cfg parquet.StoreConfig) parquet.StoreConfig { resolved := parquet.DefaultStoreConfig() resolved.DBDirectory = cfg.DBDirectory @@ -178,9 +222,29 @@ func resolveStoreConfig(cfg parquet.StoreConfig) parquet.StoreConfig { if cfg.MaxBlocksPerFile > 0 { resolved.MaxBlocksPerFile = cfg.MaxBlocksPerFile } + if cfg.ReadWorkerPoolSize > 0 { + resolved.ReadWorkerPoolSize = cfg.ReadWorkerPoolSize + } + if cfg.ReadChannelDepth > 0 { + resolved.ReadChannelDepth = cfg.ReadChannelDepth + } return resolved } +// resolveReadWorkerCount derives the reader pool size from the resolved +// config. Zero means runtime.NumCPU(); we cap at numCPU*2 to match the +// DuckDB connection limit set in NewReaderWithMaxBlocksPerFile. +func resolveReadWorkerCount(configured int) int { + numCPU := runtime.NumCPU() + if configured <= 0 { + return numCPU + } + if configured > numCPU*2 { + return numCPU * 2 + } + return configured +} + func (c *Coordinator) run() { var pruneTick <-chan time.Time if c.config.KeepRecent > 0 && c.config.PruneIntervalSeconds > 0 { @@ -190,6 +254,8 @@ func (c *Coordinator) run() { } for { select { + case msg := <-c.controlChan: + c.handleControl(msg) case req := <-c.requests: if req.dispatch(c) { return diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/export_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/export_test.go index 29067570b8..7684319dc4 100644 --- a/sei-db/ledger_db/receipt/parquet_v2/coordinator/export_test.go +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/export_test.go @@ -1,5 +1,77 @@ package coordinator +// forcePruneTickForTest synchronously runs a prune tick and then waits for +// any dispatched pruner lambdas to complete so the test can observe the +// post-prune disk state immediately. func forcePruneTickForTest(c *Coordinator) { c.handlePruneTick() + quiesceWorkersForTest(c) +} + +// bootstrapWorkersForTest initializes worker channels and spawns the +// reader/writer/pruner goroutines on a bare-constructed Coordinator. +// Tests that drive coordinator handlers directly (without going through +// New) must call this so awaitWriter and dispatchPrune don't deadlock on +// a nil channel send. +func bootstrapWorkersForTest(c *Coordinator) { + if c.readChan != nil { + return + } + if c.inFlightReads == nil { + c.inFlightReads = make(map[string]int) + } + c.readChan = make(chan func(), 1024) + c.writerChan = make(chan func(), 4) + c.pruneChan = make(chan func(), 4) + c.controlChan = make(chan controlMsg, 64) + c.readWorkerCount = 2 + c.startWorkers() +} + +// quiesceWorkersForTest round-trips a sentinel lambda through both the +// writer and the pruner. Because each is single-threaded, the sentinel +// runs only after every previously-dispatched lambda for that worker has +// completed. After the sentinels return, the function drains controlChan +// of any pending readDoneMsg/pruneDoneMsg so coordinator-side bookkeeping +// (refcounts, pendingPrune) reflects the just-completed work. +func quiesceWorkersForTest(c *Coordinator) { + if c.writerChan != nil { + done := make(chan struct{}) + c.dispatchWrite(func() { close(done) }) + <-done + } + if c.pruneChan != nil { + done := make(chan struct{}) + c.dispatchPrune(func() { close(done) }) + <-done + } + if c.controlChan == nil { + return + } + for { + select { + case msg := <-c.controlChan: + c.handleControl(msg) + default: + return + } + } +} + +// inFlightReadsForTest returns the active reader refcount for path. +// Callers must quiesce the workers first to avoid racing with in-flight +// readDoneMsg processing. +func inFlightReadsForTest(c *Coordinator, path string) int { + return c.inFlightReads[path] +} + +// pendingPruneCountForTest returns the number of files awaiting a refcount +// drop before being deleted. +func pendingPruneCountForTest(c *Coordinator) int { + return len(c.pendingPrune) +} + +// setReadWorkerCountForTest must be called BEFORE bootstrapWorkersForTest. +func setReadWorkerCountForTest(c *Coordinator, n int) { + c.readWorkerCount = n } diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go index 1513b9fd71..5e08136a88 100644 --- a/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go @@ -30,8 +30,19 @@ func (c *Coordinator) handleReadByTxHash(req readByTxHashReq) { return } - result, err := c.reader.QueryReceiptByTxHash(req.ctx, c.receiptFilesSnapshot(), req.txHash) - req.resp <- readReceiptResp{result: result, err: err} + snapshot := c.receiptFilesSnapshot() + if len(snapshot) == 0 { + req.resp <- readReceiptResp{} + return + } + c.acquireReadRefs(snapshot) + reader := c.reader + control := c.controlChan + c.dispatchRead(func() { + result, err := reader.QueryReceiptByTxHash(req.ctx, snapshot, req.txHash) + req.resp <- readReceiptResp{result: result, err: err} + control <- readDoneMsg{paths: snapshot} + }) } // handleReadByTxHashInBlock serves a readByTxHashInBlockReq, narrowing the @@ -47,8 +58,19 @@ func (c *Coordinator) handleReadByTxHashInBlock(req readByTxHashInBlockReq) { return } - result, err := c.reader.QueryReceiptByTxHashInBlock(req.ctx, c.receiptFileSnapshotForBlock(req.blockNumber), req.txHash, req.blockNumber) - req.resp <- readReceiptResp{result: result, err: err} + snapshot := c.receiptFileSnapshotForBlock(req.blockNumber) + if len(snapshot) == 0 { + req.resp <- readReceiptResp{} + return + } + c.acquireReadRefs(snapshot) + reader := c.reader + control := c.controlChan + c.dispatchRead(func() { + result, err := reader.QueryReceiptByTxHashInBlock(req.ctx, snapshot, req.txHash, req.blockNumber) + req.resp <- readReceiptResp{result: result, err: err} + control <- readDoneMsg{paths: snapshot} + }) } // handleGetLogs serves a getLogsReq by querying logs across the closed log @@ -59,8 +81,93 @@ func (c *Coordinator) handleGetLogs(req getLogsReq) { return } - results, err := c.reader.QueryLogs(req.ctx, c.logFilesSnapshot(), req.filter) - req.resp <- getLogsResp{results: results, err: err} + snapshot := c.filteredLogFilesSnapshot(req.filter) + if len(snapshot) == 0 { + req.resp <- getLogsResp{} + return + } + c.acquireReadRefs(snapshot) + reader := c.reader + control := c.controlChan + c.dispatchRead(func() { + results, err := reader.QueryLogs(req.ctx, snapshot, req.filter) + req.resp <- getLogsResp{results: results, err: err} + control <- readDoneMsg{paths: snapshot} + }) +} + +// acquireReadRefs increments inFlightReads for each path in the snapshot. +// Must run on the coordinator goroutine — it mutates coordinator-owned +// state that workers never touch. +func (c *Coordinator) acquireReadRefs(paths []string) { + for _, p := range paths { + c.inFlightReads[p]++ + } +} + +// releaseReadRefs decrements inFlightReads and dispatches any deferred +// prunes whose files have just dropped to zero. Must run on the +// coordinator goroutine. +func (c *Coordinator) releaseReadRefs(paths []string) { + for _, p := range paths { + n := c.inFlightReads[p] + if n <= 1 { + delete(c.inFlightReads, p) + } else { + c.inFlightReads[p] = n - 1 + } + } + c.flushPendingPrune() +} + +// flushPendingPrune dispatches prune jobs for any pendingPrune entries +// whose receipt and log paths both have zero refcount. Must run on the +// coordinator goroutine. +func (c *Coordinator) flushPendingPrune() { + if len(c.pendingPrune) == 0 { + return + } + kept := c.pendingPrune[:0] + for _, f := range c.pendingPrune { + if c.inFlightReads[f.receiptPath] > 0 || c.inFlightReads[f.logPath] > 0 { + kept = append(kept, f) + continue + } + c.dispatchPruneJob(f) + } + c.pendingPrune = kept +} + +// handleControl processes a worker completion message. Always runs on +// the coordinator goroutine. +func (c *Coordinator) handleControl(msg controlMsg) { + switch m := msg.(type) { + case readDoneMsg: + c.releaseReadRefs(m.paths) + case pruneDoneMsg: + if !m.ok { + c.reinsertFailedPrune(m.paths) + } + case writerDoneMsg: + // Reserved for future async writer paths; awaitWriter uses its + // own per-call result channel. + } +} + +// reinsertFailedPrune re-adds a file to closedFiles after a prune lambda +// failed to delete it from disk. Best-effort — preserves the prior +// behavior of pruneOldFiles which kept the entry on remove failure. +func (c *Coordinator) reinsertFailedPrune(paths []string) { + if len(paths) != 2 { + return + } + receiptPath, logPath := paths[0], paths[1] + startBlock := parquet.ExtractBlockNumber(receiptPath) + c.closedFiles = append(c.closedFiles, closedFile{ + startBlock: startBlock, + receiptPath: receiptPath, + logPath: logPath, + }) } // handleFlush serves a flushReq by flushing buffered receipts/logs for the @@ -138,9 +245,11 @@ func (c *Coordinator) handleSetFaultHooks(req setFaultHooksReq) { // handlePruneTick fires on the prune ticker and removes closed parquet pairs // whose end block falls below latestVersion - KeepRecent. func (c *Coordinator) handlePruneTick() { - // TODO(future-async): if read I/O moves to a worker pool, gate prune on - // map[fileID]int reference counts that the coordinator increments on - // dispatch and decrements on completion. + // Eligibility walk: for each closedFile that should age out, remove + // from closedFiles (so no future snapshot can include it) and either + // dispatch a prune job immediately (refcount==0) or defer it via + // pendingPrune (refcount>0). The deferred prune fires from + // flushPendingPrune when readDoneMsg drives the refcount to zero. if c.config.KeepRecent <= 0 { return } @@ -148,7 +257,43 @@ func (c *Coordinator) handlePruneTick() { if pruneBeforeBlock <= 0 { return } - c.pruneOldFiles(uint64(pruneBeforeBlock)) + c.pruneEligibleFiles(uint64(pruneBeforeBlock)) +} + +// pruneEligibleFiles walks closedFiles and removes any file whose chunk +// has fully aged out. Each removed file is either dispatched to the +// pruner immediately or deferred to pendingPrune if there are active +// readers. +func (c *Coordinator) pruneEligibleFiles(pruneBeforeBlock uint64) { + if len(c.closedFiles) == 0 { + return + } + kept := c.closedFiles[:0] + for _, f := range c.closedFiles { + if !c.shouldPruneClosedFile(f, pruneBeforeBlock) { + kept = append(kept, f) + continue + } + if c.inFlightReads[f.receiptPath] > 0 || c.inFlightReads[f.logPath] > 0 { + c.pendingPrune = append(c.pendingPrune, f) + continue + } + c.dispatchPruneJob(f) + } + c.closedFiles = kept +} + +// dispatchPruneJob hands a closure to the pruner that deletes both the +// receipt and log file for f, then reports back via controlChan. Must +// run on the coordinator goroutine. +func (c *Coordinator) dispatchPruneJob(f closedFile) { + receiptPath := f.receiptPath + logPath := f.logPath + control := c.controlChan + c.dispatchPrune(func() { + ok := removePrunedFile(receiptPath) && removePrunedFile(logPath) + control <- pruneDoneMsg{paths: []string{receiptPath, logPath}, ok: ok} + }) } // handleClose performs a graceful shutdown: flush and close the open writers, @@ -164,6 +309,7 @@ func (c *Coordinator) handleClose(req closeReq) { if err := c.closeWriters(); err != nil { errs = append(errs, err) } + c.shutdownWorkers() if c.wal != nil { if err := c.wal.Close(); err != nil { errs = append(errs, fmt.Errorf("wal close: %w", err)) @@ -193,6 +339,7 @@ func (c *Coordinator) handleSimulateCrash(req simulateCrashReq) { } c.receiptWriter = nil c.logWriter = nil + c.shutdownWorkers() if c.wal != nil { _ = c.wal.Close() } @@ -202,6 +349,29 @@ func (c *Coordinator) handleSimulateCrash(req simulateCrashReq) { req.resp <- struct{}{} } +// shutdownWorkers closes the dispatch channels in dependency order and +// waits for each pool to drain. Coordinator-side state has already been +// updated by the caller; workers only execute remaining queued lambdas +// before exiting. Idempotent via shutdownOnce so retry paths in +// handleClose do not double-close the channels. Tolerates nil channels +// for bare-Coordinator constructions used in unit tests. +func (c *Coordinator) shutdownWorkers() { + c.shutdownOnce.Do(func() { + if c.readChan != nil { + close(c.readChan) + c.readerWG.Wait() + } + if c.writerChan != nil { + close(c.writerChan) + c.writerWG.Wait() + } + if c.pruneChan != nil { + close(c.pruneChan) + c.prunerWG.Wait() + } + }) +} + // writeReceipts records a committed block at height. When inputs is empty it // degenerates to the rotation/cursor-advance path (formerly ObserveEmptyBlock): // no WAL entry is written, but if height lands on a rotation boundary the @@ -364,6 +534,24 @@ func (c *Coordinator) logFilesSnapshot() []string { return files } +// filteredLogFilesSnapshot builds a snapshot of log file paths whose +// block range overlaps the filter's [FromBlock, ToBlock] window. Filtering +// happens here on the coordinator so refcounts only get incremented on +// files that workers will actually touch. +func (c *Coordinator) filteredLogFilesSnapshot(filter parquet.LogFilter) []string { + files := make([]string, 0, len(c.closedFiles)) + for _, f := range c.closedFiles { + if filter.ToBlock != nil && f.startBlock > *filter.ToBlock { + continue + } + if filter.FromBlock != nil && f.startBlock+c.config.MaxBlocksPerFile <= *filter.FromBlock { + continue + } + files = append(files, f.logPath) + } + return files +} + // isRotationBoundary reports whether blockNumber lands on a MaxBlocksPerFile // boundary, in which case the open parquet file should rotate before this // block's receipts are written. @@ -387,36 +575,52 @@ func (c *Coordinator) initWriters() error { receiptPath := filepath.Join(c.basePath, fmt.Sprintf("receipts_%d.parquet", c.fileStartBlock)) logPath := filepath.Join(c.basePath, fmt.Sprintf("logs_%d.parquet", c.fileStartBlock)) - // #nosec G304 -- paths are constructed from configured base directory. - receiptFile, err := os.Create(receiptPath) - if err != nil { - return fmt.Errorf("failed to create receipt parquet file: %w", err) - } + var ( + receiptFile *os.File + logFile *os.File + receiptWriter *parquetgo.GenericWriter[parquet.ReceiptRecord] + logWriter *parquetgo.GenericWriter[parquet.LogRecord] + ) + err := c.awaitWriter(func() error { + // #nosec G304 -- paths are constructed from configured base directory. + rf, err := os.Create(receiptPath) + if err != nil { + return fmt.Errorf("failed to create receipt parquet file: %w", err) + } - // #nosec G304 -- paths are constructed from configured base directory. - logFile, err := os.Create(logPath) - if err != nil { - if closeErr := receiptFile.Close(); closeErr != nil { - return fmt.Errorf("failed to create log parquet file: %w; close receipt file error: %v", err, closeErr) + // #nosec G304 -- paths are constructed from configured base directory. + lf, err := os.Create(logPath) + if err != nil { + if closeErr := rf.Close(); closeErr != nil { + return fmt.Errorf("failed to create log parquet file: %w; close receipt file error: %v", err, closeErr) + } + return fmt.Errorf("failed to create log parquet file: %w", err) } - return fmt.Errorf("failed to create log parquet file: %w", err) - } - blockNumberSorting := parquetgo.SortingWriterConfig( - parquetgo.SortingColumns(parquetgo.Ascending("block_number")), - ) + blockNumberSorting := parquetgo.SortingWriterConfig( + parquetgo.SortingColumns(parquetgo.Ascending("block_number")), + ) + + receiptFile = rf + logFile = lf + receiptWriter = parquetgo.NewGenericWriter[parquet.ReceiptRecord](rf, + parquetgo.Compression(&parquetgo.Snappy), + blockNumberSorting, + ) + logWriter = parquetgo.NewGenericWriter[parquet.LogRecord](lf, + parquetgo.Compression(&parquetgo.Snappy), + blockNumberSorting, + ) + return nil + }) + if err != nil { + return err + } c.receiptFile = receiptFile c.logFile = logFile - c.receiptWriter = parquetgo.NewGenericWriter[parquet.ReceiptRecord](receiptFile, - parquetgo.Compression(&parquetgo.Snappy), - blockNumberSorting, - ) - c.logWriter = parquetgo.NewGenericWriter[parquet.LogRecord](logFile, - parquetgo.Compression(&parquetgo.Snappy), - blockNumberSorting, - ) - + c.receiptWriter = receiptWriter + c.logWriter = logWriter return nil } @@ -531,23 +735,33 @@ func (c *Coordinator) flushOpenFile() error { } } - if _, err := c.receiptWriter.Write(c.receiptsBuffer); err != nil { - return fmt.Errorf("failed to write receipts to parquet: %w", err) - } - if err := c.receiptWriter.Flush(); err != nil { - return fmt.Errorf("failed to flush receipt parquet writer: %w", err) - } - - if len(c.logsBuffer) > 0 { - if c.logWriter == nil { - return fmt.Errorf("cannot flush logs: log writer is not initialized") + receiptWriter := c.receiptWriter + logWriter := c.logWriter + receiptsBuf := c.receiptsBuffer + logsBuf := c.logsBuffer + err := c.awaitWriter(func() error { + if _, err := receiptWriter.Write(receiptsBuf); err != nil { + return fmt.Errorf("failed to write receipts to parquet: %w", err) } - if _, err := c.logWriter.Write(c.logsBuffer); err != nil { - return fmt.Errorf("failed to write logs to parquet: %w", err) + if err := receiptWriter.Flush(); err != nil { + return fmt.Errorf("failed to flush receipt parquet writer: %w", err) } - if err := c.logWriter.Flush(); err != nil { - return fmt.Errorf("failed to flush log parquet writer: %w", err) + + if len(logsBuf) > 0 { + if logWriter == nil { + return fmt.Errorf("cannot flush logs: log writer is not initialized") + } + if _, err := logWriter.Write(logsBuf); err != nil { + return fmt.Errorf("failed to write logs to parquet: %w", err) + } + if err := logWriter.Flush(); err != nil { + return fmt.Errorf("failed to flush log parquet writer: %w", err) + } } + return nil + }) + if err != nil { + return err } if h := c.faultHooks; h != nil && h.AfterFlush != nil { @@ -565,41 +779,49 @@ func (c *Coordinator) flushOpenFile() error { // and fsync+closes the underlying files. All errors encountered are // collected and returned together so partial cleanup still happens. func (c *Coordinator) closeWriters() error { - var errs []error - - if c.receiptWriter != nil { - if err := c.receiptWriter.Close(); err != nil { - errs = append(errs, fmt.Errorf("receipt writer: %w", err)) - } - c.receiptWriter = nil + receiptWriter := c.receiptWriter + logWriter := c.logWriter + receiptFile := c.receiptFile + logFile := c.logFile + if receiptWriter == nil && logWriter == nil && receiptFile == nil && logFile == nil { + return nil } - if c.logWriter != nil { - if err := c.logWriter.Close(); err != nil { - errs = append(errs, fmt.Errorf("log writer: %w", err)) + c.receiptWriter = nil + c.logWriter = nil + c.receiptFile = nil + c.logFile = nil + + return c.awaitWriter(func() error { + var errs []error + if receiptWriter != nil { + if err := receiptWriter.Close(); err != nil { + errs = append(errs, fmt.Errorf("receipt writer: %w", err)) + } } - c.logWriter = nil - } - if c.receiptFile != nil { - if err := c.receiptFile.Sync(); err != nil { - errs = append(errs, fmt.Errorf("receipt file sync: %w", err)) + if logWriter != nil { + if err := logWriter.Close(); err != nil { + errs = append(errs, fmt.Errorf("log writer: %w", err)) + } } - if err := c.receiptFile.Close(); err != nil { - errs = append(errs, fmt.Errorf("receipt file: %w", err)) + if receiptFile != nil { + if err := receiptFile.Sync(); err != nil { + errs = append(errs, fmt.Errorf("receipt file sync: %w", err)) + } + if err := receiptFile.Close(); err != nil { + errs = append(errs, fmt.Errorf("receipt file: %w", err)) + } } - c.receiptFile = nil - } - if c.logFile != nil { - if err := c.logFile.Sync(); err != nil { - errs = append(errs, fmt.Errorf("log file sync: %w", err)) + if logFile != nil { + if err := logFile.Sync(); err != nil { + errs = append(errs, fmt.Errorf("log file sync: %w", err)) + } + if err := logFile.Close(); err != nil { + errs = append(errs, fmt.Errorf("log file: %w", err)) + } } - if err := c.logFile.Close(); err != nil { - errs = append(errs, fmt.Errorf("log file: %w", err)) + if len(errs) > 0 { + return fmt.Errorf("close errors: %v", errs) } - c.logFile = nil - } - - if len(errs) > 0 { - return fmt.Errorf("close errors: %v", errs) - } - return nil + return nil + }) } diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/helpers_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/helpers_test.go index 7ac9445f86..b483e608f1 100644 --- a/sei-db/ledger_db/receipt/parquet_v2/coordinator/helpers_test.go +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/helpers_test.go @@ -41,7 +41,7 @@ func newWriteCoordinator(t *testing.T, wal *recordingWAL) *Coordinator { cfg.MaxBlocksPerFile = 500 cfg.BlockFlushInterval = 0 - return &Coordinator{ + c := &Coordinator{ config: cfg, basePath: cfg.DBDirectory, receiptsBuffer: make([]parquet.ReceiptRecord, 0, 1000), @@ -49,6 +49,9 @@ func newWriteCoordinator(t *testing.T, wal *recordingWAL) *Coordinator { tempWriteCache: make(map[common.Hash][]tempReceipt), wal: wal, } + bootstrapWorkersForTest(c) + t.Cleanup(func() { c.shutdownWorkers() }) + return c } func newReplayCoordinator(t *testing.T, wal *recordingWAL) *Coordinator { @@ -151,6 +154,7 @@ func readClosedReceiptForTest(t *testing.T, coord *Coordinator, txHash common.Ha }) result := <-resp require.NoError(t, result.err) + quiesceWorkersForTest(coord) return result.result } diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune.go index 47ade32ac5..d061974ac9 100644 --- a/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune.go +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune.go @@ -11,39 +11,6 @@ var ( logger = seilog.NewLogger("db", "ledger-db", "parquet-v2") ) -// pruneOldFiles deletes closed parquet pairs whose entire block range falls -// below pruneBeforeBlock. A pair stays in the list if either of its files -// fails to delete, so a transient error doesn't desync c.closedFiles from -// disk. Returns the number of pairs successfully removed. -func (c *Coordinator) pruneOldFiles(pruneBeforeBlock uint64) int { - if len(c.closedFiles) == 0 { - return 0 - } - - prunedCount := 0 - kept := c.closedFiles[:0] - for _, f := range c.closedFiles { - if !c.shouldPruneClosedFile(f, pruneBeforeBlock) { - kept = append(kept, f) - continue - } - - receiptRemoved := removePrunedFile(f.receiptPath) - if !receiptRemoved { - kept = append(kept, f) - continue - } - logRemoved := removePrunedFile(f.logPath) - if logRemoved { - prunedCount++ - continue - } - kept = append(kept, f) - } - c.closedFiles = kept - return prunedCount -} - // shouldPruneClosedFile reports whether the file's full block range // (startBlock + MaxBlocksPerFile) lies entirely below pruneBeforeBlock. // Saturates on overflow rather than wrapping. diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune_test.go index 033a2e5940..435d7bcef8 100644 --- a/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune_test.go +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune_test.go @@ -29,6 +29,8 @@ func TestPruneTickDeletesEligibleClosedFiles(t *testing.T) { latestVersion: 12, reader: reader, } + bootstrapWorkersForTest(coord) + t.Cleanup(func() { coord.shutdownWorkers() }) forcePruneTickForTest(coord) @@ -64,11 +66,15 @@ func TestPruneKeepsFilePairTrackedWhenDeleteFails(t *testing.T) { } coord := &Coordinator{ - config: parquet.StoreConfig{MaxBlocksPerFile: 4}, - closedFiles: closedFiles, + config: parquet.StoreConfig{KeepRecent: 4, MaxBlocksPerFile: 4}, + closedFiles: closedFiles, + latestVersion: 8, } + bootstrapWorkersForTest(coord) + t.Cleanup(func() { coord.shutdownWorkers() }) + + forcePruneTickForTest(coord) - require.Zero(t, coord.pruneOldFiles(4)) require.Len(t, coord.closedFiles, 1) require.Equal(t, uint64(0), coord.closedFiles[0].startBlock) require.FileExists(t, failPath) diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/workers.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/workers.go new file mode 100644 index 0000000000..df360dcffb --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/workers.go @@ -0,0 +1,104 @@ +package coordinator + +// Worker pool design: the coordinator owns all mutable state and builds +// closures (lambdas) that capture exactly what each worker needs. Workers +// are dumb runners that pull a job from their channel and call it. All +// completion handshakes flow back through controlMsg on controlChan, so +// the coordinator goroutine remains the single owner of refcounts, +// closedFiles, pendingPrune, etc. + +// controlMsg is the union of completion messages workers send back to the +// coordinator. Implemented as a sealed interface via the unexported +// isControlMsg method. +type controlMsg interface { + isControlMsg() +} + +// readDoneMsg signals that a reader finished a job. The coordinator +// decrements inFlightReads for each path and dispatches any pendingPrune +// entries that just dropped to refcount zero. +type readDoneMsg struct { + paths []string +} + +// writerDoneMsg is reserved for an asynchronous fire-and-forget writer +// path. The synchronous awaitWriter handshake uses its own per-call +// result channel and does not flow through controlChan. +type writerDoneMsg struct{} + +// pruneDoneMsg signals a pruner finished. The coordinator drops the +// matching pendingPrune entry on success. +type pruneDoneMsg struct { + paths []string + ok bool +} + +func (readDoneMsg) isControlMsg() {} +func (writerDoneMsg) isControlMsg() {} +func (pruneDoneMsg) isControlMsg() {} + +// runReader is the reader worker loop. Identical shape to runWriter and +// runPruner — pulls a closure from its channel and executes it. Exits when +// the channel is closed. +func (c *Coordinator) runReader() { + defer c.readerWG.Done() + for job := range c.readChan { + job() + } +} + +// runWriter is the writer worker loop. Single goroutine; the coordinator +// guarantees only one writer job is in flight at a time via awaitWriter. +func (c *Coordinator) runWriter() { + defer c.writerWG.Done() + for job := range c.writerChan { + job() + } +} + +// runPruner is the pruner worker loop. Single goroutine; deletions are +// fire-and-forget from the coordinator's perspective. +func (c *Coordinator) runPruner() { + defer c.prunerWG.Done() + for job := range c.pruneChan { + job() + } +} + +// dispatchRead enqueues a read closure. This may block when readChan is +// full, which in turn blocks the coordinator's run loop and therefore the +// inbound requests channel — preserving backpressure. +func (c *Coordinator) dispatchRead(job func()) { + c.readChan <- job +} + +// dispatchWrite enqueues a writer closure. Used by awaitWriter; do not +// call directly from handlers. +func (c *Coordinator) dispatchWrite(job func()) { + c.writerChan <- job +} + +// dispatchPrune enqueues a pruner closure. +func (c *Coordinator) dispatchPrune(job func()) { + c.pruneChan <- job +} + +// awaitWriter dispatches a writer job and blocks the coordinator until it +// completes. The coordinator continues to service controlChan messages +// (read/prune completions) while waiting so refcounts and pendingPrune +// stay live, but it does NOT pull new requests off c.requests — preserving +// the single-owner invariant for write state. +func (c *Coordinator) awaitWriter(job func() error) error { + result := make(chan error, 1) + c.dispatchWrite(func() { + result <- job() + }) + for { + select { + case err := <-result: + return err + case msg := <-c.controlChan: + c.handleControl(msg) + } + } +} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/workers_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/workers_test.go new file mode 100644 index 0000000000..a42f23ab8b --- /dev/null +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/workers_test.go @@ -0,0 +1,189 @@ +package coordinator + +import ( + "context" + "math/big" + "path/filepath" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/parquet" + "github.com/stretchr/testify/require" +) + +// newReadCoordinator builds a coordinator with closed parquet files on +// disk and a real reader, ready to dispatch reads through the worker +// pool. Returns the coordinator and the directory. +func newReadCoordinator(t *testing.T, starts ...uint64) (*Coordinator, string) { + t.Helper() + dir := t.TempDir() + closedFiles := writeClosedFileSet(t, dir, starts...) + reader, err := NewReaderWithMaxBlocksPerFile(dir, 4) + require.NoError(t, err) + t.Cleanup(func() { _ = reader.Close() }) + + coord := &Coordinator{ + config: parquet.StoreConfig{ + KeepRecent: 4, + MaxBlocksPerFile: 4, + }, + basePath: dir, + closedFiles: closedFiles, + latestVersion: int64(starts[len(starts)-1]) + 4, + reader: reader, + tempWriteCache: make(map[common.Hash][]tempReceipt), + } + bootstrapWorkersForTest(coord) + t.Cleanup(func() { coord.shutdownWorkers() }) + return coord, dir +} + +// dispatchReadInBlock issues a read through the handler and returns the +// response channel without blocking on it. +func dispatchReadInBlock(coord *Coordinator, txHash common.Hash, blockNumber uint64) chan readReceiptResp { + resp := make(chan readReceiptResp, 1) + coord.handleReadByTxHashInBlock(readByTxHashInBlockReq{ + ctx: context.Background(), + txHash: txHash, + blockNumber: blockNumber, + resp: resp, + }) + return resp +} + +func TestReadDispatchIncrementsAndDecrementsRefcount(t *testing.T) { + coord, dir := newReadCoordinator(t, 0, 4) + + // Pre-acquire a refcount on file_0 by holding a reader gate via + // dispatching a read against block 1 (file_0). The handler increments + // the refcount synchronously before dispatching the lambda. + resp := dispatchReadInBlock(coord, common.BigToHash(big.NewInt(1)), 1) + target := filepath.Join(dir, "receipts_0.parquet") + + // Refcount becomes visible immediately because acquireReadRefs runs + // on the same goroutine as the handler call (the test goroutine + // here, not the coordinator's run loop — we're driving the handler + // directly). + require.Equal(t, 1, inFlightReadsForTest(coord, target)) + + // Wait for the read to complete and the lambda to send readDoneMsg, + // then drain it through handleControl. + <-resp + quiesceWorkersForTest(coord) + + require.Equal(t, 0, inFlightReadsForTest(coord, target)) +} + +func TestPruneWhileReadingDefersDeletion(t *testing.T) { + coord, dir := newReadCoordinator(t, 0, 4) + receiptPath := filepath.Join(dir, "receipts_0.parquet") + logPath := filepath.Join(dir, "logs_0.parquet") + + // Manually pre-bump the refcount to simulate an in-flight read on + // file_0 that the test will later release. This avoids racing on + // when the read lambda actually completes. + coord.acquireReadRefs([]string{receiptPath, logPath}) + + // Force a prune tick. file_0 is eligible (latestVersion=8, + // keepRecent=4 → pruneBefore=4 → file_0 ages out). With a refcount + // >0 the file must move to pendingPrune, not be deleted yet. + coord.handlePruneTick() + quiesceWorkersForTest(coord) + + require.Equal(t, 1, pendingPruneCountForTest(coord)) + require.FileExists(t, receiptPath) + require.FileExists(t, logPath) + + // closedFiles must already exclude the pruned file so subsequent + // snapshots don't pick it up. + for _, f := range coord.closedFiles { + require.NotEqual(t, uint64(0), f.startBlock, "closedFiles must not include the pending-prune file") + } + + // Releasing the refs triggers the deferred prune via flushPendingPrune. + coord.releaseReadRefs([]string{receiptPath, logPath}) + quiesceWorkersForTest(coord) + + require.Equal(t, 0, pendingPruneCountForTest(coord)) + require.NoFileExists(t, receiptPath) + require.NoFileExists(t, logPath) +} + +func TestNewReadAfterPruneTickExcludesPendingFile(t *testing.T) { + coord, dir := newReadCoordinator(t, 0, 4) + receiptPath := filepath.Join(dir, "receipts_0.parquet") + logPath := filepath.Join(dir, "logs_0.parquet") + + coord.acquireReadRefs([]string{receiptPath, logPath}) + coord.handlePruneTick() + quiesceWorkersForTest(coord) + require.Equal(t, 1, pendingPruneCountForTest(coord)) + + // A new GetLogs against the full window must not see the pending- + // prune file in its snapshot. + from := uint64(0) + to := uint64(100) + snapshot := coord.filteredLogFilesSnapshot(parquet.LogFilter{FromBlock: &from, ToBlock: &to}) + for _, p := range snapshot { + require.NotEqual(t, logPath, p, "new read snapshot must not include pending-prune file") + } + + coord.releaseReadRefs([]string{receiptPath, logPath}) + quiesceWorkersForTest(coord) +} + +func TestReadPoolDispatchesReadsConcurrently(t *testing.T) { + coord, _ := newReadCoordinator(t, 0, 4) + + // In production handlers run serially on the coordinator goroutine. + // We dispatch from a single goroutine here for the same reason — + // what we're proving is that the WORKER POOL services them in + // parallel, not that the handler is reentrant. With a deep readChan + // the handler returns immediately, so dispatching N reads back-to- + // back enqueues all N before any worker has a chance to drain the + // channel. + const n = 8 + resps := make([]chan readReceiptResp, n) + for i := 0; i < n; i++ { + resps[i] = dispatchReadInBlock(coord, common.BigToHash(big.NewInt(int64(1+(i%4)))), uint64(1+(i%4))) + } + for i := 0; i < n; i++ { + select { + case <-resps[i]: + case <-time.After(5 * time.Second): + t.Fatalf("read %d did not complete", i) + } + } + quiesceWorkersForTest(coord) +} + +func TestShutdownDrainsActiveReads(t *testing.T) { + coord, _ := newReadCoordinator(t, 0, 4) + + const n = 4 + resps := make([]chan readReceiptResp, n) + for i := 0; i < n; i++ { + resps[i] = dispatchReadInBlock(coord, common.BigToHash(big.NewInt(int64(1+(i%4)))), uint64(1+(i%4))) + } + + // Trigger shutdown immediately. shutdownWorkers must drain in-flight + // lambdas — every response should still arrive. + done := make(chan struct{}) + go func() { + coord.shutdownWorkers() + close(done) + }() + for i := 0; i < n; i++ { + select { + case <-resps[i]: + case <-time.After(5 * time.Second): + t.Fatalf("read %d not drained on shutdown", i) + } + } + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("shutdownWorkers did not return") + } +} From 891971168a8ff9409ef31c382f941b82281b272b Mon Sep 17 00:00:00 2001 From: Jeremy Wei Date: Thu, 30 Apr 2026 13:00:06 -0400 Subject: [PATCH 2/4] fix --- .../parquet_v2/coordinator/handlers.go | 11 ++++- .../parquet_v2/coordinator/prune_test.go | 43 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go index 5e08136a88..518d7c83ec 100644 --- a/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sort" "github.com/ethereum/go-ethereum/common" parquetgo "github.com/parquet-go/parquet-go" @@ -157,17 +158,25 @@ func (c *Coordinator) handleControl(msg controlMsg) { // reinsertFailedPrune re-adds a file to closedFiles after a prune lambda // failed to delete it from disk. Best-effort — preserves the prior // behavior of pruneOldFiles which kept the entry on remove failure. +// Inserts in sorted position by startBlock because receiptFileSnapshotForBlock +// relies on closedFiles being ascending to early-break correctly. func (c *Coordinator) reinsertFailedPrune(paths []string) { if len(paths) != 2 { return } receiptPath, logPath := paths[0], paths[1] startBlock := parquet.ExtractBlockNumber(receiptPath) - c.closedFiles = append(c.closedFiles, closedFile{ + entry := closedFile{ startBlock: startBlock, receiptPath: receiptPath, logPath: logPath, + } + idx := sort.Search(len(c.closedFiles), func(i int) bool { + return c.closedFiles[i].startBlock >= startBlock }) + c.closedFiles = append(c.closedFiles, closedFile{}) + copy(c.closedFiles[idx+1:], c.closedFiles[idx:]) + c.closedFiles[idx] = entry } // handleFlush serves a flushReq by flushing buffered receipts/logs for the diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune_test.go index 435d7bcef8..5819171972 100644 --- a/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune_test.go +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/prune_test.go @@ -80,3 +80,46 @@ func TestPruneKeepsFilePairTrackedWhenDeleteFails(t *testing.T) { require.FileExists(t, failPath) require.FileExists(t, filepath.Join(dir, "logs_0.parquet")) } + +// Regression: a failed prune for an older file must be reinserted in sorted +// position so receiptFileSnapshotForBlock's early-break still finds it. +func TestPruneFailureReinsertsInSortedOrder(t *testing.T) { + dir := t.TempDir() + closedFiles := writeClosedFileSet(t, dir, 0, 4, 8) + failPath := filepath.Join(dir, "receipts_0.parquet") + + originalRemoveFile := removeFile + t.Cleanup(func() { removeFile = originalRemoveFile }) + removeFile = func(path string) error { + if path == failPath { + return errors.New("delete failed") + } + return os.Remove(path) + } + + reader, err := NewReaderWithMaxBlocksPerFile(dir, 4) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, reader.Close()) }) + + coord := &Coordinator{ + config: parquet.StoreConfig{KeepRecent: 4, MaxBlocksPerFile: 4}, + closedFiles: closedFiles, + latestVersion: 12, + reader: reader, + } + bootstrapWorkersForTest(coord) + t.Cleanup(func() { coord.shutdownWorkers() }) + + forcePruneTickForTest(coord) + + // File 4 deleted; file 0 reinserted after delete failure; file 8 retained. + // Ordering matters: file 0 must come before file 8 so a query for block 1 + // can still resolve to receipts_0.parquet. + require.Len(t, coord.closedFiles, 2) + require.Equal(t, uint64(0), coord.closedFiles[0].startBlock) + require.Equal(t, uint64(8), coord.closedFiles[1].startBlock) + + result := readClosedReceiptForTest(t, coord, common.BigToHash(new(big.Int).SetUint64(1)), 1) + require.NotNil(t, result) + require.Equal(t, uint64(1), result.BlockNumber) +} From ccfa6bd3484604b54fb678306367af30fecb9719 Mon Sep 17 00:00:00 2001 From: Jeremy Wei Date: Thu, 30 Apr 2026 13:13:41 -0400 Subject: [PATCH 3/4] remove some dead code --- .../receipt/parquet_v2/coordinator/export_test.go | 5 ----- .../receipt/parquet_v2/coordinator/handlers.go | 11 ----------- 2 files changed, 16 deletions(-) diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/export_test.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/export_test.go index 7684319dc4..70f5772243 100644 --- a/sei-db/ledger_db/receipt/parquet_v2/coordinator/export_test.go +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/export_test.go @@ -70,8 +70,3 @@ func inFlightReadsForTest(c *Coordinator, path string) int { func pendingPruneCountForTest(c *Coordinator) int { return len(c.pendingPrune) } - -// setReadWorkerCountForTest must be called BEFORE bootstrapWorkersForTest. -func setReadWorkerCountForTest(c *Coordinator, n int) { - c.readWorkerCount = n -} diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go index 518d7c83ec..361c43e45f 100644 --- a/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go @@ -532,17 +532,6 @@ func (c *Coordinator) receiptFileSnapshotForBlock(blockNumber uint64) []string { return []string{best} } -// logFilesSnapshot returns the log parquet paths for all closed files. Log -// queries use this list as the file set, which the Reader further narrows -// by from/to-block range. -func (c *Coordinator) logFilesSnapshot() []string { - files := make([]string, 0, len(c.closedFiles)) - for _, f := range c.closedFiles { - files = append(files, f.logPath) - } - return files -} - // filteredLogFilesSnapshot builds a snapshot of log file paths whose // block range overlaps the filter's [FromBlock, ToBlock] window. Filtering // happens here on the coordinator so refcounts only get incremented on From 59e172a44306b9047ff271583e58e0091aac638d Mon Sep 17 00:00:00 2001 From: Jeremy Wei Date: Fri, 1 May 2026 10:21:45 -0400 Subject: [PATCH 4/4] drain channel during shutdown --- .../parquet_v2/coordinator/handlers.go | 53 +++++++++++++++++-- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go b/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go index 361c43e45f..8e893e25b8 100644 --- a/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go +++ b/sei-db/ledger_db/receipt/parquet_v2/coordinator/handlers.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "sort" + "sync" "github.com/ethereum/go-ethereum/common" parquetgo "github.com/parquet-go/parquet-go" @@ -364,23 +365,69 @@ func (c *Coordinator) handleSimulateCrash(req simulateCrashReq) { // before exiting. Idempotent via shutdownOnce so retry paths in // handleClose do not double-close the channels. Tolerates nil channels // for bare-Coordinator constructions used in unit tests. +// +// IMPORTANT: this runs on the coordinator goroutine — the same goroutine +// that normally drains controlChan in c.run(). Each in-flight reader and +// pruner lambda finishes with a blocking send on controlChan +// (handlers.go readDoneMsg/pruneDoneMsg sites). controlChan is buffered +// to 64, but the reader pool can have up to 2*NumCPU workers and readChan +// can hold up to 1024 queued jobs. If we naively call wg.Wait() here, +// nothing is reading controlChan; once the buffer fills, additional +// workers block on the send forever, never call wg.Done(), and shutdown +// hangs. drainControlUntilDone keeps controlChan moving in parallel with +// the wait so workers can always complete their send and exit. +// +// We deliberately drop the drained messages instead of routing them +// through handleControl: the coordinator state they update (inFlightReads, +// pendingPrune) is about to be discarded, and dispatching a fresh prune +// from a late readDoneMsg would race with the close(c.pruneChan) that +// happens later in this same function. +// +// This deadlock is hard to reproduce in unit tests (existing tests fire +// 4–8 reads, the threshold is 64+ in-flight reads at the moment of +// shutdown), so the fix is documented here rather than guarded by a +// regression test. func (c *Coordinator) shutdownWorkers() { c.shutdownOnce.Do(func() { if c.readChan != nil { close(c.readChan) - c.readerWG.Wait() + c.drainControlUntilDone(&c.readerWG) } if c.writerChan != nil { close(c.writerChan) - c.writerWG.Wait() + c.drainControlUntilDone(&c.writerWG) } if c.pruneChan != nil { close(c.pruneChan) - c.prunerWG.Wait() + c.drainControlUntilDone(&c.prunerWG) } }) } +// drainControlUntilDone blocks until wg signals done, while concurrently +// receiving (and discarding) anything queued on controlChan. See +// shutdownWorkers for why discarding is correct during teardown. Spawns +// one short-lived goroutine that closes a sentinel when the wait group +// drains; the loop in this function is the only consumer of controlChan +// during shutdown, replacing c.run()'s usual select. +func (c *Coordinator) drainControlUntilDone(wg *sync.WaitGroup) { + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + for { + select { + case <-done: + return + case <-c.controlChan: + // Drop. Coordinator state is being torn down; refcounts and + // pendingPrune entries do not need to stay consistent past + // this point. + } + } +} + // writeReceipts records a committed block at height. When inputs is empty it // degenerates to the rotation/cursor-advance path (formerly ObserveEmptyBlock): // no WAL entry is written, but if height lands on a rotation boundary the