Skip to content

Commit c082480

Browse files
committed
implement basic functions
1 parent 90c5c91 commit c082480

File tree

2 files changed

+313
-31
lines changed

2 files changed

+313
-31
lines changed
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
package org.embulk.output.sftp;
2+
3+
import com.google.common.base.Throwables;
4+
import io.netty.util.internal.logging.Slf4JLoggerFactory;
5+
import org.apache.commons.logging.Log;
6+
import org.apache.commons.logging.LogFactory;
7+
import org.apache.commons.vfs2.FileObject;
8+
import org.apache.commons.vfs2.FileSystemException;
9+
import org.apache.commons.vfs2.FileSystemManager;
10+
import org.apache.commons.vfs2.FileSystemOptions;
11+
import org.apache.commons.vfs2.Selectors;
12+
import org.apache.commons.vfs2.VFS;
13+
import org.apache.commons.vfs2.impl.StandardFileSystemManager;
14+
import org.apache.commons.vfs2.provider.sftp.IdentityInfo;
15+
import org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder;
16+
import org.embulk.config.TaskReport;
17+
import org.embulk.spi.Buffer;
18+
import org.embulk.spi.Exec;
19+
import org.embulk.spi.FileOutput;
20+
import org.embulk.spi.TransactionalFileOutput;
21+
import org.slf4j.Logger;
22+
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.File;
25+
import java.io.IOException;
26+
import java.io.OutputStream;
27+
import java.net.URI;
28+
import java.net.URISyntaxException;
29+
30+
import static org.embulk.output.sftp.SftpFileOutputPlugin.*;
31+
32+
/**
33+
* Created by takahiro.nakayama on 10/20/15.
34+
*/
35+
public class SftpFileOutput
36+
implements FileOutput, TransactionalFileOutput
37+
{
38+
private final Logger logger = Exec.getLogger(SftpFileOutput.class);
39+
private final StandardFileSystemManager manager;
40+
private final FileSystemOptions fsOptions;
41+
private final String userInfo;
42+
private final String host;
43+
private final int port;
44+
private final String pathPrefix;
45+
private final String sequenceFormat;
46+
private final String fileNameExtension;
47+
48+
private final int taskIndex;
49+
private int fileIndex = 0;
50+
private FileObject currentRamFile;
51+
52+
private StandardFileSystemManager initializeStandardFileSystemManager()
53+
{
54+
// TODO: change logging format: org.apache.commons.logging.Log
55+
// System.setProperty("org.apache.commons.logging.Log",
56+
// "org.apache.commons.logging.impl.NoOpLog");
57+
StandardFileSystemManager manager = new StandardFileSystemManager();
58+
try {
59+
manager.init();
60+
}
61+
catch (FileSystemException e) {
62+
logger.error(e.getMessage());
63+
throw new RuntimeException(e);
64+
}
65+
66+
return manager;
67+
}
68+
69+
private String initializeUserInfo(PluginTask task)
70+
{
71+
String userInfo = task.getUser();
72+
if (task.getPassword().isPresent()) {
73+
userInfo += ":" + task.getPassword().get();
74+
}
75+
return userInfo;
76+
}
77+
78+
private FileSystemOptions initializeFsOptions(PluginTask task)
79+
{
80+
FileSystemOptions fsOptions = new FileSystemOptions();
81+
82+
try {
83+
SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(fsOptions, task.getUserDirIsRoot());
84+
SftpFileSystemConfigBuilder.getInstance().setStrictHostKeyChecking(fsOptions, "no");
85+
if (task.getSecretKeyFilePath().isPresent()) {
86+
IdentityInfo identityInfo = new IdentityInfo(new File((task.getSecretKeyFilePath().get())), task.getSecretKeyPassphrase().getBytes());
87+
SftpFileSystemConfigBuilder.getInstance().setIdentityInfo(fsOptions, identityInfo);
88+
}
89+
}
90+
catch (FileSystemException e) {
91+
logger.error(e.getMessage());
92+
throw new RuntimeException(e);
93+
}
94+
95+
return fsOptions;
96+
}
97+
98+
SftpFileOutput(PluginTask task, int taskIndex)
99+
{
100+
this.manager = initializeStandardFileSystemManager();
101+
this.userInfo = initializeUserInfo(task);
102+
this.fsOptions = initializeFsOptions(task);
103+
this.host = task.getHost();
104+
this.port = task.getPort();
105+
this.pathPrefix = task.getPathPrefix();
106+
this.sequenceFormat = task.getSequenceFormat();
107+
this.fileNameExtension = task.getFileNameExtension();
108+
this.taskIndex = taskIndex;
109+
}
110+
111+
@Override
112+
public void nextFile()
113+
{
114+
closeCurrentWithUpload();
115+
116+
try {
117+
currentRamFile = newRamFile(getRamUri(getOutputFilePath()));
118+
logger.info("new ram file: {}", currentRamFile.getPublicURIString());
119+
}
120+
catch (FileSystemException e) {
121+
logger.error(e.getMessage());
122+
Throwables.propagate(e);
123+
}
124+
catch (URISyntaxException e) {
125+
logger.error(e.getMessage());
126+
Throwables.propagate(e);
127+
}
128+
}
129+
130+
@Override
131+
public void add(Buffer buffer)
132+
{
133+
if (currentRamFile == null) {
134+
throw new IllegalStateException("nextFile() must be called before poll()");
135+
}
136+
137+
try {
138+
getCurrentRamFileOutputStream().write(buffer.array(), buffer.offset(), buffer.limit());
139+
}
140+
catch (IOException e) {
141+
logger.error(e.getMessage());
142+
Throwables.propagate(e);
143+
}
144+
buffer.release();
145+
}
146+
147+
@Override
148+
public void finish()
149+
{
150+
closeCurrentWithUpload();
151+
}
152+
153+
@Override
154+
public void close()
155+
{
156+
closeCurrentWithUpload();
157+
manager.close();
158+
}
159+
160+
@Override
161+
public void abort()
162+
{
163+
}
164+
165+
@Override
166+
public TaskReport commit()
167+
{
168+
return null;
169+
}
170+
171+
private void closeCurrentWithUpload()
172+
{
173+
try {
174+
closeCurrentRamFileContent();
175+
uploadCurrentRamFileToSftp();
176+
closeCurrentRamFile();
177+
}
178+
catch (FileSystemException e) {
179+
logger.error(e.getMessage());
180+
Throwables.propagate(e);
181+
}
182+
catch (URISyntaxException e) {
183+
logger.error(e.getMessage());
184+
Throwables.propagate(e);
185+
}
186+
fileIndex++;
187+
currentRamFile = null;
188+
}
189+
190+
private void closeCurrentRamFileContent()
191+
throws FileSystemException
192+
{
193+
if (currentRamFile == null) {
194+
return;
195+
}
196+
currentRamFile.getContent().close();
197+
}
198+
199+
private void uploadCurrentRamFileToSftp()
200+
throws FileSystemException, URISyntaxException
201+
{
202+
if (currentRamFile == null) {
203+
return;
204+
}
205+
206+
try (FileObject remoteSftpFile = newSftpFile(getSftpUri(getOutputFilePath()))) {
207+
remoteSftpFile.copyFrom(currentRamFile, Selectors.SELECT_SELF);
208+
logger.info("Upload: {}", remoteSftpFile.getPublicURIString());
209+
}
210+
}
211+
212+
private void closeCurrentRamFile()
213+
throws FileSystemException
214+
{
215+
if (currentRamFile == null) {
216+
return;
217+
}
218+
219+
currentRamFile.close();
220+
currentRamFile.delete();
221+
}
222+
223+
private URI getSftpUri(String remoteFilePath)
224+
throws URISyntaxException
225+
{
226+
return new URI("sftp", userInfo, host, port, remoteFilePath, null, null);
227+
}
228+
229+
private URI getRamUri(String remoteFilePath)
230+
throws URISyntaxException
231+
{
232+
return new URI("ram", null, remoteFilePath, null);
233+
}
234+
235+
private String getOutputFilePath()
236+
{
237+
return pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + fileNameExtension;
238+
}
239+
240+
private FileObject newSftpFile(URI sftpUri)
241+
throws FileSystemException
242+
{
243+
return manager.resolveFile(sftpUri.toString(), fsOptions);
244+
}
245+
246+
private FileObject newRamFile(URI ramUri)
247+
throws FileSystemException
248+
{
249+
FileObject ramFile = manager.resolveFile(ramUri);
250+
ramFile.createFile();
251+
return ramFile;
252+
}
253+
254+
private OutputStream getCurrentRamFileOutputStream()
255+
throws FileSystemException
256+
{
257+
return currentRamFile.getContent().getOutputStream(true);
258+
}
259+
}
Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,81 @@
11
package org.embulk.output.sftp;
22

