diff --git a/src/cache/cache.go b/src/cache/cache.go index c6986b9..dbbc009 100644 --- a/src/cache/cache.go +++ b/src/cache/cache.go @@ -15,9 +15,8 @@ type TransactionInfo struct { type Cache struct { sync.RWMutex - Data map[string]TransactionInfo - WaitingForReceiptCache map[string]bool //the key here should be tx hash - DelayFactor int64 + Data map[string]TransactionInfo + DelayFactor int64 } type ProcessTxEntryResp struct { @@ -27,9 +26,8 @@ type ProcessTxEntryResp struct { func NewCache(delayFactor int64) *Cache { return &Cache{ - Data: make(map[string]TransactionInfo), - DelayFactor: delayFactor, - WaitingForReceiptCache: make(map[string]bool), + Data: make(map[string]TransactionInfo), + DelayFactor: delayFactor, } } @@ -38,8 +36,8 @@ func (c *Cache) Key(tx *types.Transaction) (string, error) { if err != nil { return "", err } - - return fmt.Sprintf("%s-%d", fromAddress.Hex(), tx.Nonce()), nil + isCancellation := utils.IsCancellationTransaction(tx, fromAddress) + return fmt.Sprintf("%s-%d-%t", fromAddress.Hex(), tx.Nonce(), isCancellation), nil } func (c *Cache) UpdateEntry(key string, tx *types.Transaction, cachedTime int64) { diff --git a/src/rpc/service.go b/src/rpc/service.go index 98e47e8..836a143 100644 --- a/src/rpc/service.go +++ b/src/rpc/service.go @@ -12,6 +12,7 @@ import ( "github.com/shutter-network/encrypting-rpc-server/db" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/url" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/metricsserver" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" ) type Processor struct { @@ -40,10 +41,9 @@ type Config struct { type RPCService interface { Name() string - NewTimeEvent(ctx context.Context, newTime int64) SendRawTransaction(ctx context.Context, s string) (*common.Hash, error) Init(processor Processor, config Config) - SendTimeEvents(ctx context.Context, delayInSeconds int) + Start(ctx context.Context, group service.Runner) error } type EthereumClient interface { diff --git a/src/rpc/service_eth.go b/src/rpc/service_eth.go index 3cf413e..374fa3c 100644 --- a/src/rpc/service_eth.go +++ b/src/rpc/service_eth.go @@ -14,7 +14,6 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core/types" txtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" "github.com/shutter-network/encrypting-rpc-server/cache" @@ -22,6 +21,7 @@ import ( "github.com/shutter-network/encrypting-rpc-server/metrics" "github.com/shutter-network/encrypting-rpc-server/utils" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/identitypreimage" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service" "github.com/shutter-network/shutter/shlib/shcrypto" ) @@ -61,14 +61,14 @@ func (s *EthService) Name() string { return "eth" } -func (s *EthService) SendTimeEvents(ctx context.Context, delayInSeconds int) { +func (s *EthService) SendTimeEvents(ctx context.Context, delayInSeconds int) error { timer := time.NewTicker(time.Duration(delayInSeconds) * time.Second) for { select { case <-ctx.Done(): utils.Logger.Info().Msg("Stopping because context is done.") - return + return nil case tickTime := <-timer.C: newTime := tickTime.Unix() @@ -88,22 +88,49 @@ func (s *EthService) NewTimeEvent(ctx context.Context, newTime int64) { delete(s.Cache.Data, key) if info.Tx != nil { - utils.Logger.Debug().Msgf("Sending transaction [%s]", info.Tx.Hash().Hex()) - rawTxBytes, err := info.Tx.MarshalBinary() - if err != nil { - utils.Logger.Error().Err(err).Msg("Failed to marshal data") + utils.Logger.Debug().Msgf("Checking for tx inclusion [%s]", info.Tx.Hash().Hex()) + receipt, err := s.Processor.Client.TransactionReceipt(ctx, info.Tx.Hash()) + if err == nil { + block, err := s.Processor.Client.BlockByHash(ctx, receipt.BlockHash) + if err != nil { + utils.Logger.Debug().Msgf("Error getting block | blockHash: %s", receipt.BlockHash.String()) + continue + } + s.Processor.Db.FinaliseTx(db.TransactionDetails{ + TxHash: info.Tx.Hash().String(), + InclusionTime: block.Time(), + }) + continue } + fromAddress, err := utils.SenderAddress(info.Tx) + if err != nil { + utils.Logger.Error().Err(err).Msg("Failed to get sender address from tx in cache") - rawTx := "0x" + common.Bytes2Hex(rawTxBytes) - txHash, err := s.SendRawTransaction(ctx, rawTx) + } - if err != nil { - metrics.ErrorReturnedGauge.Dec() - utils.Logger.Error().Err(err).Msgf("Failed to send transaction.") + if errors.Is(err, ethereum.NotFound) { + if !utils.IsCancellationTransaction(info.Tx, fromAddress) { + utils.Logger.Debug().Msgf("Transaction not yet mined | re-sending | txHash: %s", info.Tx.Hash().String()) + rawTxBytes, err := info.Tx.MarshalBinary() + if err != nil { + utils.Logger.Error().Err(err).Msg("Failed to marshal data") + continue + } + rawTx := "0x" + common.Bytes2Hex(rawTxBytes) + + txHash, err := s.SendRawTransaction(ctx, rawTx) + if err != nil { + metrics.ErrorReturnedGauge.Dec() + utils.Logger.Error().Err(err).Msgf("Failed to send transaction.") + continue + } + utils.Logger.Info().Msg("Transaction sent internally: " + txHash.Hex()) + } + } else { + utils.Logger.Debug().Msgf("receipt retrieval failed | txHash: %s | err: %W", info.Tx.Hash().String(), err) continue } - utils.Logger.Info().Msg("Transaction sent internally: " + txHash.Hex()) } } } @@ -196,16 +223,19 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c metrics.CancellationTxGauge.Inc() utils.Logger.Info().Msg("Transaction forwarded with hash: " + txHash.Hex()) - service.Processor.Db.InsertNewTx(db.TransactionDetails{ - Address: fromAddress.String(), - Nonce: tx.Nonce(), - TxHash: txHash.String(), - SubmissionTime: time.Now().Unix(), - IsCancellation: true, - }) + statuses, err := service.Cache.ProcessTxEntry(tx, time.Now().Unix()) + if err != nil { + utils.Logger.Debug().Msgf("cancellation tx | error in generating key | err: %v", err) + } else if statuses.UpdateStatus { + service.Processor.Db.InsertNewTx(db.TransactionDetails{ + Address: fromAddress.String(), + Nonce: tx.Nonce(), + TxHash: txHash.String(), + IsCancellation: true, + }) + + } - _ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(service.Config.WaitMinedInterval)*10*time.Second) - go service.WaitTillMined(_ctx, cancelFunc, tx, service.Config.WaitMinedInterval) return &txHash, nil } @@ -219,9 +249,10 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c utils.Logger.Info().Hex("Tx hash", txHash.Bytes()).Msg("Transaction delayed") if statuses.UpdateStatus { // this is the same tx, just requested more than once so we do not add it to db service.Processor.Db.InsertNewTx(db.TransactionDetails{ - Address: fromAddress.String(), - Nonce: tx.Nonce(), - TxHash: txHash.String(), + Address: fromAddress.String(), + Nonce: tx.Nonce(), + TxHash: txHash.String(), + IsCancellation: false, }) } return &txHash, nil @@ -239,11 +270,9 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c TxHash: txHash.String(), EncryptedTxHash: submitTx.Hash().String(), SubmissionTime: time.Now().Unix(), + IsCancellation: false, }) - _ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(service.Config.WaitMinedInterval)*10*time.Second) - go service.WaitTillMined(_ctx, cancelFunc, tx, service.Config.WaitMinedInterval) - metrics.RequestedGasLimit.Observe(float64(tx.Gas())) metrics.TotalRequestDuration.Observe(float64(time.Since(timeBefore).Seconds())) @@ -308,69 +337,17 @@ var DefaultProcessTransaction = func(tx *txtypes.Transaction, ctx context.Contex return submitTx, nil } -func (s *EthService) WaitTillMined(ctx context.Context, cancelFunc context.CancelFunc, tx *types.Transaction, waitMinedInterval int) { - key := tx.Hash().String() - value := s.Cache.WaitingForReceiptCache[key] - - if !value { - queryTicker := time.NewTicker(time.Duration(waitMinedInterval) * time.Second) - defer queryTicker.Stop() - s.Cache.WaitingForReceiptCache[key] = true - utils.Logger.Info().Msgf("New tx recorded to check for inclusion | txHash: %s", tx.Hash().String()) - for { - if s.Cache.WaitingForReceiptCache[key] { - receipt, err := s.Processor.Client.TransactionReceipt(ctx, tx.Hash()) - if err == nil { - delete(s.Cache.WaitingForReceiptCache, key) - block, err := s.Processor.Client.BlockByHash(ctx, receipt.BlockHash) - if err != nil { - utils.Logger.Debug().Msgf("Error getting block | blockHash: %s", receipt.BlockHash.String()) - cancelFunc() - - } - - s.Processor.Db.FinaliseTx(db.TransactionDetails{ - TxHash: tx.Hash().String(), - InclusionTime: block.Time(), - }) - cancelFunc() - } - - if errors.Is(err, ethereum.NotFound) { - utils.Logger.Debug().Msgf("Transaction not yet mined | txHash: %s", tx.Hash().String()) - } else { - delete(s.Cache.WaitingForReceiptCache, key) - utils.Logger.Debug().Msgf("receipt retrieval failed | txHash: %s | err: %W", tx.Hash().String(), err) - cancelFunc() - } - - } else { - // return if tx is explicitely set to not check - cancelFunc() - } - // Wait for the next round. - select { - case <-ctx.Done(): - // deleting cache here as we have stopped waiting for tx inclusion - delete(s.Cache.WaitingForReceiptCache, key) - return - case <-queryTicker.C: - } - } - } -} - -func (p *Processor) MonitorBalance(ctx context.Context, delayInSeconds int) { +func (s *EthService) monitorBalance(ctx context.Context, delayInSeconds int) error { timer := time.NewTicker(time.Duration(delayInSeconds) * time.Second) for { select { case <-ctx.Done(): utils.Logger.Info().Msg("Stopping because context is done.") - return + return ctx.Err() case <-timer.C: - balance, err := p.Client.BalanceAt(ctx, *p.SigningAddress, nil) + balance, err := s.Processor.Client.BalanceAt(ctx, *s.Processor.SigningAddress, nil) if err != nil { utils.Logger.Err(err).Msg("Failed to get balance") continue @@ -392,3 +369,15 @@ func returnError(status int, msg error) *EncodingError { Err: msg, } } + +func (s *EthService) Start(ctx context.Context, group service.Runner) error { + group.Go(func() error { + return s.SendTimeEvents(ctx, s.Config.DelayInSeconds) + }) + if s.Processor.MetricsConfig.Enabled { + group.Go(func() error { + return s.monitorBalance(ctx, s.Config.FetchBalanceDelay) + }) + } + return nil +} diff --git a/src/rpc/service_eth_test.go b/src/rpc/service_eth_test.go index 4bff6f5..245ffee 100644 --- a/src/rpc/service_eth_test.go +++ b/src/rpc/service_eth_test.go @@ -2,15 +2,16 @@ package rpc_test import ( "context" + "math/big" + "testing" + "time" + "github.com/ethereum/go-ethereum/core/types" "github.com/shutter-network/encrypting-rpc-server/cache" "github.com/shutter-network/encrypting-rpc-server/rpc" "github.com/shutter-network/encrypting-rpc-server/testdata" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "math/big" - "testing" - "time" ) func initTest(t *testing.T) *rpc.EthService { diff --git a/src/rpc/tx-checker.go b/src/rpc/txchecker.go similarity index 100% rename from src/rpc/tx-checker.go rename to src/rpc/txchecker.go diff --git a/src/rpc/tx-checker_test.go b/src/rpc/txchecker_test.go similarity index 100% rename from src/rpc/tx-checker_test.go rename to src/rpc/txchecker_test.go diff --git a/src/server/server.go b/src/server/server.go index 04b232d..85aecc5 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -88,7 +88,7 @@ func NewRPCService(processor rpc.Processor, config rpc.Config, pgDb *db.Postgres } } -func (srv *server) rpcHandler(ctx context.Context) (http.Handler, error) { +func (srv *server) rpcHandler(ctx context.Context) (http.Handler, *[]rpc.RPCService, error) { rpcServices := []rpc.RPCService{ &rpc.EthService{}, } @@ -96,13 +96,9 @@ func (srv *server) rpcHandler(ctx context.Context) (http.Handler, error) { rpcServer := ethrpc.NewServer() for _, service := range rpcServices { service.Init(srv.processor, srv.config) - go service.SendTimeEvents(ctx, srv.config.DelayInSeconds) - if srv.processor.MetricsConfig.Enabled { - go srv.processor.MonitorBalance(ctx, srv.config.FetchBalanceDelay) - } err := rpcServer.RegisterName(service.Name(), service) if err != nil { - return nil, errors.Wrap(err, "error while trying to register RPCService") + return nil, nil, errors.Wrap(err, "error while trying to register RPCService") } } @@ -110,23 +106,23 @@ func (srv *server) rpcHandler(ctx context.Context) (http.Handler, error) { backend: NewReverseProxy(srv.config.BackendURL), processor: rpcServer, } - return p, nil + return p, &rpcServices, nil } -func (srv *server) setupRouter(ctx context.Context) (*chi.Mux, error) { +func (srv *server) setupRouter(ctx context.Context) (*chi.Mux, *[]rpc.RPCService, error) { router := chi.NewRouter() router.Use(middleware.Logger) router.Use(middleware.Recoverer) - handler, err := srv.rpcHandler(ctx) + handler, services, err := srv.rpcHandler(ctx) if err != nil { - return nil, err + return nil, nil, err } router.Mount("/", handler) - return router, nil + return router, services, nil } func (srv *server) Start(ctx context.Context, runner medleyService.Runner) error { - handler, err := srv.setupRouter(ctx) + handler, services, err := srv.setupRouter(ctx) if err != nil { return err @@ -136,8 +132,17 @@ func (srv *server) Start(ctx context.Context, runner medleyService.Runner) error Handler: handler, ReadHeaderTimeout: 5 * time.Second, } - - go srv.postgresDatabase.Start(ctx) + runner.Go(func() error { + srv.postgresDatabase.Start(ctx) + return nil + }) + + for _, service := range *services { + if err := runner.StartService(service); err != nil { + return err + } + } + if srv.processor.MetricsConfig.Enabled { if err := runner.StartService(srv.processor.MetricsServer); err != nil { return err diff --git a/src/testdata/test-data.go b/src/testdata/testdata.go similarity index 100% rename from src/testdata/test-data.go rename to src/testdata/testdata.go