Skip to content

Commit 498ae20

Browse files
lonnbladMichal Witkowski
authored andcommitted
Added retry for connection errors on server stream call (#161)
* Added retry on server stream call * Updated interceptor suite with restart server and added test cases for stream connection retry * Fixed a typo when initilizaing the restart channel in InterceptorTestSuite * Added a delay when stopping the interceptor suite server * Added another test for ServerStream CallFailsOnDeadlineExceeded * Actually validates the errors in the stream call retry-tc's
1 parent 15ea740 commit 498ae20

File tree

4 files changed

+128
-40
lines changed

4 files changed

+128
-40
lines changed

retry/retry.go

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -85,22 +85,45 @@ func StreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientIntercepto
8585
if desc.ClientStreams {
8686
return nil, grpc.Errorf(codes.Unimplemented, "grpc_retry: cannot retry on ClientStreams, set grpc_retry.Disable()")
8787
}
88-
logTrace(parentCtx, "grpc_retry attempt: %d, no backoff for this call", 0)
89-
callCtx := perCallContext(parentCtx, callOpts, 0)
90-
newStreamer, err := streamer(callCtx, desc, cc, method, grpcOpts...)
91-
if err != nil {
92-
// TODO(mwitkow): Maybe dial and transport errors should be retriable?
93-
return nil, err
94-
}
95-
retryingStreamer := &serverStreamingRetryingStream{
96-
ClientStream: newStreamer,
97-
callOpts: callOpts,
98-
parentCtx: parentCtx,
99-
streamerCall: func(ctx context.Context) (grpc.ClientStream, error) {
100-
return streamer(ctx, desc, cc, method, grpcOpts...)
101-
},
88+
89+
var lastErr error
90+
for attempt := uint(0); attempt < callOpts.max; attempt++ {
91+
if err := waitRetryBackoff(attempt, parentCtx, callOpts); err != nil {
92+
return nil, err
93+
}
94+
callCtx := perCallContext(parentCtx, callOpts, 0)
95+
96+
var newStreamer grpc.ClientStream
97+
newStreamer, lastErr = streamer(callCtx, desc, cc, method, grpcOpts...)
98+
if lastErr == nil {
99+
retryingStreamer := &serverStreamingRetryingStream{
100+
ClientStream: newStreamer,
101+
callOpts: callOpts,
102+
parentCtx: parentCtx,
103+
streamerCall: func(ctx context.Context) (grpc.ClientStream, error) {
104+
return streamer(ctx, desc, cc, method, grpcOpts...)
105+
},
106+
}
107+
return retryingStreamer, nil
108+
}
109+
110+
logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr)
111+
if isContextError(lastErr) {
112+
if parentCtx.Err() != nil {
113+
logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err())
114+
// its the parent context deadline or cancellation.
115+
return nil, lastErr
116+
} else {
117+
logTrace(parentCtx, "grpc_retry attempt: %d, context error from retry call", attempt)
118+
// its the callCtx deadline or cancellation, in which case try again.
119+
continue
120+
}
121+
}
122+
if !isRetriable(lastErr, callOpts) {
123+
return nil, lastErr
124+
}
102125
}
103-
return retryingStreamer, nil
126+
return nil, lastErr
104127
}
105128
}
106129