3-
import java.util.List;
43
import com.google.common.base.Optional;
5-
import org.embulk.config.TaskReport;
4+
import com.google.common.base.Throwables;
5+
import org.apache.commons.logging.Log;
6+
import org.apache.commons.vfs2.FileObject;
7+
import org.apache.commons.vfs2.FileSystemException;
8+
import org.apache.commons.vfs2.FileSystemManager;
9+
import org.apache.commons.vfs2.FileSystemOptions;
10+
import org.apache.commons.vfs2.VFS;
11+
import org.apache.commons.vfs2.impl.StandardFileSystemManager;
12+
import org.apache.commons.vfs2.provider.sftp.IdentityInfo;
13+
import org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder;
614
import org.embulk.config.Config;
715
import org.embulk.config.ConfigDefault;
816
import org.embulk.config.ConfigDiff;
917
import org.embulk.config.ConfigSource;
1018
import org.embulk.config.Task;
19+
import org.embulk.config.TaskReport;
1120
import org.embulk.config.TaskSource;
21+
import org.embulk.spi.Buffer;
1222
import org.embulk.spi.Exec;
1323
import org.embulk.spi.FileOutputPlugin;
1424
import org.embulk.spi.TransactionalFileOutput;
25+
import org.slf4j.Logger;
26+
27+
import java.io.ByteArrayOutputStream;
28+
import java.io.File;
29+
import java.io.IOException;
30+
import java.io.OutputStream;
31+
import java.net.URI;
32+
import java.net.URISyntaxException;
33+
import java.util.List;
1534

