diff --git a/entries/block.go b/entries/block.go index e685d09..322474c 100644 --- a/entries/block.go +++ b/entries/block.go @@ -3,6 +3,7 @@ package entries import ( "context" "encoding/hex" + "reflect" "time" "github.com/deso-protocol/core/lib" @@ -34,12 +35,43 @@ type PGBlockEntry struct { BlockEntry } +type BlockSigner struct { + BlockHash string + SignerIndex uint64 +} + +type PGBlockSigner struct { + bun.BaseModel `bun:"table:block_signer"` + BlockSigner +} + // Convert the UserAssociation DeSo encoder to the PG struct used by bun. -func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte, params *lib.DeSoParams) *PGBlockEntry { +func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte, params *lib.DeSoParams) (*PGBlockEntry, []*PGBlockSigner) { blockHash, _ := block.Hash() + blockHashHex := hex.EncodeToString(blockHash[:]) + qc := block.Header.GetQC() + blockSigners := []*PGBlockSigner{} + if !isInterfaceNil(qc) { + aggSig := qc.GetAggregatedSignature() + if !isInterfaceNil(aggSig) { + signersList := aggSig.GetSignersList() + for ii := 0; ii < signersList.Size(); ii++ { + // Skip signers that didn't sign. + if !signersList.Get(ii) { + continue + } + blockSigners = append(blockSigners, &PGBlockSigner{ + BlockSigner: BlockSigner{ + BlockHash: blockHashHex, + SignerIndex: uint64(ii), + }, + }) + } + } + } return &PGBlockEntry{ BlockEntry: BlockEntry{ - BlockHash: hex.EncodeToString(blockHash[:]), + BlockHash: blockHashHex, PrevBlockHash: hex.EncodeToString(block.Header.PrevBlockHash[:]), TxnMerkleRoot: hex.EncodeToString(block.Header.TransactionMerkleRoot[:]), Timestamp: consumer.UnixNanoToTime(uint64(block.Header.TstampNanoSecs)), @@ -53,7 +85,7 @@ func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte, params *li ProposerVotePartialSignature: block.Header.ProposerVotePartialSignature.ToString(), BadgerKey: keyBytes, }, - } + }, blockSigners } // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler @@ -120,11 +152,13 @@ func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation // Create a new array to hold the bun struct. pgBlockEntrySlice := make([]*PGBlockEntry, 0) pgTransactionEntrySlice := make([]*PGTransactionEntry, 0) + pgBlockSignersEntrySlice := make([]*PGBlockSigner, 0) for _, entry := range uniqueBlocks { block := entry.Encoder.(*lib.MsgDeSoBlock) - blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes, params) + blockEntry, blockSigners := BlockEncoderToPGStruct(block, entry.KeyBytes, params) pgBlockEntrySlice = append(pgBlockEntrySlice, blockEntry) + pgBlockSignersEntrySlice = append(pgBlockSignersEntrySlice, blockSigners...) for jj, transaction := range block.Txns { indexInBlock := uint64(jj) pgTransactionEntry, err := TransactionEncoderToPGStruct( @@ -166,6 +200,19 @@ func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation return errors.Wrapf(err, "entries.bulkInsertBlock: Error inserting transaction entries") } + if len(pgBlockSignersEntrySlice) > 0 { + // Execute the insert query. + query := db.NewInsert().Model(&pgBlockSignersEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (block_hash, signer_index) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertBlockEntry: Error inserting block signers") + } + } + return nil } @@ -214,5 +261,26 @@ func bulkDeleteBlockEntriesFromKeysToDelete(db *bun.DB, keysToDelete [][]byte) e Exec(context.Background()); err != nil { return errors.Wrapf(err, "entries.bulkDeleteBlockEntry: Error deleting utxo operation entries") } + + // Delete any signers associated with the block. + if _, err := db.NewDelete(). + Model(&PGBlockSigner{}). + Where("block_hash IN (?)", bun.In(blockHashHexesToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteBlockEntry: Error deleting block signers") + } return nil } + +// golang interface types are stored as a tuple of (type, value). A single i==nil check is not enough to +// determine if a pointer that implements an interface is nil. This function checks if the interface is nil +// by checking if the pointer itself is nil. +func isInterfaceNil(i interface{}) bool { + if i == nil { + return true + } + + value := reflect.ValueOf(i) + return value.Kind() == reflect.Ptr && value.IsNil() +} diff --git a/entries/utxo_operation.go b/entries/utxo_operation.go index 74f4ab6..b68f01c 100644 --- a/entries/utxo_operation.go +++ b/entries/utxo_operation.go @@ -93,13 +93,14 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, transactionUpdates := make([]*PGTransactionEntry, 0) affectedPublicKeys := make([]*PGAffectedPublicKeyEntry, 0) blockEntries := make([]*PGBlockEntry, 0) + pgBlockSigners := make([]*PGBlockSigner, 0) stakeRewardEntries := make([]*PGStakeReward, 0) jailedHistoryEntries := make([]*PGJailedHistoryEvent, 0) // Start timer to track how long it takes to insert the entries. start := time.Now() - fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserting %v entries\n", len(uniqueEntries)) + glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Inserting %v entries\n", len(uniqueEntries)) transactionCount := 0 // Whether we are inserting transactions for the first time, or just updating them. @@ -126,8 +127,9 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, if entry.Block != nil { insertTransactions = true block := entry.Block - blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes, params) + blockEntry, blockSigners := BlockEncoderToPGStruct(block, entry.KeyBytes, params) blockEntries = append(blockEntries, blockEntry) + pgBlockSigners = append(pgBlockSigners, blockSigners...) for ii, txn := range block.Txns { indexInBlock := uint64(ii) pgTxn, err := TransactionEncoderToPGStruct( @@ -277,7 +279,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, transactionCount += len(innerTransactionsUtxoOperations) // Print how long it took to insert the entries. } - fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Processed %v txns in %v s\n", transactionCount, time.Since(start)) + glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Processed %v txns in %v s\n", transactionCount, time.Since(start)) start = time.Now() @@ -299,6 +301,16 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, return errors.Wrapf(err, "entries.bulkInsertBlock: Error inserting entries") } + blockSignerQuery := db.NewInsert().Model(&pgBlockSigners) + + if operationType == lib.DbOperationTypeUpsert { + blockSignerQuery = blockSignerQuery.On("CONFLICT (block_hash, signer_index) DO UPDATE") + } + + if _, err := blockSignerQuery.Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertBlockSigners: Error inserting block signer entries") + } + } else { values := db.NewValues(&transactionUpdates) _, err := db.NewUpdate(). @@ -317,7 +329,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, } } - fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Updated %v txns in %v s\n", len(transactionUpdates), time.Since(start)) + glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Updated %v txns in %v s\n", len(transactionUpdates), time.Since(start)) start = time.Now() @@ -329,7 +341,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, } } - fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserted %v affected public keys in %v s\n", len(affectedPublicKeys), time.Since(start)) + glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Inserted %v affected public keys in %v s\n", len(affectedPublicKeys), time.Since(start)) start = time.Now() @@ -340,7 +352,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, return errors.Wrapf(err, "InsertStakeRewards: Problem inserting stake rewards") } } - fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserted %v stake rewards in %v s\n", len(stakeRewardEntries), time.Since(start)) + glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Inserted %v stake rewards in %v s\n", len(stakeRewardEntries), time.Since(start)) if len(jailedHistoryEntries) > 0 { _, err := db.NewInsert().Model(&jailedHistoryEntries).On("CONFLICT (validator_pkid, jailed_at_epoch_number, unjailed_at_epoch_number) DO NOTHING").Exec(context.Background()) @@ -410,6 +422,9 @@ func parseUtxoOperationBundle( } txIndexMetadata, err := consumer.ComputeTransactionMetadata(transaction, blockHashHex, params, transaction.TxnFeeNanos, uint64(jj), utxoOps) if err != nil { + glog.Errorf("parseUtxoOperationBundle: Problem computing transaction metadata for "+ + "entry %+v at block height %v: %v", entry, entry.BlockHeight, err) + // TODO: swallow error and continue. return nil, nil, nil, diff --git a/entries/validator.go b/entries/validator.go index f06cd18..1d41230 100644 --- a/entries/validator.go +++ b/entries/validator.go @@ -38,6 +38,28 @@ type PGValidatorEntryUtxoOps struct { UtxoOperation } +type SnapshotValidatorEntry struct { + ValidatorPKID string `bun:",nullzero"` + Domains []string `bun:",array"` + DisableDelegatedStake bool + DelegatedStakeCommissionBasisPoints uint64 + VotingPublicKey string `bun:",nullzero"` + VotingAuthorization string `bun:",nullzero"` + // Use bunbig.Int to store the balance as a numeric in the pg database. + TotalStakeAmountNanos *bunbig.Int `pg:",use_zero"` + LastActiveAtEpochNumber uint64 + JailedAtEpochNumber uint64 + SnapshotAtEpochNumber uint64 `pg:",use_zero"` + + ExtraData map[string]string `bun:"type:jsonb"` + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGSnapshotValidatorEntry struct { + bun.BaseModel `bun:"table:snapshot_validator_entry"` + SnapshotValidatorEntry +} + // Convert the ValidatorEntry DeSo encoder to the PGValidatorEntry struct used by bun. func ValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []byte, params *lib.DeSoParams) ValidatorEntry { pgValidatorEntry := ValidatorEntry{ @@ -96,23 +118,43 @@ func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) + uniqueValidatorEntries := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixValidatorByPKID) + uniqueSnapshotValidatorEntries := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixSnapshotValidatorSetByPKID) // Create a new array to hold the bun struct. - pgEntrySlice := make([]*PGValidatorEntry, len(uniqueEntries)) + pgEntrySlice := make([]*PGValidatorEntry, len(uniqueValidatorEntries)) + pgSnapshotEntrySlice := make([]*PGSnapshotValidatorEntry, len(uniqueSnapshotValidatorEntries)) // Loop through the entries and convert them to PGEntry. - for ii, entry := range uniqueEntries { + for ii, entry := range uniqueValidatorEntries { pgEntrySlice[ii] = &PGValidatorEntry{ValidatorEntry: ValidatorEncoderToPGStruct(entry.Encoder.(*lib.ValidatorEntry), entry.KeyBytes, params)} } + for ii, entry := range uniqueSnapshotValidatorEntries { + pgSnapshotEntrySlice[ii] = &PGSnapshotValidatorEntry{SnapshotValidatorEntry: SnapshotValidatorEncoderToPGStruct(entry.Encoder.(*lib.ValidatorEntry), entry.KeyBytes, params)} + } // Execute the insert query. - query := db.NewInsert().Model(&pgEntrySlice) + if len(pgEntrySlice) > 0 { + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } - if operationType == lib.DbOperationTypeUpsert { - query = query.On("CONFLICT (badger_key) DO UPDATE") + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting validator entries") + } } - if _, err := query.Returning("").Exec(context.Background()); err != nil { - return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting entries") + if len(pgSnapshotEntrySlice) > 0 { + query := db.NewInsert().Model(&pgSnapshotEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting snapshot validator entries") + } } return nil } @@ -123,16 +165,64 @@ func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, opera uniqueEntries := consumer.UniqueEntries(entries) // Transform the entries into a list of keys to delete. - keysToDelete := consumer.KeysToDelete(uniqueEntries) + validatorEntriesToDelete := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixValidatorByPKID) - // Execute the delete query. + snapshotValidatorEntriesToDelete := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixSnapshotValidatorSetByPKID) + + // Execute the delete query for validator entries. if _, err := db.NewDelete(). Model(&PGValidatorEntry{}). - Where("badger_key IN (?)", bun.In(keysToDelete)). + Where("badger_key IN (?)", bun.In(validatorEntriesToDelete)). Returning(""). Exec(context.Background()); err != nil { return errors.Wrapf(err, "entries.bulkDeleteValidatorEntry: Error deleting entries") } + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGSnapshotValidatorEntry{}). + Where("badger_key IN (?)", bun.In(snapshotValidatorEntriesToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteSnapshotValidatorEntry: Error deleting entries") + } + return nil } + +// Convert the SnapshotValidatorEntry DeSo encoder to the PGSnapshotValidatorEntry struct used by bun. +func SnapshotValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []byte, params *lib.DeSoParams) SnapshotValidatorEntry { + pgValidatorEntry := SnapshotValidatorEntry{ + ExtraData: consumer.ExtraDataBytesToString(validatorEntry.ExtraData), + BadgerKey: keyBytes, + } + + if validatorEntry.ValidatorPKID != nil { + pgValidatorEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*validatorEntry.ValidatorPKID)[:], params) + } + + if validatorEntry.Domains != nil { + pgValidatorEntry.Domains = make([]string, len(validatorEntry.Domains)) + for ii, domain := range validatorEntry.Domains { + pgValidatorEntry.Domains[ii] = string(domain) + } + } + + pgValidatorEntry.DisableDelegatedStake = validatorEntry.DisableDelegatedStake + pgValidatorEntry.DelegatedStakeCommissionBasisPoints = validatorEntry.DelegatedStakeCommissionBasisPoints + + if validatorEntry.VotingPublicKey != nil { + pgValidatorEntry.VotingPublicKey = validatorEntry.VotingPublicKey.ToString() + } + + if validatorEntry.VotingAuthorization != nil { + pgValidatorEntry.VotingAuthorization = validatorEntry.VotingAuthorization.ToString() + } + + pgValidatorEntry.TotalStakeAmountNanos = bunbig.FromMathBig(validatorEntry.TotalStakeAmountNanos.ToBig()) + pgValidatorEntry.LastActiveAtEpochNumber = validatorEntry.LastActiveAtEpochNumber + pgValidatorEntry.JailedAtEpochNumber = validatorEntry.JailedAtEpochNumber + keyBytesWithoutPrefix := keyBytes[1:] + pgValidatorEntry.SnapshotAtEpochNumber = lib.DecodeUint64(keyBytesWithoutPrefix[:8]) + return pgValidatorEntry +} diff --git a/migrations/initial_migrations/20240425000001_create_snapshot_validator_entry_table.go b/migrations/initial_migrations/20240425000001_create_snapshot_validator_entry_table.go new file mode 100644 index 0000000..8997cf9 --- /dev/null +++ b/migrations/initial_migrations/20240425000001_create_snapshot_validator_entry_table.go @@ -0,0 +1,48 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +func createSnapshotValidatorEntryTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + validator_pkid VARCHAR NOT NULL, + domains VARCHAR ARRAY, + disable_delegated_stake BOOLEAN, + delegated_stake_commission_basis_points BIGINT, + voting_public_key VARCHAR, + voting_authorization VARCHAR, + total_stake_amount_nanos NUMERIC(78, 0) NOT NULL, + last_active_at_epoch_number BIGINT, + jailed_at_epoch_number BIGINT, + extra_data JSONB, + snapshot_at_epoch_number BIGINT NOT NULL, + badger_key BYTEA PRIMARY KEY + ); + CREATE INDEX {tableName}_validator_pkid_idx ON {tableName} (validator_pkid); + CREATE INDEX {tableName}_snapshot_at_epoch_number_idx ON {tableName} (snapshot_at_epoch_number); + CREATE INDEX {tableName}_total_stake_amount_nanos on {tableName} (total_stake_amount_nanos); + CREATE INDEX {tableName}_badger_key ON {tableName} (badger_key); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createSnapshotValidatorEntryTable(db, "snapshot_validator_entry") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS snapshot_validator_entry; + `) + if err != nil { + return err + } + return nil + }) +} diff --git a/migrations/initial_migrations/20240425000002_create_block_signer_table.go b/migrations/initial_migrations/20240425000002_create_block_signer_table.go new file mode 100644 index 0000000..f6726e9 --- /dev/null +++ b/migrations/initial_migrations/20240425000002_create_block_signer_table.go @@ -0,0 +1,39 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +func createBlockSignerTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + block_hash VARCHAR NOT NULL, + signer_index BIGINT NOT NULL, + PRIMARY KEY(block_hash, signer_index) + ); + CREATE INDEX {tableName}_block_hash_idx ON {tableName} (block_hash); + CREATE INDEX {tableName}_block_hash_signer_index_idx ON {tableName} (block_hash, signer_index); + CREATE INDEX {tableName}_signer_index_idx ON {tableName} (signer_index); + create index block_proposer_voting_public_key on block (proposer_voting_public_key); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createBlockSignerTable(db, "block_signer") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS block_signer; + `) + if err != nil { + return err + } + return nil + }) +}