diff --git a/evmrpc/block.go b/evmrpc/block.go index 2aab45f035..7209278acb 100644 --- a/evmrpc/block.go +++ b/evmrpc/block.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "math/big" - "strings" "sync" "time" @@ -20,6 +19,7 @@ import ( "github.com/sei-protocol/sei-chain/sei-cosmos/client" sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types" banktypes "github.com/sei-protocol/sei-chain/sei-cosmos/x/bank/types" + "github.com/sei-protocol/sei-chain/sei-db/ledger_db/receipt" rpcclient "github.com/sei-protocol/sei-chain/sei-tendermint/rpc/client" "github.com/sei-protocol/sei-chain/sei-tendermint/rpc/coretypes" wasmtypes "github.com/sei-protocol/sei-chain/sei-wasmd/x/wasm/types" @@ -40,9 +40,31 @@ const genesisBlockHashHex = "0xF9D3845DF25B43B1C6926F3CEDA6845C17F5624E12212FD88 var genesisBlockHash = common.HexToHash(genesisBlockHashHex) +// ErrReceiptsPruned signals that a block's receipts have been pruned from this +// node, so receipt-derived fields cannot be served reliably. +var ErrReceiptsPruned = errors.New("block receipts have been pruned from this node") + // genesisBlockTxCount is the transaction count for the synthetic genesis block (eth_getBlockTransactionCountByHash/ByNumber for genesis). var genesisBlockTxCount = func() *hexutil.Uint { u := hexutil.Uint(0); return &u }() +func checkReceiptsAvailable(store receipt.ReceiptStore, block *coretypes.ResultBlock) error { + if store == nil { + return nil + } + earliest := store.EarliestVersion() + if earliest == 0 || block.Block.Height >= earliest { + return nil + } + return fmt.Errorf("%w: requested height %d, earliest retained %d", ErrReceiptsPruned, block.Block.Height, earliest) +} + +func (a *BlockAPI) receiptStore() receipt.ReceiptStore { + if a.keeper == nil { + return nil + } + return a.keeper.ReceiptStore() +} + func encodeGenesisBlock() map[string]any { return map[string]any{ "number": (*hexutil.Big)(big.NewInt(0)), @@ -180,6 +202,9 @@ func (a *BlockAPI) GetBlockTransactionCountByNumber(ctx context.Context, number if err != nil { return nil, err } + if err := checkReceiptsAvailable(a.receiptStore(), block); err != nil { + return nil, err + } return a.getEvmTxCount(block), nil } @@ -195,6 +220,9 @@ func (a *BlockAPI) GetBlockTransactionCountByHash(ctx context.Context, blockHash if err != nil { return nil, err } + if err := checkReceiptsAvailable(a.receiptStore(), block); err != nil { + return nil, err + } return a.getEvmTxCount(block), nil } @@ -230,6 +258,9 @@ func (a *BlockAPI) getBlockByHash(ctx context.Context, blockHash common.Hash, fu return nil, err } + if err := checkReceiptsAvailable(a.receiptStore(), block); err != nil { + return nil, err + } blockRes, err := blockResultsWithRetry(ctx, a.tmClient, &block.Block.Height) if err != nil { return nil, err @@ -277,6 +308,9 @@ func (a *BlockAPI) getBlockByNumber( if err != nil { return nil, err } + if err := checkReceiptsAvailable(a.receiptStore(), block); err != nil { + return nil, err + } blockRes, err := blockResultsWithRetry(ctx, a.tmClient, &block.Block.Height) if err != nil { return nil, err @@ -314,6 +348,9 @@ func (a *BlockAPI) GetBlockReceipts(ctx context.Context, blockNrOrHash rpc.Block if err != nil { return nil, err } + if err := checkReceiptsAvailable(a.receiptStore(), block); err != nil { + return nil, err + } // Get all tx hashes for the block height := block.Block.Height @@ -329,16 +366,16 @@ func (a *BlockAPI) GetBlockReceipts(ctx context.Context, blockNrOrHash rpc.Block go func(i int, hash typedTxHash) { defer wg.Done() defer recoverAndLog() - receipt, err := getOrSetCachedReceiptErr(a.cacheCreationMutex, a.globalBlockCache, a.ctxProvider(height), a.keeper, block, hash.hash) + rcpt, err := getOrSetCachedReceiptErr(a.cacheCreationMutex, a.globalBlockCache, a.ctxProvider(height), a.keeper, block, hash.hash) if err != nil { - if !strings.Contains(err.Error(), "not found") { + if !errors.Is(err, receipt.ErrNotFound) { mtx.Lock() returnErr = err mtx.Unlock() } return } - encodedReceipt, err := encodeReceipt(a.ctxProvider, a.txConfigProvider, receipt, a.keeper, block, a.includeShellReceipts, a.globalBlockCache, a.cacheCreationMutex) + encodedReceipt, err := encodeReceipt(a.ctxProvider, a.txConfigProvider, rcpt, a.keeper, block, a.includeShellReceipts, a.globalBlockCache, a.cacheCreationMutex) if err != nil { mtx.Lock() returnErr = err diff --git a/evmrpc/block_test.go b/evmrpc/block_test.go index 664c031eb5..49e45fed5c 100644 --- a/evmrpc/block_test.go +++ b/evmrpc/block_test.go @@ -1,6 +1,7 @@ package evmrpc_test import ( + "context" "crypto/sha256" "math/big" "sync" @@ -26,6 +27,61 @@ import ( "github.com/stretchr/testify/require" ) +func TestGetBlockByNumberReturnsReceiptsPrunedBelowReceiptWatermark(t *testing.T) { + k := &testkeeper.EVMTestApp.EvmKeeper + store := k.ReceiptStore() + require.NotNil(t, store) + + oldEarliest := store.EarliestVersion() + require.NoError(t, store.SetEarliestVersion(MockHeight8+1)) + t.Cleanup(func() { + require.NoError(t, store.SetEarliestVersion(oldEarliest)) + }) + + tmClient := NewMockClientWithLatest(MockHeight103) + ctxProvider := func(height int64) sdk.Context { + if height == evmrpc.LatestCtxHeight { + return Ctx + } + return Ctx.WithBlockHeight(height) + } + txConfigProvider := func(int64) client.TxConfig { return TxConfig } + watermarks := evmrpc.NewWatermarkManager(tmClient, ctxProvider, nil, store) + api := evmrpc.NewBlockAPI(tmClient, k, ctxProvider, txConfigProvider, evmrpc.ConnectionTypeHTTP, watermarks, evmrpc.NewBlockCache(3000), &sync.Mutex{}) + + _, err := api.GetBlockByNumber(context.Background(), MockHeight8, false) + require.ErrorIs(t, err, evmrpc.ErrReceiptsPruned) +} + +func TestGetBlockTransactionCountReturnsReceiptsPrunedBelowReceiptWatermark(t *testing.T) { + k := &testkeeper.EVMTestApp.EvmKeeper + store := k.ReceiptStore() + require.NotNil(t, store) + + oldEarliest := store.EarliestVersion() + require.NoError(t, store.SetEarliestVersion(MockHeight8+1)) + t.Cleanup(func() { + require.NoError(t, store.SetEarliestVersion(oldEarliest)) + }) + + tmClient := NewMockClientWithLatest(MockHeight103) + ctxProvider := func(height int64) sdk.Context { + if height == evmrpc.LatestCtxHeight { + return Ctx + } + return Ctx.WithBlockHeight(height) + } + txConfigProvider := func(int64) client.TxConfig { return TxConfig } + watermarks := evmrpc.NewWatermarkManager(tmClient, ctxProvider, nil, store) + api := evmrpc.NewBlockAPI(tmClient, k, ctxProvider, txConfigProvider, evmrpc.ConnectionTypeHTTP, watermarks, evmrpc.NewBlockCache(3000), &sync.Mutex{}) + + _, err := api.GetBlockTransactionCountByNumber(context.Background(), MockHeight8) + require.ErrorIs(t, err, evmrpc.ErrReceiptsPruned) + + _, err = api.GetBlockTransactionCountByHash(context.Background(), common.HexToHash(TestBlockHash)) + require.ErrorIs(t, err, evmrpc.ErrReceiptsPruned) +} + func TestEncodeTmBlock_EmptyTransactions(t *testing.T) { k := &testkeeper.EVMTestApp.EvmKeeper ctx := testkeeper.EVMTestApp.GetContextForDeliverTx([]byte{}).WithBlockTime(time.Now()) diff --git a/evmrpc/tx.go b/evmrpc/tx.go index 9e798d0083..6fc8c4f3de 100644 --- a/evmrpc/tx.go +++ b/evmrpc/tx.go @@ -7,7 +7,6 @@ import ( "fmt" "math" "math/big" - "strings" "sync" "time" @@ -94,6 +93,10 @@ func NewSeiTransactionAPI( return &SeiTransactionAPI{TransactionAPI: baseAPI, isPanicTx: isPanicTx} } +func (t *TransactionAPI) receiptStore() receiptpkg.ReceiptStore { + return t.keeper.ReceiptStore() +} + func (t *SeiTransactionAPI) GetTransactionReceiptExcludeTraceFail(ctx context.Context, hash common.Hash) (result map[string]interface{}, returnErr error) { return getTransactionReceipt(ctx, t.TransactionAPI, hash, true, t.isPanicTx, true) } @@ -128,7 +131,7 @@ func getTransactionReceipt( receipt, err := t.keeper.GetReceipt(sdkctx, hash) if err != nil { - if strings.Contains(err.Error(), "not found") { + if errors.Is(err, receiptpkg.ErrNotFound) { // When the transaction doesn't exist, the RPC method should return JSON null // as per specification. return nil, nil @@ -214,6 +217,9 @@ func (t *TransactionAPI) getTransactionByBlockNumberAndIndex(ctx context.Context if err != nil { return nil, err } + if err := checkReceiptsAvailable(t.receiptStore(), block); err != nil { + return nil, err + } return t.getTransactionWithBlock(block, txIndex, t.includeSynthetic) } @@ -230,6 +236,9 @@ func (t *TransactionAPI) GetTransactionByBlockHashAndIndex(ctx context.Context, if err != nil { return nil, err } + if err := checkReceiptsAvailable(t.receiptStore(), block); err != nil { + return nil, err + } var idx uint32 idx, err = txIndexToUint32(txIndex) if err != nil { @@ -281,7 +290,7 @@ func (t *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.H // then try get from committed receipt, err := t.keeper.GetReceipt(t.ctxProvider(LatestCtxHeight), hash) if err != nil { - if strings.Contains(err.Error(), "not found") { + if errors.Is(err, receiptpkg.ErrNotFound) { return nil, nil } return nil, err @@ -309,7 +318,7 @@ func (t *TransactionAPI) GetTransactionErrorByHash(ctx context.Context, hash com }() receipt, err := t.keeper.GetReceipt(t.ctxProvider(LatestCtxHeight), hash) if err != nil { - if strings.Contains(err.Error(), "not found") { + if errors.Is(err, receiptpkg.ErrNotFound) { return "", nil } return "", err diff --git a/evmrpc/tx_test.go b/evmrpc/tx_test.go index 0d0367bcc5..9e85a47676 100644 --- a/evmrpc/tx_test.go +++ b/evmrpc/tx_test.go @@ -1,6 +1,7 @@ package evmrpc_test import ( + "context" "encoding/json" "fmt" "io" @@ -311,6 +312,35 @@ func TestGetTransactionByBlockHashAndIndexErrors(t *testing.T) { require.Nil(t, resObj["result"]) } +func TestGetTransactionByBlockAndIndexReturnsReceiptsPrunedBelowReceiptWatermark(t *testing.T) { + k := &testkeeper.EVMTestApp.EvmKeeper + store := k.ReceiptStore() + require.NotNil(t, store) + + oldEarliest := store.EarliestVersion() + require.NoError(t, store.SetEarliestVersion(MockHeight8+1)) + t.Cleanup(func() { + require.NoError(t, store.SetEarliestVersion(oldEarliest)) + }) + + tmClient := NewMockClientWithLatest(MockHeight103) + ctxProvider := func(height int64) sdk.Context { + if height == evmrpc.LatestCtxHeight { + return Ctx + } + return Ctx.WithBlockHeight(height) + } + txConfigProvider := func(int64) client.TxConfig { return TxConfig } + watermarks := evmrpc.NewWatermarkManager(tmClient, ctxProvider, nil, store) + api := evmrpc.NewTransactionAPI(tmClient, k, ctxProvider, txConfigProvider, "", evmrpc.ConnectionTypeHTTP, watermarks, evmrpc.NewBlockCache(3000), &sync.Mutex{}) + + _, err := api.GetTransactionByBlockNumberAndIndex(context.Background(), MockHeight8, 0) + require.ErrorIs(t, err, evmrpc.ErrReceiptsPruned) + + _, err = api.GetTransactionByBlockHashAndIndex(context.Background(), common.HexToHash(TestBlockHash), 0) + require.ErrorIs(t, err, evmrpc.ErrReceiptsPruned) +} + func TestGetTransactionByHashNotFound(t *testing.T) { nonExistentHash := "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" body := fmt.Sprintf(`{"jsonrpc": "2.0","method": "eth_getTransactionByHash","params":["%s"],"id":"test"}`, nonExistentHash) diff --git a/evmrpc/watermark_manager.go b/evmrpc/watermark_manager.go index d751d3421d..79e53ce8e6 100644 --- a/evmrpc/watermark_manager.go +++ b/evmrpc/watermark_manager.go @@ -19,6 +19,10 @@ var errNoHeightSource = errors.New("unable to determine height information") // node's safe latest watermark. eth_getBlockByNumber maps this to result null (Ethereum spec). var ErrBlockHeightNotYetAvailable = errors.New("block height not yet available") +// ErrBlockHeightPruned is returned when a concrete block height is below the +// node's earliest available block or state watermark. +var ErrBlockHeightPruned = errors.New("block height pruned") + // WatermarkManager coordinates access to block, state, and receipt stores to // determine queryable block heights for RPC consumers. It ensures read-side // requests only target heights where all backing data sources are fully @@ -224,7 +228,7 @@ func (m *WatermarkManager) ensureWithinWatermarks(height, earliest, latest int64 return fmt.Errorf("requested height %d is not yet available; safe latest is %d: %w", height, latest, ErrBlockHeightNotYetAvailable) } if height < earliest { - return fmt.Errorf("requested height %d has been pruned; earliest available is %d", height, earliest) + return fmt.Errorf("requested height %d has been pruned; earliest available is %d: %w", height, earliest, ErrBlockHeightPruned) } return nil } diff --git a/evmrpc/watermark_manager_test.go b/evmrpc/watermark_manager_test.go index 025ef8ba05..7ff58f9555 100644 --- a/evmrpc/watermark_manager_test.go +++ b/evmrpc/watermark_manager_test.go @@ -71,8 +71,7 @@ func TestResolveHeightGating(t *testing.T) { tooHigh := rpc.BlockNumber(6) _, err := wm.ResolveHeight(context.Background(), rpc.BlockNumberOrHash{BlockNumber: &tooHigh}) - require.Error(t, err) - require.Contains(t, err.Error(), "not yet available") + require.ErrorIs(t, err, ErrBlockHeightNotYetAvailable) within := rpc.BlockNumber(4) height, err := wm.ResolveHeight(context.Background(), rpc.BlockNumberOrHash{BlockNumber: &within}) @@ -100,8 +99,8 @@ func TestEnsureBlockHeightAvailableBounds(t *testing.T) { require.NoError(t, wm.EnsureBlockHeightAvailable(context.Background(), 5)) - require.ErrorContains(t, wm.EnsureBlockHeightAvailable(context.Background(), 7), "not yet available") - require.ErrorContains(t, wm.EnsureBlockHeightAvailable(context.Background(), 2), "has been pruned") + require.ErrorIs(t, wm.EnsureBlockHeightAvailable(context.Background(), 7), ErrBlockHeightNotYetAvailable) + require.ErrorIs(t, wm.EnsureBlockHeightAvailable(context.Background(), 2), ErrBlockHeightPruned) } func TestLatestAndEarliestHeightHelpers(t *testing.T) { @@ -129,8 +128,7 @@ func TestResolveHeightUsesStateEarliest(t *testing.T) { belowState := rpc.BlockNumber(9) _, err := wm.ResolveHeight(context.Background(), rpc.BlockNumberOrHash{BlockNumber: &belowState}) - require.Error(t, err) - require.Contains(t, err.Error(), "has been pruned") + require.ErrorIs(t, err, ErrBlockHeightPruned) within := rpc.BlockNumber(12) resolved, err := wm.ResolveHeight(context.Background(), rpc.BlockNumberOrHash{BlockNumber: &within}) @@ -138,6 +136,19 @@ func TestResolveHeightUsesStateEarliest(t *testing.T) { require.Equal(t, int64(12), resolved) } +func TestResolveHeightByHashUsesStateEarliest(t *testing.T) { + tmClient := &fakeTMClient{ + status: &coretypes.ResultStatus{SyncInfo: coretypes.SyncInfo{LatestBlockHeight: 20, EarliestBlockHeight: 5}}, + blockByHash: makeBlockResult(9), + } + stateStore := &fakeStateStore{latest: 18, earliest: 10} + wm := NewWatermarkManager(tmClient, nil, stateStore, nil) + + h := common.HexToHash("0x9") + _, err := wm.ResolveHeight(context.Background(), rpc.BlockNumberOrHash{BlockHash: &h}) + require.ErrorIs(t, err, ErrBlockHeightPruned) +} + func TestStateWatermarksCanLagBlocks(t *testing.T) { tmClient := &fakeTMClient{ status: &coretypes.ResultStatus{SyncInfo: coretypes.SyncInfo{LatestBlockHeight: 30, EarliestBlockHeight: 12}}, @@ -233,7 +244,8 @@ func (f *fakeStateStore) Prune(_ int64) error func (f *fakeStateStore) Close() error { return nil } type fakeReceiptStore struct { - latest int64 + latest int64 + earliest int64 } func (f *fakeReceiptStore) LatestVersion() int64 { @@ -245,7 +257,14 @@ func (f *fakeReceiptStore) SetLatestVersion(version int64) error { return nil } -func (f *fakeReceiptStore) SetEarliestVersion(_ int64) error { return nil } +func (f *fakeReceiptStore) EarliestVersion() int64 { + return f.earliest +} + +func (f *fakeReceiptStore) SetEarliestVersion(version int64) error { + f.earliest = version + return nil +} func (f *fakeReceiptStore) GetReceipt(sdk.Context, common.Hash) (*evmtypes.Receipt, error) { return nil, errors.New("not found") diff --git a/sei-db/ledger_db/parquet/reader.go b/sei-db/ledger_db/parquet/reader.go index 9ded27e67e..635f26cab2 100644 --- a/sei-db/ledger_db/parquet/reader.go +++ b/sei-db/ledger_db/parquet/reader.go @@ -206,6 +206,17 @@ func (r *Reader) ClosedReceiptFileCount() int { return len(r.closedReceiptFiles) } +// MinReceiptFileStart returns the start block of the earliest tracked receipt +// parquet file. +func (r *Reader) MinReceiptFileStart() (uint64, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + if len(r.closedReceiptFiles) == 0 { + return 0, false + } + return ExtractBlockNumber(r.closedReceiptFiles[0]), true +} + // GetFilesBeforeBlock returns files whose start block is before the given block. // These files contain only data older than the prune threshold. func (r *Reader) GetFilesBeforeBlock(pruneBeforeBlock uint64) []FilePair { diff --git a/sei-db/ledger_db/parquet/store.go b/sei-db/ledger_db/parquet/store.go index d4bfed2fab..02200dcff0 100644 --- a/sei-db/ledger_db/parquet/store.go +++ b/sei-db/ledger_db/parquet/store.go @@ -142,6 +142,13 @@ func NewStore(cfg StoreConfig) (*Store, error) { store.fileStartBlock = maxBlock + 1 } } + if minBlock, ok := reader.MinReceiptFileStart(); ok && minBlock > 0 { + earliest, err := int64FromUint64(minBlock) + if err != nil { + return nil, err + } + store.earliestVersion.Store(earliest) + } store.startPruning(cfg.PruneIntervalSeconds) @@ -170,6 +177,11 @@ func (s *Store) LatestVersion() int64 { return s.latestVersion.Load() } +// EarliestVersion returns the earliest version retained. +func (s *Store) EarliestVersion() int64 { + return s.earliestVersion.Load() +} + // SetLatestVersion sets the latest version. func (s *Store) SetLatestVersion(version int64) { s.latestVersion.Store(version) @@ -500,6 +512,7 @@ func (s *Store) PruneOldFiles(pruneBeforeBlock uint64) int { } prunedCount := 0 + var earliestWatermarkCandidate uint64 for _, filePair := range filesToPrune { // Step 1: Remove from tracking (brief mu.Lock) so new reader // snapshots won't include these files. @@ -544,8 +557,23 @@ func (s *Store) PruneOldFiles(pruneBeforeBlock uint64) int { s.Reader.AddTrackedLogFile(filePair.StartBlock) } + // Track the earliest watermark candidate by the next file's start block. if receiptRemoved && logRemoved { prunedCount++ + candidate := filePair.StartBlock + s.config.MaxBlocksPerFile + if candidate > earliestWatermarkCandidate { + earliestWatermarkCandidate = candidate + } + } + } + + // Update the earliest version if we found a new candidate. + if earliestWatermarkCandidate > 0 { + earliestVersion, err := int64FromUint64(earliestWatermarkCandidate) + if err != nil { + logger.Error("failed to update earliest parquet version after pruning", "block", earliestWatermarkCandidate, "err", err) + } else if earliestVersion > s.earliestVersion.Load() { + s.earliestVersion.Store(earliestVersion) } } diff --git a/sei-db/ledger_db/parquet/store_config_test.go b/sei-db/ledger_db/parquet/store_config_test.go index f57e5776f6..ddb1c44ace 100644 --- a/sei-db/ledger_db/parquet/store_config_test.go +++ b/sei-db/ledger_db/parquet/store_config_test.go @@ -84,6 +84,19 @@ func TestNewStorePreservesKeepRecentAndPruneIntervalSettings(t *testing.T) { require.Equal(t, "none", store.config.TxIndexBackend) } +func TestStoreEarliestVersionAccessors(t *testing.T) { + store, err := NewStore(StoreConfig{ + DBDirectory: t.TempDir(), + TxIndexBackend: "none", + }) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + require.Equal(t, int64(0), store.EarliestVersion()) + store.SetEarliestVersion(42) + require.Equal(t, int64(42), store.EarliestVersion()) +} + func TestNewStoreSucceedsWithTxIndexLookupEnabled(t *testing.T) { store, err := NewStore(StoreConfig{ DBDirectory: t.TempDir(), diff --git a/sei-db/ledger_db/receipt/cached_receipt_store.go b/sei-db/ledger_db/receipt/cached_receipt_store.go index 1be9d3ca5d..84ae24e216 100644 --- a/sei-db/ledger_db/receipt/cached_receipt_store.go +++ b/sei-db/ledger_db/receipt/cached_receipt_store.go @@ -85,6 +85,10 @@ func (s *cachedReceiptStore) SetLatestVersion(version int64) error { return s.backend.SetLatestVersion(version) } +func (s *cachedReceiptStore) EarliestVersion() int64 { + return s.backend.EarliestVersion() +} + func (s *cachedReceiptStore) SetEarliestVersion(version int64) error { return s.backend.SetEarliestVersion(version) } diff --git a/sei-db/ledger_db/receipt/cached_receipt_store_test.go b/sei-db/ledger_db/receipt/cached_receipt_store_test.go index cacc526c0d..5d5922d0a2 100644 --- a/sei-db/ledger_db/receipt/cached_receipt_store_test.go +++ b/sei-db/ledger_db/receipt/cached_receipt_store_test.go @@ -18,6 +18,7 @@ type fakeReceiptBackend struct { receipts map[common.Hash]*types.Receipt logs []*ethtypes.Log latestVersion int64 + earliestVersion int64 rotateInterval uint64 getReceiptCalls int filterLogCalls int @@ -71,7 +72,12 @@ func (f *fakeReceiptBackend) SetLatestVersion(int64) error { return nil } -func (f *fakeReceiptBackend) SetEarliestVersion(int64) error { +func (f *fakeReceiptBackend) EarliestVersion() int64 { + return f.earliestVersion +} + +func (f *fakeReceiptBackend) SetEarliestVersion(version int64) error { + f.earliestVersion = version return nil } @@ -123,6 +129,16 @@ func (f *fakeReceiptBackend) Close() error { return nil } +func TestCachedReceiptStoreDelegatesEarliestVersion(t *testing.T) { + backend := newFakeReceiptBackend() + store := newCachedReceiptStore(backend, nil) + + require.Equal(t, int64(0), store.EarliestVersion()) + require.NoError(t, store.SetEarliestVersion(17)) + require.Equal(t, int64(17), store.EarliestVersion()) + require.Equal(t, int64(17), backend.earliestVersion) +} + func TestCachedReceiptStoreUsesCacheForReceipt(t *testing.T) { ctx, _ := newTestContext() backend := newFakeReceiptBackend() diff --git a/sei-db/ledger_db/receipt/parquet_store.go b/sei-db/ledger_db/receipt/parquet_store.go index 616f5dfd82..7d57803700 100644 --- a/sei-db/ledger_db/receipt/parquet_store.go +++ b/sei-db/ledger_db/receipt/parquet_store.go @@ -86,6 +86,10 @@ func (s *parquetReceiptStore) SetLatestVersion(version int64) error { return nil } +func (s *parquetReceiptStore) EarliestVersion() int64 { + return s.store.EarliestVersion() +} + func (s *parquetReceiptStore) SetEarliestVersion(version int64) error { s.store.SetEarliestVersion(version) return nil diff --git a/sei-db/ledger_db/receipt/parquet_store_test.go b/sei-db/ledger_db/receipt/parquet_store_test.go index c9faa5a615..c581523624 100644 --- a/sei-db/ledger_db/receipt/parquet_store_test.go +++ b/sei-db/ledger_db/receipt/parquet_store_test.go @@ -317,6 +317,7 @@ func TestParquetFilePruning(t *testing.T) { pqStore := store.(*cachedReceiptStore).backend.(*parquetReceiptStore) pruned := pqStore.store.PruneOldFiles(900) require.Greater(t, pruned, 0, "should have pruned at least one file pair") + require.Equal(t, int64(500), store.EarliestVersion()) // Verify files were actually removed from disk receiptFilesAfter, err := filepath.Glob(filepath.Join(cfg.DBDirectory, "receipts_*.parquet")) @@ -331,6 +332,11 @@ func TestParquetFilePruning(t *testing.T) { txHash := common.BigToHash(new(big.Int).SetUint64(1400)) _, err = store.GetReceiptFromStore(ctx, txHash) require.NoError(t, err) + + require.NoError(t, store.Close()) + store, err = NewReceiptStore(cfg, storeKey) + require.NoError(t, err) + require.Equal(t, int64(500), store.EarliestVersion()) } func TestParquetReaderGetFilesBeforeBlock(t *testing.T) { diff --git a/sei-db/ledger_db/receipt/receipt_store.go b/sei-db/ledger_db/receipt/receipt_store.go index c2ee7f2742..ebc213071d 100644 --- a/sei-db/ledger_db/receipt/receipt_store.go +++ b/sei-db/ledger_db/receipt/receipt_store.go @@ -44,6 +44,9 @@ var ( type ReceiptStore interface { LatestVersion() int64 SetLatestVersion(version int64) error + // EarliestVersion returns the lowest block height whose receipts are still + // retained on this node. Returns 0 when no receipt data has been pruned. + EarliestVersion() int64 SetEarliestVersion(version int64) error GetReceipt(ctx sdk.Context, txHash common.Hash) (*types.Receipt, error) GetReceiptFromStore(ctx sdk.Context, txHash common.Hash) (*types.Receipt, error) @@ -180,6 +183,10 @@ func (s *receiptStore) SetLatestVersion(version int64) error { return s.db.SetLatestVersion(version) } +func (s *receiptStore) EarliestVersion() int64 { + return s.db.GetEarliestVersion() +} + func (s *receiptStore) SetEarliestVersion(version int64) error { return s.db.SetEarliestVersion(version, true) } diff --git a/sei-db/ledger_db/receipt/receipt_store_test.go b/sei-db/ledger_db/receipt/receipt_store_test.go index e7aa1faf41..f0fbf3d6c9 100644 --- a/sei-db/ledger_db/receipt/receipt_store_test.go +++ b/sei-db/ledger_db/receipt/receipt_store_test.go @@ -121,7 +121,9 @@ func TestSetReceiptsAndGet(t *testing.T) { require.GreaterOrEqual(t, store.LatestVersion(), int64(1)) require.NoError(t, store.SetLatestVersion(10)) require.Equal(t, int64(10), store.LatestVersion()) + require.Equal(t, int64(0), store.EarliestVersion()) require.NoError(t, store.SetEarliestVersion(1)) + require.Equal(t, int64(1), store.EarliestVersion()) } func TestReceiptStoreLegacyFallback(t *testing.T) {