1717import org .embulk .spi .unit .LocalFile ;
1818import org .slf4j .Logger ;
1919
20+ import java .lang .Void ;
2021import java .io .File ;
2122import java .io .IOException ;
2223import java .io .OutputStream ;
@@ -144,7 +145,7 @@ public void nextFile()
144145
145146 try {
146147 currentFile = newSftpFile (getSftpFileUri (getOutputFilePath ()));
147- currentFileOutputStream = currentFile . getContent (). getOutputStream ( );
148+ currentFileOutputStream = newSftpOutputStream ( currentFile );
148149 logger .info ("new sftp file: {}" , currentFile .getPublicURIString ());
149150 }
150151 catch (FileSystemException e ) {
@@ -154,14 +155,26 @@ public void nextFile()
154155 }
155156
156157 @ Override
157- public void add (Buffer buffer )
158+ public void add (final Buffer buffer )
158159 {
159160 if (currentFile == null ) {
160161 throw new IllegalStateException ("nextFile() must be called before poll()" );
161162 }
162163
163164 try {
164- currentFileOutputStream .write (buffer .array (), buffer .offset (), buffer .limit ());
165+ Retriable <Void > retriable = new Retriable <Void >() {
166+ public Void execute () throws IOException
167+ {
168+ currentFileOutputStream .write (buffer .array (), buffer .offset (), buffer .limit ());
169+ return null ;
170+ }
171+ };
172+ try {
173+ withConnectionRetry (retriable );
174+ }
175+ catch (Exception e ) {
176+ throw (IOException )e ;
177+ }
165178 }
166179 catch (IOException e ) {
167180 logger .error (e .getMessage ());
@@ -204,18 +217,21 @@ private void closeCurrentFile()
204217
205218 try {
206219 currentFileOutputStream .close ();
207- currentFile .getContent ().close ();
208- currentFile .close ();
209220 }
210221 catch (IOException e ) {
211- logger .error (e .getMessage ());
212- Throwables .propagate (e );
222+ logger .info (e .getMessage ());
213223 }
214- finally {
215- fileIndex ++;
216- currentFile = null ;
217- currentFileOutputStream = null ;
224+
225+ try {
226+ currentFile .close ();
218227 }
228+ catch (FileSystemException e ) {
229+ logger .warn (e .getMessage ());
230+ }
231+
232+ fileIndex ++;
233+ currentFile = null ;
234+ currentFileOutputStream = null ;
219235 }
220236
221237 private URI getSftpFileUri (String remoteFilePath )
@@ -234,24 +250,25 @@ private String getOutputFilePath()
234250 return pathPrefix + String .format (sequenceFormat , taskIndex , fileIndex ) + fileNameExtension ;
235251 }
236252
237- private FileObject newSftpFile (URI sftpUri )
238- throws FileSystemException
253+ interface Retriable <T >
239254 {
255+ /**
256+ * Execute the operation with the given (or null) return value.
257+ *
258+ * @return any return value from the operation
259+ * @throws Exception
260+ */
261+ public T execute () throws Exception ;
262+ }
263+
264+ private <T > T withConnectionRetry ( final Retriable <T > op ) throws Exception {
240265 int count = 0 ;
241266 while (true ) {
242267 try {
243- FileObject file = manager .resolveFile (sftpUri .toString (), fsOptions );
244- if (file .getParent ().exists ()) {
245- logger .info ("parent directory {} exists there" , file .getParent ());
246- return file ;
247- }
248- else {
249- logger .info ("trying to create parent directory {}" , file .getParent ());
250- file .getParent ().createFolder ();
251- }
268+ return op .execute ();
252269 }
253- catch ( FileSystemException e ) {
254- if (++count == maxConnectionRetry ) {
270+ catch ( final Exception e ) {
271+ if (++count > maxConnectionRetry ) {
255272 throw e ;
256273 }
257274 logger .warn ("failed to connect sftp server: " + e .getMessage (), e );
@@ -270,6 +287,48 @@ private FileObject newSftpFile(URI sftpUri)
270287 }
271288 }
272289
290+ private FileObject newSftpFile (final URI sftpUri )
291+ throws FileSystemException
292+ {
293+ Retriable <FileObject > retriable = new Retriable <FileObject >() {
294+ public FileObject execute () throws FileSystemException
295+ {
296+ FileObject file = manager .resolveFile (sftpUri .toString (), fsOptions );
297+ if (file .getParent ().exists ()) {
298+ logger .info ("parent directory {} exists there" , file .getParent ());
299+ }
300+ else {
301+ logger .info ("trying to create parent directory {}" , file .getParent ());
302+ file .getParent ().createFolder ();
303+ }
304+ return file ;
305+ }
306+ };
307+ try {
308+ return withConnectionRetry (retriable );
309+ }
310+ catch (Exception e ) {
311+ throw (FileSystemException )e ;
312+ }
313+ }
314+
315+ private OutputStream newSftpOutputStream (final FileObject file )
316+ throws FileSystemException
317+ {
318+ Retriable <OutputStream > retriable = new Retriable <OutputStream >() {
319+ public OutputStream execute () throws FileSystemException
320+ {
321+ return file .getContent ().getOutputStream ();
322+ }
323+ };
324+ try {
325+ return withConnectionRetry (retriable );
326+ }
327+ catch (Exception e ) {
328+ throw (FileSystemException )e ;
329+ }
330+ }
331+
273332 private Function <LocalFile , String > localFileToPathString ()
274333 {
275334 return new Function <LocalFile , String >()
0 commit comments