Skip to content

Commit 089e36a

Browse files
committed
Add proxy support
1 parent a9c6b96 commit 089e36a

File tree

5 files changed

+265
-9
lines changed

5 files changed

+265
-9
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ dependencies {
2929
testCompile "org.embulk:embulk-core:0.8.6:tests"
3030
testCompile "org.embulk:embulk-standards:0.8.6"
3131
testCompile "org.apache.sshd:apache-sshd:1.1.0+"
32+
testCompile "org.littleshoot:littleproxy:1.1.0-beta1"
33+
testCompile "io.netty:netty-all:4.0.34.Final"
3234
}
3335

3436
jacocoTestReport {
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package org.embulk.output.sftp;
2+
3+
import com.fasterxml.jackson.annotation.JsonCreator;
4+
import com.fasterxml.jackson.annotation.JsonValue;
5+
import com.google.common.base.Optional;
6+
import org.apache.commons.vfs2.FileSystemOptions;
7+
import org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder;
8+
import org.embulk.config.Config;
9+
import org.embulk.config.ConfigDefault;
10+
import org.embulk.config.ConfigException;
11+
import org.embulk.config.Task;
12+
13+
import java.util.Locale;
14+
15+
interface ProxyTask
16+
extends Task
17+
{
18+
@Config("type")
19+
public ProxyType getType();
20+
21+
@Config("host")
22+
public Optional<String> getHost();
23+
24+
@Config("user")
25+
@ConfigDefault("null")
26+
public Optional<String> getUser();
27+
28+
@Config("password")
29+
@ConfigDefault("null")
30+
public Optional<String> getPassword();
31+
32+
@Config("port")
33+
@ConfigDefault("22")
34+
public int getPort();
35+
36+
@Config("command")
37+
@ConfigDefault("null")
38+
public Optional<String> getCommand();
39+
40+
public enum ProxyType
41+
{
42+
HTTP,
43+
SOCKS,
44+
STREAM;
45+
46+
@JsonValue
47+
@Override
48+
public String toString()
49+
{
50+
return name().toLowerCase(Locale.ENGLISH);
51+
}
52+
53+
@JsonCreator
54+
public static ProxyType fromString(String value)
55+
{
56+
switch (value) {
57+
case "http":
58+
return HTTP;
59+
case "socks":
60+
return SOCKS;
61+
case "stream":
62+
return STREAM;
63+
default:
64+
throw new ConfigException(String.format("Unknown proxy type '%s'. Supported proxy types are http, socks, stream", value));
65+
}
66+
}
67+
68+
public static SftpFileSystemConfigBuilder setProxyType(SftpFileSystemConfigBuilder builder, FileSystemOptions fsOptions, ProxyTask.ProxyType type)
69+
{
70+
SftpFileSystemConfigBuilder.ProxyType setType = null;
71+
switch (type) {
72+
case HTTP:
73+
setType = SftpFileSystemConfigBuilder.PROXY_HTTP;
74+
break;
75+
case SOCKS:
76+
setType = SftpFileSystemConfigBuilder.PROXY_SOCKS5;
77+
break;
78+
case STREAM:
79+
setType = SftpFileSystemConfigBuilder.PROXY_STREAM;
80+
}
81+
builder.setProxyType(fsOptions, setType);
82+
return builder;
83+
}
84+
}
85+
}

src/main/java/org/embulk/output/sftp/SftpFileOutput.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,17 +79,41 @@ private FileSystemOptions initializeFsOptions(PluginTask task)
7979
FileSystemOptions fsOptions = new FileSystemOptions();
8080

8181
try {
82-
SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(fsOptions, task.getUserDirIsRoot());
83-
SftpFileSystemConfigBuilder.getInstance().setTimeout(fsOptions, task.getSftpConnectionTimeout());
84-
SftpFileSystemConfigBuilder.getInstance().setStrictHostKeyChecking(fsOptions, "no");
82+
SftpFileSystemConfigBuilder builder = SftpFileSystemConfigBuilder.getInstance();
83+
builder.setUserDirIsRoot(fsOptions, task.getUserDirIsRoot());
84+
builder.setTimeout(fsOptions, task.getSftpConnectionTimeout());
85+
builder.setStrictHostKeyChecking(fsOptions, "no");
8586
if (task.getSecretKeyFilePath().isPresent()) {
8687
IdentityInfo identityInfo = new IdentityInfo(
8788
new File((task.getSecretKeyFilePath().transform(localFileToPathString()).get())),
8889
task.getSecretKeyPassphrase().getBytes()
8990
);
90-
SftpFileSystemConfigBuilder.getInstance().setIdentityInfo(fsOptions, identityInfo);
91+
builder.setIdentityInfo(fsOptions, identityInfo);
9192
logger.info("set identity: {}", task.getSecretKeyFilePath().get());
9293
}
94+
95+
if (task.getProxy().isPresent()) {
96+
ProxyTask proxy = task.getProxy().get();
97+
98+
ProxyTask.ProxyType.setProxyType(builder, fsOptions, proxy.getType());
99+
100+
if (proxy.getHost().isPresent()) {
101+
builder.setProxyHost(fsOptions, proxy.getHost().get());
102+
builder.setProxyPort(fsOptions, proxy.getPort());
103+
}
104+
105+
if (proxy.getUser().isPresent()) {
106+
builder.setProxyUser(fsOptions, proxy.getUser().get());
107+
}
108+
109+
if (proxy.getPassword().isPresent()) {
110+
builder.setProxyPassword(fsOptions, proxy.getPassword().get());
111+
}
112+
113+
if (proxy.getCommand().isPresent()) {
114+
builder.setProxyCommand(fsOptions, proxy.getCommand().get());
115+
}
116+
}
93117
}
94118
catch (FileSystemException e) {
95119
logger.error(e.getMessage());

src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ public interface PluginTask
6868
@Config("sequence_format")
6969
@ConfigDefault("\"%03d.%02d.\"")
7070
public String getSequenceFormat();
71+
72+
@Config("proxy")
73+
@ConfigDefault("null")
74+
public Optional<ProxyTask> getProxy();
7175
}
7276

7377
@Override

src/test/java/org/embulk/output/sftp/TestSftpFileOutputPlugin.java

Lines changed: 146 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.sshd.server.session.ServerSession;
1717
import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
1818
import org.embulk.EmbulkTestRuntime;
19+
import org.embulk.config.ConfigException;
1920
import org.embulk.config.ConfigLoader;
2021
import org.embulk.config.ConfigSource;
2122
import org.embulk.config.TaskReport;
@@ -36,6 +37,8 @@
3637
import org.junit.Test;
3738
import org.junit.rules.ExpectedException;
3839
import org.junit.rules.TemporaryFolder;
40+
import org.littleshoot.proxy.HttpProxyServer;
41+
import org.littleshoot.proxy.impl.DefaultHttpProxyServer;
3942
import org.slf4j.Logger;
4043

4144
import java.io.File;
@@ -74,6 +77,8 @@ public class TestSftpFileOutputPlugin
7477
private SshServer sshServer;
7578
private static final String HOST = "127.0.0.1";
7679
private static final int PORT = 20022;
80+
private static final String PROXY_HOST = "127.0.0.1";
81+
private static final int PROXY_PORT = 8080;
7782
private static final String USERNAME = "username";
7883
private static final String PASSWORD = "password";
7984
private static final String SECRET_KEY_FILE = Resources.getResource("id_rsa").getPath();
@@ -94,13 +99,18 @@ public void createResources()
9499
SftpFileOutputPlugin sftpFileOutputPlugin = new SftpFileOutputPlugin();
95100
runner = new FileOutputRunner(sftpFileOutputPlugin);
96101

102+
sshServer = createSshServer(HOST, PORT, USERNAME, PASSWORD);
103+
}
104+
105+
private SshServer createSshServer(String host, int port, final String sshUsername, final String sshPassword)
106+
{
97107
// setup a mock sftp server
98-
sshServer = SshServer.setUpDefaultServer();
108+
SshServer sshServer = SshServer.setUpDefaultServer();
99109
VirtualFileSystemFactory fsFactory = new VirtualFileSystemFactory();
100-
fsFactory.setUserHomeDir(USERNAME, testFolder.getRoot().toPath());
110+
fsFactory.setUserHomeDir(sshUsername, testFolder.getRoot().toPath());
101111
sshServer.setFileSystemFactory(fsFactory);
102-
sshServer.setHost(HOST);
103-
sshServer.setPort(PORT);
112+
sshServer.setHost(host);
113+
sshServer.setPort(port);
104114
sshServer.setSubsystemFactories(Collections.<NamedFactory<Command>>singletonList(new SftpSubsystemFactory()));
105115
sshServer.setCommandFactory(new ScpCommandFactory());
106116
sshServer.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
@@ -109,7 +119,7 @@ public void createResources()
109119
@Override
110120
public boolean authenticate(final String username, final String password, final ServerSession session)
111121
{
112-
return USERNAME.contentEquals(username) && PASSWORD.contentEquals(password);
122+
return sshUsername.contentEquals(username) && sshPassword.contentEquals(password);
113123
}
114124
});
115125
sshServer.setPublickeyAuthenticator(new PublickeyAuthenticator()
@@ -127,6 +137,14 @@ public boolean authenticate(String username, PublicKey key, ServerSession sessio
127137
catch (IOException e) {
128138
logger.debug(e.getMessage(), e);
129139
}
140+
return sshServer;
141+
}
142+
143+
private HttpProxyServer createProxyServer(int port)
144+
{
145+
return DefaultHttpProxyServer.bootstrap()
146+
.withPort(port)
147+
.start();
130148
}
131149

132150
@After
@@ -269,6 +287,49 @@ public void testConfigValuesIncludingDefault()
269287
assertEquals(pathPrefix, task.getPathPrefix());
270288
assertEquals("txt", task.getFileNameExtension());
271289
assertEquals("%03d.%02d.", task.getSequenceFormat());
290+
assertEquals(Optional.absent(), task.getProxy());
291+
}
292+
293+
@Test
294+
public void testConfigValuesIncludingProxy()
295+
{
296+
// setting embulk config
297+
final String pathPrefix = "/test/testUserPassword";
298+
String configYaml = "" +
299+
"type: sftp\n" +
300+
"host: " + HOST + "\n" +
301+
"user: " + USERNAME + "\n" +
302+
"path_prefix: " + pathPrefix + "\n" +
303+
"file_ext: txt\n" +
304+
"proxy: \n" +
305+
" type: http\n" +
306+
" host: proxy_host\n" +
307+
" port: 80 \n" +
308+
" user: proxy_user\n" +
309+
" password: proxy_pass\n" +
310+
" command: proxy_command\n" +
311+
"formatter:\n" +
312+
" type: csv\n" +
313+
" newline: CRLF\n" +
314+
" newline_in_field: LF\n" +
315+
" header_line: true\n" +
316+
" charset: UTF-8\n" +
317+
" quote_policy: NONE\n" +
318+
" quote: \"\\\"\"\n" +
319+
" escape: \"\\\\\"\n" +
320+
" null_string: \"\"\n" +
321+
" default_timezone: 'UTC'";
322+
323+
ConfigSource config = getConfigFromYaml(configYaml);
324+
PluginTask task = config.loadConfig(PluginTask.class);
325+
326+
ProxyTask proxy = task.getProxy().get();
327+
assertEquals("proxy_command", proxy.getCommand().get());
328+
assertEquals("proxy_host", proxy.getHost().get());
329+
assertEquals("proxy_user", proxy.getUser().get());
330+
assertEquals("proxy_pass", proxy.getPassword().get());
331+
assertEquals(80, proxy.getPort());
332+
assertEquals(ProxyTask.ProxyType.HTTP, proxy.getType());
272333
}
273334