1635
public class SftpFileOutputPlugin
1736
implements FileOutputPlugin
1837
{
38+
private Logger logger = Exec.getLogger(SftpFileOutputPlugin.class);
39+
1940
public interface PluginTask
2041
extends Task
2142
{
22-
// configuration option 1 (required integer)
23-
@Config("option1")
24-
public int getOption1();
43+
@Config("host")
44+
public String getHost();
45+
46+
@Config("port")
47+
@ConfigDefault("22")
48+
public int getPort();
2549

26-
// configuration option 2 (optional string, null is not allowed)
27-
@Config("optoin2")
28-
@ConfigDefault("\"myvalue\"")
29-
public String getOption2();
50+
@Config("user")
51+
public String getUser();
3052

31-
// configuration option 3 (optional string, null is allowed)
32-
@Config("optoin3")
53+
@Config("password")
3354
@ConfigDefault("null")
34-
public Optional<String> getOption3();
55+
public Optional<String> getPassword();
3556

36-
// usually, run() method needs to write multiple files because size of a file
37-
// can be very large. So, file name will be:
38-
//
39-
// path_prefix + String.format(sequence_format, taskIndex, sequenceCounterInRunMethod) + file_ext
40-
//
57+
@Config("secret_key_file")
58+
@ConfigDefault("null")
59+
public Optional<String> getSecretKeyFilePath();
60+
61+
@Config("secret_key_passphrase")
62+
@ConfigDefault("\"\"")
63+
public String getSecretKeyPassphrase();
64+
65+
@Config("user_directory_is_root")
66+
@ConfigDefault("true")
67+
public Boolean getUserDirIsRoot();
4168

42-
//@Config("path_prefix")
43-
//public String getPathPrefix();
69+
@Config("path_prefix")
70+
public String getPathPrefix();
4471

45-
//@Config("file_ext")
46-
//public String getFileNameExtension();
72+
@Config("file_ext")
73+
public String getFileNameExtension();
74+
75+
@Config("sequence_format")
76+
@ConfigDefault("\"%03d.%02d.\"")
77+
public String getSequenceFormat();
4778

48-
//@Config("sequence_format")
49-
//@ConfigDefault("\"%03d.%02d.\"")
50-
//public String getSequenceFormat();
5179
}
5280

5381
@Override
@@ -82,12 +110,7 @@ public void cleanup(TaskSource taskSource,
82110
@Override
83111
public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex)
84112
{
85-
PluginTask task = taskSource.loadTask(PluginTask.class);
86-
87-
// Write your code here :)
88-
throw new UnsupportedOperationException("SftpFileOutputPlugin.open method is not implemented yet");
89-
90-
// See LocalFileOutputPlugin as an example implementation:
91-
// https://github.com/embulk/embulk/blob/master/embulk-standards/src/main/java/org/embulk/standards/LocalFileOutputPlugin.java
113+
final PluginTask task = taskSource.loadTask(PluginTask.class);
114+
return new SftpFileOutput(task, taskIndex);
92115
}
93116
}

0 commit comments

Comments
 (0)