Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions entries/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package entries
import (
"context"
"encoding/hex"
"reflect"
"time"

"github.com/deso-protocol/core/lib"
Expand Down Expand Up @@ -34,12 +35,43 @@ type PGBlockEntry struct {
BlockEntry
}

type BlockSigner struct {
BlockHash string
SignerIndex uint64
}

type PGBlockSigner struct {
bun.BaseModel `bun:"table:block_signer"`
BlockSigner
}

// Convert the UserAssociation DeSo encoder to the PG struct used by bun.
func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte, params *lib.DeSoParams) *PGBlockEntry {
func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte, params *lib.DeSoParams) (*PGBlockEntry, []*PGBlockSigner) {
blockHash, _ := block.Hash()
blockHashHex := hex.EncodeToString(blockHash[:])
qc := block.Header.GetQC()
blockSigners := []*PGBlockSigner{}
if !isInterfaceNil(qc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe? Please make this this works for a PoW block where the header has neither a QC nor an aggregate QC

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is safe. the isInterfaceNil check returns true for pow blocks.

aggSig := qc.GetAggregatedSignature()
if !isInterfaceNil(aggSig) {
signersList := aggSig.GetSignersList()
for ii := 0; ii < signersList.Size(); ii++ {
// Skip signers that didn't sign.
if !signersList.Get(ii) {
continue
}
blockSigners = append(blockSigners, &PGBlockSigner{
BlockSigner: BlockSigner{
BlockHash: blockHashHex,
SignerIndex: uint64(ii),
},
})
}
}
}
return &PGBlockEntry{
BlockEntry: BlockEntry{
BlockHash: hex.EncodeToString(blockHash[:]),
BlockHash: blockHashHex,
PrevBlockHash: hex.EncodeToString(block.Header.PrevBlockHash[:]),
TxnMerkleRoot: hex.EncodeToString(block.Header.TransactionMerkleRoot[:]),
Timestamp: consumer.UnixNanoToTime(uint64(block.Header.TstampNanoSecs)),
Expand All @@ -53,7 +85,7 @@ func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte, params *li
ProposerVotePartialSignature: block.Header.ProposerVotePartialSignature.ToString(),
BadgerKey: keyBytes,
},
}
}, blockSigners
}

// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
Expand Down Expand Up @@ -120,11 +152,13 @@ func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation
// Create a new array to hold the bun struct.
pgBlockEntrySlice := make([]*PGBlockEntry, 0)
pgTransactionEntrySlice := make([]*PGTransactionEntry, 0)
pgBlockSignersEntrySlice := make([]*PGBlockSigner, 0)

for _, entry := range uniqueBlocks {
block := entry.Encoder.(*lib.MsgDeSoBlock)
blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes, params)
blockEntry, blockSigners := BlockEncoderToPGStruct(block, entry.KeyBytes, params)
pgBlockEntrySlice = append(pgBlockEntrySlice, blockEntry)
pgBlockSignersEntrySlice = append(pgBlockSignersEntrySlice, blockSigners...)
for jj, transaction := range block.Txns {
indexInBlock := uint64(jj)
pgTransactionEntry, err := TransactionEncoderToPGStruct(
Expand Down Expand Up @@ -166,6 +200,19 @@ func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation
return errors.Wrapf(err, "entries.bulkInsertBlock: Error inserting transaction entries")
}

if len(pgBlockSignersEntrySlice) > 0 {
// Execute the insert query.
query := db.NewInsert().Model(&pgBlockSignersEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (block_hash, signer_index) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertBlockEntry: Error inserting block signers")
}
}

return nil
}

Expand Down Expand Up @@ -214,5 +261,26 @@ func bulkDeleteBlockEntriesFromKeysToDelete(db *bun.DB, keysToDelete [][]byte) e
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteBlockEntry: Error deleting utxo operation entries")
}

// Delete any signers associated with the block.
if _, err := db.NewDelete().
Model(&PGBlockSigner{}).
Where("block_hash IN (?)", bun.In(blockHashHexesToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteBlockEntry: Error deleting block signers")
}
return nil
}

// golang interface types are stored as a tuple of (type, value). A single i==nil check is not enough to
// determine if a pointer that implements an interface is nil. This function checks if the interface is nil
// by checking if the pointer itself is nil.
func isInterfaceNil(i interface{}) bool {
if i == nil {
return true
}

value := reflect.ValueOf(i)
return value.Kind() == reflect.Ptr && value.IsNil()
}
27 changes: 21 additions & 6 deletions entries/utxo_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
transactionUpdates := make([]*PGTransactionEntry, 0)
affectedPublicKeys := make([]*PGAffectedPublicKeyEntry, 0)
blockEntries := make([]*PGBlockEntry, 0)
pgBlockSigners := make([]*PGBlockSigner, 0)
stakeRewardEntries := make([]*PGStakeReward, 0)
jailedHistoryEntries := make([]*PGJailedHistoryEvent, 0)

