Skip to content

Commit 856dc78

Browse files
committed
use receipt indexer to fetch receipts
1 parent 4a695d4 commit 856dc78

File tree

6 files changed

+350
-14
lines changed

6 files changed

+350
-14
lines changed

action/receipt.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,64 @@ func (receipt *Receipt) TransferLogs(accountContractAddr string, logIndex uint32
219219
return logs, nil
220220
}
221221

222+
func (receipt *Receipt) CloneFixed() *Receipt {
223+
var cls []*Log
224+
if receipt.logs != nil {
225+
cls = make([]*Log, len(receipt.logs))
226+
for i, l := range receipt.logs {
227+
data := make([]byte, len(l.Data))
228+
copy(data, l.Data)
229+
topics := make([]hash.Hash256, len(l.Topics))
230+
for j, topic := range l.Topics {
231+
copy(topics[j][:], topic[:])
232+
}
233+
cls[i] = &Log{
234+
Address: l.Address,
235+
Topics: topics,
236+
Data: data,
237+
BlockHeight: l.BlockHeight,
238+
ActionHash: l.ActionHash,
239+
Index: l.Index,
240+
TxIndex: l.TxIndex,
241+
}
242+
}
243+
}
244+
var ctls []*TransactionLog
245+
if receipt.transactionLogs != nil {
246+
ctls = make([]*TransactionLog, len(receipt.transactionLogs))
247+
for i, tl := range receipt.transactionLogs {
248+
ctls[i] = &TransactionLog{
249+
Type: tl.Type,
250+
Amount: new(big.Int).Set(tl.Amount),
251+
Sender: tl.Sender,
252+
Recipient: tl.Recipient,
253+
}
254+
}
255+
}
256+
var blobGasPrice *big.Int
257+
if receipt.BlobGasPrice != nil {
258+
blobGasPrice = new(big.Int).Set(receipt.BlobGasPrice)
259+
}
260+
var effectiveGasPrice *big.Int
261+
if receipt.EffectiveGasPrice != nil {
262+
effectiveGasPrice = new(big.Int).Set(receipt.EffectiveGasPrice)
263+
}
264+
return &Receipt{
265+
Status: receipt.Status,
266+
BlockHeight: receipt.BlockHeight,
267+
ActionHash: receipt.ActionHash,
268+
GasConsumed: receipt.GasConsumed,
269+
BlobGasUsed: receipt.BlobGasUsed,
270+
BlobGasPrice: blobGasPrice,
271+
ContractAddress: receipt.ContractAddress,
272+
TxIndex: receipt.TxIndex,
273+
EffectiveGasPrice: effectiveGasPrice,
274+
logs: cls,
275+
transactionLogs: ctls,
276+
executionRevertMsg: receipt.executionRevertMsg,
277+
}
278+
}
279+
222280
// ConvertToLogPb converts a Log to protobuf's Log
223281
func (log *Log) ConvertToLogPb() *iotextypes.Log {
224282
l := &iotextypes.Log{}

blockchain/blockdao/blockdao.go

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,18 @@ type (
6363
}
6464

6565
blockDAO struct {
66-
blockStore BlockStore
67-
blobStore BlobStore
68-
indexers []BlockIndexer
69-
timerFactory *prometheustimer.TimerFactory
70-
lifecycle lifecycle.Lifecycle
71-
headerCache cache.LRUCache
72-
footerCache cache.LRUCache
73-
receiptCache cache.LRUCache
74-
blockCache cache.LRUCache
75-
txLogCache cache.LRUCache
76-
tipHeight uint64
66+
blockStore BlockStore
67+
blobStore BlobStore
68+
receiptIndexer *ReceiptIndexer
69+
indexers []BlockIndexer
70+
timerFactory *prometheustimer.TimerFactory
71+
lifecycle lifecycle.Lifecycle
72+
headerCache cache.LRUCache
73+
footerCache cache.LRUCache
74+
receiptCache cache.LRUCache
75+
blockCache cache.LRUCache
76+
txLogCache cache.LRUCache
77+
tipHeight uint64
7778
}
7879
)
7980

