Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 52 additions & 19 deletions entries/block.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build relic

package entries

import (
Expand All @@ -12,14 +14,23 @@ import (
)

type BlockEntry struct {
BlockHash string `pg:",pk,use_zero"`
PrevBlockHash string
TxnMerkleRoot string
Timestamp time.Time
Height uint64
Nonce uint64
ExtraNonce uint64
BadgerKey []byte `pg:",use_zero"`
BlockHash string `pg:",pk,use_zero"`
PrevBlockHash string
TxnMerkleRoot string
Timestamp time.Time
Height uint64
Nonce uint64
ExtraNonce uint64
BlockVersion uint32
TxnConnectStatusByIndexHash string `pg:",use_zero"`
ProposerPublicKey string `pg:",use_zero"`
ProposerVotingPublicKey string `pg:",use_zero"`
ProposerRandomSeedSignature string `pg:",use_zero"`
ProposedInView uint64
ProposerVotePartialSignature string `pg:",use_zero"`
// TODO: Quorum Certificates. Separate entry.

BadgerKey []byte `pg:",use_zero"`
}

type PGBlockEntry struct {
Expand All @@ -28,18 +39,34 @@ type PGBlockEntry struct {
}

// Convert the UserAssociation DeSo encoder to the PG struct used by bun.
func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte) *PGBlockEntry {
func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte, params *lib.DeSoParams) *PGBlockEntry {
blockHash, _ := block.Hash()
var txnConnectStatusByIndexHash string
if block.Header.TxnConnectStatusByIndexHash != nil {
txnConnectStatusByIndexHash = hex.EncodeToString(block.Header.TxnConnectStatusByIndexHash.ToBytes())
}
var proposerPublicKey string
if block.Header.ProposerPublicKey != nil {
proposerPublicKey = consumer.PublicKeyBytesToBase58Check(
block.Header.ProposerPublicKey.ToBytes(), params)
}
return &PGBlockEntry{
BlockEntry: BlockEntry{
BlockHash: hex.EncodeToString(blockHash[:]),
PrevBlockHash: hex.EncodeToString(block.Header.PrevBlockHash[:]),
TxnMerkleRoot: hex.EncodeToString(block.Header.TransactionMerkleRoot[:]),
Timestamp: consumer.UnixNanoToTime(uint64(block.Header.TstampNanoSecs)),
Height: block.Header.Height,
Nonce: block.Header.Nonce,
ExtraNonce: block.Header.ExtraNonce,
BadgerKey: keyBytes,
BlockHash: hex.EncodeToString(blockHash[:]),
PrevBlockHash: hex.EncodeToString(block.Header.PrevBlockHash[:]),
TxnMerkleRoot: hex.EncodeToString(block.Header.TransactionMerkleRoot[:]),
Timestamp: consumer.UnixNanoToTime(uint64(block.Header.TstampNanoSecs)),
Height: block.Header.Height,
Nonce: block.Header.Nonce,
ExtraNonce: block.Header.ExtraNonce,
BlockVersion: block.Header.Version,
TxnConnectStatusByIndexHash: txnConnectStatusByIndexHash,
ProposerPublicKey: proposerPublicKey,
ProposerVotingPublicKey: block.Header.ProposerVotingPublicKey.ToString(),
ProposerRandomSeedSignature: block.Header.ProposerRandomSeedSignature.ToString(),
ProposedInView: block.Header.ProposedInView,
ProposerVotePartialSignature: block.Header.ProposerVotePartialSignature.ToString(),
BadgerKey: keyBytes,
},
}
}
Expand Down Expand Up @@ -77,10 +104,16 @@ func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation

for _, entry := range uniqueBlocks {
block := entry.Encoder.(*lib.MsgDeSoBlock)
blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes)
blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes, params)
pgBlockEntrySlice = append(pgBlockEntrySlice, blockEntry)
for jj, transaction := range block.Txns {
pgTransactionEntry, err := TransactionEncoderToPGStruct(transaction, uint64(jj), blockEntry.BlockHash, blockEntry.Height, blockEntry.Timestamp, params)
// Check if the transaction connects or not.
txnConnects := blockEntry.Height < uint64(params.ForkHeights.ProofOfStake2ConsensusCutoverBlockHeight) ||
jj == 0 || block.TxnConnectStatusByIndex.Get(jj-1)
pgTransactionEntry, err := TransactionEncoderToPGStruct(
transaction, uint64(jj), blockEntry.BlockHash, blockEntry.Height, blockEntry.Timestamp, txnConnects,
params,
)
if err != nil {
return errors.Wrapf(err, "entries.bulkInsertBlockEntry: Problem converting transaction to PG struct")
}
Expand Down
48 changes: 48 additions & 0 deletions entries/stake_reward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package entries

import (
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
"github.com/uptrace/bun"
)

type StakeReward struct {
StakerPKID string `bun:",nullzero"`
ValidatorPKID string `bun:",nullzero"`
RewardMethod lib.StakingRewardMethod // TODO: we probably want this to be human readable?
RewardNanos uint64 `pg:",use_zero"`
IsValidatorCommission bool
BlockHash string

UtxoOpIndex uint64 `pg:",use_zero"`
}

type PGStakeReward struct {
bun.BaseModel `bun:"table:stake_reward"`
StakeReward
}

// Convert the StakeRewardStateChangeMetadata DeSo encoder to the PGStakeReward struct used by bun.
func StakeRewardEncoderToPGStruct(
stakeReward *lib.StakeRewardStateChangeMetadata,
params *lib.DeSoParams,
blockHash string,
utxoOpIndex uint64,
) StakeReward {
pgStakeReward := StakeReward{}

if stakeReward.StakerPKID != nil {
pgStakeReward.StakerPKID = consumer.PublicKeyBytesToBase58Check((*stakeReward.StakerPKID)[:], params)
}

if stakeReward.ValidatorPKID != nil {
pgStakeReward.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*stakeReward.ValidatorPKID)[:], params)
}