// Start timer to track how long it takes to insert the entries.
start := time.Now()

fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserting %v entries\n", len(uniqueEntries))
glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Inserting %v entries\n", len(uniqueEntries))
transactionCount := 0

// Whether we are inserting transactions for the first time, or just updating them.
Expand All @@ -126,8 +127,9 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
if entry.Block != nil {
insertTransactions = true
block := entry.Block
blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes, params)
blockEntry, blockSigners := BlockEncoderToPGStruct(block, entry.KeyBytes, params)
blockEntries = append(blockEntries, blockEntry)
pgBlockSigners = append(pgBlockSigners, blockSigners...)
for ii, txn := range block.Txns {
indexInBlock := uint64(ii)
pgTxn, err := TransactionEncoderToPGStruct(
Expand Down Expand Up @@ -277,7 +279,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
transactionCount += len(innerTransactionsUtxoOperations)
// Print how long it took to insert the entries.
}
fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Processed %v txns in %v s\n", transactionCount, time.Since(start))
glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Processed %v txns in %v s\n", transactionCount, time.Since(start))

start = time.Now()

Expand All @@ -299,6 +301,16 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
return errors.Wrapf(err, "entries.bulkInsertBlock: Error inserting entries")
}

blockSignerQuery := db.NewInsert().Model(&pgBlockSigners)

if operationType == lib.DbOperationTypeUpsert {
blockSignerQuery = blockSignerQuery.On("CONFLICT (block_hash, signer_index) DO UPDATE")
}

if _, err := blockSignerQuery.Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertBlockSigners: Error inserting block signer entries")
}

} else {
values := db.NewValues(&transactionUpdates)
_, err := db.NewUpdate().
Expand All @@ -317,7 +329,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
}
}

fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Updated %v txns in %v s\n", len(transactionUpdates), time.Since(start))
glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Updated %v txns in %v s\n", len(transactionUpdates), time.Since(start))

start = time.Now()

Expand All @@ -329,7 +341,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
}
}

fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserted %v affected public keys in %v s\n", len(affectedPublicKeys), time.Since(start))
glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Inserted %v affected public keys in %v s\n", len(affectedPublicKeys), time.Since(start))

start = time.Now()

Expand All @@ -340,7 +352,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
return errors.Wrapf(err, "InsertStakeRewards: Problem inserting stake rewards")
}
}
fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserted %v stake rewards in %v s\n", len(stakeRewardEntries), time.Since(start))
glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Inserted %v stake rewards in %v s\n", len(stakeRewardEntries), time.Since(start))

