Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions entries/epoch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package entries

import (
"context"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
"github.com/pkg/errors"
"github.com/uptrace/bun"
)

// TODO: when to use nullzero vs use_zero?
type EpochEntry struct {
EpochNumber uint64
InitialBlockHeight uint64
InitialView uint64
FinalBlockHeight uint64
CreatedAtBlockTimestampNanoSecs uint64

BadgerKey []byte `pg:",pk,use_zero"`
}

type PGEpochEntry struct {
bun.BaseModel `bun:"table:epoch_entry"`
EpochEntry
}

// TODO: Do I need this?
type PGEpochUtxoOps struct {
bun.BaseModel `bun:"table:epoch_entry_utxo_ops"`
EpochEntry
UtxoOperation
}

// Convert the EpochEntry DeSo encoder to the PGEpochEntry struct used by bun.
func EpochEntryEncoderToPGStruct(epochEntry *lib.EpochEntry, keyBytes []byte, params *lib.DeSoParams) EpochEntry {
return EpochEntry{
EpochNumber: epochEntry.EpochNumber,
InitialBlockHeight: epochEntry.InitialBlockHeight,
InitialView: epochEntry.InitialView,
FinalBlockHeight: epochEntry.FinalBlockHeight,
BadgerKey: keyBytes,
}
}

// EpochEntryBatchOperation is the entry point for processing a batch of Epoch entries.
// It determines the appropriate handler based on the operation type and executes it.
func EpochEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeleteEpochEntry(entries, db, operationType)
} else {
err = bulkInsertEpochEntry(entries, db, operationType, params)
}
if err != nil {
return errors.Wrapf(err, "entries.EpochEntryBatchOperation: Problem with operation type %v", operationType)
}
return nil
}

// bulkInsertEpochEntry inserts a batch of locked stake entries into the database.
func bulkInsertEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
pgEntrySlice := make([]*PGEpochEntry, len(uniqueEntries))

// Loop through the entries and convert them to PGEntry.
for ii, entry := range uniqueEntries {
pgEntrySlice[ii] = &PGEpochEntry{EpochEntry: EpochEntryEncoderToPGStruct(entry.Encoder.(*lib.EpochEntry), entry.KeyBytes, params)}
}

// Execute the insert query.
query := db.NewInsert().Model(&pgEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertEpochEntry: Error inserting entries")
}
return nil
}

// bulkDeleteEpochEntry deletes a batch of locked stake entries from the database.
func bulkDeleteEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

// Transform the entries into a list of keys to delete.
keysToDelete := consumer.KeysToDelete(uniqueEntries)

// Execute the delete query.
if _, err := db.NewDelete().
Model(&PGEpochEntry{}).
Where("badger_key IN (?)", bun.In(keysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteEpochEntry: Error deleting entries")
}

return nil
}
117 changes: 117 additions & 0 deletions entries/locked_stake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package entries

import (
"context"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
"github.com/pkg/errors"
"github.com/uptrace/bun"
"github.com/uptrace/bun/extra/bunbig"
)

// TODO: when to use nullzero vs use_zero?
type LockedStakeEntry struct {
StakerPKID string `bun:",nullzero"`
ValidatorPKID string `bun:",nullzero"`
LockedAmountNanos *bunbig.Int `pg:",use_zero"`
LockedAtEpochNumber uint64

ExtraData map[string]string `bun:"type:jsonb"`
BadgerKey []byte `pg:",pk,use_zero"`
}

type PGLockedStakeEntry struct {
bun.BaseModel `bun:"table:locked_stake_entry"`
LockedStakeEntry
}

// TODO: Do I need this?
type PGLockedStakeEntryUtxoOps struct {
bun.BaseModel `bun:"table:locked_stake_entry_utxo_ops"`
LockedStakeEntry
UtxoOperation
}

// Convert the LockedStakeEntry DeSo encoder to the PGLockedStakeEntry struct used by bun.
func LockedStakeEncoderToPGStruct(lockedStakeEntry *lib.LockedStakeEntry, keyBytes []byte, params *lib.DeSoParams) LockedStakeEntry {
pgLockedStakeEntry := LockedStakeEntry{
ExtraData: consumer.ExtraDataBytesToString(lockedStakeEntry.ExtraData),
BadgerKey: keyBytes,
}

if lockedStakeEntry.StakerPKID != nil {
pgLockedStakeEntry.StakerPKID = consumer.PublicKeyBytesToBase58Check((*lockedStakeEntry.StakerPKID)[:], params)
}

if lockedStakeEntry.ValidatorPKID != nil {
pgLockedStakeEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*lockedStakeEntry.ValidatorPKID)[:], params)
}

pgLockedStakeEntry.LockedAtEpochNumber = lockedStakeEntry.LockedAtEpochNumber
pgLockedStakeEntry.LockedAmountNanos = bunbig.FromMathBig(lockedStakeEntry.LockedAmountNanos.ToBig())

return pgLockedStakeEntry
}

// LockedStakeBatchOperation is the entry point for processing a batch of LockedStake entries.
// It determines the appropriate handler based on the operation type and executes it.
func LockedStakeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeleteLockedStakeEntry(entries, db, operationType)
} else {
err = bulkInsertLockedStakeEntry(entries, db, operationType, params)
}
if err != nil {
return errors.Wrapf(err, "entries.LockedStakeBatchOperation: Problem with operation type %v", operationType)
}
return nil
}

