Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2ae2782
support subbed dbs
superzordon Oct 9, 2024
73176ac
Fix create sub
superzordon Oct 11, 2024
42022e4
Add cached entries entity to reduce number of inserts from db
superzordon Oct 12, 2024
8ad9de5
Move location of cached entries
superzordon Oct 12, 2024
6da6f88
Empty commit to trigger build
superzordon Oct 12, 2024
f8101e5
Empty commit to trigger build
superzordon Oct 12, 2024
9fba700
Updates to cached entries
superzordon Oct 12, 2024
f50f93f
Add views for explorer
superzordon Oct 14, 2024
f4a5c7f
Implement hashicorp LRU cache
superzordon Oct 14, 2024
9458be9
Fix references
superzordon Oct 16, 2024
a2b87de
Merge pull request #82 from deso-protocol/z/cache-entries
superzordon Oct 16, 2024
d289550
Merge branch 'z/cache-and-pubsub' into z/support-subscribed-dbs
superzordon Oct 16, 2024
b7d884c
Merge pull request #84 from deso-protocol/z/support-subscribed-dbs
superzordon Oct 17, 2024
ca69e07
Update profile entry migration
superzordon Oct 18, 2024
58f5a9f
Updates
superzordon Oct 20, 2024
0d6b878
Ln/merge main into z cache and pubsub (#86)
lazynina Oct 21, 2024
bd5b3f4
Update primary key of utxo operation
superzordon Oct 22, 2024
2ec2a3d
Update utxo op table, update entries
superzordon Oct 22, 2024
4049441
Fix reset db
superzordon Oct 22, 2024
26549c7
Update subscription logic
superzordon Oct 23, 2024
5d1a02e
Fix pdh
superzordon Oct 23, 2024
9217dec
Update refresh subscription
superzordon Oct 23, 2024
84c9ea0
Fix replication slot reuse
superzordon Oct 23, 2024
acdc682
Reuse replication slot
superzordon Oct 23, 2024
11e901a
Run post sync migrations immediately for subscription db
superzordon Oct 23, 2024
8b501aa
Reset order of reset db
superzordon Oct 23, 2024
af39388
Update sync sub routine
superzordon Oct 23, 2024
74259b5
Updates to pdh transactions
superzordon Oct 23, 2024
8973a65
Run migrations on sub sync
superzordon Oct 24, 2024
5dffcad
Additional migration
superzordon Oct 25, 2024
f9f2a82
Fix drop schema function
superzordon Nov 4, 2024
53016c9
Fix string interpolation
superzordon Nov 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions entries/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db bun.IDB, operation
for _, entry := range uniqueBlocks {
block := entry.Encoder.(*lib.MsgDeSoBlock)
blockEntry, blockSigners := BlockEncoderToPGStruct(block, entry.KeyBytes, params)

pgBlockEntrySlice = append(pgBlockEntrySlice, blockEntry)
pgBlockSignersEntrySlice = append(pgBlockSignersEntrySlice, blockSigners...)
for jj, transaction := range block.Txns {
Expand Down
25 changes: 20 additions & 5 deletions entries/bls_pkid_pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/pkg/errors"
"github.com/uptrace/bun"
)
Expand Down Expand Up @@ -77,15 +78,15 @@ func BLSPublicKeyPKIDPairSnapshotEncoderToPGStruct(

// BLSPublicKeyPKIDPairBatchOperation is the entry point for processing a batch of BLSPublicKeyPKIDPair entries.
// It determines the appropriate handler based on the operation type and executes it.
func BLSPublicKeyPKIDPairBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
func BLSPublicKeyPKIDPairBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeleteBLSPkidPairEntry(entries, db, operationType)
err = bulkDeleteBLSPkidPairEntry(entries, db, operationType, cachedEntries)
} else {
err = bulkInsertBLSPkidPairEntry(entries, db, operationType, params)
err = bulkInsertBLSPkidPairEntry(entries, db, operationType, params, cachedEntries)
}
if err != nil {
return errors.Wrapf(err, "entries.StakeBatchOperation: Problem with operation type %v", operationType)
Expand All @@ -95,10 +96,14 @@ func BLSPublicKeyPKIDPairBatchOperation(entries []*lib.StateChangeEntry, db bun.

// bulkInsertBLSPkidPairEntry inserts a batch of stake entries into the database.
func bulkInsertBLSPkidPairEntry(
entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams,
entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte],
) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

// Filter out any entries that are already tracked in the cache.
uniqueEntries = consumer.FilterCachedEntries(uniqueEntries, cachedEntries)

uniqueBLSPkidPairEntries := consumer.FilterEntriesByPrefix(
uniqueEntries, lib.Prefixes.PrefixValidatorBLSPublicKeyPKIDPairEntry)
uniqueBLSPkidPairSnapshotEntries := consumer.FilterEntriesByPrefix(
Expand Down Expand Up @@ -145,11 +150,16 @@ func bulkInsertBLSPkidPairEntry(
}
}

// Update the cache with the new entries.
for _, entry := range uniqueEntries {
cachedEntries.Add(string(entry.KeyBytes), entry.EncoderBytes)
}

return nil
}

// bulkDeleteBLSPkidPairEntry deletes a batch of stake entries from the database.
func bulkDeleteBLSPkidPairEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
func bulkDeleteBLSPkidPairEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, cachedEntries *lru.Cache[string, []byte]) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

Expand Down Expand Up @@ -182,5 +192,10 @@ func bulkDeleteBLSPkidPairEntry(entries []*lib.StateChangeEntry, db bun.IDB, ope
}
}