pgStakeReward.RewardMethod = stakeReward.StakingRewardMethod
pgStakeReward.RewardNanos = stakeReward.RewardNanos
pgStakeReward.IsValidatorCommission = stakeReward.IsValidatorCommission
pgStakeReward.BlockHash = blockHash
pgStakeReward.UtxoOpIndex = utxoOpIndex
return pgStakeReward
}
18 changes: 15 additions & 3 deletions entries/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,24 @@ type TransactionEntry struct {
IndexInBlock uint64
BlockHeight uint64
Timestamp time.Time `pg:",use_zero"`
BadgerKey []byte `pg:",use_zero"`
Connects bool
BadgerKey []byte `pg:",use_zero"`
}

type PGTransactionEntry struct {
bun.BaseModel `bun:"table:transaction_partitioned"`
TransactionEntry
}

func TransactionEncoderToPGStruct(transaction *lib.MsgDeSoTxn, blockIndex uint64, blockHash string, blockHeight uint64, timestamp time.Time, params *lib.DeSoParams) (*PGTransactionEntry, error) {
func TransactionEncoderToPGStruct(
transaction *lib.MsgDeSoTxn,
blockIndex uint64,
blockHash string,
blockHeight uint64,
timestamp time.Time,
connects bool,
params *lib.DeSoParams,
) (*PGTransactionEntry, error) {

var txInputs []map[string]string
for _, input := range transaction.TxInputs {
Expand Down Expand Up @@ -86,6 +95,7 @@ func TransactionEncoderToPGStruct(transaction *lib.MsgDeSoTxn, blockIndex uint64
IndexInBlock: blockIndex,
BlockHeight: blockHeight,
Timestamp: timestamp,
Connects: connects,
BadgerKey: transaction.Hash()[:],
},
}
Expand Down Expand Up @@ -127,7 +137,9 @@ func transformTransactionEntry(entries []*lib.StateChangeEntry, params *lib.DeSo

for _, entry := range uniqueTransactions {
transaction := entry.Encoder.(*lib.MsgDeSoTxn)
transactionEntry, err := TransactionEncoderToPGStruct(transaction, 0, "", 0, time.Now(), params)
// Assuming transactions connect when using this function. We can only
// tell if a transaction connects or not if we have the block.
transactionEntry, err := TransactionEncoderToPGStruct(transaction, 0, "", 0, time.Now(), true, params)
if err != nil {
return nil, errors.Wrapf(err, "entries.transformAndBulkInsertTransactionEntry: Problem converting transaction to PG struct")
}
Expand Down
49 changes: 44 additions & 5 deletions entries/utxo_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/uptrace/bun"
"time"
Expand Down Expand Up @@ -64,7 +65,7 @@ func ConvertUtxoOperationKeyToBlockHashHex(keyBytes []byte) string {
return hex.EncodeToString(keyBytes[1:])
}

// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
// UtxoOperationBatchOperation is the entry point for processing a batch of utxo operations. It determines the appropriate handler
// based on the operation type and executes it.
func UtxoOperationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
Expand Down Expand Up @@ -92,6 +93,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
transactionUpdates := make([]*PGTransactionEntry, 0)
affectedPublicKeys := make([]*PGAffectedPublicKeyEntry, 0)
blockEntries := make([]*PGBlockEntry, 0)
stakeRewardEntries := make([]*PGStakeReward, 0)

// Start timer to track how long it takes to insert the entries.
start := time.Now()
Expand All @@ -113,14 +115,18 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
blockHash := ConvertUtxoOperationKeyToBlockHashHex(entry.KeyBytes)

// Check to see if the state change entry has an attached block.
// Note that this only happens during the iniltial sync, in order to speed up the sync process.
// Note that this only happens during the initial sync, in order to speed up the sync process.
if entry.Block != nil {
insertTransactions = true
block := entry.Block
blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes)
blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes, params)
blockEntries = append(blockEntries, blockEntry)
for ii, txn := range block.Txns {
pgTxn, err := TransactionEncoderToPGStruct(txn, uint64(ii), blockEntry.BlockHash, blockEntry.Height, blockEntry.Timestamp, params)
// Check if the transaction connects or not.
txnConnects := blockEntry.Height < uint64(params.ForkHeights.ProofOfStake2ConsensusCutoverBlockHeight) ||
ii == 0 || block.TxnConnectStatusByIndex.Get(ii-1)
pgTxn, err := TransactionEncoderToPGStruct(
txn, uint64(ii), blockEntry.BlockHash, blockEntry.Height, blockEntry.Timestamp, txnConnects, params)
if err != nil {
return errors.Wrapf(err, "entries.bulkInsertUtxoOperationsEntry: Problem converting transaction to PG struct")
}
Expand Down Expand Up @@ -169,7 +175,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
if err != nil {
return fmt.Errorf("entries.bulkInsertUtxoOperationsEntry: Problem decoding transaction for entry %+v at block height %v", entry, entry.BlockHeight)
}
txIndexMetadata, err := consumer.ComputeTransactionMetadata(transaction, blockHash, &lib.DeSoMainnetParams, transaction.TxnFeeNanos, uint64(jj), utxoOps)
txIndexMetadata, err := consumer.ComputeTransactionMetadata(transaction, blockHash, params, transaction.TxnFeeNanos, uint64(jj), utxoOps)
if err != nil {
return fmt.Errorf("entries.bulkInsertUtxoOperationsEntry: Problem computing transaction metadata for entry %+v at block height %v", entry, entry.BlockHeight)
}
Expand Down Expand Up @@ -216,6 +222,27 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
affectedPublicKeys = append(affectedPublicKeys, affectedPublicKeyEntry)
}
transactionUpdates = append(transactionUpdates, transactions[jj])
} else if jj == len(transactions) {
// TODO: parse utxo operations for the block level index.
// Examples: deletion of expired nonces, staking rewards (restaked
// + payed to balance), validator jailing, updating validator's
// last active at epoch.
for ii, utxoOp := range utxoOps {
switch utxoOp.Type {
case lib.OperationTypeStakeDistributionRestake, lib.OperationTypeStakeDistributionPayToBalance:
stateChangeMetadata, ok := utxoOp.StateChangeMetadata.(*lib.StakeRewardStateChangeMetadata)
if !ok {
glog.Error("bulkInsertUtxoOperationsEntry: Problem with state change metadata for " +
"stake rewards")
continue
}
stakeReward := PGStakeReward{
StakeReward: StakeRewardEncoderToPGStruct(stateChangeMetadata, params, blockHash, uint64(ii)),
}
stakeRewardEntries = append(stakeRewardEntries, &stakeReward)
}
}

}
}
// Print how long it took to insert the entries.
Expand Down Expand Up @@ -273,6 +300,18 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
}

fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserted %v affected public keys in %v s\n", len(affectedPublicKeys), time.Since(start))

start = time.Now()

// Insert stake rewards into db
if len(stakeRewardEntries) > 0 {
_, err := db.NewInsert().Model(&stakeRewardEntries).On("CONFLICT (block_hash, utxo_op_index) DO UPDATE").Exec(context.Background())
if err != nil {
return errors.Wrapf(err, "InsertStakeRewards: Problem inserting stake rewards")
}
}
fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserted %v stake rewards in %v s\n", len(stakeRewardEntries), time.Since(start))

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func getConfigValues() (pgURI string, stateChangeDir string, consumerProgressDir
if stateChangeDir == "" {
stateChangeDir = "/tmp/state-changes"
}
// Set the state change dir flag that core uses, so DeSoEncoders properly encode and decode state change metadata.
viper.Set("state-change-dir", stateChangeDir)

consumerProgressDir = viper.GetString("CONSUMER_PROGRESS_DIR")
if consumerProgressDir == "" {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package initial_migrations

import (
"context"
"github.com/uptrace/bun"
)

// TODO: Not nullable fields
func updateBlockTableWithPoSFields(db *bun.DB, tableName string) error {
_, err := db.Exec(`
ALTER TABLE block
ADD COLUMN block_version BIGINT,
ADD COLUMN txn_connect_status_by_index_hash VARCHAR,
ADD COLUMN proposer_public_key VARCHAR,
ADD COLUMN proposer_voting_public_key VARCHAR,
ADD COLUMN proposer_random_seed_signature VARCHAR,
ADD COLUMN proposed_in_view BIGINT,
ADD COLUMN proposer_vote_partial_signature VARCHAR;
`)
// TODO: What other fields do we need indexed?
return err
}

func init() {
Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error {
return updateBlockTableWithPoSFields(db, "block")
}, func(ctx context.Context, db *bun.DB) error {
_, err := db.Exec(`
ALTER TABLE block
DROP COLUMN block_version,
DROP COLUMN txn_connect_status_by_index_hash,
DROP COLUMN proposer_public_key,
DROP COLUMN proposer_voting_public_key,
DROP COLUMN proposer_random_seed_signature,
DROP COLUMN proposed_in_view,
DROP COLUMN proposer_vote_partial_signature;
`)
if err != nil {
return err
}
return nil
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package initial_migrations

import (
"context"
"github.com/uptrace/bun"
)

func updateTransactionTableWithPoSFields(db *bun.DB) error {
_, err := db.Exec(`
ALTER TABLE transaction_partitioned
ADD COLUMN connects BOOLEAN DEFAULT TRUE NOT NULL;
`)
// TODO: What other fields do we need indexed?
return err
}

func init() {
Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error {
return updateTransactionTableWithPoSFields(db)
}, func(ctx context.Context, db *bun.DB) error {
_, err := db.Exec(`
ALTER TABLE transaction_partitioned
DROP COLUMN connects;
`)
if err != nil {
return err
}
return nil
})
}
Loading