From 3257aa20841c5cbde0918e358f790b3efc4c3ebb Mon Sep 17 00:00:00 2001 From: Lazy Nina <> Date: Wed, 13 Dec 2023 17:23:29 -0500 Subject: [PATCH 1/3] Add StakeEntry, LockedStakeEntry, and ValidatorEntry. Update Dockerfile for pos --- entries/epoch.go | 106 ++++++++++++++ entries/locked_stake.go | 117 +++++++++++++++ entries/lockup.go | 117 +++++++++++++++ entries/stake.go | 117 +++++++++++++++ entries/validator.go | 138 ++++++++++++++++++ entries/yield_curve_point.go | 109 ++++++++++++++ handler/data_handler.go | 12 ++ ...3000000_create_locked_stake_entry_table.go | 41 ++++++ ...20231213000000_create_stake_entry_table.go | 41 ++++++ ...1213000000_create_validator_entry_table.go | 44 ++++++ ...20240129000000_create_epoch_entry_table.go | 39 +++++ ...00001_create_locked_balance_entry_table.go | 41 ++++++ ...2_create_lockup_yield_curve_point_table.go | 38 +++++ 13 files changed, 960 insertions(+) create mode 100644 entries/epoch.go create mode 100644 entries/locked_stake.go create mode 100644 entries/lockup.go create mode 100644 entries/stake.go create mode 100644 entries/validator.go create mode 100644 entries/yield_curve_point.go create mode 100644 migrations/initial_migrations/20231213000000_create_locked_stake_entry_table.go create mode 100644 migrations/initial_migrations/20231213000000_create_stake_entry_table.go create mode 100644 migrations/initial_migrations/20231213000000_create_validator_entry_table.go create mode 100644 migrations/initial_migrations/20240129000000_create_epoch_entry_table.go create mode 100644 migrations/initial_migrations/20240129000001_create_locked_balance_entry_table.go create mode 100644 migrations/initial_migrations/20240129000002_create_lockup_yield_curve_point_table.go diff --git a/entries/epoch.go b/entries/epoch.go new file mode 100644 index 0000000..f7faae3 --- /dev/null +++ b/entries/epoch.go @@ -0,0 +1,106 @@ +package entries + +import ( + "context" + "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/state-consumer/consumer" + "github.com/pkg/errors" + "github.com/uptrace/bun" +) + +// TODO: when to use nullzero vs use_zero? +type EpochEntry struct { + EpochNumber uint64 + InitialBlockHeight uint64 + InitialView uint64 + FinalBlockHeight uint64 + CreatedAtBlockTimestampNanoSecs uint64 + + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGEpochEntry struct { + bun.BaseModel `bun:"table:epoch_entry"` + EpochEntry +} + +// TODO: Do I need this? +type PGEpochUtxoOps struct { + bun.BaseModel `bun:"table:epoch_entry_utxo_ops"` + EpochEntry + UtxoOperation +} + +// Convert the EpochEntry DeSo encoder to the PGEpochEntry struct used by bun. +func EpochEntryEncoderToPGStruct(epochEntry *lib.EpochEntry, keyBytes []byte, params *lib.DeSoParams) EpochEntry { + return EpochEntry{ + EpochNumber: epochEntry.EpochNumber, + InitialBlockHeight: epochEntry.InitialBlockHeight, + InitialView: epochEntry.InitialView, + FinalBlockHeight: epochEntry.FinalBlockHeight, + BadgerKey: keyBytes, + } +} + +// EpochEntryBatchOperation is the entry point for processing a batch of Epoch entries. +// It determines the appropriate handler based on the operation type and executes it. +func EpochEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { + // We check before we call this function that there is at least one operation type. + // We also ensure before this that all entries have the same operation type. + operationType := entries[0].OperationType + var err error + if operationType == lib.DbOperationTypeDelete { + err = bulkDeleteEpochEntry(entries, db, operationType) + } else { + err = bulkInsertEpochEntry(entries, db, operationType, params) + } + if err != nil { + return errors.Wrapf(err, "entries.EpochEntryBatchOperation: Problem with operation type %v", operationType) + } + return nil +} + +// bulkInsertEpochEntry inserts a batch of locked stake entries into the database. +func bulkInsertEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + // Create a new array to hold the bun struct. + pgEntrySlice := make([]*PGEpochEntry, len(uniqueEntries)) + + // Loop through the entries and convert them to PGEntry. + for ii, entry := range uniqueEntries { + pgEntrySlice[ii] = &PGEpochEntry{EpochEntry: EpochEntryEncoderToPGStruct(entry.Encoder.(*lib.EpochEntry), entry.KeyBytes, params)} + } + + // Execute the insert query. + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertEpochEntry: Error inserting entries") + } + return nil +} + +// bulkDeleteEpochEntry deletes a batch of locked stake entries from the database. +func bulkDeleteEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + // Transform the entries into a list of keys to delete. + keysToDelete := consumer.KeysToDelete(uniqueEntries) + + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGEpochEntry{}). + Where("badger_key IN (?)", bun.In(keysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteEpochEntry: Error deleting entries") + } + + return nil +} diff --git a/entries/locked_stake.go b/entries/locked_stake.go new file mode 100644 index 0000000..d8adf88 --- /dev/null +++ b/entries/locked_stake.go @@ -0,0 +1,117 @@ +package entries + +import ( + "context" + "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/state-consumer/consumer" + "github.com/pkg/errors" + "github.com/uptrace/bun" + "github.com/uptrace/bun/extra/bunbig" +) + +// TODO: when to use nullzero vs use_zero? +type LockedStakeEntry struct { + StakerPKID string `bun:",nullzero"` + ValidatorPKID string `bun:",nullzero"` + LockedAmountNanos *bunbig.Int `pg:",use_zero"` + LockedAtEpochNumber uint64 + + ExtraData map[string]string `bun:"type:jsonb"` + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGLockedStakeEntry struct { + bun.BaseModel `bun:"table:locked_stake_entry"` + LockedStakeEntry +} + +// TODO: Do I need this? +type PGLockedStakeEntryUtxoOps struct { + bun.BaseModel `bun:"table:locked_stake_entry_utxo_ops"` + LockedStakeEntry + UtxoOperation +} + +// Convert the LockedStakeEntry DeSo encoder to the PGLockedStakeEntry struct used by bun. +func LockedStakeEncoderToPGStruct(lockedStakeEntry *lib.LockedStakeEntry, keyBytes []byte, params *lib.DeSoParams) LockedStakeEntry { + pgLockedStakeEntry := LockedStakeEntry{ + ExtraData: consumer.ExtraDataBytesToString(lockedStakeEntry.ExtraData), + BadgerKey: keyBytes, + } + + if lockedStakeEntry.StakerPKID != nil { + pgLockedStakeEntry.StakerPKID = consumer.PublicKeyBytesToBase58Check((*lockedStakeEntry.StakerPKID)[:], params) + } + + if lockedStakeEntry.ValidatorPKID != nil { + pgLockedStakeEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*lockedStakeEntry.ValidatorPKID)[:], params) + } + + pgLockedStakeEntry.LockedAtEpochNumber = lockedStakeEntry.LockedAtEpochNumber + pgLockedStakeEntry.LockedAmountNanos = bunbig.FromMathBig(lockedStakeEntry.LockedAmountNanos.ToBig()) + + return pgLockedStakeEntry +} + +// LockedStakeBatchOperation is the entry point for processing a batch of LockedStake entries. +// It determines the appropriate handler based on the operation type and executes it. +func LockedStakeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { + // We check before we call this function that there is at least one operation type. + // We also ensure before this that all entries have the same operation type. + operationType := entries[0].OperationType + var err error + if operationType == lib.DbOperationTypeDelete { + err = bulkDeleteLockedStakeEntry(entries, db, operationType) + } else { + err = bulkInsertLockedStakeEntry(entries, db, operationType, params) + } + if err != nil { + return errors.Wrapf(err, "entries.LockedStakeBatchOperation: Problem with operation type %v", operationType) + } + return nil +} + +// bulkInsertLockedStakeEntry inserts a batch of locked stake entries into the database. +func bulkInsertLockedStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + // Create a new array to hold the bun struct. + pgEntrySlice := make([]*PGLockedStakeEntry, len(uniqueEntries)) + + // Loop through the entries and convert them to PGEntry. + for ii, entry := range uniqueEntries { + pgEntrySlice[ii] = &PGLockedStakeEntry{LockedStakeEntry: LockedStakeEncoderToPGStruct(entry.Encoder.(*lib.LockedStakeEntry), entry.KeyBytes, params)} + } + + // Execute the insert query. + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertLockedStakeEntry: Error inserting entries") + } + return nil +} + +// bulkDeleteLockedStakeEntry deletes a batch of locked stake entries from the database. +func bulkDeleteLockedStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + // Transform the entries into a list of keys to delete. + keysToDelete := consumer.KeysToDelete(uniqueEntries) + + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGLockedStakeEntry{}). + Where("badger_key IN (?)", bun.In(keysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteLockedStakeEntry: Error deleting entries") + } + + return nil +} diff --git a/entries/lockup.go b/entries/lockup.go new file mode 100644 index 0000000..1531f70 --- /dev/null +++ b/entries/lockup.go @@ -0,0 +1,117 @@ +package entries + +import ( + "context" + "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/state-consumer/consumer" + "github.com/pkg/errors" + "github.com/uptrace/bun" + "github.com/uptrace/bun/extra/bunbig" +) + +// TODO: when to use nullzero vs use_zero? +type LockedBalanceEntry struct { + HODLerPKID string `bun:",nullzero"` + ProfilePKID string `bun:",nullzero"` + UnlockTimestampNanoSecs int64 + VestingEndTimestampNanoSecs int64 + BalanceBaseUnits *bunbig.Int `pg:",use_zero"` + + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGLockedBalanceEntry struct { + bun.BaseModel `bun:"table:locked_balance_entry"` + LockedBalanceEntry +} + +// TODO: Do I need this? +type PGLockedBalanceEntryUtxoOps struct { + bun.BaseModel `bun:"table:locked_balance_entry_utxo_ops"` + LockedBalanceEntry + UtxoOperation +} + +// Convert the LockedBalanceEntry DeSo encoder to the PGLockedBalnceEntry struct used by bun. +func LockedBalanceEntryEncoderToPGStruct(lockedBalanceEntry *lib.LockedBalanceEntry, keyBytes []byte, params *lib.DeSoParams) LockedBalanceEntry { + pgLockedBalanceEntry := LockedBalanceEntry{ + BadgerKey: keyBytes, + } + + if lockedBalanceEntry.HODLerPKID != nil { + pgLockedBalanceEntry.HODLerPKID = consumer.PublicKeyBytesToBase58Check((*lockedBalanceEntry.HODLerPKID)[:], params) + } + + if lockedBalanceEntry.ProfilePKID != nil { + pgLockedBalanceEntry.ProfilePKID = consumer.PublicKeyBytesToBase58Check((*lockedBalanceEntry.ProfilePKID)[:], params) + } + + pgLockedBalanceEntry.UnlockTimestampNanoSecs = lockedBalanceEntry.UnlockTimestampNanoSecs + pgLockedBalanceEntry.VestingEndTimestampNanoSecs = lockedBalanceEntry.VestingEndTimestampNanoSecs + pgLockedBalanceEntry.BalanceBaseUnits = bunbig.FromMathBig(lockedBalanceEntry.BalanceBaseUnits.ToBig()) + + return pgLockedBalanceEntry +} + +// LockedBalanceEntryBatchOperation is the entry point for processing a batch of LockedBalance entries. +// It determines the appropriate handler based on the operation type and executes it. +func LockedBalanceEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { + // We check before we call this function that there is at least one operation type. + // We also ensure before this that all entries have the same operation type. + operationType := entries[0].OperationType + var err error + if operationType == lib.DbOperationTypeDelete { + err = bulkDeleteLockedBalanceEntry(entries, db, operationType) + } else { + err = bulkInsertLockedBalanceEntry(entries, db, operationType, params) + } + if err != nil { + return errors.Wrapf(err, "entries.LockedBalanceEntryBatchOperation: Problem with operation type %v", operationType) + } + return nil +} + +// bulkInsertLockedBalanceEntry inserts a batch of locked stake entries into the database. +func bulkInsertLockedBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + // Create a new array to hold the bun struct. + pgEntrySlice := make([]*PGLockedBalanceEntry, len(uniqueEntries)) + + // Loop through the entries and convert them to PGEntry. + for ii, entry := range uniqueEntries { + pgEntrySlice[ii] = &PGLockedBalanceEntry{LockedBalanceEntry: LockedBalanceEntryEncoderToPGStruct(entry.Encoder.(*lib.LockedBalanceEntry), entry.KeyBytes, params)} + } + + // Execute the insert query. + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertLockedBalanceEntry: Error inserting entries") + } + return nil +} + +// bulkDeleteLockedBalanceEntry deletes a batch of locked stake entries from the database. +func bulkDeleteLockedBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + // Transform the entries into a list of keys to delete. + keysToDelete := consumer.KeysToDelete(uniqueEntries) + + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGLockedBalanceEntry{}). + Where("badger_key IN (?)", bun.In(keysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteLockedBalanceEntry: Error deleting entries") + } + + return nil +} diff --git a/entries/stake.go b/entries/stake.go new file mode 100644 index 0000000..2c47fed --- /dev/null +++ b/entries/stake.go @@ -0,0 +1,117 @@ +package entries + +import ( + "context" + "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/state-consumer/consumer" + "github.com/pkg/errors" + "github.com/uptrace/bun" + "github.com/uptrace/bun/extra/bunbig" +) + +// TODO: when to use nullzero vs use_zero? +type StakeEntry struct { + StakerPKID string `bun:",nullzero"` + ValidatorPKID string `bun:",nullzero"` + RewardMethod lib.StakingRewardMethod // TODO: we probably want this to be human readable? + StakeAmountNanos *bunbig.Int `pg:",use_zero"` + + ExtraData map[string]string `bun:"type:jsonb"` + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGStakeEntry struct { + bun.BaseModel `bun:"table:stake_entry"` + StakeEntry +} + +// TODO: Do I need this? +type PGStakeEntryUtxoOps struct { + bun.BaseModel `bun:"table:stake_entry_utxo_ops"` + StakeEntry + UtxoOperation +} + +// Convert the StakeEntry DeSo encoder to the PGStakeEntry struct used by bun. +func StakeEncoderToPGStruct(stakeEntry *lib.StakeEntry, keyBytes []byte, params *lib.DeSoParams) StakeEntry { + pgStakeEntry := StakeEntry{ + ExtraData: consumer.ExtraDataBytesToString(stakeEntry.ExtraData), + BadgerKey: keyBytes, + } + + if stakeEntry.StakerPKID != nil { + pgStakeEntry.StakerPKID = consumer.PublicKeyBytesToBase58Check((*stakeEntry.StakerPKID)[:], params) + } + + if stakeEntry.ValidatorPKID != nil { + pgStakeEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*stakeEntry.ValidatorPKID)[:], params) + } + + pgStakeEntry.RewardMethod = stakeEntry.RewardMethod + pgStakeEntry.StakeAmountNanos = bunbig.FromMathBig(stakeEntry.StakeAmountNanos.ToBig()) + + return pgStakeEntry +} + +// StakeBatchOperation is the entry point for processing a batch of Stake entries. +// It determines the appropriate handler based on the operation type and executes it. +func StakeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { + // We check before we call this function that there is at least one operation type. + // We also ensure before this that all entries have the same operation type. + operationType := entries[0].OperationType + var err error + if operationType == lib.DbOperationTypeDelete { + err = bulkDeleteStakeEntry(entries, db, operationType) + } else { + err = bulkInsertStakeEntry(entries, db, operationType, params) + } + if err != nil { + return errors.Wrapf(err, "entries.StakeBatchOperation: Problem with operation type %v", operationType) + } + return nil +} + +// bulkInsertStakeEntry inserts a batch of stake entries into the database. +func bulkInsertStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + // Create a new array to hold the bun struct. + pgEntrySlice := make([]*PGStakeEntry, len(uniqueEntries)) + + // Loop through the entries and convert them to PGEntry. + for ii, entry := range uniqueEntries { + pgEntrySlice[ii] = &PGStakeEntry{StakeEntry: StakeEncoderToPGStruct(entry.Encoder.(*lib.StakeEntry), entry.KeyBytes, params)} + } + + // Execute the insert query. + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertStakeEntry: Error inserting entries") + } + return nil +} + +// bulkDeleteStakeEntry deletes a batch of stake entries from the database. +func bulkDeleteStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + // Transform the entries into a list of keys to delete. + keysToDelete := consumer.KeysToDelete(uniqueEntries) + + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGStakeEntry{}). + Where("badger_key IN (?)", bun.In(keysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteStakeEntry: Error deleting entries") + } + + return nil +} diff --git a/entries/validator.go b/entries/validator.go new file mode 100644 index 0000000..30e598e --- /dev/null +++ b/entries/validator.go @@ -0,0 +1,138 @@ +package entries + +import ( + "context" + "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/state-consumer/consumer" + "github.com/pkg/errors" + "github.com/uptrace/bun" + "github.com/uptrace/bun/extra/bunbig" +) + +// TODO: when to use nullzero vs use_zero? +type ValidatorEntry struct { + ValidatorPKID string `bun:",nullzero"` + Domains []string `bun:",nullzero"` + DisableDelegatedStake bool + DelegatedStakeCommissionBasisPoints uint64 + VotingPublicKey string `bun:",nullzero"` + VotingAuthorization string `bun:",nullzero"` + // Use bunbig.Int to store the balance as a numeric in the pg database. + TotalStakeAmountNanos *bunbig.Int `pg:",use_zero"` + LastActiveAtEpochNumber uint64 + JailedAtEpochNumber uint64 + + ExtraData map[string]string `bun:"type:jsonb"` + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGValidatorEntry struct { + bun.BaseModel `bun:"table:validator_entry"` + ValidatorEntry +} + +// TODO: Do I need this? +type PGValidatorEntryUtxoOps struct { + bun.BaseModel `bun:"table:validator_entry_utxo_ops"` + ValidatorEntry + UtxoOperation +} + +// Convert the ValidatorEntry DeSo encoder to the PGValidatorEntry struct used by bun. +func ValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []byte, params *lib.DeSoParams) ValidatorEntry { + pgValidatorEntry := ValidatorEntry{ + ExtraData: consumer.ExtraDataBytesToString(validatorEntry.ExtraData), + BadgerKey: keyBytes, + } + + if validatorEntry.ValidatorPKID != nil { + pgValidatorEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*validatorEntry.ValidatorPKID)[:], params) + } + + if validatorEntry.Domains != nil { + pgValidatorEntry.Domains = make([]string, len(validatorEntry.Domains)) + for ii, domain := range validatorEntry.Domains { + pgValidatorEntry.Domains[ii] = string(domain) + } + } + + pgValidatorEntry.DisableDelegatedStake = validatorEntry.DisableDelegatedStake + pgValidatorEntry.DelegatedStakeCommissionBasisPoints = validatorEntry.DelegatedStakeCommissionBasisPoints + + if validatorEntry.VotingPublicKey != nil { + pgValidatorEntry.VotingPublicKey = validatorEntry.VotingPublicKey.ToString() + } + + if validatorEntry.VotingAuthorization != nil { + pgValidatorEntry.VotingAuthorization = validatorEntry.VotingAuthorization.ToString() + } + + pgValidatorEntry.TotalStakeAmountNanos = bunbig.FromMathBig(validatorEntry.TotalStakeAmountNanos.ToBig()) + pgValidatorEntry.LastActiveAtEpochNumber = validatorEntry.LastActiveAtEpochNumber + pgValidatorEntry.JailedAtEpochNumber = validatorEntry.JailedAtEpochNumber + + return pgValidatorEntry +} + +// ValidatorBatchOperation is the entry point for processing a batch of Validator entries. +// It determines the appropriate handler based on the operation type and executes it. +func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { + // We check before we call this function that there is at least one operation type. + // We also ensure before this that all entries have the same operation type. + operationType := entries[0].OperationType + var err error + if operationType == lib.DbOperationTypeDelete { + err = bulkDeleteValidatorEntry(entries, db, operationType) + } else { + err = bulkInsertValidatorEntry(entries, db, operationType, params) + } + if err != nil { + return errors.Wrapf(err, "entries.ValidatorBatchOperation: Problem with operation type %v", operationType) + } + return nil +} + +// bulkInsertValidatorEntry inserts a batch of validator entries into the database. +func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + // Create a new array to hold the bun struct. + pgEntrySlice := make([]*PGValidatorEntry, len(uniqueEntries)) + + // Loop through the entries and convert them to PGEntry. + for ii, entry := range uniqueEntries { + pgEntrySlice[ii] = &PGValidatorEntry{ValidatorEntry: ValidatorEncoderToPGStruct(entry.Encoder.(*lib.ValidatorEntry), entry.KeyBytes, params)} + } + + // Execute the insert query. + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting entries") + } + return nil +} + +// bulkDeleteValidatorEntry deletes a batch of validator entries from the database. +func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + // Transform the entries into a list of keys to delete. + keysToDelete := consumer.KeysToDelete(uniqueEntries) + + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGValidatorEntry{}). + Where("badger_key IN (?)", bun.In(keysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteValidatorEntry: Error deleting entries") + } + + return nil +} diff --git a/entries/yield_curve_point.go b/entries/yield_curve_point.go new file mode 100644 index 0000000..bd81314 --- /dev/null +++ b/entries/yield_curve_point.go @@ -0,0 +1,109 @@ +package entries + +import ( + "context" + "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/state-consumer/consumer" + "github.com/pkg/errors" + "github.com/uptrace/bun" +) + +// TODO: when to use nullzero vs use_zero? +type LockupYieldCurvePoint struct { + ProfilePKID string `bun:",nullzero"` + LockupDurationNanoSecs int64 + LockupYieldAPYBasisPoints uint64 + + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGLockupYieldCurvePoint struct { + bun.BaseModel `bun:"table:locked_balance_entry"` + LockupYieldCurvePoint +} + +// TODO: Do I need this? +type PGLockupYieldCurvePointUtxoOps struct { + bun.BaseModel `bun:"table:locked_balance_entry_utxo_ops"` + LockupYieldCurvePoint + UtxoOperation +} + +// Convert the LockupYieldCurvePoint DeSo encoder to the PGLockedBalnceEntry struct used by bun. +func LockupYieldCurvePointEncoderToPGStruct(lockupYieldCurvePoint *lib.LockupYieldCurvePoint, keyBytes []byte, params *lib.DeSoParams) LockupYieldCurvePoint { + pgLockupYieldCurvePoint := LockupYieldCurvePoint{ + BadgerKey: keyBytes, + } + + if lockupYieldCurvePoint.ProfilePKID != nil { + pgLockupYieldCurvePoint.ProfilePKID = consumer.PublicKeyBytesToBase58Check((*lockupYieldCurvePoint.ProfilePKID)[:], params) + } + + pgLockupYieldCurvePoint.LockupDurationNanoSecs = lockupYieldCurvePoint.LockupDurationNanoSecs + pgLockupYieldCurvePoint.LockupYieldAPYBasisPoints = lockupYieldCurvePoint.LockupYieldAPYBasisPoints + + return pgLockupYieldCurvePoint +} + +// LockupYieldCurvePointBatchOperation is the entry point for processing a batch of LockedBalance entries. +// It determines the appropriate handler based on the operation type and executes it. +func LockupYieldCurvePointBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { + // We check before we call this function that there is at least one operation type. + // We also ensure before this that all entries have the same operation type. + operationType := entries[0].OperationType + var err error + if operationType == lib.DbOperationTypeDelete { + err = bulkDeleteLockupYieldCurvePoint(entries, db, operationType) + } else { + err = bulkInsertLockupYieldCurvePoint(entries, db, operationType, params) + } + if err != nil { + return errors.Wrapf(err, "entries.LockupYieldCurvePointBatchOperation: Problem with operation type %v", operationType) + } + return nil +} + +// bulkInsertLockupYieldCurvePoint inserts a batch of locked stake entries into the database. +func bulkInsertLockupYieldCurvePoint(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + // Create a new array to hold the bun struct. + pgEntrySlice := make([]*PGLockupYieldCurvePoint, len(uniqueEntries)) + + // Loop through the entries and convert them to PGEntry. + for ii, entry := range uniqueEntries { + pgEntrySlice[ii] = &PGLockupYieldCurvePoint{LockupYieldCurvePoint: LockupYieldCurvePointEncoderToPGStruct(entry.Encoder.(*lib.LockupYieldCurvePoint), entry.KeyBytes, params)} + } + + // Execute the insert query. + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertLockupYieldCurvePoint: Error inserting entries") + } + return nil +} + +// bulkDeleteLockupYieldCurvePoint deletes a batch of locked stake entries from the database. +func bulkDeleteLockupYieldCurvePoint(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + // Transform the entries into a list of keys to delete. + keysToDelete := consumer.KeysToDelete(uniqueEntries) + + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGLockupYieldCurvePoint{}). + Where("badger_key IN (?)", bun.In(keysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteLockupYieldCurvePoint: Error deleting entries") + } + + return nil +} diff --git a/handler/data_handler.go b/handler/data_handler.go index b630db5..2e7c2b5 100644 --- a/handler/data_handler.go +++ b/handler/data_handler.go @@ -74,6 +74,18 @@ func (postgresDataHandler *PostgresDataHandler) HandleEntryBatch(batchedEntries err = entries.BlockBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) case lib.EncoderTypeTxn: err = entries.TransactionBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + case lib.EncoderTypeStakeEntry: + err = entries.StakeBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + case lib.EncoderTypeValidatorEntry: + err = entries.ValidatorBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + case lib.EncoderTypeLockedStakeEntry: + err = entries.LockedStakeBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + case lib.EncoderTypeLockedBalanceEntry: + err = entries.LockedBalanceEntryBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + case lib.EncoderTypeLockupYieldCurvePoint: + err = entries.LockupYieldCurvePointBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + case lib.EncoderTypeEpochEntry: + err = entries.EpochEntryBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) } if err != nil { diff --git a/migrations/initial_migrations/20231213000000_create_locked_stake_entry_table.go b/migrations/initial_migrations/20231213000000_create_locked_stake_entry_table.go new file mode 100644 index 0000000..4d13124 --- /dev/null +++ b/migrations/initial_migrations/20231213000000_create_locked_stake_entry_table.go @@ -0,0 +1,41 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +func createLockedStakeEntryTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + staker_pkid VARCHAR NOT NULL, + validator_pkid VARCHAR NOT NULL, + locked_amount_nanos NUMERIC(78, 0) NOT NULL, + locked_at_epoch_number BIGINT NOT NULL, + + extra_data JSONB, + badger_key BYTEA PRIMARY KEY + ); + CREATE INDEX {tableName}_validator_pkid_idx ON {tableName} (validator_pkid); + CREATE INDEX {tableName}_staker_pkid_idx ON {tableName} (staker_pkid); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createLockedStakeEntryTable(db, "locked_stake_entry") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS locked_stake_entry; + `) + if err != nil { + return err + } + return nil + }) +} diff --git a/migrations/initial_migrations/20231213000000_create_stake_entry_table.go b/migrations/initial_migrations/20231213000000_create_stake_entry_table.go new file mode 100644 index 0000000..ae37317 --- /dev/null +++ b/migrations/initial_migrations/20231213000000_create_stake_entry_table.go @@ -0,0 +1,41 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +func createStakeEntryTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + staker_pkid VARCHAR NOT NULL, + validator_pkid VARCHAR NOT NULL, + reward_method SMALLINT NOT NULL, + stake_amount_nanos NUMERIC(78, 0) NOT NULL, + + extra_data JSONB, + badger_key BYTEA PRIMARY KEY + ); + CREATE INDEX {tableName}_validator_pkid_idx ON {tableName} (validator_pkid); + CREATE INDEX {tableName}_staker_pkid_idx ON {tableName} (staker_pkid); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createStakeEntryTable(db, "stake_entry") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS stake_entry; + `) + if err != nil { + return err + } + return nil + }) +} diff --git a/migrations/initial_migrations/20231213000000_create_validator_entry_table.go b/migrations/initial_migrations/20231213000000_create_validator_entry_table.go new file mode 100644 index 0000000..2f34b57 --- /dev/null +++ b/migrations/initial_migrations/20231213000000_create_validator_entry_table.go @@ -0,0 +1,44 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +func createValidatorEntryTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + validator_pkid VARCHAR NOT NULL, + domains VARCHAR[], + disable_delegated_stake BOOLEAN, + delegated_stake_commission_basis_points BIGINT, + voting_public_key VARCHAR, + voting_authorization VARCHAR, + total_stake_amount_nanos NUMERIC(78, 0) NOT NULL, + last_active_at_epoch_number BIGINT, + jailed_at_epoch_number BIGINT, + extra_data JSONB, + badger_key BYTEA PRIMARY KEY + ); + CREATE INDEX {tableName}_validator_pkid_idx ON {tableName} (validator_pkid); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createValidatorEntryTable(db, "validator_entry") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS validator_entry; + `) + if err != nil { + return err + } + return nil + }) +} diff --git a/migrations/initial_migrations/20240129000000_create_epoch_entry_table.go b/migrations/initial_migrations/20240129000000_create_epoch_entry_table.go new file mode 100644 index 0000000..1dd2ed7 --- /dev/null +++ b/migrations/initial_migrations/20240129000000_create_epoch_entry_table.go @@ -0,0 +1,39 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +// TODO: indexes +func createEpochEntryTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + epoch_number BIGINT NOT NULL, + initial_block_height BIGINT NOT NULL, + initial_view BIGINT NOT NULL, + final_block_height BIGINT NOT NULL, + + badger_key BYTEA PRIMARY KEY + ); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createEpochEntryTable(db, "epoch") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS epoch; + `) + if err != nil { + return err + } + return nil + }) +} diff --git a/migrations/initial_migrations/20240129000001_create_locked_balance_entry_table.go b/migrations/initial_migrations/20240129000001_create_locked_balance_entry_table.go new file mode 100644 index 0000000..678f156 --- /dev/null +++ b/migrations/initial_migrations/20240129000001_create_locked_balance_entry_table.go @@ -0,0 +1,41 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +func createLockedBalanceEntryTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + hodler_pkid VARCHAR NOT NULL, + profile_pkid VARCHAR NOT NULL, + unlock_timestamp_nano_secs BIGINT NOT NULL, + vesting_end_timestamp_nano_secs BIGINT NOT NULL, + balance_base_units NUMERIC(78, 0) NOT NULL, + + badger_key BYTEA PRIMARY KEY + ); + CREATE INDEX {tableName}_hodler_pkid_idx ON {tableName} (hodler_pkid); + CREATE INDEX {tableName}_profile_pkid_idx ON {tableName} (profile_pkid); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createLockedBalanceEntryTable(db, "locked_balance_entry") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS locked_balance_entry; + `) + if err != nil { + return err + } + return nil + }) +} diff --git a/migrations/initial_migrations/20240129000002_create_lockup_yield_curve_point_table.go b/migrations/initial_migrations/20240129000002_create_lockup_yield_curve_point_table.go new file mode 100644 index 0000000..d957f6a --- /dev/null +++ b/migrations/initial_migrations/20240129000002_create_lockup_yield_curve_point_table.go @@ -0,0 +1,38 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +func createYieldCurvePointTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + profile_pkid VARCHAR NOT NULL, + lockup_duration_nano_secs BIGINT NOT NULL, + lockup_yield_api_basis_points BIGINT NOT NULL, + + badger_key BYTEA PRIMARY KEY + ); + CREATE INDEX {tableName}_profile_pkid_idx ON {tableName} (profile_pkid); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createYieldCurvePointTable(db, "yield_curve_point") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS yield_curve_point; + `) + if err != nil { + return err + } + return nil + }) +} From f6951c400f50bb30f53450df3fcc17b3003f6746 Mon Sep 17 00:00:00 2001 From: Jon Pollock Date: Tue, 6 Feb 2024 11:36:17 -0800 Subject: [PATCH 2/3] Finalize LockedBalanceEntry pg annotations and clean code. --- entries/lockup.go | 51 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/entries/lockup.go b/entries/lockup.go index 1531f70..4dad5dd 100644 --- a/entries/lockup.go +++ b/entries/lockup.go @@ -9,12 +9,11 @@ import ( "github.com/uptrace/bun/extra/bunbig" ) -// TODO: when to use nullzero vs use_zero? type LockedBalanceEntry struct { - HODLerPKID string `bun:",nullzero"` - ProfilePKID string `bun:",nullzero"` - UnlockTimestampNanoSecs int64 - VestingEndTimestampNanoSecs int64 + HODLerPKID string `pg:",use_zero"` + ProfilePKID string `pg:",use_zero"` + UnlockTimestampNanoSecs int64 `pg:",null_zero"` + VestingEndTimestampNanoSecs int64 `pg:",null_zero"` BalanceBaseUnits *bunbig.Int `pg:",use_zero"` BadgerKey []byte `pg:",pk,use_zero"` @@ -25,7 +24,6 @@ type PGLockedBalanceEntry struct { LockedBalanceEntry } -// TODO: Do I need this? type PGLockedBalanceEntryUtxoOps struct { bun.BaseModel `bun:"table:locked_balance_entry_utxo_ops"` LockedBalanceEntry @@ -33,17 +31,23 @@ type PGLockedBalanceEntryUtxoOps struct { } // Convert the LockedBalanceEntry DeSo encoder to the PGLockedBalnceEntry struct used by bun. -func LockedBalanceEntryEncoderToPGStruct(lockedBalanceEntry *lib.LockedBalanceEntry, keyBytes []byte, params *lib.DeSoParams) LockedBalanceEntry { +func LockedBalanceEntryEncoderToPGStruct( + lockedBalanceEntry *lib.LockedBalanceEntry, + keyBytes []byte, + params *lib.DeSoParams, +) LockedBalanceEntry { pgLockedBalanceEntry := LockedBalanceEntry{ BadgerKey: keyBytes, } if lockedBalanceEntry.HODLerPKID != nil { - pgLockedBalanceEntry.HODLerPKID = consumer.PublicKeyBytesToBase58Check((*lockedBalanceEntry.HODLerPKID)[:], params) + pgLockedBalanceEntry.HODLerPKID = + consumer.PublicKeyBytesToBase58Check((*lockedBalanceEntry.HODLerPKID)[:], params) } if lockedBalanceEntry.ProfilePKID != nil { - pgLockedBalanceEntry.ProfilePKID = consumer.PublicKeyBytesToBase58Check((*lockedBalanceEntry.ProfilePKID)[:], params) + pgLockedBalanceEntry.ProfilePKID = + consumer.PublicKeyBytesToBase58Check((*lockedBalanceEntry.ProfilePKID)[:], params) } pgLockedBalanceEntry.UnlockTimestampNanoSecs = lockedBalanceEntry.UnlockTimestampNanoSecs @@ -55,7 +59,11 @@ func LockedBalanceEntryEncoderToPGStruct(lockedBalanceEntry *lib.LockedBalanceEn // LockedBalanceEntryBatchOperation is the entry point for processing a batch of LockedBalance entries. // It determines the appropriate handler based on the operation type and executes it. -func LockedBalanceEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func LockedBalanceEntryBatchOperation( + entries []*lib.StateChangeEntry, + db *bun.DB, + params *lib.DeSoParams, +) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -66,13 +74,19 @@ func LockedBalanceEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.D err = bulkInsertLockedBalanceEntry(entries, db, operationType, params) } if err != nil { - return errors.Wrapf(err, "entries.LockedBalanceEntryBatchOperation: Problem with operation type %v", operationType) + return errors.Wrapf(err, + "entries.LockedBalanceEntryBatchOperation: Problem with operation type %v", operationType) } return nil } // bulkInsertLockedBalanceEntry inserts a batch of locked stake entries into the database. -func bulkInsertLockedBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertLockedBalanceEntry( + entries []*lib.StateChangeEntry, + db *bun.DB, + operationType lib.StateSyncerOperationType, + params *lib.DeSoParams, +) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -80,7 +94,12 @@ func bulkInsertLockedBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, o // Loop through the entries and convert them to PGEntry. for ii, entry := range uniqueEntries { - pgEntrySlice[ii] = &PGLockedBalanceEntry{LockedBalanceEntry: LockedBalanceEntryEncoderToPGStruct(entry.Encoder.(*lib.LockedBalanceEntry), entry.KeyBytes, params)} + pgEntrySlice[ii] = &PGLockedBalanceEntry{ + LockedBalanceEntry: LockedBalanceEntryEncoderToPGStruct( + entry.Encoder.(*lib.LockedBalanceEntry), + entry.KeyBytes, + params), + } } // Execute the insert query. @@ -97,7 +116,11 @@ func bulkInsertLockedBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, o } // bulkDeleteLockedBalanceEntry deletes a batch of locked stake entries from the database. -func bulkDeleteLockedBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteLockedBalanceEntry( + entries []*lib.StateChangeEntry, + db *bun.DB, + operationType lib.StateSyncerOperationType, +) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) From 864be1763ecf10cd3da1b0f5682a86aec5444b23 Mon Sep 17 00:00:00 2001 From: Jon Pollock Date: Tue, 6 Feb 2024 12:45:03 -0800 Subject: [PATCH 3/3] Revise yield curve point entry, remove UtxoOp structs. --- entries/lockup.go | 6 ----- entries/yield_curve_point.go | 51 +++++++++++++++++++++++------------- 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/entries/lockup.go b/entries/lockup.go index 4dad5dd..858b8a8 100644 --- a/entries/lockup.go +++ b/entries/lockup.go @@ -24,12 +24,6 @@ type PGLockedBalanceEntry struct { LockedBalanceEntry } -type PGLockedBalanceEntryUtxoOps struct { - bun.BaseModel `bun:"table:locked_balance_entry_utxo_ops"` - LockedBalanceEntry - UtxoOperation -} - // Convert the LockedBalanceEntry DeSo encoder to the PGLockedBalnceEntry struct used by bun. func LockedBalanceEntryEncoderToPGStruct( lockedBalanceEntry *lib.LockedBalanceEntry, diff --git a/entries/yield_curve_point.go b/entries/yield_curve_point.go index bd81314..f6dcc8e 100644 --- a/entries/yield_curve_point.go +++ b/entries/yield_curve_point.go @@ -8,35 +8,32 @@ import ( "github.com/uptrace/bun" ) -// TODO: when to use nullzero vs use_zero? type LockupYieldCurvePoint struct { - ProfilePKID string `bun:",nullzero"` - LockupDurationNanoSecs int64 - LockupYieldAPYBasisPoints uint64 + ProfilePKID string `pg:",null_zero"` + LockupDurationNanoSecs int64 `pg:",null_zero"` + LockupYieldAPYBasisPoints uint64 `pg:",null_zero"` BadgerKey []byte `pg:",pk,use_zero"` } type PGLockupYieldCurvePoint struct { - bun.BaseModel `bun:"table:locked_balance_entry"` + bun.BaseModel `bun:"table:lockup_yield_curve_points"` LockupYieldCurvePoint } -// TODO: Do I need this? -type PGLockupYieldCurvePointUtxoOps struct { - bun.BaseModel `bun:"table:locked_balance_entry_utxo_ops"` - LockupYieldCurvePoint - UtxoOperation -} - // Convert the LockupYieldCurvePoint DeSo encoder to the PGLockedBalnceEntry struct used by bun. -func LockupYieldCurvePointEncoderToPGStruct(lockupYieldCurvePoint *lib.LockupYieldCurvePoint, keyBytes []byte, params *lib.DeSoParams) LockupYieldCurvePoint { +func LockupYieldCurvePointEncoderToPGStruct( + lockupYieldCurvePoint *lib.LockupYieldCurvePoint, + keyBytes []byte, + params *lib.DeSoParams, +) LockupYieldCurvePoint { pgLockupYieldCurvePoint := LockupYieldCurvePoint{ BadgerKey: keyBytes, } if lockupYieldCurvePoint.ProfilePKID != nil { - pgLockupYieldCurvePoint.ProfilePKID = consumer.PublicKeyBytesToBase58Check((*lockupYieldCurvePoint.ProfilePKID)[:], params) + pgLockupYieldCurvePoint.ProfilePKID = + consumer.PublicKeyBytesToBase58Check((*lockupYieldCurvePoint.ProfilePKID)[:], params) } pgLockupYieldCurvePoint.LockupDurationNanoSecs = lockupYieldCurvePoint.LockupDurationNanoSecs @@ -47,7 +44,11 @@ func LockupYieldCurvePointEncoderToPGStruct(lockupYieldCurvePoint *lib.LockupYie // LockupYieldCurvePointBatchOperation is the entry point for processing a batch of LockedBalance entries. // It determines the appropriate handler based on the operation type and executes it. -func LockupYieldCurvePointBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func LockupYieldCurvePointBatchOperation( + entries []*lib.StateChangeEntry, + db *bun.DB, + params *lib.DeSoParams, +) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -64,7 +65,12 @@ func LockupYieldCurvePointBatchOperation(entries []*lib.StateChangeEntry, db *bu } // bulkInsertLockupYieldCurvePoint inserts a batch of locked stake entries into the database. -func bulkInsertLockupYieldCurvePoint(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertLockupYieldCurvePoint( + entries []*lib.StateChangeEntry, + db *bun.DB, + operationType lib.StateSyncerOperationType, + params *lib.DeSoParams, +) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -72,7 +78,12 @@ func bulkInsertLockupYieldCurvePoint(entries []*lib.StateChangeEntry, db *bun.DB // Loop through the entries and convert them to PGEntry. for ii, entry := range uniqueEntries { - pgEntrySlice[ii] = &PGLockupYieldCurvePoint{LockupYieldCurvePoint: LockupYieldCurvePointEncoderToPGStruct(entry.Encoder.(*lib.LockupYieldCurvePoint), entry.KeyBytes, params)} + pgEntrySlice[ii] = &PGLockupYieldCurvePoint{ + LockupYieldCurvePoint: LockupYieldCurvePointEncoderToPGStruct( + entry.Encoder.(*lib.LockupYieldCurvePoint), + entry.KeyBytes, + params), + } } // Execute the insert query. @@ -89,7 +100,11 @@ func bulkInsertLockupYieldCurvePoint(entries []*lib.StateChangeEntry, db *bun.DB } // bulkDeleteLockupYieldCurvePoint deletes a batch of locked stake entries from the database. -func bulkDeleteLockupYieldCurvePoint(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteLockupYieldCurvePoint( + entries []*lib.StateChangeEntry, + db *bun.DB, + operationType lib.StateSyncerOperationType, +) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries)