Skip to content

Commit 3c0c855

Browse files
committed
Remove retry of openStream(), improve uploadFile() (closing resources sequentially, add re-connect to onRetry())
1 parent c0308df commit 3c0c855

File tree

1 file changed

+18
-15
lines changed

1 file changed

+18
-15
lines changed

src/main/java/org/embulk/output/sftp/SftpUtils.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.slf4j.Logger;
2121

2222
import java.io.BufferedOutputStream;
23+
import java.io.Closeable;
2324
import java.io.File;
2425
import java.io.FileInputStream;
2526
import java.io.IOException;
@@ -39,7 +40,7 @@
3940
public class SftpUtils
4041
{
4142
private final Logger logger = Exec.getLogger(SftpUtils.class);
42-
private final DefaultFileSystemManager manager;
43+
private DefaultFileSystemManager manager;
4344
private final FileSystemOptions fsOptions;
4445
private final String userInfo;
4546
private final String user;
@@ -159,12 +160,23 @@ public Void call() throws Exception
159160
final FileObject remoteFile = newSftpFile(getSftpFileUri(remotePath));
160161
final BufferedOutputStream outputStream = openStream(remoteFile);
161162
// When channel is broken, closing resource may hang, hence the time-out wrapper
162-
// Note: closing FileObject will also close OutputStream
163-
try (final TimeoutCloser ignored1 = new TimeoutCloser(outputStream);
164-
final TimeoutCloser ignored2 = new TimeoutCloser(remoteFile)) {
163+
try (final TimeoutCloser ignored = new TimeoutCloser(outputStream)) {
165164
appendFile(localTempFile, remoteFile, outputStream);
166165
return null;
167166
}
167+
finally {
168+
// closing sequentially
169+
new TimeoutCloser(remoteFile).close();
170+
}
171+
}
172+
173+
@Override
174+
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait)
175+
{
176+
super.onRetry(exception, retryCount, retryLimit, retryWait);
177+
// re-connect
178+
manager.close();
179+
manager = initializeStandardFileSystemManager();
168180
}
169181
});
170182
}
@@ -256,18 +268,9 @@ FileObject resolve(final String remoteFilePath) throws FileSystemException
256268
return manager.resolveFile(getSftpFileUri(remoteFilePath).toString(), fsOptions);
257269
}
258270

259-
BufferedOutputStream openStream(final FileObject remoteFile)
271+
BufferedOutputStream openStream(final FileObject remoteFile) throws FileSystemException
260272
{
261-
// output stream is already a BufferedOutputStream, no need to wrap
262-
final String taskName = "SFTP open stream";
263-
return withRetry(new DefaultRetry<BufferedOutputStream>(taskName)
264-
{
265-
@Override
266-
public BufferedOutputStream call() throws Exception
267-
{
268-
return new BufferedOutputStream(remoteFile.getContent().getOutputStream());
269-
}
270-
});
273+
return new BufferedOutputStream(remoteFile.getContent().getOutputStream());
271274
}
272275

273276
URI getSftpFileUri(String remoteFilePath)

0 commit comments

Comments
 (0)