retry/retry_test.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ import (
1111

1212
pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto"
1313

14-
"github.com/grpc-ecosystem/go-grpc-middleware"
15-
"github.com/grpc-ecosystem/go-grpc-middleware/retry"
14+
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
15+
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
1616
"github.com/grpc-ecosystem/go-grpc-middleware/testing"
1717

18+
"github.com/stretchr/testify/assert"
1819
"github.com/stretchr/testify/require"
1920
"github.com/stretchr/testify/suite"
2021
"golang.org/x/net/context"
@@ -31,16 +32,18 @@ var (
3132

3233
type failingService struct {
3334
pb_testproto.TestServiceServer
35+
mu sync.Mutex
36+
3437
reqCounter uint
3538
reqModulo uint
3639
reqSleep time.Duration
3740
reqError codes.Code
38-
mu sync.Mutex
3941
}
4042

4143
func (s *failingService) resetFailingConfiguration(modulo uint, errorCode codes.Code, sleepTime time.Duration) {
4244
s.mu.Lock()
4345
defer s.mu.Unlock()
46+
4447
s.reqCounter = 0
4548
s.reqModulo = modulo
4649
s.reqError = errorCode
@@ -242,6 +245,38 @@ func (s *RetrySuite) TestServerStream_PerCallDeadline_FailsOnParent() {
242245
require.Equal(s.T(), codes.DeadlineExceeded, grpc.Code(err), "failre code must be a gRPC error of Deadline class")
243246
}
244247

248+
func (s *RetrySuite) TestServerStream_CallFailsOnOutOfRetries() {
249+
restarted := s.RestartServer(3 * retryTimeout)
250+
_, err := s.Client.PingList(s.SimpleCtx(), goodPing)
251+
252+
require.Error(s.T(), err, "establishing the connection should not succeed")
253+
assert.Equal(s.T(), codes.Unavailable, grpc.Code(err))
254+
255+
<-restarted
256+
}
257+
258+
func (s *RetrySuite) TestServerStream_CallFailsOnDeadlineExceeded() {
259+
restarted := s.RestartServer(3 * retryTimeout)
260+
ctx, _ := context.WithTimeout(context.TODO(), retryTimeout)
261+
_, err := s.Client.PingList(ctx, goodPing)
262+
263+
require.Error(s.T(), err, "establishing the connection should not succeed")
264+
assert.Equal(s.T(), codes.DeadlineExceeded, grpc.Code(err))
265+
266+
<-restarted
267+
}
268+
269+
func (s *RetrySuite) TestServerStream_CallRetrySucceeds() {
270+
restarted := s.RestartServer(retryTimeout)
271+
272+
_, err := s.Client.PingList(s.SimpleCtx(), goodPing,
273+
grpc_retry.WithMax(40),
274+
)
275+
276+
assert.NoError(s.T(), err, "establishing the connection should succeed")
277+
<-restarted
278+
}
279+
245280
func (s *RetrySuite) assertPingListWasCorrect(stream pb_testproto.TestService_PingListClient) {
246281
count := 0
247282
for {

testing/interceptor_suite.go

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,36 +36,66 @@ type InterceptorTestSuite struct {
3636
ServerOpts []grpc.ServerOption
3737
ClientOpts []grpc.DialOption
3838

39+
serverAddr string
3940
ServerListener net.Listener
4041
Server *grpc.Server
4142
clientConn *grpc.ClientConn
4243
Client pb_testproto.TestServiceClient
44+
45+
restartServerWithDelayedStart chan time.Duration
46+
serverRunning chan bool
4347
}
4448

4549
func (s *InterceptorTestSuite) SetupSuite() {
46-
var err error
47-
s.ServerListener, err = net.Listen("tcp", "127.0.0.1:0")
48-
require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
49-
if *flagTls {
50-
creds, err := credentials.NewServerTLSFromFile(
51-
path.Join(getTestingCertsPath(), "localhost.crt"),
52-
path.Join(getTestingCertsPath(), "localhost.key"),
53-
)
54-
require.NoError(s.T(), err, "failed reading server credentials for localhost.crt")
55-
s.ServerOpts = append(s.ServerOpts, grpc.Creds(creds))
56-
}
57-
// This is the point where we hook up the interceptor
58-
s.Server = grpc.NewServer(s.ServerOpts...)
59-
// Crete a service of the instantiator hasn't provided one.
60-
if s.TestService == nil {
61-
s.TestService = &TestPingService{T: s.T()}
62-
}
63-
pb_testproto.RegisterTestServiceServer(s.Server, s.TestService)
50+
s.restartServerWithDelayedStart = make(chan time.Duration)
51+
s.serverRunning = make(chan bool)
52+
53+
s.serverAddr = "127.0.0.1:0"
6454

6555
go func() {
66-
s.Server.Serve(s.ServerListener)
56+
for {
57+
var err error
58+
s.ServerListener, err = net.Listen("tcp", s.serverAddr)
59+
s.serverAddr = s.ServerListener.Addr().String()
60+
require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
61+
if *flagTls {
62+
creds, err := credentials.NewServerTLSFromFile(
63+
path.Join(getTestingCertsPath(), "localhost.crt"),
64+
path.Join(getTestingCertsPath(), "localhost.key"),
65+
)
66+
require.NoError(s.T(), err, "failed reading server credentials for localhost.crt")
67+
s.ServerOpts = append(s.ServerOpts, grpc.Creds(creds))
68+
}
69+
// This is the point where we hook up the interceptor
70+
s.Server = grpc.NewServer(s.ServerOpts...)
71+
// Crete a service of the instantiator hasn't provided one.
72+
if s.TestService == nil {
73+
s.TestService = &TestPingService{T: s.T()}
74+
}
75+
pb_testproto.RegisterTestServiceServer(s.Server, s.TestService)
76+
77+
go func() {
78+
s.Server.Serve(s.ServerListener)
79+
}()
80+
if s.Client == nil {
81+
s.Client = s.NewClient(s.ClientOpts...)
82+
}
83+
84+
s.serverRunning <- true
85+
86+
d := <-s.restartServerWithDelayedStart
87+
s.Server.Stop()
88+
time.Sleep(d)
89+
}
6790
}()
68-
s.Client = s.NewClient(s.ClientOpts...)
91+
92+
<-s.serverRunning
93+
}
94+
95+
func (s *InterceptorTestSuite) RestartServer(delayedStart time.Duration) <-chan bool {
96+
s.restartServerWithDelayedStart <- delayedStart
97+
time.Sleep(10 * time.Millisecond)
98+
return s.serverRunning
6999
}
70100

71101
func (s *InterceptorTestSuite) NewClient(dialOpts ...grpc.DialOption) pb_testproto.TestServiceClient {
@@ -84,7 +114,7 @@ func (s *InterceptorTestSuite) NewClient(dialOpts ...grpc.DialOption) pb_testpro
84114
}
85115

86116
func (s *InterceptorTestSuite) ServerAddr() string {
87-
return s.ServerListener.Addr().String()
117+
return s.serverAddr
88118
}
89119

90120
func (s *InterceptorTestSuite) SimpleCtx() context.Context {

tracing/opentracing/interceptors_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func (jaegerFormatInjector) Inject(ctx mocktracer.MockSpanContext, carrier inter
245245
type jaegerFormatExtractor struct{}
246246

247247
func (jaegerFormatExtractor) Extract(carrier interface{}) (mocktracer.MockSpanContext, error) {
248-
rval := mocktracer.MockSpanContext{0, 0, true, nil}
248+
rval := mocktracer.MockSpanContext{Sampled: true}
249249
reader, ok := carrier.(opentracing.TextMapReader)
250250
if !ok {
251251
return rval, opentracing.ErrInvalidCarrier

0 commit comments

Comments
 (0)