274335
// Cases
@@ -353,6 +414,60 @@ public void testUserSecretKeyFileAndPutToRootDirectory()
353414
pathPrefix));
354415
}
355416

417+
@Test
418+
public void testUserSecretKeyFileWithProxy()
419+
{
420+
HttpProxyServer proxyServer = null;
421+
try {
422+
proxyServer = createProxyServer(PROXY_PORT);
423+
424+
// setting embulk config
425+
final String pathPrefix = "/test/testUserPassword";
426+
String configYaml = "" +
427+
"type: sftp\n" +
428+
"host: " + HOST + "\n" +
429+
"port: " + PORT + "\n" +
430+
"user: " + USERNAME + "\n" +
431+
"secret_key_file: " + SECRET_KEY_FILE + "\n" +
432+
"secret_key_passphrase: " + SECRET_KEY_PASSPHRASE + "\n" +
433+
"path_prefix: " + testFolder.getRoot().getAbsolutePath() + pathPrefix + "\n" +
434+
"file_ext: txt\n" +
435+
"proxy: \n" +
436+
" type: http\n" +
437+
" host: " + PROXY_HOST + "\n" +
438+
" port: " + PROXY_PORT + " \n" +
439+
" user: " + USERNAME + "\n" +
440+
" password: " + PASSWORD + "\n" +
441+
" command: \n" +
442+
"formatter:\n" +
443+
" type: csv\n" +
444+
" newline: CRLF\n" +
445+
" newline_in_field: LF\n" +
446+
" header_line: true\n" +
447+
" charset: UTF-8\n" +
448+
" quote_policy: NONE\n" +
449+
" quote: \"\\\"\"\n" +
450+
" escape: \"\\\\\"\n" +
451+
" null_string: \"\"\n" +
452+
" default_timezone: 'UTC'";
453+
454+
// runner.transaction -> ...
455+
run(configYaml, Optional.<Integer>absent());
456+
457+
List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath()));
458+
assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt")));
459+
460+
assertRecordsInFile(String.format("%s/%s001.00.txt",
461+
testFolder.getRoot().getAbsolutePath(),
462+
pathPrefix));
463+
}
464+
finally {
465+
if (proxyServer != null) {
466+
proxyServer.stop();
467+
}
468+
}
469+
}
470+
356471
@Test
357472
public void testTimeout()
358473
{
@@ -388,4 +503,30 @@ public void testTimeout()
388503
// runner.transaction -> ...
389504
run(configYaml, Optional.of(60)); // sleep 1 minute while processing
390505
}
506+
507+
@Test
508+
public void testProxyType()
509+
{
510+
// test valueOf()
511+
assertEquals("http", ProxyTask.ProxyType.valueOf("HTTP").toString());
512+
assertEquals("socks", ProxyTask.ProxyType.valueOf("SOCKS").toString());
513+
assertEquals("stream", ProxyTask.ProxyType.valueOf("STREAM").toString());
514+
try {
515+
ProxyTask.ProxyType.valueOf("non-existing-type");
516+
}
517+
catch (Exception e) {
518+
assertEquals(IllegalArgumentException.class, e.getClass());
519+
}
520+
521+
// test fromString
522+
assertEquals(ProxyTask.ProxyType.HTTP, ProxyTask.ProxyType.fromString("http"));
523+
assertEquals(ProxyTask.ProxyType.SOCKS, ProxyTask.ProxyType.fromString("socks"));
524+
assertEquals(ProxyTask.ProxyType.STREAM, ProxyTask.ProxyType.fromString("stream"));
525+
try {
526+
ProxyTask.ProxyType.fromString("non-existing-type");
527+
}
528+
catch (Exception e) {
529+
assertEquals(ConfigException.class, e.getClass());
530+
}
531+
}
391532
}

0 commit comments

Comments
 (0)