Skip to content

Commit 4642f2f

Browse files
javier-aliagakgal-akl
authored andcommitted
chore: Handle sftp reconnections (dapr#4075)
Signed-off-by: Javier Aliaga <javier@diagrid.io> Signed-off-by: Kobbi Gal <kobbi.g@akeyless.io>
1 parent 12c9e05 commit 4642f2f

File tree

1 file changed

+233
-12
lines changed

1 file changed

+233
-12
lines changed

bindings/sftp/sftp_integration_test.go

Lines changed: 233 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,92 @@
1+
//go:build integration_test
2+
3+
/*
4+
Copyright 2025 The Dapr Authors
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
116
package sftp
217

318
import (
19+
"context"
420
"encoding/json"
21+
"math/rand"
522
"os"
23+
"sync/atomic"
624
"testing"
25+
"time"
726

827
"github.com/stretchr/testify/assert"
928
"github.com/stretchr/testify/require"
1029

1130
"github.com/dapr/components-contrib/bindings"
31+
"github.com/dapr/components-contrib/tests/certification/flow"
32+
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
33+
sftp "github.com/dapr/components-contrib/tests/utils/sftpproxy"
34+
"github.com/dapr/kit/logger"
1235
)
1336

14-
var connectionStringEnvKey = "DAPR_TEST_SFTP_CONNSTRING"
37+
const (
38+
ProxySftp = "0.0.0.0:2223"
39+
ConnectionString = "0.0.0.0:2222"
40+
)
1541

16-
// Run docker from the file location as the upload folder is relative to the test
17-
// docker run -v ./upload:/home/foo/upload -p 2222:22 -d atmoz/sftp foo:pass:1001
1842
func TestIntegrationCases(t *testing.T) {
19-
connectionString := os.Getenv(connectionStringEnvKey)
20-
if connectionString == "" {
21-
t.Skipf(`sftp binding integration tests skipped. To enable this test, define the connection string using environment variable '%[1]s' (example 'export %[1]s="localhost:2222")'`, connectionStringEnvKey)
22-
}
23-
43+
cleanUp := setupSftp(t)
44+
defer cleanUp()
45+
time.Sleep(1 * time.Second)
2446
t.Run("List operation", testListOperation)
2547
t.Run("Create operation", testCreateOperation)
48+
t.Run("Reconnections", testReconnect)
49+
}
50+
51+
func setupSftp(t *testing.T) func() {
52+
dc := dockercompose.New("sftp", "docker-compose.yaml")
53+
ctx := flow.Context{
54+
T: t,
55+
Context: t.Context(),
56+
Flow: nil,
57+
}
58+
err := dc.Up(ctx)
59+
60+
if err != nil {
61+
t.Fatal(err)
62+
}
63+
64+
return func() { dc.Down(ctx) }
2665
}
2766

2867
func testListOperation(t *testing.T) {
29-
c := Sftp{}
68+
proxy := &sftp.Proxy{
69+
ListenAddr: ProxySftp,
70+
UpstreamAddr: ConnectionString,
71+
}
72+
73+
defer proxy.Close()
74+
go proxy.ListenAndServe()
75+
76+
c := Sftp{
77+
logger: logger.NewLogger("sftp"),
78+
}
79+
3080
m := bindings.Metadata{}
81+
3182
m.Properties = map[string]string{
3283
"rootPath": "/upload",
33-
"address": os.Getenv(connectionStringEnvKey),
84+
"address": ProxySftp,
3485
"username": "foo",
3586
"password": "pass",
3687
"insecureIgnoreHostKey": "true",
3788
}
89+
3890
err := c.Init(t.Context(), m)
3991
require.NoError(t, err)
4092

@@ -45,14 +97,24 @@ func testListOperation(t *testing.T) {
4597
var d []listResponse
4698
err = json.Unmarshal(r.Data, &d)
4799
require.NoError(t, err)
100+
101+
assert.EqualValues(t, 1, proxy.ReconnectionCount.Load())
48102
}
49103

50104
func testCreateOperation(t *testing.T) {
51-
c := Sftp{}
105+
proxy := &sftp.Proxy{
106+
ListenAddr: ProxySftp,
107+
UpstreamAddr: ConnectionString,
108+
}
109+
defer proxy.Close()
110+
go proxy.ListenAndServe()
111+
c := Sftp{
112+
logger: logger.NewLogger("sftp"),
113+
}
52114
m := bindings.Metadata{}
53115
m.Properties = map[string]string{
54116
"rootPath": "/upload",
55-
"address": os.Getenv(connectionStringEnvKey),
117+
"address": ProxySftp,
56118
"username": "foo",
57119
"password": "pass",
58120
"insecureIgnoreHostKey": "true",
@@ -79,4 +141,163 @@ func testCreateOperation(t *testing.T) {
79141
file, err := os.Stat("./upload/test.txt")
80142
require.NoError(t, err)
81143
assert.Equal(t, "test.txt", file.Name())
144+
assert.EqualValues(t, 1, proxy.ReconnectionCount.Load())
145+
}
146+
147+
func testReconnect(t *testing.T) {
148+
proxy := &sftp.Proxy{
149+
ListenAddr: ProxySftp,
150+
UpstreamAddr: ConnectionString,
151+
}
152+
defer proxy.Close()
153+
go proxy.ListenAndServe()
154+
155+
c := Sftp{
156+
logger: logger.NewLogger("sftp"),
157+
}
158+
159+
m := bindings.Metadata{}
160+
161+
m.Properties = map[string]string{
162+
"rootPath": "/upload",
163+
"address": ProxySftp,
164+
"username": "foo",
165+
"password": "pass",
166+
"insecureIgnoreHostKey": "true",
167+
}
168+
169+
err := c.Init(t.Context(), m)
170+
require.NoError(t, err)
171+
172+
t.Run("List operation", func(t *testing.T) {
173+
r, err := c.Invoke(t.Context(), &bindings.InvokeRequest{Operation: bindings.ListOperation})
174+
require.NoError(t, err)
175+
assert.NotNil(t, r.Data)
176+
177+
_ = proxy.KillServerConn()
178+
179+
r, err = c.Invoke(t.Context(), &bindings.InvokeRequest{Operation: bindings.ListOperation})
180+
require.NoError(t, err)
181+
assert.NotNil(t, r.Data)
182+
183+
var d []listResponse
184+
err = json.Unmarshal(r.Data, &d)
185+
require.NoError(t, err)
186+
187+
assert.EqualValues(t, 2, proxy.ReconnectionCount.Load())
188+
})
189+
190+
t.Run("List delete - no reconnection", func(t *testing.T) {
191+
numReconnects := proxy.ReconnectionCount.Load()
192+
_, err := c.Invoke(t.Context(), &bindings.InvokeRequest{
193+
Operation: bindings.DeleteOperation,
194+
Metadata: map[string]string{
195+
"fileName": "file_does_not_exist.txt",
196+
},
197+
})
198+
199+
require.Error(t, err)
200+
201+
assert.EqualValues(t, numReconnects, proxy.ReconnectionCount.Load())
202+
})
203+
204+
t.Run("List delete - reconnection", func(t *testing.T) {
205+
numReconnects := proxy.ReconnectionCount.Load()
206+
_ = proxy.KillServerConn()
207+
_, err := c.Invoke(t.Context(), &bindings.InvokeRequest{
208+
Operation: bindings.DeleteOperation,
209+
Metadata: map[string]string{
210+
"fileName": "file_does_not_exist.txt",
211+
},
212+
})
213+
214+
require.Error(t, err)
215+
216+
assert.EqualValues(t, numReconnects+1, proxy.ReconnectionCount.Load())
217+
})
218+
219+
t.Run("List get - no reconnection", func(t *testing.T) {
220+
numReconnects := proxy.ReconnectionCount.Load()
221+
_, err := c.Invoke(t.Context(), &bindings.InvokeRequest{
222+
Operation: bindings.GetOperation,
223+
Metadata: map[string]string{
224+
"fileName": "file_does_not_exist.txt",
225+
},
226+
})
227+
228+
require.Error(t, err)
229+
230+
assert.EqualValues(t, numReconnects, proxy.ReconnectionCount.Load())
231+
})
232+
233+
t.Run("List get - reconnection", func(t *testing.T) {
234+
numReconnects := proxy.ReconnectionCount.Load()
235+
_ = proxy.KillServerConn()
236+
_, err := c.Invoke(t.Context(), &bindings.InvokeRequest{
237+
Operation: bindings.GetOperation,
238+
Metadata: map[string]string{
239+
"fileName": "file_does_not_exist.txt",
240+
},
241+
})
242+
243+
require.Error(t, err)
244+
245+
assert.EqualValues(t, numReconnects+1, proxy.ReconnectionCount.Load())
246+
})
247+
248+
t.Run("Parallel ops - reconnection", func(t *testing.T) {
249+
numReconnects := proxy.ReconnectionCount.Load()
250+
ctx, cancelFn := context.WithCancel(t.Context())
251+
opCount := atomic.Int32{}
252+
opFailed := atomic.Int32{}
253+
for range 10 {
254+
go func(ctx context.Context) {
255+
for {
256+
select {
257+
case <-ctx.Done():
258+
return
259+
case <-time.After(time.Duration(100*rand.Float32()) * time.Millisecond):
260+
opCount.Add(1)
261+
r, err := c.Invoke(t.Context(), &bindings.InvokeRequest{Operation: bindings.ListOperation})
262+
if err != nil {
263+
opFailed.Add(1)
264+
break
265+
}
266+
267+
assert.NotNil(t, r.Data)
268+
}
269+
}
270+
}(ctx)
271+
}
272+
273+
go func(ctx context.Context) {
274+
for {
275+
select {
276+
case <-ctx.Done():
277+
return
278+
case <-time.After(1 * time.Second):
279+
_ = proxy.KillServerConn()
280+
}
281+
}
282+
283+
}(ctx)
284+
285+
time.Sleep(time.Second * 5)
286+
cancelFn()
287+
288+
totalOps := opCount.Load()
289+
failedOps := opFailed.Load()
290+
291+
// Calculate 5% tolerance
292+
tolerance := float64(totalOps) * 0.05
293+
294+
// Assert that failed operations are within 1% of total operations
295+
assert.InDelta(t, 0, failedOps, tolerance,
296+
"Expected less than 1%% of operations to fail. Total: %d, Failed: %d (%.2f%%)",
297+
totalOps, failedOps, (float64(failedOps)/float64(totalOps))*100)
298+
299+
expectedReconnects := numReconnects + 5
300+
currentReconnects := proxy.ReconnectionCount.Load()
301+
assert.InDelta(t, expectedReconnects, currentReconnects, 2.0, "Expected %d reconnections, got %d", expectedReconnects, currentReconnects)
302+
})
82303
}

0 commit comments

Comments
 (0)