// bulkInsertLockedStakeEntry inserts a batch of locked stake entries into the database.
func bulkInsertLockedStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
pgEntrySlice := make([]*PGLockedStakeEntry, len(uniqueEntries))

// Loop through the entries and convert them to PGEntry.
for ii, entry := range uniqueEntries {
pgEntrySlice[ii] = &PGLockedStakeEntry{LockedStakeEntry: LockedStakeEncoderToPGStruct(entry.Encoder.(*lib.LockedStakeEntry), entry.KeyBytes, params)}
}

// Execute the insert query.
query := db.NewInsert().Model(&pgEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertLockedStakeEntry: Error inserting entries")
}
return nil
}

// bulkDeleteLockedStakeEntry deletes a batch of locked stake entries from the database.
func bulkDeleteLockedStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

// Transform the entries into a list of keys to delete.
keysToDelete := consumer.KeysToDelete(uniqueEntries)

// Execute the delete query.
if _, err := db.NewDelete().
Model(&PGLockedStakeEntry{}).
Where("badger_key IN (?)", bun.In(keysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteLockedStakeEntry: Error deleting entries")
}

return nil
}
134 changes: 134 additions & 0 deletions entries/lockup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package entries

import (
"context"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
"github.com/pkg/errors"
"github.com/uptrace/bun"
"github.com/uptrace/bun/extra/bunbig"
)

type LockedBalanceEntry struct {
HODLerPKID string `pg:",use_zero"`
ProfilePKID string `pg:",use_zero"`
UnlockTimestampNanoSecs int64 `pg:",null_zero"`
VestingEndTimestampNanoSecs int64 `pg:",null_zero"`
BalanceBaseUnits *bunbig.Int `pg:",use_zero"`

BadgerKey []byte `pg:",pk,use_zero"`
}

type PGLockedBalanceEntry struct {
bun.BaseModel `bun:"table:locked_balance_entry"`
LockedBalanceEntry
}

// Convert the LockedBalanceEntry DeSo encoder to the PGLockedBalnceEntry struct used by bun.
func LockedBalanceEntryEncoderToPGStruct(
lockedBalanceEntry *lib.LockedBalanceEntry,
keyBytes []byte,
params *lib.DeSoParams,
) LockedBalanceEntry {
pgLockedBalanceEntry := LockedBalanceEntry{
BadgerKey: keyBytes,
}

if lockedBalanceEntry.HODLerPKID != nil {
pgLockedBalanceEntry.HODLerPKID =
consumer.PublicKeyBytesToBase58Check((*lockedBalanceEntry.HODLerPKID)[:], params)
}

if lockedBalanceEntry.ProfilePKID != nil {
pgLockedBalanceEntry.ProfilePKID =
consumer.PublicKeyBytesToBase58Check((*lockedBalanceEntry.ProfilePKID)[:], params)
}

pgLockedBalanceEntry.UnlockTimestampNanoSecs = lockedBalanceEntry.UnlockTimestampNanoSecs
pgLockedBalanceEntry.VestingEndTimestampNanoSecs = lockedBalanceEntry.VestingEndTimestampNanoSecs
pgLockedBalanceEntry.BalanceBaseUnits = bunbig.FromMathBig(lockedBalanceEntry.BalanceBaseUnits.ToBig())

return pgLockedBalanceEntry
}

// LockedBalanceEntryBatchOperation is the entry point for processing a batch of LockedBalance entries.
// It determines the appropriate handler based on the operation type and executes it.
func LockedBalanceEntryBatchOperation(
entries []*lib.StateChangeEntry,
db *bun.DB,
params *lib.DeSoParams,
) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeleteLockedBalanceEntry(entries, db, operationType)
} else {
err = bulkInsertLockedBalanceEntry(entries, db, operationType, params)
}
if err != nil {
return errors.Wrapf(err,
"entries.LockedBalanceEntryBatchOperation: Problem with operation type %v", operationType)
}
return nil
}

// bulkInsertLockedBalanceEntry inserts a batch of locked stake entries into the database.
func bulkInsertLockedBalanceEntry(
entries []*lib.StateChangeEntry,
db *bun.DB,
operationType lib.StateSyncerOperationType,
params *lib.DeSoParams,
) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
pgEntrySlice := make([]*PGLockedBalanceEntry, len(uniqueEntries))

// Loop through the entries and convert them to PGEntry.
for ii, entry := range uniqueEntries {
pgEntrySlice[ii] = &PGLockedBalanceEntry{
LockedBalanceEntry: LockedBalanceEntryEncoderToPGStruct(
entry.Encoder.(*lib.LockedBalanceEntry),
entry.KeyBytes,
params),
}
}

// Execute the insert query.
query := db.NewInsert().Model(&pgEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertLockedBalanceEntry: Error inserting entries")
}
return nil
}

// bulkDeleteLockedBalanceEntry deletes a batch of locked stake entries from the database.
func bulkDeleteLockedBalanceEntry(
entries []*lib.StateChangeEntry,
db *bun.DB,
operationType lib.StateSyncerOperationType,
) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

// Transform the entries into a list of keys to delete.
keysToDelete := consumer.KeysToDelete(uniqueEntries)

// Execute the delete query.
if _, err := db.NewDelete().
Model(&PGLockedBalanceEntry{}).
Where("badger_key IN (?)", bun.In(keysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteLockedBalanceEntry: Error deleting entries")
}

return nil
}
Loading