diff --git a/entries/epoch.go b/entries/epoch.go index f7faae3..16aca29 100644 --- a/entries/epoch.go +++ b/entries/epoch.go @@ -14,9 +14,8 @@ type EpochEntry struct { InitialBlockHeight uint64 InitialView uint64 FinalBlockHeight uint64 - CreatedAtBlockTimestampNanoSecs uint64 - - BadgerKey []byte `pg:",pk,use_zero"` + CreatedAtBlockTimestampNanoSecs int64 + SnapshotAtEpochNumber uint64 } type PGEpochEntry struct { @@ -33,12 +32,19 @@ type PGEpochUtxoOps struct { // Convert the EpochEntry DeSo encoder to the PGEpochEntry struct used by bun. func EpochEntryEncoderToPGStruct(epochEntry *lib.EpochEntry, keyBytes []byte, params *lib.DeSoParams) EpochEntry { + + var snapshotAtEpochNumber uint64 + // Epochs use data snapshotted from two epochs ago. Epochs 0 and 1 use data from epoch 0. + if epochEntry.EpochNumber >= 2 { + snapshotAtEpochNumber = epochEntry.EpochNumber - 2 + } return EpochEntry{ - EpochNumber: epochEntry.EpochNumber, - InitialBlockHeight: epochEntry.InitialBlockHeight, - InitialView: epochEntry.InitialView, - FinalBlockHeight: epochEntry.FinalBlockHeight, - BadgerKey: keyBytes, + EpochNumber: epochEntry.EpochNumber, + InitialBlockHeight: epochEntry.InitialBlockHeight, + InitialView: epochEntry.InitialView, + FinalBlockHeight: epochEntry.FinalBlockHeight, + CreatedAtBlockTimestampNanoSecs: epochEntry.CreatedAtBlockTimestampNanoSecs, + SnapshotAtEpochNumber: snapshotAtEpochNumber, } } @@ -49,8 +55,11 @@ func EpochEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, param // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType var err error + // Core only tracks the current epoch entry and never deletes them. + // In order to track all historical epoch entries, we don't use the badger + // key to uniquely identify them, but rather the epoch number. if operationType == lib.DbOperationTypeDelete { - err = bulkDeleteEpochEntry(entries, db, operationType) + return errors.Wrapf(err, "entries.EpochEntryBatchOperation: Delete operation type not supported") } else { err = bulkInsertEpochEntry(entries, db, operationType, params) } @@ -76,7 +85,7 @@ func bulkInsertEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation query := db.NewInsert().Model(&pgEntrySlice) if operationType == lib.DbOperationTypeUpsert { - query = query.On("CONFLICT (badger_key) DO UPDATE") + query = query.On("CONFLICT (epoch_number) DO UPDATE") } if _, err := query.Returning("").Exec(context.Background()); err != nil { @@ -84,23 +93,3 @@ func bulkInsertEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation } return nil } - -// bulkDeleteEpochEntry deletes a batch of locked stake entries from the database. -func bulkDeleteEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { - // Track the unique entries we've inserted so we don't insert the same entry twice. - uniqueEntries := consumer.UniqueEntries(entries) - - // Transform the entries into a list of keys to delete. - keysToDelete := consumer.KeysToDelete(uniqueEntries) - - // Execute the delete query. - if _, err := db.NewDelete(). - Model(&PGEpochEntry{}). - Where("badger_key IN (?)", bun.In(keysToDelete)). - Returning(""). - Exec(context.Background()); err != nil { - return errors.Wrapf(err, "entries.bulkDeleteEpochEntry: Error deleting entries") - } - - return nil -} diff --git a/entries/pkid.go b/entries/pkid.go index a085aa2..42c1847 100644 --- a/entries/pkid.go +++ b/entries/pkid.go @@ -4,6 +4,7 @@ import ( "context" "github.com/deso-protocol/core/lib" "github.com/deso-protocol/state-consumer/consumer" + "github.com/golang/glog" "github.com/pkg/errors" "github.com/uptrace/bun" ) @@ -25,8 +26,20 @@ type PGPkidEntryUtxoOps struct { UtxoOperation } +type LeaderScheduleEntry struct { + SnapshotAtEpochNumber uint64 `pg:",use_zero"` + LeaderIndex uint16 `pg:",use_zero"` + ValidatorPKID string `pg:",use_zero"` + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGLeaderScheduleEntry struct { + bun.BaseModel `bun:"table:leader_schedule_entry"` + LeaderScheduleEntry +} + // Convert the Diamond DeSo encoder to the PG struct used by bun. -func PkidEncoderToPGStruct(pkidEntry *lib.PKIDEntry, keyBytes []byte, params *lib.DeSoParams) PkidEntry { +func PkidEntryEncoderToPGStruct(pkidEntry *lib.PKIDEntry, keyBytes []byte, params *lib.DeSoParams) PkidEntry { return PkidEntry{ Pkid: consumer.PublicKeyBytesToBase58Check(pkidEntry.PKID[:], params), PublicKey: consumer.PublicKeyBytesToBase58Check(pkidEntry.PublicKey[:], params), @@ -34,9 +47,27 @@ func PkidEncoderToPGStruct(pkidEntry *lib.PKIDEntry, keyBytes []byte, params *li } } +// Convert the leader schedule entry to the PG struct used by bun. +func LeaderScheduleEncoderToPGStruct(validatorPKID *lib.PKID, keyBytes []byte, params *lib.DeSoParams, +) *LeaderScheduleEntry { + prefixRemovedKeyBytes := keyBytes[1:] + if len(prefixRemovedKeyBytes) != 10 { + glog.Errorf("LeaderScheduleEncoderToPGStruct: Invalid key length: %d", len(prefixRemovedKeyBytes)) + return nil + } + epochNumber := lib.DecodeUint64(prefixRemovedKeyBytes[:8]) + leaderIndex := lib.DecodeUint16(prefixRemovedKeyBytes[8:10]) + return &LeaderScheduleEntry{ + ValidatorPKID: consumer.PublicKeyBytesToBase58Check(validatorPKID[:], params), + SnapshotAtEpochNumber: epochNumber, + LeaderIndex: leaderIndex, + BadgerKey: keyBytes, + } +} + // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func PkidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func PkidEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -61,7 +92,7 @@ func bulkInsertPkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT // Loop through the entries and convert them to PGPostEntry. for ii, entry := range uniqueEntries { - pgEntrySlice[ii] = &PGPkidEntry{PkidEntry: PkidEncoderToPGStruct(entry.Encoder.(*lib.PKIDEntry), entry.KeyBytes, params)} + pgEntrySlice[ii] = &PGPkidEntry{PkidEntry: PkidEntryEncoderToPGStruct(entry.Encoder.(*lib.PKIDEntry), entry.KeyBytes, params)} } query := db.NewInsert().Model(&pgEntrySlice) @@ -95,3 +126,75 @@ func bulkDeletePkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT return nil } + +func PkidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { + // We check before we call this function that there is at least one operation type. + // We also ensure before this that all entries have the same operation type. + operationType := entries[0].OperationType + var err error + if operationType == lib.DbOperationTypeDelete { + err = bulkDeletePkid(entries, db, operationType) + } else { + err = bulkInsertPkid(entries, db, operationType, params) + } + if err != nil { + return errors.Wrapf(err, "entries.PostBatchOperation: Problem with operation type %v", operationType) + } + return nil +} + +// bulkInsertPkid inserts a batch of PKIDs into the database. +func bulkInsertPkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + uniqueLeaderScheduleEntries := consumer.FilterEntriesByPrefix( + uniqueEntries, lib.Prefixes.PrefixSnapshotLeaderSchedule) + // NOTE: if we need to support parsing other indexes for PKIDs beyond LeaderSchedule, + // we will need to filter the uniqueEntries by the appropriate prefix and then convert + // the entries to the appropriate PG struct. + // Create a new array to hold the bun struct. + pgEntrySlice := make([]*PGLeaderScheduleEntry, len(uniqueLeaderScheduleEntries)) + + // Loop through the entries and convert them to PGPostEntry. + for ii, entry := range uniqueLeaderScheduleEntries { + leaderScheduleEntry := LeaderScheduleEncoderToPGStruct(entry.Encoder.(*lib.PKID), entry.KeyBytes, params) + if leaderScheduleEntry == nil { + glog.Errorf("bulkInsertPkid: Error converting LeaderScheduleEntry to PG struct") + continue + } + pgEntrySlice[ii] = &PGLeaderScheduleEntry{LeaderScheduleEntry: *leaderScheduleEntry} + } + + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertPkid: Error inserting entries") + } + return nil +} + +// bulkDeletePKID deletes a batch of PKIDs from the database. +func bulkDeletePkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + // Transform the entries into a list of keys to delete. + keysToDelete := consumer.KeysToDelete(uniqueEntries) + leaderSchedKeysToDelete := consumer.FilterKeysByPrefix(keysToDelete, lib.Prefixes.PrefixSnapshotLeaderSchedule) + + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&LeaderScheduleEntry{}). + Where("badger_key IN (?)", bun.In(leaderSchedKeysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeletePkid: Error deleting entries") + } + + return nil +} diff --git a/handler/data_handler.go b/handler/data_handler.go index 2e7c2b5..066ff5d 100644 --- a/handler/data_handler.go +++ b/handler/data_handler.go @@ -63,7 +63,7 @@ func (postgresDataHandler *PostgresDataHandler) HandleEntryBatch(batchedEntries case lib.EncoderTypePostAssociationEntry: err = entries.PostAssociationBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) case lib.EncoderTypePKIDEntry: - err = entries.PkidBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.PkidEntryBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) case lib.EncoderTypeDeSoBalanceEntry: err = entries.DesoBalanceBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) case lib.EncoderTypeDAOCoinLimitOrderEntry: @@ -86,6 +86,8 @@ func (postgresDataHandler *PostgresDataHandler) HandleEntryBatch(batchedEntries err = entries.LockupYieldCurvePointBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) case lib.EncoderTypeEpochEntry: err = entries.EpochEntryBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + case lib.EncoderTypePKID: + err = entries.PkidBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) } if err != nil { diff --git a/migrations/initial_migrations/20240129000003_create_epoch_entry_table.go b/migrations/initial_migrations/20240129000003_create_epoch_entry_table.go index 18cf82c..f3ba53d 100644 --- a/migrations/initial_migrations/20240129000003_create_epoch_entry_table.go +++ b/migrations/initial_migrations/20240129000003_create_epoch_entry_table.go @@ -12,14 +12,18 @@ import ( func createEpochEntryTable(db *bun.DB, tableName string) error { _, err := db.Exec(strings.Replace(` CREATE TABLE {tableName} ( - epoch_number BIGINT NOT NULL, + epoch_number BIGINT PRIMARY KEY NOT NULL, initial_block_height BIGINT NOT NULL, initial_view BIGINT NOT NULL, final_block_height BIGINT NOT NULL, created_at_block_timestamp_nano_secs BIGINT NOT NULL, - - badger_key BYTEA PRIMARY KEY + snapshot_at_epoch_number BIGINT NOT NULL ); + + CREATE INDEX {tableName}_epoch_number_idx ON {tableName} (epoch_number); + CREATE INDEX {tableName}_initial_block_height_idx ON {tableName} (initial_block_height); + CREATE INDEX {tableName}_final_block_height_idx ON {tableName} (final_block_height); + CREATE INDEX {tableName}_snapshot_at_epoch_number_idx ON {tableName} (snapshot_at_epoch_number); `, "{tableName}", tableName, -1)) // TODO: What other fields do we need indexed? return err diff --git a/migrations/initial_migrations/20240215000001_create_leader_schedule.go b/migrations/initial_migrations/20240215000001_create_leader_schedule.go new file mode 100644 index 0000000..d9bd6e7 --- /dev/null +++ b/migrations/initial_migrations/20240215000001_create_leader_schedule.go @@ -0,0 +1,37 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +func createLeaderScheduleTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + validator_pkid VARCHAR NOT NULL, + snapshot_at_epoch_number BIGINT NOT NULL, + leader_index INTEGER NOT NULL, + badger_key BYTEA PRIMARY KEY NOT NULL + ); + CREATE INDEX {tableName}_validator_pkid_idx ON {tableName} (validator_pkid); + CREATE INDEX {tableName}_snapshot_at_epoch_number_idx ON {tableName} (snapshot_at_epoch_number); + CREATE INDEX {tableName}_snapshot_at_epoch_number_leader_index_idx ON {tableName} (snapshot_at_epoch_number, leader_index); + `, "{tableName}", tableName, -1)) + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createLeaderScheduleTable(db, "leader_schedule_entry") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS leader_schedule_entry; + `) + if err != nil { + return err + } + return nil + }) +} diff --git a/migrations/post_sync_migrations/20240213000002_create_pos_fk_comments.go b/migrations/post_sync_migrations/20240213000002_create_pos_fk_comments.go index d2f0003..ef710d0 100644 --- a/migrations/post_sync_migrations/20240213000002_create_pos_fk_comments.go +++ b/migrations/post_sync_migrations/20240213000002_create_pos_fk_comments.go @@ -22,7 +22,6 @@ func init() { comment on column locked_stake_entry.badger_key is E'@omit'; comment on column yield_curve_point.badger_key is E'@omit'; comment on column locked_balance_entry.badger_key is E'@omit'; - comment on column epoch_entry.badger_key is E'@omit'; comment on table transaction_partition_34 is E'@omit'; comment on table transaction_partition_35 is E'@omit'; comment on table transaction_partition_36 is E'@omit'; diff --git a/migrations/post_sync_migrations/20240215000002_create_leader_schedule_fk_comments.go b/migrations/post_sync_migrations/20240215000002_create_leader_schedule_fk_comments.go new file mode 100644 index 0000000..eebde8b --- /dev/null +++ b/migrations/post_sync_migrations/20240215000002_create_leader_schedule_fk_comments.go @@ -0,0 +1,31 @@ +package post_sync_migrations + +import ( + "context" + + "github.com/uptrace/bun" +) + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + comment on table leader_schedule_entry is E'@foreignKey (validator_pkid) references account (pkid)|@foreignFieldName leaderScheduleEntries|@fieldName leaderAccount\n@foreignKey (validator_pkid) references validator_entry (validator_pkid)|@foreignFieldName leaderScheduleEntries|@fieldName validatorEntry\n@foreignKey (snapshot_at_epoch_number) references epoch_entry (snapshot_at_epoch_number)|@foreignFieldName leaderScheduleEntries|@fieldName epochEntryBySnapshot'; + comment on column leader_schedule_entry.badger_key is E'@omit'; + `) + if err != nil { + return err + } + + return nil + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + comment on table leader_schedule_entry is NULL; + comment on column leader_schedule_entry.badger_key is NULL; + `) + if err != nil { + return err + } + + return nil + }) +}