// Remove the deleted entries from the cache.
for _, key := range keysToDelete {
cachedEntries.Remove(string(key))
}

return nil
}
7 changes: 4 additions & 3 deletions entries/jailed_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/pkg/errors"
"github.com/uptrace/bun"
)
Expand Down Expand Up @@ -39,15 +40,15 @@ func UnjailValidatorStateChangeMetadataEncoderToPGStruct(

// ValidatorBatchOperation is the entry point for processing a batch of Validator entries.
// It determines the appropriate handler based on the operation type and executes it.
func JailedHistoryEventBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
func JailedHistoryEventBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeleteValidatorEntry(entries, db, operationType)
err = bulkDeleteValidatorEntry(entries, db, operationType, cachedEntries)
} else {
err = bulkInsertValidatorEntry(entries, db, operationType, params)
err = bulkInsertValidatorEntry(entries, db, operationType, params, cachedEntries)
}
if err != nil {
return errors.Wrapf(err, "entries.ValidatorBatchOperation: Problem with operation type %v", operationType)
Expand Down
25 changes: 20 additions & 5 deletions entries/pkid.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
"github.com/golang/glog"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/pkg/errors"
"github.com/uptrace/bun"
)
Expand Down Expand Up @@ -127,15 +128,15 @@ func bulkDeletePkidEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationT
return nil
}

func PkidBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
func PkidBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeletePkid(entries, db, operationType)
err = bulkDeletePkid(entries, db, operationType, cachedEntries)
} else {
err = bulkInsertPkid(entries, db, operationType, params)
err = bulkInsertPkid(entries, db, operationType, params, cachedEntries)
}
if err != nil {
return errors.Wrapf(err, "entries.PostBatchOperation: Problem with operation type %v", operationType)
Expand All @@ -144,12 +145,16 @@ func PkidBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib
}

// bulkInsertPkid inserts a batch of PKIDs into the database.
func bulkInsertPkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
func bulkInsertPkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

uniqueLeaderScheduleEntries := consumer.FilterEntriesByPrefix(
uniqueEntries, lib.Prefixes.PrefixSnapshotLeaderSchedule)

// Filter out any entries that are already in the cache.
uniqueLeaderScheduleEntries = consumer.FilterCachedEntries(uniqueLeaderScheduleEntries, cachedEntries)

// NOTE: if we need to support parsing other indexes for PKIDs beyond LeaderSchedule,
// we will need to filter the uniqueEntries by the appropriate prefix and then convert
// the entries to the appropriate PG struct.
Expand Down Expand Up @@ -178,11 +183,16 @@ func bulkInsertPkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType l
}
}

// Update the cached entries with the new entries.
for _, entry := range uniqueLeaderScheduleEntries {
cachedEntries.Add(string(entry.KeyBytes), entry.EncoderBytes)
}

return nil
}

// bulkDeletePKID deletes a batch of PKIDs from the database.
func bulkDeletePkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
func bulkDeletePkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, cachedEntries *lru.Cache[string, []byte]) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

Expand All @@ -201,5 +211,10 @@ func bulkDeletePkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType l
}
}

// Remove the entries from the cache.
for _, entry := range uniqueEntries {
cachedEntries.Remove(string(entry.KeyBytes))
}

