Skip to content

Commit ba5197b

Browse files
Merge pull request #465 from Pavani-Panakanti/port_fix
Move npagent grpc call to unix sockets
2 parents 239f518 + f54ea28 commit ba5197b

File tree

3 files changed

+78
-13
lines changed

3 files changed

+78
-13
lines changed

main.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import (
5252
var (
5353
scheme = runtime.NewScheme()
5454
LOCAL_IPAMD_ADDRESS = "127.0.0.1:50051"
55+
npaSocketPath = "/var/run/aws-node/npa.sock"
5556
)
5657

5758
func init() {
@@ -134,9 +135,15 @@ func main() {
134135

135136
// CNI makes rpc calls to NP agent regardless NP is enabled or not
136137
// need to start rpc always
138+
// todo: add a liveness probe to this gRPC server and remove closing based on this errCh, liveness probe will check and re-start this container
139+
errCh, err := rpc.RunRPCHandler(policyEndpointController, npaSocketPath)
140+
if err != nil {
141+
log.Errorf("Failed to set up gRPC Handler %v", err)
142+
os.Exit(1)
143+
}
137144
go func() {
138-
if err := rpc.RunRPCHandler(policyEndpointController); err != nil {
139-
log.Errorf("Failed to set up gRPC Handler %v", err)
145+
if err := <-errCh; err != nil {
146+
log.Errorf("gRPC server stopped: %v", err)
140147
os.Exit(1)
141148
}
142149
}()

pkg/rpc/rpc_handler.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package rpc
1616
import (
1717
"context"
1818
"net"
19+
"os"
1920

2021
"github.com/aws/aws-network-policy-agent/controllers"
2122
"github.com/aws/aws-network-policy-agent/pkg/logger"
@@ -41,7 +42,6 @@ var (
4142
)
4243

4344
const (
44-
npgRPCaddress = "127.0.0.1:50052"
4545
grpcHealthServiceName = "grpc.health.v1.np-agent"
4646
)
4747

@@ -159,12 +159,21 @@ func (s *server) DeletePodNp(ctx context.Context, in *rpc.DeleteNpRequest) (*rpc
159159
}
160160

161161
// RunRPCHandler handles request from gRPC
162-
func RunRPCHandler(policyReconciler *controllers.PolicyEndpointsReconciler) error {
163-
log().Infof("Serving RPC Handler on Address: %s", npgRPCaddress)
164-
listener, err := net.Listen("tcp", npgRPCaddress)
162+
func RunRPCHandler(policyReconciler *controllers.PolicyEndpointsReconciler, npaSocketPath string) (<-chan error, error) {
163+
log().Infof("Serving RPC Handler on Unix socket: %s", npaSocketPath)
164+
165+
if _, err := os.Stat(npaSocketPath); err == nil {
166+
log().Infof("Removing stale socket file: %s", npaSocketPath)
167+
err = os.Remove(npaSocketPath)
168+
if err != nil {
169+
log().Warnf("got error in removing socket file %v", err)
170+
}
171+
}
172+
173+
listener, err := net.Listen("unix", npaSocketPath)
165174
if err != nil {
166-
log().Errorf("Failed to listen gRPC port: %v", err)
167-
return errors.Wrap(err, "network policy agent: failed to listen to gRPC port")
175+
log().Errorf("Failed to listen on unix socket: %v", err)
176+
return nil, errors.Wrap(err, "network policy agent: failed to listen on unix socket")
168177
}
169178
grpcServer := grpc.NewServer()
170179
rpc.RegisterNPBackendServer(grpcServer, &server{policyReconciler: policyReconciler})
@@ -175,10 +184,13 @@ func RunRPCHandler(policyReconciler *controllers.PolicyEndpointsReconciler) erro
175184

176185
// Register reflection service on gRPC server.
177186
reflection.Register(grpcServer)
178-
if err := grpcServer.Serve(listener); err != nil {
179-
log().Errorf("Failed to start server on gRPC port: %v", err)
180-
return errors.Wrap(err, "network policy agent: failed to start server on gPRC port")
181-
}
187+
errCh := make(chan error, 1)
188+
go func() {
189+
if err := grpcServer.Serve(listener); err != nil {
190+
errCh <- errors.Wrap(err, "network policy agent: grpc serve failed")
191+
}
192+
close(errCh)
193+
}()
182194
log().Info("Done with RPC Handler initialization")
183-
return nil
195+
return errCh, nil
184196
}

pkg/rpc/rpc_handler_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
14+
package rpc
15+
16+
import (
17+
"os"
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
)
22+
23+
func TestRunRPCHandler_NoExistingSocket(t *testing.T) {
24+
testSocketPath := "/tmp/test-rpc-handler.sock"
25+
defer os.Remove(testSocketPath)
26+
27+
errCh, err := RunRPCHandler(nil, testSocketPath)
28+
assert.Nil(t, err)
29+
assert.NotNil(t, errCh)
30+
}
31+
32+
func TestRunRPCHandler_StaleSocketCleanup(t *testing.T) {
33+
testSocketPath := "/tmp/temp-rpc-handler.sock"
34+
35+
// Create a stale socket file
36+
file, err := os.Create(testSocketPath)
37+
if err != nil {
38+
t.Fatalf("Failed to create stale socket file: %v", err)
39+
}
40+
file.Close()
41+
defer os.Remove(testSocketPath)
42+
43+
errCh, err := RunRPCHandler(nil, testSocketPath)
44+
assert.Nil(t, err)
45+
assert.NotNil(t, errCh)
46+
}

0 commit comments

Comments
 (0)