5151import static com .google .common .io .Files .readLines ;
5252import static org .embulk .spi .type .Types .BOOLEAN ;
5353import static org .embulk .spi .type .Types .DOUBLE ;
54+ import static org .embulk .spi .type .Types .JSON ;
5455import static org .embulk .spi .type .Types .LONG ;
5556import static org .embulk .spi .type .Types .STRING ;
5657import static org .embulk .spi .type .Types .TIMESTAMP ;
5758import static org .hamcrest .CoreMatchers .containsString ;
5859import static org .hamcrest .CoreMatchers .hasItem ;
5960import static org .junit .Assert .assertEquals ;
6061import static org .junit .Assert .assertThat ;
62+ import static org .msgpack .value .ValueFactory .newMap ;
63+ import static org .msgpack .value .ValueFactory .newString ;
64+
6165public class TestSftpFileOutputPlugin
6266{
6367 @ Rule
@@ -84,6 +88,7 @@ public class TestSftpFileOutputPlugin
8488 .add ("_c2" , DOUBLE )
8589 .add ("_c3" , STRING )
8690 .add ("_c4" , TIMESTAMP )
91+ .add ("_c5" , JSON )
8792 .build ();
8893
8994 @ Before
@@ -176,12 +181,12 @@ public List<TaskReport> run(TaskSource taskSource)
176181 boolean committed = false ;
177182 try {
178183 // Result:
179- // _c0,_c1,_c2,_c3,_c4
180- // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000
181- // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000
182- for (Page page : PageTestUtils .buildPage (runtime .getBufferAllocator (), SCHEMA , true , 2L , 3.0D , "45" ,
183- Timestamp .ofEpochMilli (678L ), true , 2L , 3.0D , "45" ,
184- Timestamp .ofEpochMilli (678L ))) {
184+ // _c0,_c1,_c2,_c3,_c4,_c5
185+ // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000,{\"k\":\"v\"}
186+ // true,2,3.0,45,1970-01-01 00:00:00.678000 +0000,{\"k\":\"v\"}
187+ for (Page page : PageTestUtils .buildPage (runtime .getBufferAllocator (), SCHEMA ,
188+ true , 2L , 3.0D , "45" , Timestamp .ofEpochMilli (678L ), newMap ( newString ( "k" ), newString ( "v" )) ,
189+ true , 2L , 3.0D , "45" , Timestamp .ofEpochMilli (678L ), newMap ( newString ( "k" ), newString ( "v" ) ))) {
185190 pageOutput .add (page );
186191 if (sleep .isPresent ()) {
187192 Thread .sleep (sleep .get () * 1000 );
@@ -223,6 +228,7 @@ private void assertRecordsInFile(String filePath)
223228 assertEquals ("3.0" , record [2 ]);
224229 assertEquals ("45" , record [3 ]);
225230 assertEquals ("1970-01-01 00:00:00.678000 +0000" , record [4 ]);
231+ assertEquals ("{\" k\" :\" v\" }" , record [5 ]);
226232 }
227233 }
228234 }
0 commit comments