diff --git a/api/grpcserver.go b/api/grpcserver.go index 830f41547a..d09c99b65c 100644 --- a/api/grpcserver.go +++ b/api/grpcserver.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" + "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/health" @@ -52,6 +53,7 @@ type ( GRPCServer struct { port string svr *grpc.Server + sem *semaphore.Weighted } // GRPCHandler contains the pointer to api coreservice @@ -87,6 +89,7 @@ func NewGRPCServer(core CoreService, bds *blockDAOService, grpcPort int) *GRPCSe return nil } + sem := semaphore.NewWeighted(_maxRequestLimit) gSvr := grpc.NewServer( grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_prometheus.StreamServerInterceptor, @@ -96,6 +99,20 @@ func NewGRPCServer(core CoreService, bds *blockDAOService, grpcPort int) *GRPCSe grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_prometheus.UnaryServerInterceptor, otelgrpc.UnaryServerInterceptor(), + grpc.UnaryServerInterceptor(func( + ctx context.Context, + req any, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (any, error) { + acquireCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := sem.Acquire(acquireCtx, 1); err != nil { + return nil, status.Error(codes.ResourceExhausted, "server busy") + } + defer sem.Release(1) + return handler(ctx, req) + }), grpc_recovery.UnaryServerInterceptor(RecoveryInterceptor()), )), grpc.KeepaliveEnforcementPolicy(kaep), @@ -114,6 +131,7 @@ func NewGRPCServer(core CoreService, bds *blockDAOService, grpcPort int) *GRPCSe return &GRPCServer{ port: ":" + strconv.Itoa(grpcPort), svr: gSvr, + sem: sem, } } diff --git a/api/http.go b/api/http.go index b2950051a2..27deff94a1 100644 --- a/api/http.go +++ b/api/http.go @@ -8,6 +8,7 @@ import ( "time" "go.uber.org/zap" + "golang.org/x/sync/semaphore" apitypes "github.com/iotexproject/iotex-core/v2/api/types" "github.com/iotexproject/iotex-core/v2/pkg/log" @@ -24,6 +25,7 @@ type ( // hTTPHandler handles requests from http protocol hTTPHandler struct { msgHandler Web3Handler + sem *semaphore.Weighted } ) @@ -60,6 +62,7 @@ func (hSvr *HTTPServer) Stop(ctx context.Context) error { func newHTTPHandler(web3Handler Web3Handler) *hTTPHandler { return &hTTPHandler{ msgHandler: web3Handler, + sem: semaphore.NewWeighted(_maxRequestLimit), } } @@ -71,6 +74,16 @@ func (handler *hTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) ctx, span := tracer.NewSpan(req.Context(), "http") defer span.End() + acquireCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := handler.sem.Acquire(acquireCtx, 1); err != nil { + w.WriteHeader(http.StatusTooManyRequests) + log.L().Error("fail to acquire semaphore", zap.Error(err)) + return + } + // TODO: add metrics + defer handler.sem.Release(1) + if err := handler.msgHandler.HandlePOSTReq(ctx, req.Body, apitypes.NewResponseWriter( func(resp interface{}) (int, error) { diff --git a/api/web3server.go b/api/web3server.go index 1d69c1f1ef..b09ec11d14 100644 --- a/api/web3server.go +++ b/api/web3server.go @@ -40,6 +40,8 @@ const ( _metamaskBalanceContractAddr = "io1k8uw2hrlvnfq8s2qpwwc24ws2ru54heenx8chr" // _defaultBatchRequestLimit is the default maximum number of items in a batch. _defaultBatchRequestLimit = 100 // Maximum number of items in a batch. + // _maxRequestLimit is the maximum number of concurrent requests. + _maxRequestLimit = 100 ) type (