Skip to content

Commit 68fc8c7

Browse files
authored
Merge pull request #51 from embulk/fix_stalling_issue
Fix stalling issue when closing `remoteFile`
2 parents 91a4a63 + 443c44c commit 68fc8c7

File tree

4 files changed

+32
-22
lines changed

4 files changed

+32
-22
lines changed

src/main/java/org/embulk/output/sftp/SftpUtils.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.slf4j.Logger;
2121

2222
import java.io.BufferedOutputStream;
23+
import java.io.Closeable;
2324
import java.io.File;
2425
import java.io.FileInputStream;
2526
import java.io.IOException;
@@ -39,7 +40,7 @@
3940
public class SftpUtils
4041
{
4142
private final Logger logger = Exec.getLogger(SftpUtils.class);
42-
private final DefaultFileSystemManager manager;
43+
private DefaultFileSystemManager manager;
4344
private final FileSystemOptions fsOptions;
4445
private final String userInfo;
4546
private final String user;
@@ -159,15 +160,24 @@ public Void call() throws Exception
159160
final FileObject remoteFile = newSftpFile(getSftpFileUri(remotePath));
160161
final BufferedOutputStream outputStream = openStream(remoteFile);
161162
// When channel is broken, closing resource may hang, hence the time-out wrapper
162-
// Note: closing FileObject will also close OutputStream
163-
try (TimeoutCloser ignored = new TimeoutCloser(outputStream)) {
163+
try (final TimeoutCloser ignored = new TimeoutCloser(outputStream)) {
164164
appendFile(localTempFile, remoteFile, outputStream);
165165
return null;
166166
}
167167
finally {
168-
remoteFile.close();
168+
// closing sequentially
169+
new TimeoutCloser(remoteFile).close();
169170
}
170171
}
172+
173+
@Override
174+
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait)
175+
{
176+
super.onRetry(exception, retryCount, retryLimit, retryWait);
177+
// re-connect
178+
manager.close();
179+
manager = initializeStandardFileSystemManager();
180+
}
171181
});
172182
}
173183

@@ -258,18 +268,9 @@ FileObject resolve(final String remoteFilePath) throws FileSystemException
258268
return manager.resolveFile(getSftpFileUri(remoteFilePath).toString(), fsOptions);
259269
}
260270

261-
BufferedOutputStream openStream(final FileObject remoteFile)
271+
BufferedOutputStream openStream(final FileObject remoteFile) throws FileSystemException
262272
{
263-
// output stream is already a BufferedOutputStream, no need to wrap
264-
final String taskName = "SFTP open stream";
265-
return withRetry(new DefaultRetry<BufferedOutputStream>(taskName)
266-
{
267-
@Override
268-
public BufferedOutputStream call() throws Exception
269-
{
270-
return new BufferedOutputStream(remoteFile.getContent().getOutputStream());
271-
}
272-
});
273+
return new BufferedOutputStream(remoteFile.getContent().getOutputStream());
273274
}
274275

275276
URI getSftpFileUri(String remoteFilePath)

src/main/java/org/embulk/output/sftp/utils/TimeoutCloser.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010

1111
public class TimeoutCloser implements Closeable
1212
{
13-
@VisibleForTesting
14-
int timeout = 300; // 5 minutes
13+
private static int timeout = 300; // 5 minutes
1514
private Closeable wrapped;
1615

1716
public TimeoutCloser(Closeable wrapped)
@@ -39,4 +38,10 @@ public Void call() throws Exception
3938
throw Throwables.propagate(e);
4039
}
4140
}
41+
42+
@VisibleForTesting
43+
public static void setTimeout(int timeout)
44+
{
45+
TimeoutCloser.timeout = timeout;
46+
}
4247
}

src/test/java/org/embulk/output/sftp/TestSftpFileOutputPlugin.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -768,7 +768,7 @@ public void testResolveWithoutRetry()
768768
}
769769

770770
@Test
771-
public void testOpenStreamWithRetry() throws FileSystemException
771+
public void testOpenStreamWithoutRetry() throws FileSystemException
772772
{
773773
SftpFileOutputPlugin.PluginTask task = defaultTask();
774774
SftpUtils utils = new SftpUtils(task);
@@ -778,9 +778,13 @@ public void testOpenStreamWithRetry() throws FileSystemException
778778
.doCallRealMethod()
779779
.when(mock).getContent();
780780

781-
OutputStream stream = utils.openStream(mock);
782-
assertNotNull(stream);
783-
Mockito.verify(mock, Mockito.times(2)).getContent();
781+
try {
782+
utils.openStream(mock);
783+
fail("Should not reach here");
784+
}
785+
catch (FileSystemException e) {
786+
Mockito.verify(mock, Mockito.times(1)).getContent();
787+
}
784788
}
785789

786790
@Test

src/test/java/org/embulk/output/sftp/utils/TestTimeoutCloser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void close()
3232
}
3333
}
3434
});
35-
closer.timeout = 1;
35+
TimeoutCloser.setTimeout(1);
3636
try {
3737
closer.close();
3838
fail("Should not finish");

0 commit comments

Comments
 (0)