Skip to content
Draft
14 changes: 6 additions & 8 deletions src/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ type TransactionInfo struct {

type Cache struct {
sync.RWMutex
Data map[string]TransactionInfo
WaitingForReceiptCache map[string]bool //the key here should be tx hash
DelayFactor int64
Data map[string]TransactionInfo
DelayFactor int64
}

type ProcessTxEntryResp struct {
Expand All @@ -27,9 +26,8 @@ type ProcessTxEntryResp struct {

func NewCache(delayFactor int64) *Cache {
return &Cache{
Data: make(map[string]TransactionInfo),
DelayFactor: delayFactor,
WaitingForReceiptCache: make(map[string]bool),
Data: make(map[string]TransactionInfo),
DelayFactor: delayFactor,
}
}

Expand All @@ -38,8 +36,8 @@ func (c *Cache) Key(tx *types.Transaction) (string, error) {
if err != nil {
return "", err
}

return fmt.Sprintf("%s-%d", fromAddress.Hex(), tx.Nonce()), nil
isCancellation := utils.IsCancellationTransaction(tx, fromAddress)
return fmt.Sprintf("%s-%d-%t", fromAddress.Hex(), tx.Nonce(), isCancellation), nil
}

func (c *Cache) UpdateEntry(key string, tx *types.Transaction, cachedTime int64) {
Expand Down
4 changes: 2 additions & 2 deletions src/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/shutter-network/encrypting-rpc-server/db"
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/url"
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/metricsserver"
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
)

type Processor struct {
Expand Down Expand Up @@ -40,10 +41,9 @@ type Config struct {

type RPCService interface {
Name() string
NewTimeEvent(ctx context.Context, newTime int64)
SendRawTransaction(ctx context.Context, s string) (*common.Hash, error)
Init(processor Processor, config Config)
SendTimeEvents(ctx context.Context, delayInSeconds int)
Start(ctx context.Context, group service.Runner) error
}

type EthereumClient interface {
Expand Down
155 changes: 72 additions & 83 deletions src/rpc/service_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
txtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/shutter-network/encrypting-rpc-server/cache"
"github.com/shutter-network/encrypting-rpc-server/db"
"github.com/shutter-network/encrypting-rpc-server/metrics"
"github.com/shutter-network/encrypting-rpc-server/utils"
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/identitypreimage"
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
"github.com/shutter-network/shutter/shlib/shcrypto"
)

Expand Down Expand Up @@ -61,14 +61,14 @@ func (s *EthService) Name() string {
return "eth"
}

func (s *EthService) SendTimeEvents(ctx context.Context, delayInSeconds int) {
func (s *EthService) SendTimeEvents(ctx context.Context, delayInSeconds int) error {
timer := time.NewTicker(time.Duration(delayInSeconds) * time.Second)

for {
select {
case <-ctx.Done():
utils.Logger.Info().Msg("Stopping because context is done.")
return
return nil

case tickTime := <-timer.C:
newTime := tickTime.Unix()
Expand All @@ -88,22 +88,49 @@ func (s *EthService) NewTimeEvent(ctx context.Context, newTime int64) {
delete(s.Cache.Data, key)

if info.Tx != nil {
utils.Logger.Debug().Msgf("Sending transaction [%s]", info.Tx.Hash().Hex())
rawTxBytes, err := info.Tx.MarshalBinary()
if err != nil {
utils.Logger.Error().Err(err).Msg("Failed to marshal data")
utils.Logger.Debug().Msgf("Checking for tx inclusion [%s]", info.Tx.Hash().Hex())
receipt, err := s.Processor.Client.TransactionReceipt(ctx, info.Tx.Hash())
if err == nil {
block, err := s.Processor.Client.BlockByHash(ctx, receipt.BlockHash)
if err != nil {
utils.Logger.Debug().Msgf("Error getting block | blockHash: %s", receipt.BlockHash.String())
continue
}
s.Processor.Db.FinaliseTx(db.TransactionDetails{
TxHash: info.Tx.Hash().String(),
InclusionTime: block.Time(),
})
continue
}
fromAddress, err := utils.SenderAddress(info.Tx)
if err != nil {
utils.Logger.Error().Err(err).Msg("Failed to get sender address from tx in cache")

rawTx := "0x" + common.Bytes2Hex(rawTxBytes)
txHash, err := s.SendRawTransaction(ctx, rawTx)
}

if err != nil {
metrics.ErrorReturnedGauge.Dec()
utils.Logger.Error().Err(err).Msgf("Failed to send transaction.")
if errors.Is(err, ethereum.NotFound) {
if !utils.IsCancellationTransaction(info.Tx, fromAddress) {
utils.Logger.Debug().Msgf("Transaction not yet mined | re-sending | txHash: %s", info.Tx.Hash().String())
rawTxBytes, err := info.Tx.MarshalBinary()
if err != nil {
utils.Logger.Error().Err(err).Msg("Failed to marshal data")
continue
}
rawTx := "0x" + common.Bytes2Hex(rawTxBytes)

txHash, err := s.SendRawTransaction(ctx, rawTx)
if err != nil {
metrics.ErrorReturnedGauge.Dec()
utils.Logger.Error().Err(err).Msgf("Failed to send transaction.")
continue
}
utils.Logger.Info().Msg("Transaction sent internally: " + txHash.Hex())
}
} else {
utils.Logger.Debug().Msgf("receipt retrieval failed | txHash: %s | err: %W", info.Tx.Hash().String(), err)
continue
}

utils.Logger.Info().Msg("Transaction sent internally: " + txHash.Hex())
}
}
}
Expand Down Expand Up @@ -196,16 +223,19 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c
metrics.CancellationTxGauge.Inc()
utils.Logger.Info().Msg("Transaction forwarded with hash: " + txHash.Hex())

service.Processor.Db.InsertNewTx(db.TransactionDetails{
Address: fromAddress.String(),
Nonce: tx.Nonce(),
TxHash: txHash.String(),
SubmissionTime: time.Now().Unix(),
IsCancellation: true,
})
statuses, err := service.Cache.ProcessTxEntry(tx, time.Now().Unix())
if err != nil {
utils.Logger.Debug().Msgf("cancellation tx | error in generating key | err: %v", err)
} else if statuses.UpdateStatus {
service.Processor.Db.InsertNewTx(db.TransactionDetails{
Address: fromAddress.String(),
Nonce: tx.Nonce(),
TxHash: txHash.String(),
IsCancellation: true,
})

}

_ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(service.Config.WaitMinedInterval)*10*time.Second)
go service.WaitTillMined(_ctx, cancelFunc, tx, service.Config.WaitMinedInterval)
return &txHash, nil
}

Expand All @@ -219,9 +249,10 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c
utils.Logger.Info().Hex("Tx hash", txHash.Bytes()).Msg("Transaction delayed")
if statuses.UpdateStatus { // this is the same tx, just requested more than once so we do not add it to db
service.Processor.Db.InsertNewTx(db.TransactionDetails{
Address: fromAddress.String(),
Nonce: tx.Nonce(),
TxHash: txHash.String(),
Address: fromAddress.String(),
Nonce: tx.Nonce(),
TxHash: txHash.String(),
IsCancellation: false,
})
}
return &txHash, nil
Expand All @@ -239,11 +270,9 @@ func (service *EthService) SendRawTransaction(ctx context.Context, s string) (*c
TxHash: txHash.String(),
EncryptedTxHash: submitTx.Hash().String(),
SubmissionTime: time.Now().Unix(),
IsCancellation: false,
})

_ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(service.Config.WaitMinedInterval)*10*time.Second)
go service.WaitTillMined(_ctx, cancelFunc, tx, service.Config.WaitMinedInterval)

metrics.RequestedGasLimit.Observe(float64(tx.Gas()))
metrics.TotalRequestDuration.Observe(float64(time.Since(timeBefore).Seconds()))

Expand Down Expand Up @@ -308,69 +337,17 @@ var DefaultProcessTransaction = func(tx *txtypes.Transaction, ctx context.Contex
return submitTx, nil
}

func (s *EthService) WaitTillMined(ctx context.Context, cancelFunc context.CancelFunc, tx *types.Transaction, waitMinedInterval int) {
key := tx.Hash().String()
value := s.Cache.WaitingForReceiptCache[key]

if !value {
queryTicker := time.NewTicker(time.Duration(waitMinedInterval) * time.Second)
defer queryTicker.Stop()
s.Cache.WaitingForReceiptCache[key] = true
utils.Logger.Info().Msgf("New tx recorded to check for inclusion | txHash: %s", tx.Hash().String())
for {
if s.Cache.WaitingForReceiptCache[key] {
receipt, err := s.Processor.Client.TransactionReceipt(ctx, tx.Hash())
if err == nil {
delete(s.Cache.WaitingForReceiptCache, key)
block, err := s.Processor.Client.BlockByHash(ctx, receipt.BlockHash)
if err != nil {
utils.Logger.Debug().Msgf("Error getting block | blockHash: %s", receipt.BlockHash.String())
cancelFunc()

}

s.Processor.Db.FinaliseTx(db.TransactionDetails{
TxHash: tx.Hash().String(),
InclusionTime: block.Time(),
})
cancelFunc()
}

if errors.Is(err, ethereum.NotFound) {
utils.Logger.Debug().Msgf("Transaction not yet mined | txHash: %s", tx.Hash().String())
} else {
delete(s.Cache.WaitingForReceiptCache, key)
utils.Logger.Debug().Msgf("receipt retrieval failed | txHash: %s | err: %W", tx.Hash().String(), err)
cancelFunc()
}

} else {
// return if tx is explicitely set to not check
cancelFunc()
}
// Wait for the next round.
select {
case <-ctx.Done():
// deleting cache here as we have stopped waiting for tx inclusion
delete(s.Cache.WaitingForReceiptCache, key)
return
case <-queryTicker.C:
}
}
}
}

func (p *Processor) MonitorBalance(ctx context.Context, delayInSeconds int) {
func (s *EthService) monitorBalance(ctx context.Context, delayInSeconds int) error {
timer := time.NewTicker(time.Duration(delayInSeconds) * time.Second)

for {
select {
case <-ctx.Done():
utils.Logger.Info().Msg("Stopping because context is done.")
return
return ctx.Err()

case <-timer.C:
balance, err := p.Client.BalanceAt(ctx, *p.SigningAddress, nil)
balance, err := s.Processor.Client.BalanceAt(ctx, *s.Processor.SigningAddress, nil)
if err != nil {
utils.Logger.Err(err).Msg("Failed to get balance")
continue
Expand All @@ -392,3 +369,15 @@ func returnError(status int, msg error) *EncodingError {
Err: msg,
}
}

func (s *EthService) Start(ctx context.Context, group service.Runner) error {
group.Go(func() error {
return s.SendTimeEvents(ctx, s.Config.DelayInSeconds)
})
if s.Processor.MetricsConfig.Enabled {
group.Go(func() error {
return s.monitorBalance(ctx, s.Config.FetchBalanceDelay)
})
}
return nil
}
7 changes: 4 additions & 3 deletions src/rpc/service_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package rpc_test

import (
"context"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/shutter-network/encrypting-rpc-server/cache"
"github.com/shutter-network/encrypting-rpc-server/rpc"
"github.com/shutter-network/encrypting-rpc-server/testdata"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"math/big"
"testing"
"time"
)

func initTest(t *testing.T) *rpc.EthService {
Expand Down
File renamed without changes.
File renamed without changes.
33 changes: 19 additions & 14 deletions src/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,45 +88,41 @@ func NewRPCService(processor rpc.Processor, config rpc.Config, pgDb *db.Postgres
}
}

func (srv *server) rpcHandler(ctx context.Context) (http.Handler, error) {
func (srv *server) rpcHandler(ctx context.Context) (http.Handler, *[]rpc.RPCService, error) {
rpcServices := []rpc.RPCService{
&rpc.EthService{},
}

rpcServer := ethrpc.NewServer()
for _, service := range rpcServices {
service.Init(srv.processor, srv.config)
go service.SendTimeEvents(ctx, srv.config.DelayInSeconds)
if srv.processor.MetricsConfig.Enabled {
go srv.processor.MonitorBalance(ctx, srv.config.FetchBalanceDelay)
}
err := rpcServer.RegisterName(service.Name(), service)
if err != nil {
return nil, errors.Wrap(err, "error while trying to register RPCService")
return nil, nil, errors.Wrap(err, "error while trying to register RPCService")
}
}

p := &JSONRPCProxy{
backend: NewReverseProxy(srv.config.BackendURL),
processor: rpcServer,
}
return p, nil
return p, &rpcServices, nil
}

func (srv *server) setupRouter(ctx context.Context) (*chi.Mux, error) {
func (srv *server) setupRouter(ctx context.Context) (*chi.Mux, *[]rpc.RPCService, error) {
router := chi.NewRouter()
router.Use(middleware.Logger)
router.Use(middleware.Recoverer)
handler, err := srv.rpcHandler(ctx)
handler, services, err := srv.rpcHandler(ctx)
if err != nil {
return nil, err
return nil, nil, err
}
router.Mount("/", handler)
return router, nil
return router, services, nil
}

func (srv *server) Start(ctx context.Context, runner medleyService.Runner) error {
handler, err := srv.setupRouter(ctx)
handler, services, err := srv.setupRouter(ctx)

if err != nil {
return err
Expand All @@ -136,8 +132,17 @@ func (srv *server) Start(ctx context.Context, runner medleyService.Runner) error
Handler: handler,
ReadHeaderTimeout: 5 * time.Second,
}

go srv.postgresDatabase.Start(ctx)
runner.Go(func() error {
srv.postgresDatabase.Start(ctx)
return nil
})

for _, service := range *services {
if err := runner.StartService(service); err != nil {
return err
}
}

if srv.processor.MetricsConfig.Enabled {
if err := runner.StartService(srv.processor.MetricsServer); err != nil {
return err
Expand Down
File renamed without changes.