if len(jailedHistoryEntries) > 0 {
_, err := db.NewInsert().Model(&jailedHistoryEntries).On("CONFLICT (validator_pkid, jailed_at_epoch_number, unjailed_at_epoch_number) DO NOTHING").Exec(context.Background())
Expand Down Expand Up @@ -410,6 +422,9 @@ func parseUtxoOperationBundle(
}
txIndexMetadata, err := consumer.ComputeTransactionMetadata(transaction, blockHashHex, params, transaction.TxnFeeNanos, uint64(jj), utxoOps)
if err != nil {
glog.Errorf("parseUtxoOperationBundle: Problem computing transaction metadata for "+
"entry %+v at block height %v: %v", entry, entry.BlockHeight, err)
// TODO: swallow error and continue.
return nil,
nil,
nil,
Expand Down
110 changes: 100 additions & 10 deletions entries/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ type PGValidatorEntryUtxoOps struct {
UtxoOperation
}

type SnapshotValidatorEntry struct {
ValidatorPKID string `bun:",nullzero"`
Domains []string `bun:",array"`
DisableDelegatedStake bool
DelegatedStakeCommissionBasisPoints uint64
VotingPublicKey string `bun:",nullzero"`
VotingAuthorization string `bun:",nullzero"`
// Use bunbig.Int to store the balance as a numeric in the pg database.
TotalStakeAmountNanos *bunbig.Int `pg:",use_zero"`
LastActiveAtEpochNumber uint64
JailedAtEpochNumber uint64
SnapshotAtEpochNumber uint64 `pg:",use_zero"`

ExtraData map[string]string `bun:"type:jsonb"`
BadgerKey []byte `pg:",pk,use_zero"`
}

type PGSnapshotValidatorEntry struct {
bun.BaseModel `bun:"table:snapshot_validator_entry"`
SnapshotValidatorEntry
}

// Convert the ValidatorEntry DeSo encoder to the PGValidatorEntry struct used by bun.
func ValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []byte, params *lib.DeSoParams) ValidatorEntry {
pgValidatorEntry := ValidatorEntry{
Expand Down Expand Up @@ -96,23 +118,43 @@ func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params
func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
uniqueValidatorEntries := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixValidatorByPKID)
uniqueSnapshotValidatorEntries := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixSnapshotValidatorSetByPKID)
// Create a new array to hold the bun struct.
pgEntrySlice := make([]*PGValidatorEntry, len(uniqueEntries))
pgEntrySlice := make([]*PGValidatorEntry, len(uniqueValidatorEntries))
pgSnapshotEntrySlice := make([]*PGSnapshotValidatorEntry, len(uniqueSnapshotValidatorEntries))

// Loop through the entries and convert them to PGEntry.
for ii, entry := range uniqueEntries {
for ii, entry := range uniqueValidatorEntries {
pgEntrySlice[ii] = &PGValidatorEntry{ValidatorEntry: ValidatorEncoderToPGStruct(entry.Encoder.(*lib.ValidatorEntry), entry.KeyBytes, params)}
}
for ii, entry := range uniqueSnapshotValidatorEntries {
pgSnapshotEntrySlice[ii] = &PGSnapshotValidatorEntry{SnapshotValidatorEntry: SnapshotValidatorEncoderToPGStruct(entry.Encoder.(*lib.ValidatorEntry), entry.KeyBytes, params)}
}

// Execute the insert query.
query := db.NewInsert().Model(&pgEntrySlice)
if len(pgEntrySlice) > 0 {
query := db.NewInsert().Model(&pgEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting validator entries")
}
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting entries")
if len(pgSnapshotEntrySlice) > 0 {
query := db.NewInsert().Model(&pgSnapshotEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting snapshot validator entries")
}
}
return nil
}
Expand All @@ -123,16 +165,64 @@ func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, opera
uniqueEntries := consumer.UniqueEntries(entries)

// Transform the entries into a list of keys to delete.
keysToDelete := consumer.KeysToDelete(uniqueEntries)
validatorEntriesToDelete := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixValidatorByPKID)

// Execute the delete query.
snapshotValidatorEntriesToDelete := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixSnapshotValidatorSetByPKID)

// Execute the delete query for validator entries.
if _, err := db.NewDelete().
Model(&PGValidatorEntry{}).
Where("badger_key IN (?)", bun.In(keysToDelete)).
Where("badger_key IN (?)", bun.In(validatorEntriesToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteValidatorEntry: Error deleting entries")
}

// Execute the delete query.
if _, err := db.NewDelete().
Model(&PGSnapshotValidatorEntry{}).
Where("badger_key IN (?)", bun.In(snapshotValidatorEntriesToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteSnapshotValidatorEntry: Error deleting entries")
}

return nil
}

// Convert the SnapshotValidatorEntry DeSo encoder to the PGSnapshotValidatorEntry struct used by bun.
func SnapshotValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []byte, params *lib.DeSoParams) SnapshotValidatorEntry {
pgValidatorEntry := SnapshotValidatorEntry{
ExtraData: consumer.ExtraDataBytesToString(validatorEntry.ExtraData),
BadgerKey: keyBytes,
}

if validatorEntry.ValidatorPKID != nil {
pgValidatorEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*validatorEntry.ValidatorPKID)[:], params)
}

if validatorEntry.Domains != nil {
pgValidatorEntry.Domains = make([]string, len(validatorEntry.Domains))
for ii, domain := range validatorEntry.Domains {
pgValidatorEntry.Domains[ii] = string(domain)
}
}

pgValidatorEntry.DisableDelegatedStake = validatorEntry.DisableDelegatedStake
pgValidatorEntry.DelegatedStakeCommissionBasisPoints = validatorEntry.DelegatedStakeCommissionBasisPoints

if validatorEntry.VotingPublicKey != nil {
pgValidatorEntry.VotingPublicKey = validatorEntry.VotingPublicKey.ToString()
}

if validatorEntry.VotingAuthorization != nil {
pgValidatorEntry.VotingAuthorization = validatorEntry.VotingAuthorization.ToString()
}

pgValidatorEntry.TotalStakeAmountNanos = bunbig.FromMathBig(validatorEntry.TotalStakeAmountNanos.ToBig())
pgValidatorEntry.LastActiveAtEpochNumber = validatorEntry.LastActiveAtEpochNumber
pgValidatorEntry.JailedAtEpochNumber = validatorEntry.JailedAtEpochNumber
keyBytesWithoutPrefix := keyBytes[1:]
pgValidatorEntry.SnapshotAtEpochNumber = lib.DecodeUint64(keyBytesWithoutPrefix[:8])
return pgValidatorEntry
}
Loading