From 7e6d43b875ed0b6e90f0add47fdaab03ee6f16d0 Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Tue, 20 Aug 2024 13:11:45 +0530 Subject: [PATCH 1/7] feat: implemented runner when spawning go routine --- src/rpc/service.go | 4 ++-- src/rpc/service_eth.go | 23 ++++++++++++++++++----- src/rpc/service_eth_test.go | 7 ++++--- src/server/server.go | 33 +++++++++++++++++++-------------- 4 files changed, 43 insertions(+), 24 deletions(-) diff --git a/src/rpc/service.go b/src/rpc/service.go index 98e47e8..836a143 100644 --- a/src/rpc/service.go +++ b/src/rpc/service.go @@ -12,6 +12,7 @@ import ( "github.com/shutter-network/encrypting-rpc-server/db" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/url" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/metricsserver" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) type Processor struct { @@ -40,10 +41,9 @@ type Config struct { type RPCService interface { Name() string - NewTimeEvent(ctx context.Context, newTime int64) SendRawTransaction(ctx context.Context, s string) (*common.Hash, error) Init(processor Processor, config Config) - SendTimeEvents(ctx context.Context, delayInSeconds int) + Start(ctx context.Context, group service.Runner) error } type EthereumClient interface { diff --git a/src/rpc/service_eth.go b/src/rpc/service_eth.go index f3d6459..17e8cf4 100644 --- a/src/rpc/service_eth.go +++ b/src/rpc/service_eth.go @@ -22,6 +22,7 @@ import ( "github.com/shutter-network/encrypting-rpc-server/metrics" "github.com/shutter-network/encrypting-rpc-server/utils" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/identitypreimage" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" "github.com/shutter-network/shutter/shlib/shcrypto" ) @@ -61,14 +62,14 @@ func (s *EthService) Name() string { return "eth" } -func (s *EthService) SendTimeEvents(ctx context.Context, delayInSeconds int) { +func (s *EthService) SendTimeEvents(ctx context.Context, delayInSeconds int) error { timer := time.NewTicker(time.Duration(delayInSeconds) * time.Second) for { select { case <-ctx.Done(): utils.Logger.Info().Msg("Stopping because context is done.") - return + return nil case tickTime := <-timer.C: newTime := tickTime.Unix() @@ -361,17 +362,17 @@ func (s *EthService) WaitTillMined(ctx context.Context, cancelFunc context.Cance } } -func (p *Processor) MonitorBalance(ctx context.Context, delayInSeconds int) { +func (s *EthService) monitorBalance(ctx context.Context, delayInSeconds int) error { timer := time.NewTicker(time.Duration(delayInSeconds) * time.Second) for { select { case <-ctx.Done(): utils.Logger.Info().Msg("Stopping because context is done.") - return + return ctx.Err() case <-timer.C: - balance, err := p.Client.BalanceAt(ctx, *p.SigningAddress, nil) + balance, err := s.Processor.Client.BalanceAt(ctx, *s.Processor.SigningAddress, nil) if err != nil { utils.Logger.Err(err).Msg("Failed to get balance") continue @@ -393,3 +394,15 @@ func returnError(status int, msg error) *EncodingError { Err: msg, } } + +func (s *EthService) Start(ctx context.Context, group service.Runner) error { + group.Go(func() error { + return s.SendTimeEvents(ctx, s.Config.DelayInSeconds) + }) + if s.Processor.MetricsConfig.Enabled { + group.Go(func() error { + return s.monitorBalance(ctx, s.Config.FetchBalanceDelay) + }) + } + return nil +} diff --git a/src/rpc/service_eth_test.go b/src/rpc/service_eth_test.go index 4bff6f5..245ffee 100644 --- a/src/rpc/service_eth_test.go +++ b/src/rpc/service_eth_test.go @@ -2,15 +2,16 @@ package rpc_test import ( "context" + "math/big" + "testing" + "time" + "github.com/ethereum/go-ethereum/core/types" "github.com/shutter-network/encrypting-rpc-server/cache" "github.com/shutter-network/encrypting-rpc-server/rpc" "github.com/shutter-network/encrypting-rpc-server/testdata" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "math/big" - "testing" - "time" ) func initTest(t *testing.T) *rpc.EthService { diff --git a/src/server/server.go b/src/server/server.go index 04b232d..85aecc5 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -88,7 +88,7 @@ func NewRPCService(processor rpc.Processor, config rpc.Config, pgDb *db.Postgres } } -func (srv *server) rpcHandler(ctx context.Context) (http.Handler, error) { +func (srv *server) rpcHandler(ctx context.Context) (http.Handler, *[]rpc.RPCService, error) { rpcServices := []rpc.RPCService{ &rpc.EthService{}, } @@ -96,13 +96,9 @@ func (srv *server) rpcHandler(ctx context.Context) (http.Handler, error) { rpcServer := ethrpc.NewServer() for _, service := range rpcServices { service.Init(srv.processor, srv.config) - go service.SendTimeEvents(ctx, srv.config.DelayInSeconds) - if srv.processor.MetricsConfig.Enabled { - go srv.processor.MonitorBalance(ctx, srv.config.FetchBalanceDelay) - } err := rpcServer.RegisterName(service.Name(), service) if err != nil { - return nil, errors.Wrap(err, "error while trying to register RPCService") + return nil, nil, errors.Wrap(err, "error while trying to register RPCService") } } @@ -110,23 +106,23 @@ func (srv *server) rpcHandler(ctx context.Context) (http.Handler, error) { backend: NewReverseProxy(srv.config.BackendURL), processor: rpcServer, } - return p, nil + return p, &rpcServices, nil } -func (srv *server) setupRouter(ctx context.Context) (*chi.Mux, error) { +func (srv *server) setupRouter(ctx context.Context) (*chi.Mux, *[]rpc.RPCService, error) { router := chi.NewRouter() router.Use(middleware.Logger) router.Use(middleware.Recoverer) - handler, err := srv.rpcHandler(ctx) + handler, services, err := srv.rpcHandler(ctx) if err != nil { - return nil, err + return nil, nil, err } router.Mount("/", handler) - return router, nil + return router, services, nil } func (srv *server) Start(ctx context.Context, runner medleyService.Runner) error { - handler, err := srv.setupRouter(ctx) + handler, services, err := srv.setupRouter(ctx) if err != nil { return err @@ -136,8 +132,17 @@ func (srv *server) Start(ctx context.Context, runner medleyService.Runner) error Handler: handler, ReadHeaderTimeout: 5 * time.Second, } - - go srv.postgresDatabase.Start(ctx) + runner.Go(func() error { + srv.postgresDatabase.Start(ctx) + return nil + }) + + for _, service := range *services { + if err := runner.StartService(service); err != nil { + return err + } + } + if srv.processor.MetricsConfig.Enabled { if err := runner.StartService(srv.processor.MetricsServer); err != nil { return err From 90b0a24cdf0e3e4ca6c640441940065a32ca08e5 Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Tue, 20 Aug 2024 14:40:21 +0530 Subject: [PATCH 2/7] fix: merge retry and wait until mined go routines --- src/cache/cache.go | 1 + src/rpc/service_eth.go | 67 ++++++++++++++++++++++++++++++++---------- 2 files changed, 53 insertions(+), 15 deletions(-) diff --git a/src/cache/cache.go b/src/cache/cache.go index 96b114c..21c8c09 100644 --- a/src/cache/cache.go +++ b/src/cache/cache.go @@ -45,6 +45,7 @@ func (c *Cache) Key(tx *types.Transaction) (string, error) { func (c *Cache) UpdateEntry(key string, tx *types.Transaction, cachedTime int64) { txInfo := TransactionInfo{Tx: tx, CachedTime: cachedTime} c.Data[key] = txInfo + c.WaitingForReceiptCache[key] = true utils.Logger.Debug().Msgf("Cache entry at key [%s] updated to: Tx = [%s] and CachedTime = [%d]", key, c.Data[key].Tx.Hash().Hex(), c.Data[key].CachedTime) } diff --git a/src/rpc/service_eth.go b/src/rpc/service_eth.go index 17e8cf4..73daa1d 100644 --- a/src/rpc/service_eth.go +++ b/src/rpc/service_eth.go @@ -81,6 +81,7 @@ func (s *EthService) SendTimeEvents(ctx context.Context, delayInSeconds int) err } } +// TODO: this needs to be tested func (s *EthService) NewTimeEvent(ctx context.Context, newTime int64) { utils.Logger.Info().Msg(fmt.Sprintf("Received new time event: %d", newTime)) for key, info := range s.Cache.Data { @@ -89,22 +90,56 @@ func (s *EthService) NewTimeEvent(ctx context.Context, newTime int64) { delete(s.Cache.Data, key) if info.Tx != nil { - utils.Logger.Debug().Msgf("Sending transaction [%s]", info.Tx.Hash().Hex()) - rawTxBytes, err := info.Tx.MarshalBinary() - if err != nil { - utils.Logger.Error().Err(err).Msg("Failed to marshal data") - } + if s.Cache.WaitingForReceiptCache[key] { + // utils.Logger.Debug().Msgf("Sending transaction [%s]", info.Tx.Hash().Hex()) + utils.Logger.Debug().Msgf("Checking for tx inclusion [%s]", info.Tx.Hash().Hex()) + receipt, err := s.Processor.Client.TransactionReceipt(ctx, info.Tx.Hash()) + if err == nil { + delete(s.Cache.WaitingForReceiptCache, key) + block, err := s.Processor.Client.BlockByHash(ctx, receipt.BlockHash) + if err != nil { + utils.Logger.Debug().Msgf("Error getting block | blockHash: %s", receipt.BlockHash.String()) + continue + } + s.Processor.Db.FinaliseTx(db.TransactionDetails{ + TxHash: info.Tx.Hash().String(), + InclusionTime: block.Time(), + }) + continue + } + + if errors.Is(err, ethereum.NotFound) { + utils.Logger.Debug().Msgf("Transaction not yet mined | txHash: %s", info.Tx.Hash().String()) + rawTxBytes, err := info.Tx.MarshalBinary() + if err != nil { + utils.Logger.Error().Err(err).Msg("Failed to marshal data") + } + + rawTx := "0x" + common.Bytes2Hex(rawTxBytes) + + txHash, err := s.SendRawTransaction(ctx, rawTx) + + if err != nil { + metrics.ErrorReturnedGauge.Dec() + utils.Logger.Error().Err(err).Msgf("Failed to send transaction.") + continue + } + utils.Logger.Info().Msg("Transaction sent internally: " + txHash.Hex()) - rawTx := "0x" + common.Bytes2Hex(rawTxBytes) - txHash, err := s.SendRawTransaction(ctx, rawTx) + } else { + delete(s.Cache.WaitingForReceiptCache, key) + utils.Logger.Debug().Msgf("receipt retrieval failed | txHash: %s | err: %W", info.Tx.Hash().String(), err) + } - if err != nil { - metrics.ErrorReturnedGauge.Dec() - utils.Logger.Error().Err(err).Msgf("Failed to send transaction.") + } else { + // continue if tx is explicitely set to not check from now on + s.Processor.Db.FinaliseTx(db.TransactionDetails{ + TxHash: info.Tx.Hash().String(), + InclusionTime: uint64(time.Now().Unix()), + IsCancelled: true, + }) continue } - - utils.Logger.Info().Msg("Transaction sent internally: " + txHash.Hex()) } } } @@ -198,8 +233,10 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c utils.Logger.Info().Msg("Transaction forwarded with hash: " + txHash.Hex()) key, err := service.Cache.Key(tx) if err != nil { - utils.Logger.Debug().Msgf("WaitTillMined | error in generating key | err: %v", err) + utils.Logger.Debug().Msgf("SendRawTransaction | error in generating key | err: %v", err) } else { + + // TODO: cancellation tx will not be handled correctly here service.Cache.WaitingForReceiptCache[key] = false } return &txHash, nil @@ -237,8 +274,8 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c SubmissionTime: time.Now().Unix(), }) - _ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(service.Config.WaitMinedInterval)*10*time.Second) - go service.WaitTillMined(_ctx, cancelFunc, tx, service.Config.WaitMinedInterval) + // _ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(service.Config.WaitMinedInterval)*10*time.Second) + // go service.WaitTillMined(_ctx, cancelFunc, tx, service.Config.WaitMinedInterval) metrics.RequestedGasLimit.WithLabelValues(submitTx.Hash().String()).Observe(float64(tx.Gas())) metrics.TotalRequestDuration.WithLabelValues(submitTx.Hash().String(), txHash.String()).Observe(float64(time.Since(timeBefore).Seconds())) From da70bd0c5c6c96fb8da03056cc2f3e6282cfe60d Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Tue, 20 Aug 2024 17:37:50 +0530 Subject: [PATCH 3/7] chore: resolved todos --- src/rpc/service_eth.go | 71 +++--------------------------------------- 1 file changed, 4 insertions(+), 67 deletions(-) diff --git a/src/rpc/service_eth.go b/src/rpc/service_eth.go index 73daa1d..cf521f8 100644 --- a/src/rpc/service_eth.go +++ b/src/rpc/service_eth.go @@ -14,7 +14,6 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core/types" txtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" "github.com/shutter-network/encrypting-rpc-server/cache" @@ -81,7 +80,6 @@ func (s *EthService) SendTimeEvents(ctx context.Context, delayInSeconds int) err } } -// TODO: this needs to be tested func (s *EthService) NewTimeEvent(ctx context.Context, newTime int64) { utils.Logger.Info().Msg(fmt.Sprintf("Received new time event: %d", newTime)) for key, info := range s.Cache.Data { @@ -91,7 +89,6 @@ func (s *EthService) NewTimeEvent(ctx context.Context, newTime int64) { if info.Tx != nil { if s.Cache.WaitingForReceiptCache[key] { - // utils.Logger.Debug().Msgf("Sending transaction [%s]", info.Tx.Hash().Hex()) utils.Logger.Debug().Msgf("Checking for tx inclusion [%s]", info.Tx.Hash().Hex()) receipt, err := s.Processor.Client.TransactionReceipt(ctx, info.Tx.Hash()) if err == nil { @@ -109,16 +106,15 @@ func (s *EthService) NewTimeEvent(ctx context.Context, newTime int64) { } if errors.Is(err, ethereum.NotFound) { - utils.Logger.Debug().Msgf("Transaction not yet mined | txHash: %s", info.Tx.Hash().String()) + utils.Logger.Debug().Msgf("Transaction not yet mined | re-sending | txHash: %s", info.Tx.Hash().String()) rawTxBytes, err := info.Tx.MarshalBinary() if err != nil { utils.Logger.Error().Err(err).Msg("Failed to marshal data") + continue } - rawTx := "0x" + common.Bytes2Hex(rawTxBytes) txHash, err := s.SendRawTransaction(ctx, rawTx) - if err != nil { metrics.ErrorReturnedGauge.Dec() utils.Logger.Error().Err(err).Msgf("Failed to send transaction.") @@ -129,10 +125,12 @@ func (s *EthService) NewTimeEvent(ctx context.Context, newTime int64) { } else { delete(s.Cache.WaitingForReceiptCache, key) utils.Logger.Debug().Msgf("receipt retrieval failed | txHash: %s | err: %W", info.Tx.Hash().String(), err) + continue } } else { // continue if tx is explicitely set to not check from now on + utils.Logger.Debug().Msgf("tx is cancelled | txhash [%s]", info.Tx.Hash().Hex()) s.Processor.Db.FinaliseTx(db.TransactionDetails{ TxHash: info.Tx.Hash().String(), InclusionTime: uint64(time.Now().Unix()), @@ -274,9 +272,6 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c SubmissionTime: time.Now().Unix(), }) - // _ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(service.Config.WaitMinedInterval)*10*time.Second) - // go service.WaitTillMined(_ctx, cancelFunc, tx, service.Config.WaitMinedInterval) - metrics.RequestedGasLimit.WithLabelValues(submitTx.Hash().String()).Observe(float64(tx.Gas())) metrics.TotalRequestDuration.WithLabelValues(submitTx.Hash().String(), txHash.String()).Observe(float64(time.Since(timeBefore).Seconds())) @@ -341,64 +336,6 @@ var DefaultProcessTransaction = func(tx *txtypes.Transaction, ctx context.Contex return submitTx, nil } -func (s *EthService) WaitTillMined(ctx context.Context, cancelFunc context.CancelFunc, tx *types.Transaction, waitMinedInterval int) { - key, err := s.Cache.Key(tx) - if err != nil { - utils.Logger.Debug().Msgf("WaitTillMined | error in generating key | err: %v", err) - return - } - value := s.Cache.WaitingForReceiptCache[key] - - if !value { - queryTicker := time.NewTicker(time.Duration(waitMinedInterval) * time.Second) - defer queryTicker.Stop() - s.Cache.WaitingForReceiptCache[key] = true - utils.Logger.Info().Msgf("New tx recorded to check for inclusion | txHash: %s", tx.Hash().String()) - - for { - if s.Cache.WaitingForReceiptCache[key] { - receipt, err := s.Processor.Client.TransactionReceipt(ctx, tx.Hash()) - if err == nil { - delete(s.Cache.WaitingForReceiptCache, key) - block, err := s.Processor.Client.BlockByHash(ctx, receipt.BlockHash) - if err != nil { - utils.Logger.Debug().Msgf("Error getting block | blockHash: %s", receipt.BlockHash.String()) - return - } - s.Processor.Db.FinaliseTx(db.TransactionDetails{ - TxHash: tx.Hash().String(), - InclusionTime: block.Time(), - }) - return - } - - if errors.Is(err, ethereum.NotFound) { - utils.Logger.Debug().Msgf("Transaction not yet mined | txHash: %s", tx.Hash().String()) - } else { - delete(s.Cache.WaitingForReceiptCache, key) - utils.Logger.Debug().Msgf("receipt retrieval failed | txHash: %s | err: %W", tx.Hash().String(), err) - cancelFunc() - } - - } else { - // return if tx is explicitely set to not check now - s.Processor.Db.FinaliseTx(db.TransactionDetails{ - TxHash: tx.Hash().String(), - InclusionTime: uint64(time.Now().Unix()), - IsCancelled: true, - }) - return - } - // Wait for the next round. - select { - case <-ctx.Done(): - return - case <-queryTicker.C: - } - } - } -} - func (s *EthService) monitorBalance(ctx context.Context, delayInSeconds int) error { timer := time.NewTicker(time.Duration(delayInSeconds) * time.Second) From 94c85655aec07f11c9dae51fe568957bfee13300 Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Thu, 5 Sep 2024 18:09:21 +0530 Subject: [PATCH 4/7] feat: updated cancellation logic --- src/cache/cache.go | 15 ++++----- src/db/migration.go | 2 +- src/db/queries.go | 1 - src/rpc/service_eth.go | 74 ++++++++++++++++++++++-------------------- 4 files changed, 46 insertions(+), 46 deletions(-) diff --git a/src/cache/cache.go b/src/cache/cache.go index 21c8c09..78f4a69 100644 --- a/src/cache/cache.go +++ b/src/cache/cache.go @@ -15,9 +15,8 @@ type TransactionInfo struct { type Cache struct { sync.RWMutex - Data map[string]TransactionInfo - WaitingForReceiptCache map[string]bool - DelayFactor int64 + Data map[string]TransactionInfo + DelayFactor int64 } type ProcessTxEntryResp struct { @@ -27,9 +26,8 @@ type ProcessTxEntryResp struct { func NewCache(delayFactor int64) *Cache { return &Cache{ - Data: make(map[string]TransactionInfo), - DelayFactor: delayFactor, - WaitingForReceiptCache: make(map[string]bool), + Data: make(map[string]TransactionInfo), + DelayFactor: delayFactor, } } @@ -38,14 +36,13 @@ func (c *Cache) Key(tx *types.Transaction) (string, error) { if err != nil { return "", err } - - return fmt.Sprintf("%s-%d", fromAddress.Hex(), tx.Nonce()), nil + isCancellation := utils.IsCancellationTransaction(tx, fromAddress) + return fmt.Sprintf("%s-%d-%t", fromAddress.Hex(), tx.Nonce(), isCancellation), nil } func (c *Cache) UpdateEntry(key string, tx *types.Transaction, cachedTime int64) { txInfo := TransactionInfo{Tx: tx, CachedTime: cachedTime} c.Data[key] = txInfo - c.WaitingForReceiptCache[key] = true utils.Logger.Debug().Msgf("Cache entry at key [%s] updated to: Tx = [%s] and CachedTime = [%d]", key, c.Data[key].Tx.Hash().Hex(), c.Data[key].CachedTime) } diff --git a/src/db/migration.go b/src/db/migration.go index ca29456..a8f11ee 100644 --- a/src/db/migration.go +++ b/src/db/migration.go @@ -25,7 +25,7 @@ type TransactionDetails struct { SubmissionTime int64 InclusionTime uint64 Retries uint64 - IsCancelled bool + IsCancellation bool } func InitialMigration(dbUrl string) (*PostgresDb, error) { diff --git a/src/db/queries.go b/src/db/queries.go index 40254e3..85a46f4 100644 --- a/src/db/queries.go +++ b/src/db/queries.go @@ -17,7 +17,6 @@ func (db *PostgresDb) updateInclusion(txDetails TransactionDetails) error { Updates(map[string]interface{}{ "inclusion_time": txDetails.InclusionTime, "retries": gorm.Expr("(?)", subQuery), - "is_cancelled": txDetails.IsCancelled, }).Error; err != nil { // Return any error will rollback the transaction return err diff --git a/src/rpc/service_eth.go b/src/rpc/service_eth.go index cf521f8..f75d356 100644 --- a/src/rpc/service_eth.go +++ b/src/rpc/service_eth.go @@ -88,24 +88,28 @@ func (s *EthService) NewTimeEvent(ctx context.Context, newTime int64) { delete(s.Cache.Data, key) if info.Tx != nil { - if s.Cache.WaitingForReceiptCache[key] { - utils.Logger.Debug().Msgf("Checking for tx inclusion [%s]", info.Tx.Hash().Hex()) - receipt, err := s.Processor.Client.TransactionReceipt(ctx, info.Tx.Hash()) - if err == nil { - delete(s.Cache.WaitingForReceiptCache, key) - block, err := s.Processor.Client.BlockByHash(ctx, receipt.BlockHash) - if err != nil { - utils.Logger.Debug().Msgf("Error getting block | blockHash: %s", receipt.BlockHash.String()) - continue - } - s.Processor.Db.FinaliseTx(db.TransactionDetails{ - TxHash: info.Tx.Hash().String(), - InclusionTime: block.Time(), - }) + utils.Logger.Debug().Msgf("Checking for tx inclusion [%s]", info.Tx.Hash().Hex()) + receipt, err := s.Processor.Client.TransactionReceipt(ctx, info.Tx.Hash()) + if err == nil { + block, err := s.Processor.Client.BlockByHash(ctx, receipt.BlockHash) + if err != nil { + utils.Logger.Debug().Msgf("Error getting block | blockHash: %s", receipt.BlockHash.String()) continue } + s.Processor.Db.FinaliseTx(db.TransactionDetails{ + TxHash: info.Tx.Hash().String(), + InclusionTime: block.Time(), + }) + continue + } + fromAddress, err := utils.SenderAddress(info.Tx) + if err != nil { + utils.Logger.Error().Err(err).Msg("Failed to get sender address from tx in cache") - if errors.Is(err, ethereum.NotFound) { + } + + if errors.Is(err, ethereum.NotFound) { + if !utils.IsCancellationTransaction(info.Tx, fromAddress) { utils.Logger.Debug().Msgf("Transaction not yet mined | re-sending | txHash: %s", info.Tx.Hash().String()) rawTxBytes, err := info.Tx.MarshalBinary() if err != nil { @@ -121,23 +125,13 @@ func (s *EthService) NewTimeEvent(ctx context.Context, newTime int64) { continue } utils.Logger.Info().Msg("Transaction sent internally: " + txHash.Hex()) - - } else { - delete(s.Cache.WaitingForReceiptCache, key) - utils.Logger.Debug().Msgf("receipt retrieval failed | txHash: %s | err: %W", info.Tx.Hash().String(), err) - continue } } else { - // continue if tx is explicitely set to not check from now on - utils.Logger.Debug().Msgf("tx is cancelled | txhash [%s]", info.Tx.Hash().Hex()) - s.Processor.Db.FinaliseTx(db.TransactionDetails{ - TxHash: info.Tx.Hash().String(), - InclusionTime: uint64(time.Now().Unix()), - IsCancelled: true, - }) + utils.Logger.Debug().Msgf("receipt retrieval failed | txHash: %s | err: %W", info.Tx.Hash().String(), err) continue } + } } } @@ -229,14 +223,22 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c metrics.CancellationTxGauge.WithLabelValues(txHash.String()).Inc() utils.Logger.Info().Msg("Transaction forwarded with hash: " + txHash.Hex()) - key, err := service.Cache.Key(tx) + + statuses, err := service.Cache.ProcessTxEntry(tx, time.Now().Unix()) if err != nil { - utils.Logger.Debug().Msgf("SendRawTransaction | error in generating key | err: %v", err) - } else { + utils.Logger.Err(err).Msg("Failed to update the cache.") + return nil, returnError(-32603, err) + } - // TODO: cancellation tx will not be handled correctly here - service.Cache.WaitingForReceiptCache[key] = false + if statuses.UpdateStatus { + service.Processor.Db.InsertNewTx(db.TransactionDetails{ + Address: fromAddress.String(), + Nonce: tx.Nonce(), + TxHash: txHash.String(), + IsCancellation: true, + }) } + return &txHash, nil } @@ -250,9 +252,10 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c utils.Logger.Info().Hex("Tx hash", txHash.Bytes()).Msg("Transaction delayed") if statuses.UpdateStatus { // this is the same tx, just requested more than once so we do not add it to db service.Processor.Db.InsertNewTx(db.TransactionDetails{ - Address: fromAddress.String(), - Nonce: tx.Nonce(), - TxHash: txHash.String(), + Address: fromAddress.String(), + Nonce: tx.Nonce(), + TxHash: txHash.String(), + IsCancellation: false, }) } return &txHash, nil @@ -270,6 +273,7 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c TxHash: txHash.String(), EncryptedTxHash: submitTx.Hash().String(), SubmissionTime: time.Now().Unix(), + IsCancellation: false, }) metrics.RequestedGasLimit.WithLabelValues(submitTx.Hash().String()).Observe(float64(tx.Gas())) From 97cd43a8172e2d73f830b43584b2bafea03c59e4 Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Thu, 5 Sep 2024 18:16:30 +0530 Subject: [PATCH 5/7] feat: rebased main --- src/cache/cache.go | 3 +-- src/db/db.go | 1 + src/db/migration.go | 1 - src/db/queries.go | 7 +------ src/rpc/service_eth.go | 8 +++----- 5 files changed, 6 insertions(+), 14 deletions(-) diff --git a/src/cache/cache.go b/src/cache/cache.go index 78f4a69..dbbc009 100644 --- a/src/cache/cache.go +++ b/src/cache/cache.go @@ -74,13 +74,12 @@ func (c *Cache) ProcessTxEntry(newTx *types.Transaction, currentTime int64) (Pro }, nil } - // new tx with higher gas -> update tx utils.Logger.Debug().Msgf("A transaction already exists with a lower gas price. " + "Updating transaction and delaying transaction sending.") c.UpdateEntry(key, newTx, existing.CachedTime) return ProcessTxEntryResp{ SendStatus: false, - UpdateStatus: true, + UpdateStatus: true, // new tx with higher gas -> update tx }, nil // false -> tx won't be sent } } diff --git a/src/db/db.go b/src/db/db.go index 67c37ab..1079f22 100644 --- a/src/db/db.go +++ b/src/db/db.go @@ -11,6 +11,7 @@ func (db *PostgresDb) InsertNewTx(txDetails TransactionDetails) { db.addTxCh <- txDetails } +// account address and nonce are mandatory fields to update the finalised tx func (db *PostgresDb) FinaliseTx(receipt TransactionDetails) { db.inclusionCh <- receipt } diff --git a/src/db/migration.go b/src/db/migration.go index a8f11ee..122b44b 100644 --- a/src/db/migration.go +++ b/src/db/migration.go @@ -24,7 +24,6 @@ type TransactionDetails struct { EncryptedTxHash string `gorm:"primaryKey"` SubmissionTime int64 InclusionTime uint64 - Retries uint64 IsCancellation bool } diff --git a/src/db/queries.go b/src/db/queries.go index 85a46f4..645c712 100644 --- a/src/db/queries.go +++ b/src/db/queries.go @@ -6,17 +6,12 @@ import ( func (db *PostgresDb) updateInclusion(txDetails TransactionDetails) error { if err := db.DB.Transaction(func(tx *gorm.DB) error { - // Subquery to count rows with the same TxHash - subQuery := tx.Model(&TransactionDetails{}). - Select("COUNT(*) - 1"). - Where("tx_hash = ? AND encrypted_tx_hash IS NOT NULL AND encrypted_tx_hash <> ''", txDetails.TxHash) - // Update all rows with new inclusion_time and retries count + // Update all rows with new inclusion_time if err := tx.Model(&TransactionDetails{}). Where("tx_hash = ?", txDetails.TxHash). Updates(map[string]interface{}{ "inclusion_time": txDetails.InclusionTime, - "retries": gorm.Expr("(?)", subQuery), }).Error; err != nil { // Return any error will rollback the transaction return err diff --git a/src/rpc/service_eth.go b/src/rpc/service_eth.go index f75d356..154920b 100644 --- a/src/rpc/service_eth.go +++ b/src/rpc/service_eth.go @@ -226,17 +226,15 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c statuses, err := service.Cache.ProcessTxEntry(tx, time.Now().Unix()) if err != nil { - utils.Logger.Err(err).Msg("Failed to update the cache.") - return nil, returnError(-32603, err) - } - - if statuses.UpdateStatus { + utils.Logger.Debug().Msgf("cancellation tx | error in generating key | err: %v", err) + } else if statuses.UpdateStatus { service.Processor.Db.InsertNewTx(db.TransactionDetails{ Address: fromAddress.String(), Nonce: tx.Nonce(), TxHash: txHash.String(), IsCancellation: true, }) + } return &txHash, nil From 18825bf5f12d556652770809cdb0ae82ec07dbbf Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Thu, 5 Sep 2024 18:21:58 +0530 Subject: [PATCH 6/7] chore: resolved conflicts --- src/rpc/service_eth.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rpc/service_eth.go b/src/rpc/service_eth.go index 2ec4733..3b4a6a8 100644 --- a/src/rpc/service_eth.go +++ b/src/rpc/service_eth.go @@ -274,8 +274,8 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c IsCancellation: false, }) - metrics.RequestedGasLimit.WithLabelValues(submitTx.Hash().String()).Observe(float64(tx.Gas())) - metrics.TotalRequestDuration.WithLabelValues(submitTx.Hash().String(), txHash.String()).Observe(float64(time.Since(timeBefore).Seconds())) + metrics.RequestedGasLimit.Observe(float64(tx.Gas())) + metrics.TotalRequestDuration.Observe(float64(time.Since(timeBefore).Seconds())) return &txHash, nil } From 967f53450104494787f8b511de1f310e46c8f9d1 Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Thu, 5 Sep 2024 18:36:06 +0530 Subject: [PATCH 7/7] chore: updated file names accroding to convention --- src/rpc/service_eth.go | 1 - src/rpc/{tx-checker.go => txchecker.go} | 0 src/rpc/{tx-checker_test.go => txchecker_test.go} | 0 src/testdata/{test-data.go => testdata.go} | 0 4 files changed, 1 deletion(-) rename src/rpc/{tx-checker.go => txchecker.go} (100%) rename src/rpc/{tx-checker_test.go => txchecker_test.go} (100%) rename src/testdata/{test-data.go => testdata.go} (100%) diff --git a/src/rpc/service_eth.go b/src/rpc/service_eth.go index 3b4a6a8..374fa3c 100644 --- a/src/rpc/service_eth.go +++ b/src/rpc/service_eth.go @@ -126,7 +126,6 @@ func (s *EthService) NewTimeEvent(ctx context.Context, newTime int64) { } utils.Logger.Info().Msg("Transaction sent internally: " + txHash.Hex()) } - } else { utils.Logger.Debug().Msgf("receipt retrieval failed | txHash: %s | err: %W", info.Tx.Hash().String(), err) continue diff --git a/src/rpc/tx-checker.go b/src/rpc/txchecker.go similarity index 100% rename from src/rpc/tx-checker.go rename to src/rpc/txchecker.go diff --git a/src/rpc/tx-checker_test.go b/src/rpc/txchecker_test.go similarity index 100% rename from src/rpc/tx-checker_test.go rename to src/rpc/txchecker_test.go diff --git a/src/testdata/test-data.go b/src/testdata/testdata.go similarity index 100% rename from src/testdata/test-data.go rename to src/testdata/testdata.go