return nil
}
27 changes: 22 additions & 5 deletions entries/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/pkg/errors"
"github.com/uptrace/bun"
"github.com/uptrace/bun/extra/bunbig"
Expand Down Expand Up @@ -98,15 +99,15 @@ func ValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []b

// ValidatorBatchOperation is the entry point for processing a batch of Validator entries.
// It determines the appropriate handler based on the operation type and executes it.
func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeleteValidatorEntry(entries, db, operationType)
err = bulkDeleteValidatorEntry(entries, db, operationType, cachedEntries)
} else {
err = bulkInsertValidatorEntry(entries, db, operationType, params)
err = bulkInsertValidatorEntry(entries, db, operationType, params, cachedEntries)
}
if err != nil {
return errors.Wrapf(err, "entries.ValidatorBatchOperation: Problem with operation type %v", operationType)
Expand All @@ -115,9 +116,13 @@ func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params
}

// bulkInsertValidatorEntry inserts a batch of validator entries into the database.
func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

// Filter out any entries that are already tracked in the cache.
uniqueEntries = consumer.FilterCachedEntries(uniqueEntries, cachedEntries)

uniqueValidatorEntries := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixValidatorByPKID)
uniqueSnapshotValidatorEntries := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixSnapshotValidatorSetByPKID)
// Create a new array to hold the bun struct.
Expand Down Expand Up @@ -156,11 +161,17 @@ func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, opera
return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting snapshot validator entries")
}
}

// Add any new entries to the cache.
for _, entry := range uniqueEntries {
cachedEntries.Add(string(entry.KeyBytes), entry.EncoderBytes)
}

return nil
}

// bulkDeleteValidatorEntry deletes a batch of validator entries from the database.
func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, cachedEntries *lru.Cache[string, []byte]) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
uniqueKeys := consumer.KeysToDelete(uniqueEntries)
Expand All @@ -182,6 +193,12 @@ func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, opera
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteValidatorEntry: Error deleting entries")
}

// Delete cached validator entries.
for _, key := range validatorKeysToDelete {
keyStr := string(key)
cachedEntries.Remove(keyStr)
}
}

// Execute the delete query for snapshot validator entries.
Expand Down
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/deso-protocol/state-consumer v1.0.3
github.com/golang/glog v1.2.2
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/pkg/errors v0.9.1
github.com/spf13/viper v1.19.0
github.com/uptrace/bun v1.2.3
Expand Down Expand Up @@ -58,7 +59,7 @@ require (
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
github.com/btcsuite/btcd/btcutil v1.1.6 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
github.com/btcsuite/btclog v0.0.0-20241003133417-09c4e92e319c // indirect
github.com/btcsuite/btclog v0.0.0-20241017175713-3428138b75c7 // indirect
github.com/bwesterb/go-ristretto v1.2.3 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand All @@ -79,7 +80,7 @@ require (
github.com/eapache/queue/v2 v2.0.0-20230407133247-75960ed334e4 // indirect
github.com/ebitengine/purego v0.8.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/envoyproxy/go-control-plane v0.13.0 // indirect
github.com/envoyproxy/go-control-plane v0.13.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
github.com/ethereum/go-ethereum v1.14.11 // indirect
github.com/fatih/color v1.17.0 // indirect
Expand Down Expand Up @@ -131,7 +132,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/nyaruka/phonenumbers v1.4.0 // indirect
github.com/nyaruka/phonenumbers v1.4.1 // indirect
github.com/oleiade/lane v1.0.1 // indirect
github.com/onflow/crypto v0.25.2 // indirect
github.com/outcaste-io/ristretto v0.2.3 // indirect
Expand All @@ -142,7 +143,7 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect
github.com/richardartoul/molecule v1.0.1-0.20240531184615-7ca0df43c0b3 // indirect
github.com/robinjoseph08/go-pg-migrations/v3 v3.0.0 // indirect
github.com/robinjoseph08/go-pg-migrations/v3 v3.1.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/sagikazarmark/locafero v0.6.0 // indirect
Expand Down Expand Up @@ -204,7 +205,7 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/grpc/stats/opentelemetry v0.0.0-20241016173057-569c8eb0af32 // indirect
google.golang.org/grpc/stats/opentelemetry v0.0.0-20241017163036-56df169480cd // indirect
google.golang.org/protobuf v1.35.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
Loading