Skip to content

Commit b088717

Browse files
authored
Merge pull request #7 from joker1007/fix-json-column
Fix jsonColumn handling, and implement test case
2 parents aaa1881 + d652969 commit b088717

File tree

3 files changed

+118
-35
lines changed

3 files changed

+118
-35
lines changed

src/main/java/org/embulk/output/dynamodb/DynamodbOutputPlugin.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,24 @@
2020
import org.embulk.config.TaskSource;
2121
import org.embulk.spi.Column;
2222
import org.embulk.spi.ColumnVisitor;
23+
import org.embulk.spi.DataException;
2324
import org.embulk.spi.Exec;
2425
import org.embulk.spi.OutputPlugin;
2526
import org.embulk.spi.Page;
2627
import org.embulk.spi.PageReader;
2728
import org.embulk.spi.Schema;
2829
import org.embulk.spi.TransactionalPageOutput;
30+
import org.msgpack.value.BooleanValue;
31+
import org.msgpack.value.FloatValue;
32+
import org.msgpack.value.IntegerValue;
33+
import org.msgpack.value.Value;
2934
import org.slf4j.Logger;
3035

36+
import java.util.ArrayList;
37+
import java.util.LinkedHashMap;
3138
import java.util.List;
3239
import java.util.Locale;
40+
import java.util.Map;
3341

3442
public class DynamodbOutputPlugin
3543
implements OutputPlugin
@@ -285,14 +293,51 @@ public void jsonColumn(Column column)
285293
addNullValue(column.getName());
286294
}
287295
else {
288-
item.withJSON(column.getName(), pageReader.getJson(column).toString());
296+
Value jsonValue = pageReader.getJson(column);
297+
if (jsonValue.isArrayValue()) {
298+
List<Object> list = new ArrayList<Object>();
299+
for (Value v : jsonValue.asArrayValue()) {
300+
list.add(getRawFromValue(v));
301+
}
302+
item.withList(column.getName(), list);
303+
} else {
304+
item.withJSON(column.getName(), jsonValue.toJson());
305+
}
289306
}
290307
}
291308

292309
private void addNullValue(String name)
293310
{
294311
item.withNull(name);
295312
}
313+
314+
private Object getRawFromValue(Value value)
315+
{
316+
if (value.isBooleanValue()) {
317+
return ((BooleanValue)value).getBoolean();
318+
} else if (value.isStringValue()) {
319+
return value.toString();
320+
} else if (value.isIntegerValue()) {
321+
return ((IntegerValue)value).asLong();
322+
} else if (value.isFloatValue()) {
323+
return ((FloatValue)value).toDouble();
324+
} else if (value.isArrayValue()) {
325+
List<Object> list = new ArrayList<>();
326+
for (Value v : value.asArrayValue()) {
327+
list.add(getRawFromValue(v));
328+
}
329+
return list;
330+
} else if (value.isMapValue()) {
331+
Map<String, Object> map = new LinkedHashMap<>();
332+
for (Map.Entry<Value, Value> entry : value.asMapValue().entrySet()) {
333+
map.put(entry.getKey().toString(), getRawFromValue(entry.getValue()));
334+
}
335+
return map;
336+
} else if (value.isNilValue()) {
337+
return null;
338+
}
339+
throw new DataException("Record has invalid json column value");
340+
}
296341
});
297342

298343
if (mode.equals(Mode.UPSERT)) {

src/test/java/org/embulk/output/dynamodb/TestDynamodbOutputPlugin.java

Lines changed: 67 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package org.embulk.output.dynamodb;
22

33
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
4+
import com.amazonaws.services.dynamodbv2.document.Item;
5+
import com.amazonaws.services.dynamodbv2.document.ItemCollection;
6+
import com.amazonaws.services.dynamodbv2.document.ScanOutcome;
7+
import com.amazonaws.services.dynamodbv2.document.Table;
48
import com.google.common.collect.ImmutableList;
59
import com.google.common.collect.ImmutableMap;
610
import com.google.common.collect.Lists;
@@ -22,9 +26,15 @@
2226
import org.junit.BeforeClass;
2327
import org.junit.Rule;
2428
import org.junit.Test;
29+
import org.msgpack.value.Value;
30+
import org.msgpack.value.ValueFactory;
2531

32+
import java.math.BigDecimal;
33+
import java.util.ArrayList;
2634
import java.util.Arrays;
35+
import java.util.LinkedHashMap;
2736
import java.util.List;
37+
import java.util.Map;
2838

2939
import static org.junit.Assert.assertEquals;
3040

@@ -134,39 +144,65 @@ public List<TaskReport> run(TaskSource taskSource)
134144
});
135145
TransactionalPageOutput output = plugin.open(task.dump(), schema, 0);
136146

137-
List<Page> pages = PageTestUtils.buildPage(runtime.getBufferAllocator(), schema, 1L, 32864L, "2015-01-27T19:23:49", "2015-01-27T00:00:00", true, 123.45, "embulk");
147+
Map<Value, Value> map = new LinkedHashMap<>();
148+
map.put(ValueFactory.newString("foo"), ValueFactory.newString("bar"));
149+
map.put(ValueFactory.newString("key1"), ValueFactory.newString("val1"));
150+
List<Page> pages = PageTestUtils.buildPage(runtime.getBufferAllocator(), schema,
151+
1L,
152+
32864L,
153+
1L,
154+
1L,
155+
true,
156+
123.45,
157+
"embulk",
158+
ValueFactory.newArray(ValueFactory.newInteger(1), ValueFactory.newInteger(2), ValueFactory.newInteger(3), ValueFactory.newArray(ValueFactory.newString("inner"))),
159+
ValueFactory.newMap(map)
160+
);
138161
assertEquals(1, pages.size());
139162
for (Page page : pages) {
140163
output.add(page);
141164
}
142165

