|
1 | 1 | package org.embulk.output.sftp; |
2 | 2 |
|
| 3 | +import com.google.common.base.Charsets; |
| 4 | +import com.google.common.base.Optional; |
| 5 | +import com.google.common.base.Splitter; |
| 6 | +import com.google.common.base.Throwables; |
| 7 | +import com.google.common.collect.Lists; |
| 8 | +import com.google.common.io.Resources; |
| 9 | +import org.apache.commons.vfs2.FileSystemException; |
| 10 | +import org.apache.sshd.common.NamedFactory; |
| 11 | +import org.apache.sshd.common.file.virtualfs.VirtualFileSystemFactory; |
| 12 | +import org.apache.sshd.server.Command; |
| 13 | +import org.apache.sshd.server.SshServer; |
| 14 | +import org.apache.sshd.server.auth.password.PasswordAuthenticator; |
| 15 | +import org.apache.sshd.server.auth.pubkey.PublickeyAuthenticator; |
| 16 | +import org.apache.sshd.server.command.ScpCommandFactory; |
| 17 | +import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; |
| 18 | +import org.apache.sshd.server.session.ServerSession; |
| 19 | +import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory; |
| 20 | +import org.embulk.EmbulkTestRuntime; |
| 21 | +import org.embulk.config.ConfigLoader; |
| 22 | +import org.embulk.config.ConfigSource; |
| 23 | +import org.embulk.config.TaskReport; |
| 24 | +import org.embulk.config.TaskSource; |
| 25 | +import org.embulk.output.sftp.SftpFileOutputPlugin.PluginTask; |
| 26 | +import org.embulk.spi.Exec; |
| 27 | +import org.embulk.spi.FileOutputRunner; |
| 28 | +import org.embulk.spi.OutputPlugin.Control; |
| 29 | +import org.embulk.spi.Page; |
| 30 | +import org.embulk.spi.PageTestUtils; |
| 31 | +import org.embulk.spi.Schema; |
| 32 | +import org.embulk.spi.TransactionalPageOutput; |
| 33 | +import org.embulk.spi.time.Timestamp; |
| 34 | +import org.hamcrest.CoreMatchers; |
| 35 | +import org.hamcrest.Matcher; |
| 36 | +import org.junit.After; |
| 37 | +import org.junit.Before; |
| 38 | +import org.junit.Rule; |
| 39 | +import org.junit.Test; |
| 40 | +import org.junit.rules.ExpectedException; |
| 41 | +import org.junit.rules.TemporaryFolder; |
| 42 | +import org.slf4j.Logger; |
| 43 | + |
| 44 | +import java.io.File; |
| 45 | +import java.io.IOException; |
| 46 | +import java.nio.file.DirectoryStream; |
| 47 | +import java.nio.file.FileSystems; |
| 48 | +import java.nio.file.Files; |
| 49 | +import java.nio.file.Path; |
| 50 | +import java.nio.file.PathMatcher; |
| 51 | +import java.nio.file.Paths; |
| 52 | +import java.security.PublicKey; |
| 53 | +import java.util.Arrays; |
| 54 | +import java.util.Collections; |
| 55 | +import java.util.List; |
| 56 | + |
| 57 | +import static com.google.common.io.Files.readLines; |
| 58 | +import static org.embulk.spi.type.Types.*; |
| 59 | +import static org.hamcrest.CoreMatchers.containsString; |
| 60 | +import static org.hamcrest.CoreMatchers.hasItem; |
| 61 | +import static org.hamcrest.CoreMatchers.instanceOf; |
| 62 | +import static org.hamcrest.MatcherAssert.*; |
| 63 | +import static org.hamcrest.core.Is.is; |
| 64 | +import static org.junit.Assert.assertEquals; |
| 65 | +//import static org.hamcrest.Matchers.*; |
| 66 | + |
3 | 67 | public class TestSftpFileOutputPlugin |
4 | 68 | { |
| 69 | + @Rule |
| 70 | + public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); |
| 71 | + |
| 72 | + @Rule |
| 73 | + public ExpectedException exception = ExpectedException.none(); |
| 74 | + |
| 75 | + @Rule |
| 76 | + public TemporaryFolder testFolder = new TemporaryFolder(); |
| 77 | + |
| 78 | + private Logger logger = runtime.getExec().getLogger(TestSftpFileOutputPlugin.class); |
| 79 | + private FileOutputRunner runner; |
| 80 | + private SshServer sshServer; |
| 81 | + private static final String HOST = "localhost"; |
| 82 | + private static final int PORT = 20022; |
| 83 | + private static final String USERNAME = "username"; |
| 84 | + private static final String PASSWORD = "password"; |
| 85 | + private static final String SECRET_KEY_FILE = Resources.getResource("id_rsa").getPath(); |
| 86 | + private static final String SECRET_KEY_PASSPHRASE = "SECRET_KEY_PASSPHRASE"; |
| 87 | + private static final Schema SCHEMA = new Schema.Builder() |
| 88 | + .add("_c0", BOOLEAN) |
| 89 | + .add("_c1", LONG) |
| 90 | + .add("_c2", DOUBLE) |
| 91 | + .add("_c3", STRING) |
| 92 | + .add("_c4", TIMESTAMP) |
| 93 | + .build(); |
| 94 | + |
| 95 | + @Before |
| 96 | + public void createResources() |
| 97 | + throws IOException |
| 98 | + { |
| 99 | + // setup the plugin |
| 100 | + SftpFileOutputPlugin sftpFileOutputPlugin = new SftpFileOutputPlugin(); |
| 101 | + runner = new FileOutputRunner(sftpFileOutputPlugin); |
| 102 | + |
| 103 | + // setup a mock sftp server |
| 104 | + sshServer = SshServer.setUpDefaultServer(); |
| 105 | + VirtualFileSystemFactory fsFactory = new VirtualFileSystemFactory(); |
| 106 | + fsFactory.setUserHomeDir(USERNAME, testFolder.getRoot().getAbsolutePath()); |
| 107 | + sshServer.setFileSystemFactory(fsFactory); |
| 108 | + sshServer.setHost(HOST); |
| 109 | + sshServer.setPort(PORT); |
| 110 | + sshServer.setSubsystemFactories(Collections.<NamedFactory<Command>>singletonList(new SftpSubsystemFactory())); |
| 111 | + sshServer.setCommandFactory(new ScpCommandFactory()); |
| 112 | + sshServer.setKeyPairProvider(new SimpleGeneratorHostKeyProvider()); |
| 113 | +// sshServer.setPasswordAuthenticator(new PasswordAuthenticator() |
| 114 | +// { |
| 115 | +// @Override |
| 116 | +// public boolean authenticate(final String username, final String password, final ServerSession session) |
| 117 | +// { |
| 118 | +// return USERNAME.contentEquals(username) && PASSWORD.contentEquals(password); |
| 119 | +// } |
| 120 | +// }); |
| 121 | + sshServer.setPublickeyAuthenticator(new PublickeyAuthenticator() |
| 122 | + { |
| 123 | + @Override |
| 124 | + public boolean authenticate(String username, PublicKey key, ServerSession session) |
| 125 | + { |
| 126 | + return true; |
| 127 | + } |
| 128 | + }); |
| 129 | + |
| 130 | + try { |
| 131 | + sshServer.start(); |
| 132 | + } |
| 133 | + catch (IOException e) { |
| 134 | + logger.debug(e.getMessage(), e); |
| 135 | + } |
| 136 | + } |
| 137 | + |
| 138 | + @After |
| 139 | + public void cleanup() throws InterruptedException { |
| 140 | + try { |
| 141 | + sshServer.stop(true); |
| 142 | + } |
| 143 | + catch (Exception e) { |
| 144 | + logger.debug(e.getMessage(), e); |
| 145 | + } |
| 146 | + } |
| 147 | + |
| 148 | + private ConfigSource getConfigFromYaml(String yaml) |
| 149 | + { |
| 150 | + ConfigLoader loader = new ConfigLoader(Exec.getModelManager()); |
| 151 | + return loader.fromYamlString(yaml); |
| 152 | + } |
| 153 | + |
| 154 | + private List<String> lsR(List<String> fileNames, Path dir) { |
| 155 | + try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) { |
| 156 | + for (Path path : stream) { |
| 157 | + if(path.toFile().isDirectory()) { |
| 158 | + lsR(fileNames, path); |
| 159 | + } else { |
| 160 | + fileNames.add(path.toAbsolutePath().toString()); |
| 161 | + } |
| 162 | + } |
| 163 | + } |
| 164 | + catch(IOException e) { |
| 165 | + logger.debug(e.getMessage(), e); |
| 166 | + } |
| 167 | + return fileNames; |
| 168 | + } |
| 169 | + |
| 170 | + private void run(String configYaml, final Optional<Integer> sleep) |
| 171 | + { |
| 172 | + ConfigSource config = getConfigFromYaml(configYaml); |
| 173 | + runner.transaction(config, SCHEMA, 1, new Control() |
| 174 | + { |
| 175 | + @Override |
| 176 | + public List<TaskReport> run(TaskSource taskSource) |
| 177 | + { |
| 178 | + TransactionalPageOutput pageOutput = runner.open(taskSource, SCHEMA, 1); |
| 179 | + boolean committed = false; |
| 180 | + try { |
| 181 | + // Result: |
| 182 | + // _c0,_c1,_c2,_c3,_c4 |
| 183 | + // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000 |
| 184 | + // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000 |
| 185 | + for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), SCHEMA, true, 2L, 3.0D, "45", |
| 186 | + Timestamp.ofEpochMilli(678L), true, 2L, 3.0D, "45", |
| 187 | + Timestamp.ofEpochMilli(678L))) { |
| 188 | + pageOutput.add(page); |
| 189 | + if (sleep.isPresent()) { |
| 190 | + Thread.sleep(sleep.get() * 1000); |
| 191 | + } |
| 192 | + } |
| 193 | + pageOutput.commit(); |
| 194 | + committed = true; |
| 195 | + } |
| 196 | + catch (InterruptedException e) { |
| 197 | + logger.debug(e.getMessage(), e); |
| 198 | + } |
| 199 | + finally { |
| 200 | + if (!committed) { |
| 201 | + pageOutput.abort(); |
| 202 | + } |
| 203 | + pageOutput.close(); |
| 204 | + } |
| 205 | + return Lists.newArrayList(); |
| 206 | + } |
| 207 | + }); |
| 208 | + } |
| 209 | + |
| 210 | + private void assertRecordsInFile(String filePath) |
| 211 | + { |
| 212 | + try { |
| 213 | + List<String> lines = readLines(new File(filePath), |
| 214 | + Charsets.UTF_8); |
| 215 | + for (int i = 0; i < lines.size(); i++) { |
| 216 | + String[] record = lines.get(i).split(","); |
| 217 | + if (i == 0) { |
| 218 | + for (int j = 0; j <= 4 ; j++) { |
| 219 | + assertEquals("_c" + j, record[j]); |
| 220 | + } |
| 221 | + } |
| 222 | + else { |
| 223 | + // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000 |
| 224 | + assertEquals("true", record[0]); |
| 225 | + assertEquals("2", record[1]); |
| 226 | + assertEquals("3.0", record[2]); |
| 227 | + assertEquals("45", record[3]); |
| 228 | + assertEquals("1970-01-01 00:00:00.678000 +0000", record[4]); |
| 229 | + } |
| 230 | + } |
| 231 | + } |
| 232 | + catch (IOException e) { |
| 233 | + logger.debug(e.getMessage(), e); |
| 234 | + } |
| 235 | + } |
| 236 | + |
| 237 | + @Test |
| 238 | + public void testConfigValuesIncludingDefault() |
| 239 | + { |
| 240 | + // setting embulk config |
| 241 | + final String pathPrefix = "/test/testUserPassword"; |
| 242 | + String configYaml = "" + |
| 243 | + "type: sftp\n" + |
| 244 | + "host: " + HOST + "\n" + |
| 245 | + "user: " + USERNAME + "\n" + |
| 246 | + "path_prefix: " + pathPrefix + "\n" + |
| 247 | + "file_ext: txt\n" + |
| 248 | + "formatter:\n" + |
| 249 | + " type: csv\n" + |
| 250 | + " newline: CRLF\n" + |
| 251 | + " newline_in_field: LF\n" + |
| 252 | + " header_line: true\n" + |
| 253 | + " charset: UTF-8\n" + |
| 254 | + " quote_policy: NONE\n" + |
| 255 | + " quote: \"\\\"\"\n" + |
| 256 | + " escape: \"\\\\\"\n" + |
| 257 | + " null_string: \"\"\n" + |
| 258 | + " default_timezone: 'UTC'"; |
| 259 | + |
| 260 | + ConfigSource config = getConfigFromYaml(configYaml); |
| 261 | + PluginTask task = config.loadConfig(PluginTask.class); |
| 262 | + |
| 263 | + assertEquals(HOST, task.getHost()); |
| 264 | + assertEquals(22, task.getPort()); |
| 265 | + assertEquals(USERNAME, task.getUser()); |
| 266 | + assertEquals(Optional.absent(), task.getPassword()); |
| 267 | + assertEquals(Optional.absent(), task.getSecretKeyFilePath()); |
| 268 | + assertEquals("", task.getSecretKeyPassphrase()); |
| 269 | + assertEquals(true, task.getUserDirIsRoot()); |
| 270 | + assertEquals(600, task.getSftpConnectionTimeout()); |
| 271 | + assertEquals(5, task.getMaxConnectionRetry()); |
| 272 | + assertEquals(pathPrefix, task.getPathPrefix()); |
| 273 | + assertEquals("txt", task.getFileNameExtension()); |
| 274 | + assertEquals("%03d.%02d.", task.getSequenceFormat()); |
| 275 | + } |
| 276 | + |
| 277 | + // Cases |
| 278 | + // login(all cases needs host + port) |
| 279 | + // user + password |
| 280 | + // user + secret_key_file + secret_key_passphrase |
| 281 | + // put files |
| 282 | + // user_directory_is_root |
| 283 | + // not user_directory_is_root |
| 284 | + // timeout |
| 285 | + // 0 second |
| 286 | + |
| 287 | + |
| 288 | + @Test |
| 289 | + public void testUserPasswordAndPutToUserDirectoryRoot() |
| 290 | + { |
| 291 | + // setting embulk config |
| 292 | + final String pathPrefix = "/test/testUserPassword"; |
| 293 | + String configYaml = "" + |
| 294 | + "type: sftp\n" + |
| 295 | + "host: " + HOST + "\n" + |
| 296 | + "port: " + PORT + "\n" + |
| 297 | + "user: " + USERNAME + "\n" + |
| 298 | + "password: " + PASSWORD + "\n" + |
| 299 | + "path_prefix: " + pathPrefix + "\n" + |
| 300 | + "file_ext: txt\n" + |
| 301 | + "formatter:\n" + |
| 302 | + " type: csv\n" + |
| 303 | + " newline: CRLF\n" + |
| 304 | + " newline_in_field: LF\n" + |
| 305 | + " header_line: true\n" + |
| 306 | + " charset: UTF-8\n" + |
| 307 | + " quote_policy: NONE\n" + |
| 308 | + " quote: \"\\\"\"\n" + |
| 309 | + " escape: \"\\\\\"\n" + |
| 310 | + " null_string: \"\"\n" + |
| 311 | + " default_timezone: 'UTC'"; |
| 312 | + |
| 313 | + // runner.transaction -> ... |
| 314 | + run(configYaml, Optional.<Integer>absent()); |
| 315 | + |
| 316 | + List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath())); |
| 317 | + assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt"))); |
| 318 | + assertRecordsInFile(String.format("%s/%s001.00.txt", |
| 319 | + testFolder.getRoot().getAbsolutePath(), |
| 320 | + pathPrefix)); |
| 321 | + |
| 322 | + } |
| 323 | + |
| 324 | + @Test |
| 325 | + public void testUserSecretKeyFileAndPutToRootDirectory() |
| 326 | + { |
| 327 | + // setting embulk config |
| 328 | + final String pathPrefix = "/test/testUserPassword"; |
| 329 | + String configYaml = "" + |
| 330 | + "type: sftp\n" + |
| 331 | + "host: " + HOST + "\n" + |
| 332 | + "port: " + PORT + "\n" + |
| 333 | + "user: " + USERNAME + "\n" + |
| 334 | + "secret_key_file: " + SECRET_KEY_FILE + "\n" + |
| 335 | + "secret_key_passphrase: " + SECRET_KEY_PASSPHRASE + "\n" + |
| 336 | + "path_prefix: " + testFolder.getRoot().getAbsolutePath() + pathPrefix + "\n" + |
| 337 | + "file_ext: txt\n" + |
| 338 | + "formatter:\n" + |
| 339 | + " type: csv\n" + |
| 340 | + " newline: CRLF\n" + |
| 341 | + " newline_in_field: LF\n" + |
| 342 | + " header_line: true\n" + |
| 343 | + " charset: UTF-8\n" + |
| 344 | + " quote_policy: NONE\n" + |
| 345 | + " quote: \"\\\"\"\n" + |
| 346 | + " escape: \"\\\\\"\n" + |
| 347 | + " null_string: \"\"\n" + |
| 348 | + " default_timezone: 'UTC'"; |
| 349 | + |
| 350 | + // runner.transaction -> ... |
| 351 | + run(configYaml, Optional.<Integer>absent()); |
| 352 | + |
| 353 | + List<String> fileList = lsR(Lists.<String>newArrayList(), Paths.get(testFolder.getRoot().getAbsolutePath())); |
| 354 | + assertThat(fileList, hasItem(containsString(pathPrefix + "001.00.txt"))); |
| 355 | + |
| 356 | + assertRecordsInFile(String.format("%s/%s001.00.txt", |
| 357 | + testFolder.getRoot().getAbsolutePath(), |
| 358 | + pathPrefix)); |
| 359 | + } |
| 360 | + |
| 361 | + @Test |
| 362 | + public void testTimeout() |
| 363 | + { |
| 364 | + // setting embulk config |
| 365 | + final String pathPrefix = "/test/testUserPassword"; |
| 366 | + String configYaml = "" + |
| 367 | + "type: sftp\n" + |
| 368 | + "host: " + HOST + "\n" + |
| 369 | + "port: " + PORT + "\n" + |
| 370 | + "user: " + USERNAME + "\n" + |
| 371 | + "secret_key_file: " + SECRET_KEY_FILE + "\n" + |
| 372 | + "secret_key_passphrase: " + SECRET_KEY_PASSPHRASE + "\n" + |
| 373 | + "path_prefix: " + testFolder.getRoot().getAbsolutePath() + pathPrefix + "\n" + |
| 374 | + "timeout: 1\n" + |
| 375 | + "file_ext: txt\n" + |
| 376 | + "formatter:\n" + |
| 377 | + " type: csv\n" + |
| 378 | + " newline: CRLF\n" + |
| 379 | + " newline_in_field: LF\n" + |
| 380 | + " header_line: true\n" + |
| 381 | + " charset: UTF-8\n" + |
| 382 | + " quote_policy: NONE\n" + |
| 383 | + " quote: \"\\\"\"\n" + |
| 384 | + " escape: \"\\\\\"\n" + |
| 385 | + " null_string: \"\"\n" + |
| 386 | + " default_timezone: 'UTC'"; |
| 387 | + |
| 388 | + // exception |
| 389 | + exception.expect(RuntimeException.class); |
| 390 | + exception.expectCause(CoreMatchers.<Throwable>instanceOf(FileSystemException.class)); |
| 391 | + exception.expectMessage("Could not connect to SFTP server"); |
| 392 | + |
| 393 | + // runner.transaction -> ... |
| 394 | + run(configYaml, Optional.of(60)); // sleep 1 minute while processing |
| 395 | + } |
5 | 396 | } |
0 commit comments