Skip to content

Commit b15b5ec

Browse files
committed
support alternative working file scheme
1 parent 8ab7f2c commit b15b5ec

File tree

2 files changed

+36
-36
lines changed

2 files changed

+36
-36
lines changed

src/main/java/org/embulk/output/sftp/SftpFileOutput.java

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,15 @@ public class SftpFileOutput
3535
private final String userInfo;
3636
private final String host;
3737
private final int port;
38+
private final String workingFileScheme;
3839
private final String pathPrefix;
3940
private final String sequenceFormat;
4041
private final String fileNameExtension;
4142

4243
private final int taskIndex;
4344
private int fileIndex = 0;
44-
private FileObject currentRamFile;
45-
private OutputStream currentRamFileOutputStream;
45+
private FileObject currentWorkingFile;
46+
private OutputStream currentWorkingFileOutputStream;
4647

4748
private StandardFileSystemManager initializeStandardFileSystemManager()
4849
{
@@ -99,6 +100,7 @@ private FileSystemOptions initializeFsOptions(PluginTask task)
99100
this.host = task.getHost();
100101
this.port = task.getPort();
101102
this.pathPrefix = task.getPathPrefix();
103+
this.workingFileScheme = task.getWorkingFileScheme();
102104
this.sequenceFormat = task.getSequenceFormat();
103105
this.fileNameExtension = task.getFileNameExtension();
104106
this.taskIndex = taskIndex;
@@ -110,9 +112,9 @@ public void nextFile()
110112
closeCurrentWithUpload();
111113

112114
try {
113-
currentRamFile = newRamFile(getRamUri(getOutputFilePath()));
114-
currentRamFileOutputStream = getCurrentRamFileOutputStream();
115-
logger.info("new ram file: {}", currentRamFile.getPublicURIString());
115+
currentWorkingFile = newWorkingFile(getWorkingFileUri(getOutputFilePath()));
116+
currentWorkingFileOutputStream = currentWorkingFile.getContent().getOutputStream();
117+
logger.info("new working file: {}", currentWorkingFile.getPublicURIString());
116118
}
117119
catch (FileSystemException e) {
118120
logger.error(e.getMessage());
@@ -127,12 +129,12 @@ public void nextFile()
127129
@Override
128130
public void add(Buffer buffer)
129131
{
130-
if (currentRamFile == null) {
132+
if (currentWorkingFile == null) {
131133
throw new IllegalStateException("nextFile() must be called before poll()");
132134
}
133135

134136
try {
135-
currentRamFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit());
137+
currentWorkingFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit());
136138
}
137139
catch (IOException e) {
138140
logger.error(e.getMessage());
@@ -168,9 +170,9 @@ public TaskReport commit()
168170
private void closeCurrentWithUpload()
169171
{
170172
try {
171-
closeCurrentRamFileContent();
172-
uploadCurrentRamFileToSftp();
173-
closeCurrentRamFile();
173+
closeCurrentWorkingFileContent();
174+
uploadCurrentWorkingFileToSftp();
175+
closeCurrentWorkingFile();
174176
}
175177
catch (URISyntaxException e) {
176178
logger.error(e.getMessage());
@@ -181,53 +183,53 @@ private void closeCurrentWithUpload()
181183
Throwables.propagate(e);
182184
}
183185
fileIndex++;
184-
currentRamFile = null;
186+
currentWorkingFile = null;
185187
}
186188

187-
private void closeCurrentRamFileContent()
189+
private void closeCurrentWorkingFileContent()
188190
throws IOException
189191
{
190-
if (currentRamFile == null) {
192+
if (currentWorkingFile == null) {
191193
return;
192194
}
193-
currentRamFileOutputStream.close();
194-
currentRamFile.getContent().close();
195+
currentWorkingFileOutputStream.close();
196+
currentWorkingFile.getContent().close();
195197
}
196198

197-
private void uploadCurrentRamFileToSftp()
199+
private void uploadCurrentWorkingFileToSftp()
198200
throws FileSystemException, URISyntaxException
199201
{
200-
if (currentRamFile == null) {
202+
if (currentWorkingFile == null) {
201203
return;
202204
}
203205

204-
try (FileObject remoteSftpFile = newSftpFile(getSftpUri(getOutputFilePath()))) {
205-
remoteSftpFile.copyFrom(currentRamFile, Selectors.SELECT_SELF);
206+
try (FileObject remoteSftpFile = newSftpFile(getSftpFileUri(getOutputFilePath()))) {
207+
remoteSftpFile.copyFrom(currentWorkingFile, Selectors.SELECT_SELF);
206208
logger.info("Upload: {}", remoteSftpFile.getPublicURIString());
207209
}
208210
}
209211

210-
private void closeCurrentRamFile()
212+
private void closeCurrentWorkingFile()
211213
throws FileSystemException
212214
{
213-
if (currentRamFile == null) {
215+
if (currentWorkingFile == null) {
214216
return;
215217
}
216218

217-
currentRamFile.close();
218-
currentRamFile.delete();
219+
currentWorkingFile.close();
220+
currentWorkingFile.delete();
219221
}
220222

221-
private URI getSftpUri(String remoteFilePath)
223+
private URI getSftpFileUri(String remoteFilePath)
222224
throws URISyntaxException
223225
{
224226
return new URI("sftp", userInfo, host, port, remoteFilePath, null, null);
225227
}
226228

227-
private URI getRamUri(String remoteFilePath)
229+
private URI getWorkingFileUri(String remoteFilePath)
228230
throws URISyntaxException
229231
{
230-
return new URI("ram", null, remoteFilePath, null);
232+
return new URI(workingFileScheme, null, remoteFilePath, null);
231233
}
232234

233235
private String getOutputFilePath()
@@ -241,17 +243,11 @@ private FileObject newSftpFile(URI sftpUri)
241243
return manager.resolveFile(sftpUri.toString(), fsOptions);
242244
}
243245

244-
private FileObject newRamFile(URI ramUri)
246+
private FileObject newWorkingFile(URI workingFileUri)
245247
throws FileSystemException
246248
{
247-
FileObject ramFile = manager.resolveFile(ramUri);
248-
ramFile.createFile();
249-
return ramFile;
250-
}
251-
252-
private OutputStream getCurrentRamFileOutputStream()
253-
throws FileSystemException
254-
{
255-
return currentRamFile.getContent().getOutputStream(true);
249+
FileObject workingFile = manager.resolveFile(workingFileUri);
250+
workingFile.createFile();
251+
return workingFile;
256252
}
257253
}

src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ public interface PluginTask
5353
@ConfigDefault("600") // 10 munites
5454
public int getSftpConnectionTimeout();
5555

56+
@Config("working_file_schema")
57+
@ConfigDefault("ram")
58+
public String getWorkingFileScheme();
59+
5660
@Config("path_prefix")
5761
public String getPathPrefix();
5862

0 commit comments

Comments
 (0)