@@ -85,6 +86,13 @@ func WithBlobStore(bs BlobStore) Option {
8586
}
8687
}
8788

89+
// WithReceiptIndexer adds receipt indexer to block DAO
90+
func WithReceiptIndexer(ri *ReceiptIndexer) Option {
91+
return func(dao *blockDAO) {
92+
dao.receiptIndexer = ri
93+
}
94+
}
95+
8896
// NewBlockDAOWithIndexersAndCache returns a BlockDAO with indexers which will consume blocks appended, and
8997
// caches which will speed up reading
9098
func NewBlockDAOWithIndexersAndCache(blkStore BlockStore, indexers []BlockIndexer, cacheSize int, opts ...Option) BlockDAO {
@@ -103,6 +111,9 @@ func NewBlockDAOWithIndexersAndCache(blkStore BlockStore, indexers []BlockIndexe
103111
if blockDAO.blobStore != nil {
104112
blockDAO.lifecycle.Add(blockDAO.blobStore)
105113
}
114+
if blockDAO.receiptIndexer != nil {
115+
blockDAO.lifecycle.Add(blockDAO.receiptIndexer)
116+
}
106117
for _, indexer := range indexers {
107118
blockDAO.lifecycle.Add(indexer)
108119
}
@@ -307,9 +318,23 @@ func (dao *blockDAO) GetReceipts(height uint64) ([]*action.Receipt, error) {
307318
_cacheMtc.WithLabelValues("miss_receipts").Inc()
308319
timer := dao.timerFactory.NewTimer("get_receipt")
309320
defer timer.End()
310-
receipts, err := dao.blockStore.GetReceipts(height)
311-
if err != nil {
312-
return nil, err
321+
var receipts []*action.Receipt
322+
var err error
323+
if dao.receiptIndexer != nil {
324+
receipts, err = dao.receiptIndexer.Receipts(height)
325+
switch errors.Cause(err) {
326+
case nil:
327+
case ErrIndexOutOfRange, db.ErrNotExist, db.ErrBucketNotExist:
328+
receipts = nil
329+
default:
330+
return nil, err
331+
}
332+
}
333+
if receipts == nil {
334+
receipts, err = dao.blockStore.GetReceipts(height)
335+
if err != nil {
336+
return nil, err
337+
}
313338
}
314339
tlogs, err := dao.blockStore.TransactionLogs(height)
315340
if err != nil {
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright (c) 2024 IoTeX Foundation
2+
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
3+
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
4+
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.
5+
6+
package blockdao
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"sync"
12+
13+
"github.com/iotexproject/iotex-core/v2/action"
14+
"github.com/iotexproject/iotex-core/v2/blockchain/block"
15+
"github.com/iotexproject/iotex-core/v2/db"
16+
"github.com/iotexproject/iotex-core/v2/db/batch"
17+
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
18+
"github.com/iotexproject/iotex-proto/golang/iotextypes"
19+
"github.com/pkg/errors"
20+
"google.golang.org/protobuf/proto"
21+
)
22+
23+
type (
24+
// ReceiptIndexer defines a struct to store correct receipts for poluted blocks
25+
ReceiptIndexer struct {
26+
mu sync.RWMutex
27+
height uint64
28+
endHeight uint64
29+
kvstore db.KVStore
30+
}
31+
)
32+
33+
const (
34+
_receiptsNS = "receipts"
35+
_receiptMetaNS = "meta"
36+
)
37+
38+
var _heightKey = []byte("height")
39+
40+
// ErrIndexOutOfRange indicates the receipt does not exist in the indexer
41+
var ErrIndexOutOfRange = errors.New("index out of range")
42+
43+
// NewReceiptIndexer creates a new receipt indexer
44+
func NewReceiptIndexer(kvstore db.KVStore, endHeight uint64) *ReceiptIndexer {
45+
return &ReceiptIndexer{
46+
endHeight: endHeight,
47+
kvstore: kvstore,
48+
}
49+
}
50+
51+
// Start starts the receipt indexer
52+
func (ri *ReceiptIndexer) Start(ctx context.Context) error {
53+
ri.mu.Lock()
54+
defer ri.mu.Unlock()
55+
if err := ri.kvstore.Start(ctx); err != nil {
56+
return err
57+
}
58+
value, err := ri.kvstore.Get(_receiptMetaNS, _heightKey)
59+
switch errors.Cause(err) {
60+
case nil:
61+
case db.ErrNotExist, db.ErrBucketNotExist:
62+
return nil
63+
default:
64+
return err
65+
}
66+
ri.height = byteutil.BytesToUint64(value)
67+
return nil
68+
}
69+
70+
// Stop stops the receipt indexer
71+
func (ri *ReceiptIndexer) Stop(ctx context.Context) error {
72+
ri.mu.Lock()
73+
defer ri.mu.Unlock()
74+
75+
return ri.kvstore.Stop(ctx)
76+
}
77+
78+
// Height returns the end height of the receipt indexer
79+
func (ri *ReceiptIndexer) Height() (uint64, error) {
80+
ri.mu.RLock()
81+
defer ri.mu.RUnlock()
82+
return ri.height, nil
83+
}
84+
85+
// PutBlock puts the receipts of the block into kvstore
86+
func (ri *ReceiptIndexer) PutBlock(ctx context.Context, blk *block.Block) error {
87+
height := blk.Height()
88+
if height > ri.endHeight {
89+
return nil
90+
}
91+
key := byteutil.Uint64ToBytes(height)
92+
if blk.Receipts == nil {
93+
return errors.Errorf("receipts of block %d is nil", height)
94+
}
95+
ri.mu.Lock()
96+
defer ri.mu.Unlock()
97+
logIndex := uint32(0)
98+
receipts := iotextypes.Receipts{}
99+
for i, receipt := range blk.Receipts {
100+
cr := receipt.CloneFixed()
101+
logIndex = cr.UpdateIndex(uint32(i), logIndex)
102+
receipts.Receipts = append(receipts.Receipts, cr.ConvertToReceiptPb())
103+
}
104+
receiptsBytes, err := proto.Marshal(&receipts)
105+
if err != nil {
106+
return err
107+
}
108+
109+
b := batch.NewBatch()
110+
b.Put(_receiptsNS, key, receiptsBytes, fmt.Sprintf("failed to write receipts for block %d", height))
111+
if height > ri.height {
112+
ri.height = height
113+
b.Put(_receiptMetaNS, _heightKey, key, "failed to write receipt indexer height")
114+
}
115+
return ri.kvstore.WriteBatch(b)
116+
}
117+
118+
// Receipts returns the receipts of the block at the given height
119+
func (ri *ReceiptIndexer) Receipts(height uint64) ([]*action.Receipt, error) {
120+
key := byteutil.Uint64ToBytes(height)
121+
ri.mu.RLock()
122+
defer ri.mu.RUnlock()
123+
if height > ri.endHeight {
124+
return nil, errors.Wrapf(ErrIndexOutOfRange, "height %d > endHeight %d", height, ri.endHeight)
125+
}
126+
value, err := ri.kvstore.Get(_receiptsNS, key)
127+
if err != nil {
128+
return nil, err
129+
}
130+
receiptsPb := &iotextypes.Receipts{}
131+
if err := proto.Unmarshal(value, receiptsPb); err != nil {
132+
return nil, errors.Wrap(err, "failed to unmarshal block receipts")
133+
}
134+
135+
var receipts []*action.Receipt
136+
for _, receiptPb := range receiptsPb.Receipts {
137+
receipt := &action.Receipt{}
138+
receipt.ConvertFromReceiptPb(receiptPb)
139+
receipts = append(receipts, receipt)
140+
}
141+
return receipts, nil
142+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright (c) 2024 IoTeX Foundation
2+
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
3+
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
4+
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.
5+
6+
package blockdao
7+
8+
import (
9+
"context"
10+
"math/big"
11+
"testing"
12+
13+
"github.com/stretchr/testify/require"
14+
"google.golang.org/protobuf/types/known/timestamppb"
15+
16+
"github.com/iotexproject/go-pkgs/hash"
17+
"github.com/iotexproject/iotex-core/v2/action"
18+
"github.com/iotexproject/iotex-core/v2/blockchain/block"
19+
"github.com/iotexproject/iotex-core/v2/db"
20+
"github.com/iotexproject/iotex-core/v2/test/identityset"
21+
"github.com/iotexproject/iotex-core/v2/testutil"
22+
"github.com/iotexproject/iotex-proto/golang/iotextypes"
23+
)
24+
25+
func TestReceiptIndexer(t *testing.T) {
26+
r := require.New(t)
27+
ctx := context.Background()
28+
testPath, err := testutil.PathOfTempFile("test-receipt-indexer")
29+
r.NoError(err)
30+
defer func() {
31+
testutil.CleanupPath(testPath)
32+
}()
33+
cfg := db.DefaultConfig
34+
cfg.DbPath = testPath
35+
kvs := db.NewBoltDB(cfg)
36+
ri := NewReceiptIndexer(kvs, 3)
37+
r.NoError(ri.Start(ctx))
38+
for i := uint64(1); i <= 3; i++ {
39+
blk1 := createTestBlock(r, i, int(i))
40+
r.NoError(ri.PutBlock(ctx, blk1))
41+
height, err := ri.Height()
42+
r.NoError(err)
43+
r.EqualValues(i, height)
44+
receipts, err := ri.Receipts(i)
45+
r.NoError(err)
46+
r.Equal(int(i), len(receipts))
47+
for j, rcpt := range receipts {
48+
r.Equal(len(blk1.Receipts[j].Logs()), len(rcpt.Logs()))
49+
for k, lg := range rcpt.Logs() {
50+
r.NotEqual(blk1.Receipts[j].Logs()[k].NotFixTopicCopyBug, lg.NotFixTopicCopyBug)
51+
for m, n := range blk1.Receipts[j].Logs()[k].Topics {
52+
r.Equal(n, lg.Topics[m])
53+
}
54+
r.Equal(blk1.Receipts[j].Logs()[k].Data, lg.Data)
55+
}
56+
}
57+
}
58+
r.NoError(ri.PutBlock(ctx, createTestBlock(r, 4, 4)))
59+
r.NoError(ri.Stop(ctx))
60+
r.NoError(ri.Start(ctx))
61+
r.EqualValues(3, ri.height)
62+
r.NoError(ri.Stop(ctx))
63+
}
64+
65+
func createTestBlock(require *require.Assertions, height uint64, numOfReceipts int) *block.Block {
66+
pb := &iotextypes.BlockHeader{
67+
Core: &iotextypes.BlockHeaderCore{
68+
Height: height,
69+
Timestamp: timestamppb.Now(),
70+
},
71+
ProducerPubkey: identityset.PrivateKey(1).PublicKey().Bytes(),
72+
}
73+
blk := &block.Block{}
74+
require.NoError(blk.LoadFromBlockHeaderProto(pb))
75+
receipts := make([]*action.Receipt, numOfReceipts)
76+
for i := 0; i < numOfReceipts; i++ {
77+
actionHash := hash.BytesToHash256([]byte{byte(i)})
78+
receipts[i] = &action.Receipt{
79+
Status: uint64(iotextypes.ReceiptStatus_Success),
80+
BlockHeight: height,
81+
ActionHash: actionHash,
82+
GasConsumed: uint64(10000 + i),
83+
BlobGasUsed: 0,
84+
BlobGasPrice: big.NewInt(0),
85+
ContractAddress: "io",
86+
TxIndex: uint32(i),
87+
EffectiveGasPrice: big.NewInt(int64(i)),
88+
}
89+
topics := make([]hash.Hash256, i+1)
90+
for j := 0; j <= i; j++ {
91+
topics[j] = hash.BytesToHash256([]byte{byte(10*i + j)})
92+
}
93+
receipts[i].AddLogs(&action.Log{
94+
Address: "io",
95+
Topics: topics,
96+
Data: []byte{byte(i)},
97+
BlockHeight: height,
98+
ActionHash: actionHash,
99+
Index: uint32(i),
100+
NotFixTopicCopyBug: true,
101+
})
102+
}
103+
blk.Receipts = receipts
104+
return blk
105+
}

0 commit comments

Comments
 (0)