Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions sei-db/ledger_db/parquet/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (

defaultBlockFlushInterval uint64 = 1
defaultMaxBlocksPerFile uint64 = 500
defaultReadChannelDepth = 1024
)

var removeFile = os.Remove
Expand All @@ -45,6 +46,13 @@ type StoreConfig struct {
// consumed by the v2 store; v1 ignores it. When nil, replay is
// skipped — used by lower-level tests that drive replay manually.
WALConverter WALReceiptConverter

// ReadWorkerPoolSize controls the number of reader goroutines in the
// parquet_v2 coordinator's read pool. Zero means runtime.NumCPU().
ReadWorkerPoolSize int
// ReadChannelDepth is the buffer depth of the coordinator → read pool
// dispatch channel. Zero means defaultReadChannelDepth.
ReadChannelDepth int
}

// WALReceiptConverter decodes a raw WAL receipt blob into the structured
Expand All @@ -69,6 +77,7 @@ func DefaultStoreConfig() StoreConfig {
BlockFlushInterval: defaultBlockFlushInterval,
MaxBlocksPerFile: defaultMaxBlocksPerFile,
TxIndexBackend: dbconfig.ReceiptTxIndexBackendPebble,
ReadChannelDepth: defaultReadChannelDepth,
}
}

Expand Down
66 changes: 66 additions & 0 deletions sei-db/ledger_db/receipt/parquet_v2/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -63,6 +64,27 @@ type Coordinator struct {

warmupRecords []parquet.ReceiptRecord
replayedBlocks []ReplayedBlock

// Worker pool dispatch channels. Coordinator builds closures and
// sends them here; workers pull and execute. Coordinator-owned state
// is never mutated by workers — completion handshakes flow back via
// controlChan.
readChan chan func()
writerChan chan func()
pruneChan chan func()
controlChan chan controlMsg

readerWG sync.WaitGroup
writerWG sync.WaitGroup
prunerWG sync.WaitGroup
shutdownOnce sync.Once

// Reference tracking for closed parquet files. Incremented on read
// dispatch, decremented on readDoneMsg. Pruning is gated on these.
inFlightReads map[string]int
pendingPrune []closedFile

readWorkerCount int
}

// New constructs a Coordinator and drives WAL replay synchronously before
Expand Down Expand Up @@ -108,6 +130,7 @@ func New(cfg parquet.StoreConfig) (*Coordinator, error) {
return nil, err
}

