22
33import java .util .List ;
44import java .util .ArrayList ;
5+ import java .util .Arrays ;
6+ import java .util .Collections ;
57import java .io .OutputStream ;
68import java .io .FilterOutputStream ;
79import java .io .IOException ;
8- import com .google .common .annotations .VisibleForTesting ;
910import org .slf4j .Logger ;
10- import com .google .common .base .Throwables ;
11- import com .google .common .collect .ImmutableList ;
1211import org .embulk .config .TaskReport ;
13- import org .embulk .config .Config ;
14- import org .embulk .config .ConfigDefault ;
12+ import org .embulk .util .config .Config ;
1513import org .embulk .config .ConfigDiff ;
1614import org .embulk .config .ConfigSource ;
17- import org .embulk .config .Task ;
15+ import org .embulk .util .config .ConfigMapper ;
16+ import org .embulk .util .config .ConfigMapperFactory ;
17+ import org .embulk .util .config .Task ;
18+ import org .embulk .util .config .TaskMapper ;
1819import org .embulk .config .TaskSource ;
1920import org .embulk .spi .Buffer ;
20- import org .embulk .spi .Exec ;
2121import org .embulk .spi .FileOutputPlugin ;
2222import org .embulk .spi .TransactionalFileOutput ;
2323
2424public class CommandFileOutputPlugin
2525 implements FileOutputPlugin
2626{
27- private final Logger logger = Exec .getLogger (getClass ());
28-
2927 public interface PluginTask
3028 extends Task
3129 {
@@ -37,10 +35,11 @@ public interface PluginTask
3735 public ConfigDiff transaction (ConfigSource config , int taskCount ,
3836 FileOutputPlugin .Control control )
3937 {
40- PluginTask task = config .loadConfig (PluginTask .class );
38+ final ConfigMapper configMapper = CONFIG_MAPPER_FACTORY .createConfigMapper ();
39+ final PluginTask task = configMapper .map (config , PluginTask .class );
4140
4241 // retryable (idempotent) output:
43- return resume (task .dump (), taskCount , control );
42+ return resume (task .toTaskSource (), taskCount , control );
4443 }
4544
4645 @ Override
@@ -49,7 +48,7 @@ public ConfigDiff resume(TaskSource taskSource,
4948 FileOutputPlugin .Control control )
5049 {
5150 control .run (taskSource );
52- return Exec .newConfigDiff ();
51+ return CONFIG_MAPPER_FACTORY .newConfigDiff ();
5352 }
5453
5554 @ Override
@@ -62,7 +61,8 @@ public void cleanup(TaskSource taskSource,
6261 @ Override
6362 public TransactionalFileOutput open (TaskSource taskSource , final int taskIndex )
6463 {
65- PluginTask task = taskSource .loadTask (PluginTask .class );
64+ final TaskMapper taskMapper = CONFIG_MAPPER_FACTORY .createTaskMapper ();
65+ final PluginTask task = taskMapper .map (taskSource , PluginTask .class );
6666
6767 List <String > cmdline = new ArrayList <String >();
6868 cmdline .addAll (buildShell ());
@@ -73,14 +73,13 @@ public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex)
7373 return new PluginFileOutput (cmdline , taskIndex );
7474 }
7575
76- @ VisibleForTesting
7776 static List <String > buildShell ()
7877 {
7978 String osName = System .getProperty ("os.name" );
8079 if (osName .indexOf ("Windows" ) >= 0 ) {
81- return ImmutableList . of ( "PowerShell.exe" , "-Command" );
80+ return Collections . unmodifiableList ( Arrays . asList ( "PowerShell.exe" , "-Command" ) );
8281 } else {
83- return ImmutableList . of ( "sh" , "-c" );
82+ return Collections . unmodifiableList ( Arrays . asList ( "sh" , "-c" ) );
8483 }
8584 }
8685
@@ -109,7 +108,7 @@ private synchronized void waitFor() throws IOException
109108 try {
110109 code = process .waitFor ();
111110 } catch (InterruptedException ex ) {
112- throw Throwables . propagate (ex );
111+ throw new RuntimeException (ex );
113112 }
114113 process = null ;
115114 if (code != 0 ) {
@@ -171,7 +170,7 @@ public void abort()
171170
172171 public TaskReport commit ()
173172 {
174- return Exec .newTaskReport ();
173+ return CONFIG_MAPPER_FACTORY .newTaskReport ();
175174 }
176175
177176 private void closeCurrentProcess ()
@@ -182,7 +181,7 @@ private void closeCurrentProcess()
182181 currentProcess = null ;
183182 }
184183 } catch (IOException ex ) {
185- throw Throwables . propagate (ex );
184+ throw new RuntimeException (ex );
186185 }
187186 }
188187
@@ -198,8 +197,11 @@ private Process startProcess(List<String> cmdline, int taskIndex, int seqId)
198197 try {
199198 return builder .start ();
200199 } catch (IOException ex ) {
201- throw Throwables . propagate (ex );
200+ throw new RuntimeException (ex );
202201 }
203202 }
204203 }
204+ private static final Logger logger = org .slf4j .LoggerFactory .getLogger (CommandFileOutputPlugin .class );
205+
206+ private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory .builder ().addDefaultModules ().build ();
205207}
0 commit comments