143-
// output.finish();
144-
// output.commit();
145-
//
146-
// DynamodbUtils dynamoDbUtils = new DynamodbUtils();
147-
// DynamoDB dynamoDB = null;
148-
// try {
149-
// dynamoDB = dynamoDbUtils.createDynamoDB(task);
150-
//
151-
// Table table = dynamoDB.getTable(task.getTable());
152-
// ItemCollection<ScanOutcome> items = table.scan();
153-
//
154-
// while (items.iterator().hasNext()) {
155-
// Map<String, Object> item = items.iterator().next().asMap();
156-
// assertEquals(1, item.get("id"));
157-
// assertEquals(32864, item.get("account"));
158-
// assertEquals("2015-01-27T19:23:49", item.get("time"));
159-
// assertEquals("2015-01-27T00:00:00", item.get("purchase"));
160-
// assertEquals(true, item.get("flg"));
161-
// assertEquals(123.45, item.get("score"));
162-
// assertEquals("embulk", item.get("comment"));
163-
// }
164-
// }
165-
// finally {
166-
// if (dynamoDB != null) {
167-
// dynamoDB.shutdown();
168-
// }
169-
// }
166+
output.finish();
167+
output.commit();
168+
169+
DynamodbUtils dynamoDbUtils = new DynamodbUtils();
170+
DynamoDB dynamoDB = null;
171+
try {
172+
dynamoDB = dynamoDbUtils.createDynamoDB(task);
173+
174+
Table table = dynamoDB.getTable(task.getTable());
175+
ItemCollection<ScanOutcome> items = table.scan();
176+
177+
for (Item item1 : items) {
178+
assertEquals(1L, item1.getLong("id"));
179+
assertEquals(32864L, item1.getLong("account"));
180+
assertEquals("1970-01-01 00:00:01 UTC", item1.getString("time"));
181+
assertEquals("1970-01-01 00:00:01 UTC", item1.getString("purchase"));
182+
assertEquals(true, item1.getBoolean("flg"));
183+
assertEquals(new BigDecimal("123.45"), item1.get("score"));
184+
assertEquals("embulk", item1.getString("comment"));
185+
186+
List<Object> list = new ArrayList<>();
187+
List<Object> inner = new ArrayList<>();
188+
inner.add("inner");
189+
list.add(new BigDecimal(1));
190+
list.add(new BigDecimal(2));
191+
list.add(new BigDecimal(3));
192+
list.add(inner);
193+
assertEquals(list, item1.getList("list"));
194+
195+
Map<String, Object> expectedMap = new LinkedHashMap<>();
196+
expectedMap.put("foo", "bar");
197+
expectedMap.put("key1", "val1");
198+
assertEquals(expectedMap, item1.getMap("map"));
199+
}
200+
}
201+
finally {
202+
if (dynamoDB != null) {
203+
dynamoDB.shutdown();
204+
}
205+
}
170206
}
171207

172208
@Test
@@ -192,7 +228,7 @@ private ConfigSource config()
192228
.set("region", "us-west-1")
193229
.set("table", "dummy")
194230
.set("primary_key", "id")
195-
.set("primary_key_type", "string")
231+
.set("primary_key_type", "number")
196232
.set("read_capacity_units", capacityUnitConfig())
197233
.set("write_capacity_units", capacityUnitConfig())
198234
.set("auth_method", "basic")
@@ -244,6 +280,8 @@ private ImmutableList<Object> schemaConfig()
244280
builder.add(ImmutableMap.of("name", "flg", "type", "boolean"));
245281
builder.add(ImmutableMap.of("name", "score", "type", "double"));
246282
builder.add(ImmutableMap.of("name", "comment", "type", "string"));
283+
builder.add(ImmutableMap.of("name", "list", "type", "json"));
284+
builder.add(ImmutableMap.of("name", "map", "type", "json"));
247285
return builder.build();
248286
}
249287
}

src/test/resources/sample_01.csv

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
id,account,time,purchase,flg,score,comment
2-
1,32864,2015-01-27 19:23:49,20150127,1,123.45,embulk
3-
2,14824,2015-01-27 19:01:23,20150127,0,234,56,embulk
4-
3,27559,2015-01-28 02:20:02,20150128,1,678.90,embulk
5-
4,11270,2015-01-29 11:54:36,20150129,0,100.00,embulk
1+
id,account,time,purchase,flg,score,comment,list
2+
1,32864,2015-01-27 19:23:49,20150127,1,123.45,embulk,[1,2,3],{"foo": "bar", "key1": "val1"}
3+
2,14824,2015-01-27 19:01:23,20150127,0,234,56,embulk,[1,2,3],{"foo": "bar", "key1": "val1"}
4+
3,27559,2015-01-28 02:20:02,20150128,1,678.90,embulk,[1,2,3],{"foo": "bar", "key1": "val1"}
5+
4,11270,2015-01-29 11:54:36,20150129,0,100.00,embulk,[1,2,3],{"foo": "bar", "key1": "val1"}

0 commit comments

Comments
 (0)