diff --git a/.env b/.env index 10c4cf6..73fbbfd 100644 --- a/.env +++ b/.env @@ -1,11 +1,11 @@ -STATE_CHANGE_DIR=/tmp/state-changes-test -CONSUMER_PROGRESS_DIR=/tmp/consumer-progress-test +STATE_CHANGE_DIR=/tmp/state-change-files/state-changes +CONSUMER_PROGRESS_DIR=/tmp/consumer-progress DB_HOST=localhost -DB_PORT=5432 +DB_PORT=5430 DB_USERNAME=postgres DB_PASSWORD=postgres READONLY_USER_PASSWORD=postgres -LOG_QUERIES=true +LOG_QUERIES=false THREAD_LIMIT=10 BATCH_BYTES=500000 -CALCULATE_EXPLORER_STATISTICS=true +CALCULATE_EXPLORER_STATISTICS=false diff --git a/Dockerfile b/Dockerfile index fa5c931..46d272c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM alpine:latest AS daodao +FROM alpine:latest AS builder RUN apk update RUN apk upgrade @@ -10,6 +10,8 @@ COPY postgres-data-handler/go.mod postgres-data-handler/ COPY postgres-data-handler/go.sum postgres-data-handler/ COPY core/go.mod core/ COPY core/go.sum core/ +COPY state-consumer/go.mod state-consumer/ +COPY state-consumer/go.sum state-consumer/ WORKDIR /postgres-data-handler/src/postgres-data-handler @@ -27,16 +29,22 @@ COPY core/cmd ../core/cmd COPY core/lib ../core/lib COPY core/migrate ../core/migrate +COPY state-consumer/consumer ../state-consumer/consumer + RUN go mod tidy +# Install Delve debugger, specifying the installation path explicitly +ENV GOPATH=/root/go +RUN go install github.com/go-delve/delve/cmd/dlv@latest + ## build postgres data handler backend RUN GOOS=linux go build -mod=mod -a -installsuffix cgo -o bin/postgres-data-handler main.go -# -## create tiny image -#FROM alpine:latest -## -#RUN apk add --update vips-dev -## -#COPY --from=daodao /daodao/src/daodao-backend/bin/daodao-backend /daodao/bin/daodao-backend -#ENTRYPOINT ["/daodao/bin/daodao-backend"] -ENTRYPOINT ["/postgres-data-handler/src/postgres-data-handler/bin/postgres-data-handler"] + +# Install runtime dependencies +RUN apk add --no-cache vips-dev + +# Expose the port Delve will listen on +EXPOSE 2345 + +# Set the entry point to start the application under Delve's control +ENTRYPOINT ["/root/go/bin/dlv", "--listen=:2345", "--headless=true", "--api-version=2", "--accept-multiclient", "exec", "/postgres-data-handler/src/postgres-data-handler/bin/postgres-data-handler"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..89e6719 --- /dev/null +++ b/Makefile @@ -0,0 +1,8 @@ +dev: + go run . + +dev-env: + docker compose -f local.docker-compose.yml build && docker compose -f local.docker-compose.yml up + +dev-env-down: + docker compose -f local.docker-compose.yml down --volumes \ No newline at end of file diff --git a/entries/access_group.go b/entries/access_group.go index fe43564..2051d2f 100644 --- a/entries/access_group.go +++ b/entries/access_group.go @@ -52,7 +52,7 @@ func AccessGroupEncoderToPGStruct(accessGroupEntry *lib.AccessGroupEntry, keyByt // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func AccessGroupBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func AccessGroupBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -69,7 +69,7 @@ func AccessGroupBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, para } // bulkInsertAccessGroupEntry inserts a batch of access_group entries into the database. -func bulkInsertAccessGroupEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertAccessGroupEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -94,7 +94,7 @@ func bulkInsertAccessGroupEntry(entries []*lib.StateChangeEntry, db *bun.DB, ope } // bulkDeletePostEntry deletes a batch of access_group entries from the database. -func bulkDeleteAccessGroupEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteAccessGroupEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/access_group_member.go b/entries/access_group_member.go index 78f8338..1722f14 100644 --- a/entries/access_group_member.go +++ b/entries/access_group_member.go @@ -57,7 +57,7 @@ func AccessGroupMemberEncoderToPGStruct(accessGroupMemberEntry *lib.AccessGroupM // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func AccessGroupMemberBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func AccessGroupMemberBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -74,7 +74,7 @@ func AccessGroupMemberBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB } // bulkInsertAccessGroupMemberEntry inserts a batch of access_group_member entries into the database. -func bulkInsertAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -99,7 +99,7 @@ func bulkInsertAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db *bun.D } // bulkDeletePostEntry deletes a batch of access_group_member entries from the database. -func bulkDeleteAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/balance.go b/entries/balance.go index 4f8c2a0..6c6de0a 100644 --- a/entries/balance.go +++ b/entries/balance.go @@ -46,7 +46,7 @@ func BalanceEntryEncoderToPGStruct(balanceEntry *lib.BalanceEntry, keyBytes []by // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func BalanceBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func BalanceBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -63,7 +63,7 @@ func BalanceBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params * } // bulkInsertBalanceEntry inserts a batch of balance entries into the database. -func bulkInsertBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -88,7 +88,7 @@ func bulkInsertBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operati } // bulkDeletePostEntry deletes a batch of balance entries from the database. -func bulkDeleteBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/block.go b/entries/block.go index 2b28d35..0fe34cf 100644 --- a/entries/block.go +++ b/entries/block.go @@ -45,7 +45,7 @@ func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte) *PGBlockEn // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func BlockBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func BlockBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -62,7 +62,7 @@ func BlockBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *li } // bulkInsertUtxoOperationsEntry inserts a batch of user_association entries into the database. -func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // If this block is a part of the initial sync, skip it - it will be handled by the utxo operations. if operationType == lib.DbOperationTypeInsert { return nil @@ -106,7 +106,7 @@ func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation } // bulkDeleteBlockEntry deletes a batch of block entries from the database. -func bulkDeleteBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteBlockEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/dao_coin_limit_order.go b/entries/dao_coin_limit_order.go index eb77740..5657b74 100644 --- a/entries/dao_coin_limit_order.go +++ b/entries/dao_coin_limit_order.go @@ -19,7 +19,7 @@ type DaoCoinLimitOrderEntry struct { OperationType uint8 `bun:",nullzero"` FillType uint8 `bun:",nullzero"` BlockHeight uint32 `bun:",nullzero"` - IsDaoCoinConst bool `bun:",nullzero"` + IsDaoCoinConst bool BadgerKey []byte `pg:",pk,use_zero"` } @@ -48,7 +48,7 @@ func DaoCoinLimitOrderEncoderToPGStruct(daoCoinLimitOrder *lib.DAOCoinLimitOrder // DaoCoinLimitOrderBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func DaoCoinLimitOrderBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func DaoCoinLimitOrderBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -65,7 +65,7 @@ func DaoCoinLimitOrderBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB } // bulkInsertDaoCoinLimitOrderEntry inserts a batch of post_association entries into the database. -func bulkInsertDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -89,7 +89,7 @@ func bulkInsertDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db *bun.D } // bulkDeletePostEntry deletes a batch of post_association entries from the database. -func bulkDeleteDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/derived_key.go b/entries/derived_key.go index b14a7f7..2af1204 100644 --- a/entries/derived_key.go +++ b/entries/derived_key.go @@ -56,7 +56,7 @@ func DerivedKeyEncoderToPGStruct(derivedKeyEntry *lib.DerivedKeyEntry, keyBytes // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func DerivedKeyBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func DerivedKeyBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -73,7 +73,7 @@ func DerivedKeyBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, param } // bulkInsertDerivedKeyEntry inserts a batch of derived_key entries into the database. -func bulkInsertDerivedKeyEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertDerivedKeyEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -101,7 +101,7 @@ func bulkInsertDerivedKeyEntry(entries []*lib.StateChangeEntry, db *bun.DB, oper } // bulkDeletePostEntry deletes a batch of derived_key entries from the database. -func bulkDeleteDerivedKeyEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteDerivedKeyEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/deso_balance.go b/entries/deso_balance.go index 3eebff2..2621a00 100644 --- a/entries/deso_balance.go +++ b/entries/deso_balance.go @@ -36,7 +36,7 @@ func DesoBalanceEncoderToPGStruct(desoBalanceEntry *lib.DeSoBalanceEntry, keyByt // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func DesoBalanceBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func DesoBalanceBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -53,7 +53,7 @@ func DesoBalanceBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, para } // bulkInsertDiamondEntry inserts a batch of diamond entries into the database. -func bulkInsertDesoBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertDesoBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -77,7 +77,7 @@ func bulkInsertDesoBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, ope } // bulkDeletePostEntry deletes a batch of diamond entries from the database. -func bulkDeleteDesoBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteDesoBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/diamond.go b/entries/diamond.go index dd07bf3..15ac3f8 100644 --- a/entries/diamond.go +++ b/entries/diamond.go @@ -41,7 +41,7 @@ func DiamondEncoderToPGStruct(diamondEntry *lib.DiamondEntry, keyBytes []byte, p // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func DiamondBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func DiamondBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -58,7 +58,7 @@ func DiamondBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params * } // bulkInsertDiamondEntry inserts a batch of diamond entries into the database. -func bulkInsertDiamondEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertDiamondEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -82,7 +82,7 @@ func bulkInsertDiamondEntry(entries []*lib.StateChangeEntry, db *bun.DB, operati } // bulkDeletePostEntry deletes a batch of diamond entries from the database. -func bulkDeleteDiamondEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteDiamondEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/follow.go b/entries/follow.go index e808d9b..6179353 100644 --- a/entries/follow.go +++ b/entries/follow.go @@ -36,7 +36,7 @@ func FollowEncoderToPGStruct(followEntry *lib.FollowEntry, keyBytes []byte, para // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func FollowBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func FollowBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -53,7 +53,7 @@ func FollowBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *l } // bulkInsertFollowEntry inserts a batch of follow entries into the database. -func bulkInsertFollowEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertFollowEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -78,7 +78,7 @@ func bulkInsertFollowEntry(entries []*lib.StateChangeEntry, db *bun.DB, operatio } // bulkDeletePostEntry deletes a batch of follow entries from the database. -func bulkDeleteFollowEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteFollowEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/helpers.go b/entries/helpers.go new file mode 100644 index 0000000..ea56c5f --- /dev/null +++ b/entries/helpers.go @@ -0,0 +1,44 @@ +package entries + +import ( + "fmt" + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/uptrace/bun" +) + +// GetDbHandle returns the correct interface to use for database operations. +// If a transaction is open, it returns the transaction handle, otherwise it returns the db handle. +func GetDbHandle(tx *bun.Tx, db *bun.DB) bun.IDB { + if tx != nil { + return tx + } + return db +} + +// CreateSavepoint creates a savepoint in the current transaction. If no transaction is open, it returns an empty string. +// The randomly generated savepoint name is returned if the savepoint is created successfully. +func CreateSavepoint(tx *bun.Tx) (string, error) { + if tx == nil { + return "", nil + } + savepointName := uuid.New().String() + + _, err := tx.Exec(fmt.Sprintf("SAVEPOINT %s", savepointName)) + if err != nil { + return "", errors.Wrapf(err, "PostgresDataHandler.CreateSavepoint: Error creating savepoint") + } + + return savepointName, nil +} + +func RollbackToSavepoint(tx *bun.Tx, savepointName string) error { + if tx == nil || savepointName == "" { + return nil + } + _, err := tx.Exec(fmt.Sprintf("ROLLBACK TO SAVEPOINT %s", savepointName)) + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.RollbackToSavepoint: Error reverting to savepoint") + } + return nil +} diff --git a/entries/like.go b/entries/like.go index 8c6d890..18ba75f 100644 --- a/entries/like.go +++ b/entries/like.go @@ -37,7 +37,7 @@ func LikeEncoderToPGStruct(likeEntry *lib.LikeEntry, keyBytes []byte, params *li // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func LikeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func LikeBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -54,7 +54,7 @@ func LikeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib } // bulkInsertLikeEntry inserts a batch of like entries into the database. -func bulkInsertLikeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertLikeEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -79,7 +79,7 @@ func bulkInsertLikeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT } // bulkDeletePostEntry deletes a batch of like entries from the database. -func bulkDeleteLikeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteLikeEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/message.go b/entries/message.go index 10ebf77..1a29d2a 100644 --- a/entries/message.go +++ b/entries/message.go @@ -55,7 +55,7 @@ func MessageEncoderToPGStruct(messageEntry *lib.MessageEntry, keyBytes []byte, p // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func MessageBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func MessageBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -72,7 +72,7 @@ func MessageBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params * } // bulkInsertMessageEntry inserts a batch of message entries into the database. -func bulkInsertMessageEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertMessageEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -96,7 +96,7 @@ func bulkInsertMessageEntry(entries []*lib.StateChangeEntry, db *bun.DB, operati } // bulkDeletePostEntry deletes a batch of message entries from the database. -func bulkDeleteMessageEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteMessageEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/new_message.go b/entries/new_message.go index eecba19..96a856c 100644 --- a/entries/new_message.go +++ b/entries/new_message.go @@ -3,6 +3,7 @@ package entries import ( "bytes" "context" + "encoding/hex" "github.com/deso-protocol/core/lib" "github.com/deso-protocol/state-consumer/consumer" "github.com/pkg/errors" @@ -11,14 +12,14 @@ import ( ) type NewMessageEntry struct { - SenderAccessGroupOwnerPublicKey string `bun:",nullzero"` - SenderAccessGroupKeyName string `bun:",nullzero"` - SenderAccessGroupPublicKey string `bun:",nullzero"` - RecipientAccessGroupOwnerPublicKey string `bun:",nullzero"` - RecipientAccessGroupKeyName string `bun:",nullzero"` - RecipientAccessGroupPublicKey string `bun:",nullzero"` - EncryptedText string `pg:",use_zero"` - IsGroupChatMessage bool `bun:",nullzero"` + SenderAccessGroupOwnerPublicKey string `bun:",nullzero"` + SenderAccessGroupKeyName string `bun:",nullzero"` + SenderAccessGroupPublicKey string `bun:",nullzero"` + RecipientAccessGroupOwnerPublicKey string `bun:",nullzero"` + RecipientAccessGroupKeyName string `bun:",nullzero"` + RecipientAccessGroupPublicKey string `bun:",nullzero"` + EncryptedText string `pg:",use_zero"` + IsGroupChatMessage bool Timestamp time.Time `pg:",use_zero"` ExtraData map[string]string `bun:"type:jsonb"` @@ -45,7 +46,7 @@ func NewMessageEncoderToPGStruct(newMessageEntry *lib.NewMessageEntry, keyBytes } pgNewMessageEntry := NewMessageEntry{ - EncryptedText: string(newMessageEntry.EncryptedText[:]), + EncryptedText: hex.EncodeToString(newMessageEntry.EncryptedText[:]), Timestamp: consumer.UnixNanoToTime(newMessageEntry.TimestampNanos), ExtraData: consumer.ExtraDataBytesToString(newMessageEntry.ExtraData), IsGroupChatMessage: isGroupChatMessage, @@ -81,7 +82,7 @@ func NewMessageEncoderToPGStruct(newMessageEntry *lib.NewMessageEntry, keyBytes // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func NewMessageBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func NewMessageBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -98,7 +99,7 @@ func NewMessageBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, param } // bulkInsertNewMessageEntry inserts a batch of new_message entries into the database. -func bulkInsertNewMessageEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertNewMessageEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -123,7 +124,7 @@ func bulkInsertNewMessageEntry(entries []*lib.StateChangeEntry, db *bun.DB, oper } // bulkDeletePostEntry deletes a batch of new_message entries from the database. -func bulkDeleteNewMessageEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteNewMessageEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/nft.go b/entries/nft.go index 02647f2..59a68c8 100644 --- a/entries/nft.go +++ b/entries/nft.go @@ -62,7 +62,7 @@ func NftEncoderToPGStruct(nftEntry *lib.NFTEntry, keyBytes []byte, params *lib.D // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func NftBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func NftBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -79,7 +79,7 @@ func NftBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib. } // bulkInsertNftEntry inserts a batch of nft entries into the database. -func bulkInsertNftEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertNftEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -104,7 +104,7 @@ func bulkInsertNftEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationTy } // bulkDeletePostEntry deletes a batch of nft entries from the database. -func bulkDeleteNftEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteNftEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/nft_bid.go b/entries/nft_bid.go index 9ea5fef..9648305 100644 --- a/entries/nft_bid.go +++ b/entries/nft_bid.go @@ -47,7 +47,7 @@ func NftBidEncoderToPGStruct(nftBidEntry *lib.NFTBidEntry, keyBytes []byte, para // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func NftBidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func NftBidBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -64,7 +64,7 @@ func NftBidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *l } // bulkInsertNftBidEntry inserts a batch of nft_bid entries into the database. -func bulkInsertNftBidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertNftBidEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -88,7 +88,7 @@ func bulkInsertNftBidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operatio } // bulkDeletePostEntry deletes a batch of nft_bid entries from the database. -func bulkDeleteNftBidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteNftBidEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/pkid.go b/entries/pkid.go index a085aa2..8b664cd 100644 --- a/entries/pkid.go +++ b/entries/pkid.go @@ -36,7 +36,7 @@ func PkidEncoderToPGStruct(pkidEntry *lib.PKIDEntry, keyBytes []byte, params *li // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func PkidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func PkidBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -53,7 +53,7 @@ func PkidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib } // bulkInsertDiamondEntry inserts a batch of diamond entries into the database. -func bulkInsertPkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertPkidEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -77,7 +77,7 @@ func bulkInsertPkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT } // bulkDeletePostEntry deletes a batch of diamond entries from the database. -func bulkDeletePkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeletePkidEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/post.go b/entries/post.go index e64a816..51a6e56 100644 --- a/entries/post.go +++ b/entries/post.go @@ -60,7 +60,7 @@ func PostEntryEncoderToPGStruct(postEntry *lib.PostEntry, keyBytes []byte, param IsNFT: postEntry.IsNFT, NumNFTCopies: postEntry.NumNFTCopies, NumNFTCopiesForSale: postEntry.NumNFTCopiesForSale, - NumNFTCopiesBurned: postEntry.NumNFTCopiesBurned, + NumNFTCopiesBurned: postEntry.NumNFTCopiesBurned, HasUnlockable: postEntry.HasUnlockable, NFTRoyaltyToCreatorBasisPoints: postEntry.NFTRoyaltyToCreatorBasisPoints, NFTRoyaltyToCoinBasisPoints: postEntry.NFTRoyaltyToCoinBasisPoints, @@ -90,7 +90,7 @@ func PostEntryEncoderToPGStruct(postEntry *lib.PostEntry, keyBytes []byte, param // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func PostBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func PostBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -107,7 +107,7 @@ func PostBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib } // bulkInsertPostEntry inserts a batch of post entries into the database. -func bulkInsertPostEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertPostEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -135,7 +135,7 @@ func bulkInsertPostEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT } // bulkDeletePostEntry deletes a batch of post entries from the database. -func bulkDeletePostEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeletePostEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/post_association.go b/entries/post_association.go index d6882fd..bd5bae2 100644 --- a/entries/post_association.go +++ b/entries/post_association.go @@ -38,6 +38,7 @@ func PostAssociationEncoderToPGStruct(postAssociationEntry *lib.PostAssociationE pgEntry := PostAssociationEntry{ AssociationType: string(postAssociationEntry.AssociationType[:]), AssociationValue: string(postAssociationEntry.AssociationValue[:]), + BlockHeight: postAssociationEntry.BlockHeight, ExtraData: consumer.ExtraDataBytesToString(postAssociationEntry.ExtraData), BadgerKey: keyBytes, } @@ -60,7 +61,7 @@ func PostAssociationEncoderToPGStruct(postAssociationEntry *lib.PostAssociationE // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func PostAssociationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func PostAssociationBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -77,7 +78,7 @@ func PostAssociationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, } // bulkInsertPostAssociationEntry inserts a batch of post_association entries into the database. -func bulkInsertPostAssociationEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertPostAssociationEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -101,7 +102,7 @@ func bulkInsertPostAssociationEntry(entries []*lib.StateChangeEntry, db *bun.DB, } // bulkDeletePostEntry deletes a batch of post_association entries from the database. -func bulkDeletePostAssociationEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeletePostAssociationEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/profile.go b/entries/profile.go index 436f84f..a8b2024 100644 --- a/entries/profile.go +++ b/entries/profile.go @@ -60,7 +60,7 @@ func ProfileEntryEncoderToPGStruct(profileEntry *lib.ProfileEntry, keyBytes []by // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func ProfileBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func ProfileBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -77,7 +77,7 @@ func ProfileBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params * } // bulkInsertPostEntry inserts a batch of post entries into the database. -func bulkInsertProfileEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertProfileEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -100,7 +100,7 @@ func bulkInsertProfileEntry(entries []*lib.StateChangeEntry, db *bun.DB, operati } // bulkDeletePostEntry deletes a batch of profile entries from the database. -func bulkDeleteProfileEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteProfileEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/transaction.go b/entries/transaction.go index c5fe312..59b75c0 100644 --- a/entries/transaction.go +++ b/entries/transaction.go @@ -103,7 +103,7 @@ func TransactionEncoderToPGStruct(transaction *lib.MsgDeSoTxn, blockIndex uint64 // TransactionBatchOperation is the entry point for processing a batch of transaction entries. It determines the appropriate handler // based on the operation type and executes it. -func TransactionBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func TransactionBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -136,7 +136,7 @@ func transformTransactionEntry(entries []*lib.StateChangeEntry, params *lib.DeSo return pgTransactionEntrySlice, nil } -func bulkInsertTransactionEntry(entries []*PGTransactionEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkInsertTransactionEntry(entries []*PGTransactionEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Bulk insert the entries. transactionQuery := db.NewInsert().Model(&entries) @@ -151,7 +151,7 @@ func bulkInsertTransactionEntry(entries []*PGTransactionEntry, db *bun.DB, opera } // transformAndBulkInsertTransactionEntry inserts a batch of user_association entries into the database. -func transformAndBulkInsertTransactionEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func transformAndBulkInsertTransactionEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { pgTransactionEntrySlice, err := transformTransactionEntry(entries, params) if err != nil { return errors.Wrapf(err, "entries.transformAndBulkInsertTransactionEntry: Problem transforming transaction entries") @@ -166,7 +166,7 @@ func transformAndBulkInsertTransactionEntry(entries []*lib.StateChangeEntry, db } // bulkDeleteTransactionEntry deletes a batch of transaction entries from the database. -func bulkDeleteTransactionEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteTransactionEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/user_association.go b/entries/user_association.go index 6f02be2..029399d 100644 --- a/entries/user_association.go +++ b/entries/user_association.go @@ -38,6 +38,7 @@ func UserAssociationEncoderToPGStruct(userAssociationEntry *lib.UserAssociationE pgEntry := UserAssociationEntry{ AssociationType: string(userAssociationEntry.AssociationType[:]), AssociationValue: string(userAssociationEntry.AssociationValue[:]), + BlockHeight: userAssociationEntry.BlockHeight, ExtraData: consumer.ExtraDataBytesToString(userAssociationEntry.ExtraData), BadgerKey: keyBytes, } @@ -60,7 +61,7 @@ func UserAssociationEncoderToPGStruct(userAssociationEntry *lib.UserAssociationE // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func UserAssociationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func UserAssociationBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -77,7 +78,7 @@ func UserAssociationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, } // bulkInsertUserAssociationEntry inserts a batch of user_association entries into the database. -func bulkInsertUserAssociationEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertUserAssociationEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -101,7 +102,7 @@ func bulkInsertUserAssociationEntry(entries []*lib.StateChangeEntry, db *bun.DB, } // bulkDeletePostEntry deletes a batch of user_association entries from the database. -func bulkDeleteUserAssociationEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteUserAssociationEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/utxo_operation.go b/entries/utxo_operation.go index 6234e84..78b3497 100644 --- a/entries/utxo_operation.go +++ b/entries/utxo_operation.go @@ -66,7 +66,7 @@ func ConvertUtxoOperationKeyToBlockHashHex(keyBytes []byte) string { // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func UtxoOperationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func UtxoOperationBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -83,7 +83,7 @@ func UtxoOperationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, pa } // bulkInsertUtxoOperationsEntry inserts a batch of utxo operation entries into the database. -func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) @@ -93,10 +93,6 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, affectedPublicKeys := make([]*PGAffectedPublicKeyEntry, 0) blockEntries := make([]*PGBlockEntry, 0) - // Start timer to track how long it takes to insert the entries. - start := time.Now() - - fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserting %v entries\n", len(uniqueEntries)) transactionCount := 0 // Whether we are inserting transactions for the first time, or just updating them. @@ -220,9 +216,6 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, } // Print how long it took to insert the entries. } - fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Processed %v txns in %v s\n", transactionCount, time.Since(start)) - - start = time.Now() if len(transactionUpdates) > 0 { @@ -260,10 +253,6 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, } } - fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Updated %v txns in %v s\n", len(transactionUpdates), time.Since(start)) - - start = time.Now() - // Insert affected public keys into db if len(affectedPublicKeys) > 0 { _, err := db.NewInsert().Model(&affectedPublicKeys).On("CONFLICT (public_key, transaction_hash, metadata) DO UPDATE").Exec(context.Background()) @@ -272,12 +261,11 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, } } - fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserted %v affected public keys in %v s\n", len(affectedPublicKeys), time.Since(start)) return nil } // bulkDeletePostEntry deletes a batch of utxo_operation entries from the database. -func bulkDeleteUtxoOperationEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteUtxoOperationEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/go.mod b/go.mod index 56f0715..051a1b9 100644 --- a/go.mod +++ b/go.mod @@ -1,14 +1,17 @@ -module PostgresDataHandler +module github.com/deso-protocol/postgres-data-handler go 1.18 replace github.com/deso-protocol/core => ../core/ +replace github.com/deso-protocol/state-consumer => ../state-consumer/ + require ( - github.com/deso-protocol/backend v1.2.10-0.20230727205436-dba653dc043c + github.com/deso-protocol/backend v1.2.10-0.20240301035823-448ec0478e01 github.com/deso-protocol/core v1.2.10-0.20230314161821-4069c3e417d3 - github.com/deso-protocol/state-consumer v1.0.4-0.20240117002702-0f75e8691905 + github.com/deso-protocol/state-consumer v0.0.0-00010101000000-000000000000 github.com/golang/glog v1.0.0 + github.com/google/uuid v1.2.0 github.com/pkg/errors v0.9.1 github.com/spf13/viper v1.7.1 github.com/uptrace/bun v1.1.14 @@ -56,7 +59,6 @@ require ( github.com/golang/snappy v0.0.3 // indirect github.com/google/flatbuffers v2.0.0+incompatible // indirect github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5 // indirect - github.com/google/uuid v1.2.0 // indirect github.com/googleapis/gax-go/v2 v2.0.5 // indirect github.com/gorilla/mux v1.8.0 // indirect github.com/h2non/bimg v1.1.5 // indirect diff --git a/go.sum b/go.sum index 458a324..05b189c 100644 --- a/go.sum +++ b/go.sum @@ -143,18 +143,12 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeC github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= github.com/decred/dcrd/lru v1.1.1 h1:kWFDaW0OWx6AD6Ki342c+JPmHbiVdE6rK81pT3fuo/Y= github.com/decred/dcrd/lru v1.1.1/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= -github.com/deso-protocol/backend v1.2.10-0.20230727205436-dba653dc043c h1:OK+OYVDLhaP0wUqOpWO0Q11ijc3kwnkEpi4EqqdV9m8= -github.com/deso-protocol/backend v1.2.10-0.20230727205436-dba653dc043c/go.mod h1:BnxywYGwEBoGuJRVfq6QIJKfRNRhydeFm9EQ1/ibUak= +github.com/deso-protocol/backend v1.2.10-0.20240301035823-448ec0478e01 h1:p+UY2EWLXhc2R/zKO18Tn/JMxkcl1wDAZU0u+urX3b8= +github.com/deso-protocol/backend v1.2.10-0.20240301035823-448ec0478e01/go.mod h1:BnxywYGwEBoGuJRVfq6QIJKfRNRhydeFm9EQ1/ibUak= github.com/deso-protocol/go-deadlock v1.0.0 h1:mw0pHy/19zgC+JFBStuQt1+1Ehv5OKA5NxXqecnL5ic= github.com/deso-protocol/go-deadlock v1.0.0/go.mod h1:K0Wd2OV2x7ck7SMYDraWerpKjFKUeBqaFcwz21tmkb8= github.com/deso-protocol/go-merkle-tree v1.0.0 h1:9zkI5dQsITYy77s4kbTGPQmZnhQ+LsH/kRdL5l/Yzvg= github.com/deso-protocol/go-merkle-tree v1.0.0/go.mod h1:V/vbg/maaNv6G7zf9VVs645nLFx/jsO2L/awFB/S/ZU= -github.com/deso-protocol/state-consumer v1.0.4-0.20230915153810-81be6a7ec25f h1:MMqFppip/KN14upXJ8rkq8CCvz3PghkHqj5TfCYaXUo= -github.com/deso-protocol/state-consumer v1.0.4-0.20230915153810-81be6a7ec25f/go.mod h1:ivi9/WBRWK/AG/cgAcGpA6GdodBAaEWh9p8PfQT3r5I= -github.com/deso-protocol/state-consumer v1.0.4-0.20240107220224-c9c924b0d69e h1:MuYxzO6stsmbJADFz2433O/9DaDy95Dks+x4Hfd85qc= -github.com/deso-protocol/state-consumer v1.0.4-0.20240107220224-c9c924b0d69e/go.mod h1:ivi9/WBRWK/AG/cgAcGpA6GdodBAaEWh9p8PfQT3r5I= -github.com/deso-protocol/state-consumer v1.0.4-0.20240117002702-0f75e8691905 h1:CntCnCV7LiH/CjQvicwRVR5iOcaIlFBffiTDwtT4BEs= -github.com/deso-protocol/state-consumer v1.0.4-0.20240117002702-0f75e8691905/go.mod h1:ivi9/WBRWK/AG/cgAcGpA6GdodBAaEWh9p8PfQT3r5I= github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg= github.com/dgraph-io/badger/v3 v3.2103.5/go.mod h1:4MPiseMeDQ3FNCYwRbbcBOGJLf5jsE0PPFzRiKjtcdw= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= diff --git a/handler/data_handler.go b/handler/data_handler.go index b630db5..d740125 100644 --- a/handler/data_handler.go +++ b/handler/data_handler.go @@ -1,10 +1,14 @@ package handler import ( - "PostgresDataHandler/entries" - "PostgresDataHandler/migrations/post_sync_migrations" + "context" + "crypto/rand" + "database/sql" + "encoding/hex" "fmt" "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/postgres-data-handler/entries" + "github.com/deso-protocol/postgres-data-handler/migrations/post_sync_migrations" "github.com/deso-protocol/state-consumer/consumer" "github.com/pkg/errors" "github.com/uptrace/bun" @@ -15,6 +19,8 @@ import ( type PostgresDataHandler struct { // A Postgres DB used for the storage of chain data. DB *bun.DB + // A bun transaction used for executing multiple operations in a single transaction. + Txn *bun.Tx // Params is a struct containing the current blockchain parameters. // It is used to determine which prefix to use for public keys. Params *lib.DeSoParams @@ -31,54 +37,74 @@ func (postgresDataHandler *PostgresDataHandler) HandleEntryBatch(batchedEntries var err error + // Get the correct db handle. + dbHandle := postgresDataHandler.GetDbHandle() + // Create a savepoint in the current transaction, if the transaction exists. + savepointName, err := postgresDataHandler.CreateSavepoint() + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.HandleEntryBatch: Error creating savepoint") + } + switch encoderType { case lib.EncoderTypePostEntry: - err = entries.PostBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.PostBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeProfileEntry: - err = entries.ProfileBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.ProfileBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeLikeEntry: - err = entries.LikeBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.LikeBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeDiamondEntry: - err = entries.DiamondBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.DiamondBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeFollowEntry: - err = entries.FollowBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.FollowBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeMessageEntry: - err = entries.MessageBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.MessageBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeBalanceEntry: - err = entries.BalanceBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.BalanceBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeNFTEntry: - err = entries.NftBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.NftBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeNFTBidEntry: - err = entries.NftBidBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.NftBidBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeDerivedKeyEntry: - err = entries.DerivedKeyBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.DerivedKeyBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeAccessGroupEntry: - err = entries.AccessGroupBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.AccessGroupBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeAccessGroupMemberEntry: - err = entries.AccessGroupMemberBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.AccessGroupMemberBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeNewMessageEntry: - err = entries.NewMessageBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.NewMessageBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeUserAssociationEntry: - err = entries.UserAssociationBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.UserAssociationBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypePostAssociationEntry: - err = entries.PostAssociationBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.PostAssociationBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypePKIDEntry: - err = entries.PkidBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.PkidBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeDeSoBalanceEntry: - err = entries.DesoBalanceBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.DesoBalanceBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeDAOCoinLimitOrderEntry: - err = entries.DaoCoinLimitOrderBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.DaoCoinLimitOrderBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeUtxoOperationBundle: - err = entries.UtxoOperationBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.UtxoOperationBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeBlock: - err = entries.BlockBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.BlockBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeTxn: - err = entries.TransactionBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.TransactionBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) } if err != nil { + // If an error occurs, revert to the savepoint and return the error. + rollbackErr := postgresDataHandler.RevertToSavepoint(savepointName) + if rollbackErr != nil { + return errors.Wrapf(rollbackErr, "PostgresDataHandler.HandleEntryBatch: Error reverting to savepoint") + } return errors.Wrapf(err, "PostgresDataHandler.CallBatchOperationForEncoderType") } + + // Release the savepoint. + err = postgresDataHandler.ReleaseSavepoint(savepointName) + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.HandleEntryBatch: Error releasing savepoint") + } + return nil } @@ -102,3 +128,107 @@ func (postgresDataHandler *PostgresDataHandler) HandleSyncEvent(syncEvent consum return nil } + +func (postgresDataHandler *PostgresDataHandler) InitiateTransaction() error { + fmt.Printf("Initiating Txn\n") + // If a transaction is already open, rollback the current transaction. + if postgresDataHandler.Txn != nil { + err := postgresDataHandler.Txn.Rollback() + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.InitiateTransaction: Error rolling back current transaction") + } + } + tx, err := postgresDataHandler.DB.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.InitiateTransaction: Error beginning transaction") + } + postgresDataHandler.Txn = &tx + return nil +} + +func (postgresDataHandler *PostgresDataHandler) CommitTransaction() error { + fmt.Printf("Committing Txn\n") + if postgresDataHandler.Txn == nil { + return fmt.Errorf("PostgresDataHandler.CommitTransaction: No transaction to commit") + } + err := postgresDataHandler.Txn.Commit() + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.CommitTransaction: Error committing transaction") + } + postgresDataHandler.Txn = nil + return nil +} + +func (postgresDataHandler *PostgresDataHandler) RollbackTransaction() error { + fmt.Printf("Rolling back Txn\n") + if postgresDataHandler.Txn == nil { + return fmt.Errorf("PostgresDataHandler.RollbackTransaction: No transaction to rollback") + } + err := postgresDataHandler.Txn.Rollback() + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.RollbackTransaction: Error rolling back transaction") + } + postgresDataHandler.Txn = nil + return nil +} + +// GetDbHandle returns the correct interface to use for database operations. +// If a transaction is open, it returns the transaction handle, otherwise it returns the db handle. +func (postgresDataHandler *PostgresDataHandler) GetDbHandle() bun.IDB { + if postgresDataHandler.Txn != nil { + return postgresDataHandler.Txn + } + return postgresDataHandler.DB +} + +// CreateSavepoint creates a savepoint in the current transaction. If no transaction is open, it returns an empty string. +// The randomly generated savepoint name is returned if the savepoint is created successfully. +func (postgresDataHandler *PostgresDataHandler) CreateSavepoint() (string, error) { + if postgresDataHandler.Txn == nil { + return "", nil + } + savepointName := generateSavepointName() + + _, err := postgresDataHandler.Txn.Exec(fmt.Sprintf("SAVEPOINT %s", savepointName)) + if err != nil { + return "", errors.Wrapf(err, "PostgresDataHandler.CreateSavepoint: Error creating savepoint") + } + + return savepointName, nil +} + +// RevertToSavepoint reverts the current transaction to the savepoint with the given name. +func (postgresDataHandler *PostgresDataHandler) RevertToSavepoint(savepointName string) error { + if postgresDataHandler.Txn == nil { + return nil + } + _, err := postgresDataHandler.Txn.Exec(fmt.Sprintf("ROLLBACK TO SAVEPOINT %s", savepointName)) + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.RevertToSavepoint: Error reverting to savepoint") + } + return nil +} + +// ReleaseSavepoint releases the savepoint with the given name. +func (postgresDataHandler *PostgresDataHandler) ReleaseSavepoint(savepointName string) error { + if postgresDataHandler.Txn == nil { + return nil + } + _, err := postgresDataHandler.Txn.Exec(fmt.Sprintf("RELEASE SAVEPOINT %s", savepointName)) + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.ReleaseSavepoint: Error releasing savepoint") + } + return nil +} + +func generateSavepointName() string { + // Create a byte slice of length 8 for a 64-bit random value + randomBytes := make([]byte, 8) + _, err := rand.Read(randomBytes) + if err != nil { + // Handle error + panic(err) // Example handling + } + // Convert the byte slice to a hexadecimal string + return "savepoint_" + hex.EncodeToString(randomBytes) +} diff --git a/handler/db_utils.go b/handler/db_utils.go index d1fcc2f..51a38ab 100644 --- a/handler/db_utils.go +++ b/handler/db_utils.go @@ -1,9 +1,9 @@ package handler import ( - "PostgresDataHandler/migrations/initial_migrations" - "PostgresDataHandler/migrations/post_sync_migrations" "context" + "github.com/deso-protocol/postgres-data-handler/migrations/initial_migrations" + "github.com/deso-protocol/postgres-data-handler/migrations/post_sync_migrations" "github.com/golang/glog" "github.com/uptrace/bun" "github.com/uptrace/bun/migrate" diff --git a/local.docker-compose.yml b/local.docker-compose.yml new file mode 100644 index 0000000..d7a1c88 --- /dev/null +++ b/local.docker-compose.yml @@ -0,0 +1,142 @@ +version: '3.8' +services: + deso: +# image: docker.io/desoprotocol/backend-dev:451a0a65e27da4ad68cb0705c63b1c964a1ce011 + build: + context: .. + dockerfile: backend/Dockerfile + environment: + - ADD_IPS=localhost:19000 + - PRIVATE_MODE=true + - RUN_HOT_FEED_ROUTINE=false + - API_PORT=18001 + - PROTOCOL_PORT=18000 + - TXINDEX=true + - DATA_DIR=/pd/n0_00001 + - ACCESS_CONTROL_ALLOW_ORIGINS=* + - SECURE_HEADER_ALLOW_HOSTS=localhost:4200 + - SECURE_HEADER_DEVELOPMENT=true + - BLOCK_CYPHER_API_KEY=092dae962ea44b02809a4c74408b42a1 + - MIN_SATOSHIS_FOR_PROFILE=0 + - EXPOSE_GLOBAL_STATE=false + - SHOW_PROCESSING_SPINNERS=true + - COMP_PROFILE_CREATION=false + - STATE_CHANGE_DIR=/ss/state-changes + +# Hypersync Settings: + - SYNC_TYPE=hypersync + - HYPERSYNC=true +# Blocksync Settings: +# - SYNC_TYPE=blocksync +# - HYPERSYNC=false + +# Mainnet Settings: +# - REGTEST=false +# - TESTNET=false +# - CONNECT_IPS=35.232.92.5:17000 + +# Testnet Settings: +# - REGTEST=false +# - TESTNET=true +# - CONNECT_IPS=35.192.117.201:18000 + +# Regtest Settings: + - REGTEST=true + - TESTNET=true + - ADMIN_PUBLIC_KEYS=* + - SUPER_ADMIN_PUBLIC_KEYS=* + - NUM_MINING_THREADS=1 + - MINER_PUBLIC_KEYS=BC1YLg7Bk5sq9iNY17bAwoAYiChLYpmWEi6nY6q5gnA1UQV6xixHjfV + - BLOCK_PRODUCER_SEED=essence camp ghost remove document vault ladder swim pupil index apart ring + - STARTER_DESO_SEED=road congress client market couple bid risk escape artwork rookie artwork food + ports: + - '18000:18000' + - '18001:18001' + - '19000:19000' + volumes: + - pd_volume:/pd +# - /tmp/state-change-files:/ss + - ss_volume:/ss + healthcheck: + test: [ "CMD-SHELL", "wget --quiet --tries=1 --spider http://deso:18001/api/v0/health-check || exit 1" ] + interval: 30s + timeout: 10s + retries: 20 + start_period: 10s + command: ["run"] + entrypoint: /deso/bin/backend + pdh: +# image: docker.io/desoprotocol/postgres-data-handler:c2ff0e2921911d2581685e1794a1324724997c64 + build: + context: .. + dockerfile: postgres-data-handler/Dockerfile + environment: + - DB_HOST=db-ss + - DB_NAME=postgres + - DB_PASSWORD=postgres + - DB_PORT=5432 + - DB_USERNAME=postgres + - READONLY_USER_PASSWORD=postgres + - STATE_CHANGE_DIR=/ss/state-changes + - CONSUMER_PROGRESS_DIR=/ss/consumer-progress + - BATCH_BYTES=500000 + - THREAD_LIMIT=10 + - CALCULATE_EXPLORER_STATISTICS=false + - LOG_QUERIES=false +# Mainnet Settings: +# - IS_TESTNET=false +# Testnet Settings: + - IS_TESTNET=true + command: ["run"] + entrypoint: /postgres-data-handler/src/postgres-data-handler/bin/postgres-data-handler + volumes: + - ss_volume:/ss + depends_on: + db-ss: + condition: service_healthy +# gql: +# image: docker.io/desoprotocol/graphql-api:4255d8c3c5be7911ed7817ef7b1baf979a6d3818 +# environment: +# - DB_HOST=db-ss +# - DB_NAME=postgres +# - DB_PASSWORD=postgres +# - DB_PORT=5432 +# - DB_USERNAME=postgres +# - DB_OWNER_USERNAME=postgres +# - DB_OWNER_PASSWORD=postgres +# - READONLY_USER_PASSWORD=postgres +# - LOG_SQL=true +# - DESO_NODE_URI=http://deso:18001 +# ports: +# - '4000:4000' +# depends_on: +# db-ss: +# condition: service_healthy +# healthcheck: +# test: [ "CMD-SHELL", "wget --quiet --tries=1 --spider --header='Content-Type: application/json' --post-data='{\"query\":\"query {__typename}\"}' http://gql:4000/graphql || exit 1" ] +# interval: 10s +# timeout: 15s +# retries: 30 +# start_period: 10s + db-ss: + image: postgres:14 + # restart: always + environment: + - PGUSER=postgres + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + - POSTGRES_DB=postgres + - POSTGRES_LOG_STATEMENTS=all + ports: + - '5430:5432' + volumes: + - db_ss_volume:/var/lib/postgresql/data + healthcheck: + test: [ "CMD-SHELL", "pg_isready" ] + interval: 10s + timeout: 5s + retries: 100 +volumes: + db_ss_volume: + pd_volume: + ss_volume: diff --git a/main.go b/main.go index aaaea1c..057bee0 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,13 @@ package main import ( - "PostgresDataHandler/handler" - "PostgresDataHandler/migrations/initial_migrations" - "PostgresDataHandler/migrations/post_sync_migrations" "database/sql" "flag" "fmt" "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/postgres-data-handler/handler" + "github.com/deso-protocol/postgres-data-handler/migrations/initial_migrations" + "github.com/deso-protocol/postgres-data-handler/migrations/post_sync_migrations" "github.com/deso-protocol/state-consumer/consumer" "github.com/golang/glog" "github.com/spf13/viper" @@ -108,6 +108,8 @@ func getConfigValues() (pgURI string, stateChangeDir string, consumerProgressDir if stateChangeDir == "" { stateChangeDir = "/tmp/state-changes" } + // Set the state change dir flag that core uses, so DeSoEncoders properly encode and decode state change metadata. + viper.Set("state-change-dir", stateChangeDir) consumerProgressDir = viper.GetString("CONSUMER_PROGRESS_DIR") if consumerProgressDir == "" { diff --git a/migrations/initial_migrations/20230414000002_create_new_message_entry_table.go b/migrations/initial_migrations/20230414000002_create_new_message_entry_table.go index 9ac4ebf..6506c12 100644 --- a/migrations/initial_migrations/20230414000002_create_new_message_entry_table.go +++ b/migrations/initial_migrations/20230414000002_create_new_message_entry_table.go @@ -17,7 +17,7 @@ func createNewMessageEntryTable(db *bun.DB, tableName string) error { recipient_access_group_key_name VARCHAR, recipient_access_group_public_key VARCHAR, encrypted_text VARCHAR NOT NULL, - is_group_chat_message BOOLEAN NOT NULL, + is_group_chat_message BOOLEAN, timestamp TIMESTAMP NOT NULL, extra_data JSONB, badger_key BYTEA PRIMARY KEY