Skip to content

Commit 4b4a24d

Browse files
authored
Merge pull request #9 from takasick/sortkey-support
HashKey/RangeKey table support at upsert_with_expression mode
2 parents ca24858 + d7e091e commit 4b4a24d

File tree

2 files changed

+45
-10
lines changed

2 files changed

+45
-10
lines changed

src/main/java/org/embulk/output/dynamodb/DynamodbOutputPlugin.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public static class DynamodbPageOutput implements TransactionalPageOutput
199199
private final String table;
200200
private final Mode mode;
201201
private final Optional<String> updateExpression;
202-
private final String primaryKey;
202+
private final List primaryKeyElements;
203203
private final int maxPutItems;
204204

205205
public DynamodbPageOutput(PluginTask task, DynamoDB dynamoDB)
@@ -210,7 +210,7 @@ public DynamodbPageOutput(PluginTask task, DynamoDB dynamoDB)
210210
this.table = task.getTable();
211211
this.mode = task.getMode();
212212
this.updateExpression = task.getUpdateExpression();
213-
this.primaryKey = (mode.equals(Mode.UPSERT_WITH_EXPRESSION)) ? dynamodbUtils.getPrimaryKeyName(dynamoDB, table) : null;
213+
this.primaryKeyElements = (mode.equals(Mode.UPSERT_WITH_EXPRESSION)) ? dynamodbUtils.getPrimaryKey(dynamoDB, table) : null;
214214
this.maxPutItems = task.getMaxPutItems();
215215
}
216216

@@ -400,7 +400,7 @@ public void flush()
400400
public void updateItem(Item item)
401401
{
402402
try {
403-
dynamodbUtils.updateItem(dynamoDB, table, item, primaryKey, updateExpression);
403+
dynamodbUtils.updateItem(dynamoDB, table, item, primaryKeyElements, updateExpression);
404404
totalWroteItemSize++;
405405
if (totalWroteItemSize % 1000 == 0) {
406406
log.info(String.format("Updated %s items", totalWroteItemSize));

src/main/java/org/embulk/output/dynamodb/DynamodbUtils.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -143,17 +143,34 @@ protected void batchWriteItem(DynamoDB dynamoDB, TableWriteItems items)
143143
}
144144
}
145145

146-
protected void updateItem(DynamoDB dynamoDB, String tableName, Item item, String primaryKey, Optional<String> expression)
146+
protected void updateItem(DynamoDB dynamoDB, String tableName, Item item, List primaryKeyelements, Optional<String> expression)
147147
{
148-
Object primaryKeyValue = null;
148+
String hashKeyname = null;
149+
String rangeKeyname = null;
150+
Object hashKeyValue = null;
151+
Object rangeKeyValue = null;
149152
Map<String, String> attributeNames = new HashMap<>();
150153
Map<String, Object> attributeValues = new HashMap<>();
151154

155+
Iterator it = primaryKeyelements.iterator();
156+
while (it.hasNext()) {
157+
KeySchemaElement element = (KeySchemaElement) it.next();
158+
if (element.getKeyType().equals(KeyType.HASH.toString())) {
159+
hashKeyname = element.getAttributeName();
160+
}
161+
else if (element.getKeyType().equals(KeyType.RANGE.toString())) {
162+
rangeKeyname = element.getAttributeName();
163+
}
164+
}
165+
152166
Map<String, Object> itemMap = item.asMap();
153167
for (Map.Entry<String, Object> e : itemMap.entrySet()) {
154168
String keyName = e.getKey();
155-
if (keyName.equals(primaryKey)) {
156-
primaryKeyValue = e.getValue();
169+
if (keyName.equals(hashKeyname)) {
170+
hashKeyValue = e.getValue();
171+
}
172+
else if (keyName.equals(rangeKeyname)) {
173+
rangeKeyValue = e.getValue();
157174
}
158175
else {
159176
if (expression.get().indexOf(keyName) > 0) {
@@ -164,9 +181,17 @@ protected void updateItem(DynamoDB dynamoDB, String tableName, Item item, String
164181
}
165182
log.debug("attribute names: " + attributeNames.toString());
166183
log.debug("attribute values: " + attributeValues.toString());
167-
log.debug(String.format("primary key %s:%s", primaryKey, primaryKeyValue));
184+
168185
Table table = dynamoDB.getTable(tableName);
169-
table.updateItem(primaryKey, primaryKeyValue, expression.get(), attributeNames, attributeValues);
186+
187+
log.debug(String.format("hash key %s:%s", hashKeyname, hashKeyValue));
188+
if (rangeKeyValue == null) {
189+
table.updateItem(hashKeyname, hashKeyValue, expression.get(), attributeNames, attributeValues);
190+
}
191+
else {
192+
log.debug(String.format("range key %s:%s", rangeKeyname, rangeKeyValue));
193+
table.updateItem(hashKeyname, hashKeyValue, rangeKeyname, rangeKeyValue, expression.get(), attributeNames, attributeValues);
194+
}
170195
}
171196

172197
protected String getPrimaryKeyName(DynamoDB dynamoDB, String tableName)
@@ -178,11 +203,21 @@ protected String getPrimaryKeyName(DynamoDB dynamoDB, String tableName)
178203
String primaryKey = null;
179204
while (schema.hasNext()) {
180205
KeySchemaElement element = schema.next();
181-
primaryKey = element.getAttributeName();
206+
if (element.getKeyType().equals(KeyType.HASH.toString())) {
207+
primaryKey = element.getAttributeName();
208+
}
182209
}
183210
return primaryKey;
184211
}
185212

213+
protected List getPrimaryKey(DynamoDB dynamoDB, String tableName)
214+
{
215+
Table table = dynamoDB.getTable(tableName);
216+
TableDescription description = table.describe();
217+
List<KeySchemaElement> keyelements = description.getKeySchema();
218+
return keyelements;
219+
}
220+
186221
protected void createTable(DynamoDB dynamoDB, DynamodbOutputPlugin.PluginTask task)
187222
throws InterruptedException
188223
{

0 commit comments

Comments
 (0)