readWorkerCount := resolveReadWorkerCount(storeCfg.ReadWorkerPoolSize)
c := &Coordinator{
requests: requests,
done: done,
Expand All @@ -121,6 +144,12 @@ func New(cfg parquet.StoreConfig) (*Coordinator, error) {
wal: receiptWAL,
latestVersion: 0,
earliestVersion: 0,
readChan: make(chan func(), storeCfg.ReadChannelDepth),
writerChan: make(chan func(), 4),
pruneChan: make(chan func(), 4),
controlChan: make(chan controlMsg, 64),
inFlightReads: make(map[string]int),
readWorkerCount: readWorkerCount,
}
cleanupWriters := true
defer func() {
Expand Down Expand Up @@ -156,6 +185,7 @@ func New(cfg parquet.StoreConfig) (*Coordinator, error) {
c.replayedBlocks = result.Blocks
}

c.startWorkers()
go c.run()
cleanupReader = false
cleanupWAL = false
Expand All @@ -164,6 +194,20 @@ func New(cfg parquet.StoreConfig) (*Coordinator, error) {
return c, nil
}

// startWorkers spawns the reader pool, writer goroutine, and pruner
// goroutine. Each adds itself to the matching wait group so handleClose
// can drain them deterministically.
func (c *Coordinator) startWorkers() {
for i := 0; i < c.readWorkerCount; i++ {
c.readerWG.Add(1)
go c.runReader()
}
c.writerWG.Add(1)
go c.runWriter()
c.prunerWG.Add(1)
go c.runPruner()
}

func resolveStoreConfig(cfg parquet.StoreConfig) parquet.StoreConfig {
resolved := parquet.DefaultStoreConfig()
resolved.DBDirectory = cfg.DBDirectory
Expand All @@ -178,9 +222,29 @@ func resolveStoreConfig(cfg parquet.StoreConfig) parquet.StoreConfig {
if cfg.MaxBlocksPerFile > 0 {
resolved.MaxBlocksPerFile = cfg.MaxBlocksPerFile
}
if cfg.ReadWorkerPoolSize > 0 {
resolved.ReadWorkerPoolSize = cfg.ReadWorkerPoolSize
}
if cfg.ReadChannelDepth > 0 {
resolved.ReadChannelDepth = cfg.ReadChannelDepth
}
return resolved
}

// resolveReadWorkerCount derives the reader pool size from the resolved
// config. Zero means runtime.NumCPU(); we cap at numCPU*2 to match the
// DuckDB connection limit set in NewReaderWithMaxBlocksPerFile.
func resolveReadWorkerCount(configured int) int {
numCPU := runtime.NumCPU()
if configured <= 0 {
return numCPU
}
if configured > numCPU*2 {
return numCPU * 2
}
return configured
}

func (c *Coordinator) run() {
var pruneTick <-chan time.Time
if c.config.KeepRecent > 0 && c.config.PruneIntervalSeconds > 0 {
Expand All @@ -190,6 +254,8 @@ func (c *Coordinator) run() {
}
for {
select {
case msg := <-c.controlChan:
c.handleControl(msg)
case req := <-c.requests:
if req.dispatch(c) {
return
Expand Down
67 changes: 67 additions & 0 deletions sei-db/ledger_db/receipt/parquet_v2/coordinator/export_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,72 @@
package coordinator

// forcePruneTickForTest synchronously runs a prune tick and then waits for
// any dispatched pruner lambdas to complete so the test can observe the
// post-prune disk state immediately.
func forcePruneTickForTest(c *Coordinator) {
c.handlePruneTick()
quiesceWorkersForTest(c)
}

// bootstrapWorkersForTest initializes worker channels and spawns the
// reader/writer/pruner goroutines on a bare-constructed Coordinator.
// Tests that drive coordinator handlers directly (without going through
// New) must call this so awaitWriter and dispatchPrune don't deadlock on
// a nil channel send.
func bootstrapWorkersForTest(c *Coordinator) {
if c.readChan != nil {
return
}
if c.inFlightReads == nil {
c.inFlightReads = make(map[string]int)
}
c.readChan = make(chan func(), 1024)
c.writerChan = make(chan func(), 4)
c.pruneChan = make(chan func(), 4)
c.controlChan = make(chan controlMsg, 64)
c.readWorkerCount = 2
c.startWorkers()
}

// quiesceWorkersForTest round-trips a sentinel lambda through both the
// writer and the pruner. Because each is single-threaded, the sentinel
// runs only after every previously-dispatched lambda for that worker has
// completed. After the sentinels return, the function drains controlChan
// of any pending readDoneMsg/pruneDoneMsg so coordinator-side bookkeeping
// (refcounts, pendingPrune) reflects the just-completed work.
func quiesceWorkersForTest(c *Coordinator) {
if c.writerChan != nil {
done := make(chan struct{})
c.dispatchWrite(func() { close(done) })
<-done
}
if c.pruneChan != nil {
done := make(chan struct{})
c.dispatchPrune(func() { close(done) })
<-done
}
if c.controlChan == nil {
return
}
for {
select {
case msg := <-c.controlChan:
c.handleControl(msg)
default:
return
}
}
}

// inFlightReadsForTest returns the active reader refcount for path.
// Callers must quiesce the workers first to avoid racing with in-flight
// readDoneMsg processing.
func inFlightReadsForTest(c *Coordinator, path string) int {
return c.inFlightReads[path]
}

// pendingPruneCountForTest returns the number of files awaiting a refcount
// drop before being deleted.
func pendingPruneCountForTest(c *Coordinator) int {
return len(c.pendingPrune)
}
Loading
Loading