11package org .embulk .output ;
22
3- import java .util .List ;
3+ import java .io .FilterOutputStream ;
4+ import java .io .IOException ;
5+ import java .io .OutputStream ;
46import java .util .ArrayList ;
57import java .util .Arrays ;
68import java .util .Collections ;
7- import java .io .OutputStream ;
8- import java .io .FilterOutputStream ;
9- import java .io .IOException ;
10- import org .slf4j .Logger ;
11- import org .embulk .config .TaskReport ;
12- import org .embulk .util .config .Config ;
9+ import java .util .List ;
1310import org .embulk .config .ConfigDiff ;
1411import org .embulk .config .ConfigSource ;
15- import org .embulk .util .config .ConfigMapper ;
16- import org .embulk .util .config .ConfigMapperFactory ;
17- import org .embulk .util .config .Task ;
18- import org .embulk .util .config .TaskMapper ;
12+ import org .embulk .config .TaskReport ;
1913import org .embulk .config .TaskSource ;
2014import org .embulk .spi .Buffer ;
2115import org .embulk .spi .FileOutputPlugin ;
2216import org .embulk .spi .TransactionalFileOutput ;
17+ import org .embulk .util .config .Config ;
18+ import org .embulk .util .config .ConfigMapper ;
19+ import org .embulk .util .config .ConfigMapperFactory ;
20+ import org .embulk .util .config .Task ;
21+ import org .embulk .util .config .TaskMapper ;
22+ import org .slf4j .Logger ;
2323
2424public class CommandFileOutputPlugin
25- implements FileOutputPlugin
26- {
25+ implements FileOutputPlugin {
2726 public interface PluginTask
28- extends Task
29- {
27+ extends Task {
3028 @ Config ("command" )
3129 public String getCommand ();
3230 }
3331
3432 @ Override
3533 public ConfigDiff transaction (ConfigSource config , int taskCount ,
36- FileOutputPlugin .Control control )
37- {
34+ FileOutputPlugin .Control control ) {
3835 final ConfigMapper configMapper = CONFIG_MAPPER_FACTORY .createConfigMapper ();
3936 final PluginTask task = configMapper .map (config , PluginTask .class );
4037
@@ -44,23 +41,20 @@ public ConfigDiff transaction(ConfigSource config, int taskCount,
4441
4542 @ Override
4643 public ConfigDiff resume (TaskSource taskSource ,
47- int taskCount ,
48- FileOutputPlugin .Control control )
49- {
44+ int taskCount ,
45+ FileOutputPlugin .Control control ) {
5046 control .run (taskSource );
5147 return CONFIG_MAPPER_FACTORY .newConfigDiff ();
5248 }
5349
5450 @ Override
5551 public void cleanup (TaskSource taskSource ,
56- int taskCount ,
57- List <TaskReport > successTaskReports )
58- {
52+ int taskCount ,
53+ List <TaskReport > successTaskReports ) {
5954 }
6055
6156 @ Override
62- public TransactionalFileOutput open (TaskSource taskSource , final int taskIndex )
63- {
57+ public TransactionalFileOutput open (TaskSource taskSource , final int taskIndex ) {
6458 final TaskMapper taskMapper = CONFIG_MAPPER_FACTORY .createTaskMapper ();
6559 final PluginTask task = taskMapper .map (taskSource , PluginTask .class );
6660
@@ -73,36 +67,31 @@ public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex)
7367 return new PluginFileOutput (cmdline , taskIndex );
7468 }
7569
76- static List <String > buildShell ()
77- {
70+ static List <String > buildShell () {
7871 String osName = System .getProperty ("os.name" );
79- if (osName .indexOf ("Windows" ) >= 0 ) {
72+ if (osName .indexOf ("Windows" ) >= 0 ) {
8073 return Collections .unmodifiableList (Arrays .asList ("PowerShell.exe" , "-Command" ));
8174 } else {
8275 return Collections .unmodifiableList (Arrays .asList ("sh" , "-c" ));
8376 }
8477 }
8578
8679 private static class ProcessWaitOutputStream
87- extends FilterOutputStream
88- {
80+ extends FilterOutputStream {
8981 private Process process ;
9082
91- public ProcessWaitOutputStream (OutputStream out , Process process )
92- {
83+ public ProcessWaitOutputStream (OutputStream out , Process process ) {
9384 super (out );
9485 this .process = process ;
9586 }
9687
9788 @ Override
98- public void close () throws IOException
99- {
89+ public void close () throws IOException {
10090 super .close ();
10191 waitFor ();
10292 }
10393
104- private synchronized void waitFor () throws IOException
105- {
94+ private synchronized void waitFor () throws IOException {
10695 if (process != null ) {
10796 int code ;
10897 try {
@@ -113,38 +102,34 @@ private synchronized void waitFor() throws IOException
113102 process = null ;
114103 if (code != 0 ) {
115104 throw new IOException (String .format (
116- "Command finished with non-zero exit code. Exit code is %d." , code ));
105+ "Command finished with non-zero exit code. Exit code is %d." , code ));
117106 }
118107 }
119108 }
120109 }
121110
122111 public class PluginFileOutput
123- implements TransactionalFileOutput
124- {
112+ implements TransactionalFileOutput {
125113 private final List <String > cmdline ;
126114 private final int taskIndex ;
127115 private int seqId ;
128116 private ProcessWaitOutputStream currentProcess ;
129117
130- public PluginFileOutput (List <String > cmdline , int taskIndex )
131- {
118+ public PluginFileOutput (List <String > cmdline , int taskIndex ) {
132119 this .cmdline = cmdline ;
133120 this .taskIndex = taskIndex ;
134121 this .seqId = 0 ;
135122 this .currentProcess = null ;
136123 }
137124
138- public void nextFile ()
139- {
125+ public void nextFile () {
140126 closeCurrentProcess ();
141127 Process proc = startProcess (cmdline , taskIndex , seqId );
142128 currentProcess = new ProcessWaitOutputStream (proc .getOutputStream (), proc );
143129 seqId ++;
144130 }
145131
146- public void add (Buffer buffer )
147- {
132+ public void add (Buffer buffer ) {
148133 try {
149134 currentProcess .write (buffer .array (), buffer .offset (), buffer .limit ());
150135 } catch (IOException ex ) {
@@ -154,27 +139,22 @@ public void add(Buffer buffer)
154139 }
155140 }
156141
157- public void finish ()
158- {
142+ public void finish () {
159143 closeCurrentProcess ();
160144 }
161145
162- public void close ()
163- {
146+ public void close () {
164147 closeCurrentProcess ();
165148 }
166149
167- public void abort ()
168- {
150+ public void abort () {
169151 }
170152
171- public TaskReport commit ()
172- {
153+ public TaskReport commit () {
173154 return CONFIG_MAPPER_FACTORY .newTaskReport ();
174155 }
175156
176- private void closeCurrentProcess ()
177- {
157+ private void closeCurrentProcess () {
178158 try {
179159 if (currentProcess != null ) {
180160 currentProcess .close ();
@@ -185,11 +165,10 @@ private void closeCurrentProcess()
185165 }
186166 }
187167
188- private Process startProcess (List <String > cmdline , int taskIndex , int seqId )
189- {
168+ private Process startProcess (List <String > cmdline , int taskIndex , int seqId ) {
190169 ProcessBuilder builder = new ProcessBuilder (cmdline .toArray (new String [cmdline .size ()]))
191- .redirectError (ProcessBuilder .Redirect .INHERIT )
192- .redirectOutput (ProcessBuilder .Redirect .INHERIT );
170+ .redirectError (ProcessBuilder .Redirect .INHERIT )
171+ .redirectOutput (ProcessBuilder .Redirect .INHERIT );
193172 builder .environment ().put ("INDEX" , Integer .toString (taskIndex ));
194173 builder .environment ().put ("SEQID" , Integer .toString (seqId ));
195174 // TODO transaction_time, etc
@@ -201,6 +180,7 @@ private Process startProcess(List<String> cmdline, int taskIndex, int seqId)
201180 }
202181 }
203182 }
183+
204184 private static final Logger logger = org .slf4j .LoggerFactory .getLogger (CommandFileOutputPlugin .class );
205185
206186 private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory .builder ().addDefaultModules ().build ();
0 commit comments