Skip to content

Commit 1e10da9

Browse files
authored
Add support for S3 (#3141)
1 parent a27666c commit 1e10da9

27 files changed

+1412
-120
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
name: Test Object Storage Adapter
2+
3+
on:
4+
workflow_dispatch:
5+
inputs:
6+
INT_TEST_JAVA_RUNTIME_VERSION:
7+
description: JDK version used to run the integration test
8+
type: choice
9+
required: false
10+
default: '8'
11+
options:
12+
- '8'
13+
- '11'
14+
- '17'
15+
- '21'
16+
INT_TEST_JAVA_RUNTIME_VENDOR:
17+
description: Vendor of the JDK used to run the integration test
18+
type: choice
19+
required: false
20+
default: 'temurin'
21+
options:
22+
- 'corretto'
23+
- 'microsoft'
24+
- 'oracle'
25+
- 'temurin'
26+
27+
env:
28+
TERM: dumb
29+
JAVA_VERSION: '8'
30+
JAVA_VENDOR: 'temurin'
31+
INT_TEST_JAVA_RUNTIME_VERSION: "${{ github.event_name != 'workflow_dispatch' && '8' || inputs.INT_TEST_JAVA_RUNTIME_VERSION }}"
32+
INT_TEST_JAVA_RUNTIME_VENDOR: "${{ github.event_name != 'workflow_dispatch' && 'temurin' || inputs.INT_TEST_JAVA_RUNTIME_VENDOR }}"
33+
# Gradle will parse 'ORG_GRADLE_PROJECT_<project_property_name>' environment variables as project properties.
34+
# The following variables configure the 'com.scalar.db.jdk-configuration' Gradle plugin.
35+
ORG_GRADLE_PROJECT_javaVersion: '8'
36+
ORG_GRADLE_PROJECT_javaVendor: 'temurin'
37+
ORG_GRADLE_PROJECT_integrationTestJavaRuntimeVersion: "${{ github.event_name != 'workflow_dispatch' && '8' || inputs.INT_TEST_JAVA_RUNTIME_VERSION }}"
38+
ORG_GRADLE_PROJECT_integrationTestJavaRuntimeVendor: "${{ github.event_name != 'workflow_dispatch' && 'temurin' || inputs.INT_TEST_JAVA_RUNTIME_VENDOR }}"
39+
# This variable evaluates to: if {!(Temurin JDK 8) && !(Oracle JDK)} then {true} else {false}
40+
# Oracle JDK that are linux compatible and publicly available through direct download exist for all LTS versions
41+
SET_UP_INT_TEST_RUNTIME_NON_ORACLE_JDK: "${{ (github.event_name == 'workflow_dispatch' && !(inputs.INT_TEST_JAVA_RUNTIME_VERSION == '8' && inputs.INT_TEST_JAVA_RUNTIME_VENDOR == 'temurin') && !(inputs.INT_TEST_JAVA_RUNTIME_VENDOR == 'oracle')) && 'true' || 'false' }}"
42+
INT_TEST_GRADLE_OPTIONS_FOR_GROUP_COMMIT: '"-Dscalardb.consensus_commit.coordinator.group_commit.enabled=true" "-Dscalardb.consensus_commit.coordinator.group_commit.old_group_abort_timeout_millis=15000" --tests "**.ConsensusCommit**"'
43+
AWS_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY }}
44+
AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_SECRET_ACCESS_KEY }}
45+
S3_REGION: ap-northeast-1
46+
S3_BUCKET_NAME: scalardb-test-bucket
47+
48+
jobs:
49+
integration-test-s3:
50+
name: S3 integration test (${{ matrix.mode.label }})
51+
runs-on: ubuntu-latest
52+
53+
strategy:
54+
fail-fast: false
55+
matrix:
56+
mode:
57+
- label: default
58+
group_commit_enabled: false
59+
- label: with_group_commit
60+
group_commit_enabled: true
61+
62+
steps:
63+
- uses: actions/checkout@v5
64+
65+
- name: Set up JDK ${{ env.JAVA_VERSION }} (${{ env.JAVA_VENDOR }})
66+
uses: actions/setup-java@v5
67+
with:
68+
java-version: ${{ env.JAVA_VERSION }}
69+
distribution: ${{ env.JAVA_VENDOR }}
70+
71+
- name: Set up JDK ${{ env.INT_TEST_JAVA_RUNTIME_VERSION }} (${{ env.INT_TEST_JAVA_RUNTIME_VENDOR }}) to run integration test
72+
uses: actions/setup-java@v5
73+
if: ${{ env.SET_UP_INT_TEST_RUNTIME_NON_ORACLE_JDK == 'true'}}
74+
with:
75+
java-version: ${{ env.INT_TEST_JAVA_RUNTIME_VERSION }}
76+
distribution: ${{ env.INT_TEST_JAVA_RUNTIME_VENDOR }}
77+
78+
- name: Login to Oracle container registry
79+
uses: docker/login-action@v3
80+
if: ${{ env.INT_TEST_JAVA_RUNTIME_VENDOR == 'oracle' }}
81+
with:
82+
registry: container-registry.oracle.com
83+
username: ${{ secrets.OCR_USERNAME }}
84+
password: ${{ secrets.OCR_TOKEN }}
85+
86+
- name: Set up JDK ${{ env.INT_TEST_JAVA_RUNTIME_VERSION }} (oracle) to run the integration test
87+
if: ${{ env.INT_TEST_JAVA_RUNTIME_VENDOR == 'oracle' }}
88+
run: |
89+
container_id=$(docker create "container-registry.oracle.com/java/jdk:${{ env.INT_TEST_JAVA_RUNTIME_VERSION }}")
90+
docker cp -L "$container_id:/usr/java/default" /usr/lib/jvm/oracle-jdk && docker rm "$container_id"
91+
- name: Setup Gradle
92+
uses: gradle/actions/setup-gradle@v5
93+
94+
- name: Execute Gradle 'integrationTestObjectStorage' task
95+
run: ./gradlew integrationTestObjectStorage -Dscalardb.object_storage.storage=s3 -Dscalardb.object_storage.endpoint=${{ env.S3_REGION }}/${{ env.S3_BUCKET_NAME }} ${{ matrix.mode.group_commit_enabled && env.INT_TEST_GRADLE_OPTIONS_FOR_GROUP_COMMIT || '' }}
96+
97+
- name: Upload Gradle test reports
98+
if: always()
99+
uses: actions/upload-artifact@v5
100+
with:
101+
name: cassandra_3.0_integration_test_reports_${{ matrix.mode.label }}
102+
path: core/build/reports/tests/integrationTestObjectStorage

core/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ dependencies {
174174
implementation platform("software.amazon.awssdk:bom:${awssdkVersion}")
175175
implementation 'software.amazon.awssdk:applicationautoscaling'
176176
implementation 'software.amazon.awssdk:dynamodb'
177+
implementation 'software.amazon.awssdk:s3'
178+
implementation 'software.amazon.awssdk:aws-crt-client'
177179
testImplementation 'software.amazon.awssdk:iam'
178180
testImplementation 'software.amazon.awssdk:iam-policy-builder'
179181
implementation "org.apache.commons:commons-dbcp2:${commonsDbcp2Version}"

core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageEnv.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22

33
import com.scalar.db.config.DatabaseConfig;
44
import com.scalar.db.storage.objectstorage.blobstorage.BlobStorageConfig;
5+
import com.scalar.db.storage.objectstorage.s3.S3Config;
56
import java.util.Collections;
67
import java.util.Map;
78
import java.util.Properties;
89

910
public class ObjectStorageEnv {
11+
private static final String PROP_OBJECT_STORAGE_STORAGE = "scalardb.object_storage.storage";
1012
private static final String PROP_OBJECT_STORAGE_ENDPOINT = "scalardb.object_storage.endpoint";
1113
private static final String PROP_OBJECT_STORAGE_USERNAME = "scalardb.object_storage.username";
1214
private static final String PROP_OBJECT_STORAGE_PASSWORD = "scalardb.object_storage.password";
1315

16+
private static final String DEFAULT_OBJECT_STORAGE_STORAGE = BlobStorageConfig.STORAGE_NAME;
1417
private static final String DEFAULT_OBJECT_STORAGE_ENDPOINT =
1518
"http://localhost:10000/test/test-container";
1619
private static final String DEFAULT_OBJECT_STORAGE_USERNAME = "test";
@@ -19,6 +22,8 @@ public class ObjectStorageEnv {
1922
private ObjectStorageEnv() {}
2023

2124
public static Properties getProperties(String testName) {
25+
String storage =
26+
System.getProperty(PROP_OBJECT_STORAGE_STORAGE, DEFAULT_OBJECT_STORAGE_STORAGE);
2227
String accountName =
2328
System.getProperty(PROP_OBJECT_STORAGE_USERNAME, DEFAULT_OBJECT_STORAGE_USERNAME);
2429
String accountKey =
@@ -30,7 +35,7 @@ public static Properties getProperties(String testName) {
3035
properties.setProperty(DatabaseConfig.CONTACT_POINTS, endpoint);
3136
properties.setProperty(DatabaseConfig.USERNAME, accountName);
3237
properties.setProperty(DatabaseConfig.PASSWORD, accountKey);
33-
properties.setProperty(DatabaseConfig.STORAGE, BlobStorageConfig.STORAGE_NAME);
38+
properties.setProperty(DatabaseConfig.STORAGE, storage);
3439
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN, "true");
3540
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_FILTERING, "true");
3641
properties.setProperty(DatabaseConfig.CROSS_PARTITION_SCAN_ORDERING, "false");
@@ -43,7 +48,36 @@ public static Properties getProperties(String testName) {
4348
return properties;
4449
}
4550

51+
public static Properties getPropertiesWithPerformanceOptions(String testName) {
52+
Properties properties = getProperties(testName);
53+
54+
// For Blob Storage
55+
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, "5242880"); // 5MB
56+
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_MAX_PARALLELISM, "4");
57+
properties.setProperty(
58+
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, "10485760"); // 10MB
59+
properties.setProperty(BlobStorageConfig.REQUEST_TIMEOUT_IN_SECONDS, "30");
60+
61+
// For S3
62+
properties.setProperty(S3Config.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, "5242880"); // 5MB
63+
properties.setProperty(S3Config.PARALLEL_UPLOAD_MAX_PARALLELISM, "4");
64+
properties.setProperty(S3Config.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, "10485760"); // 10MB
65+
properties.setProperty(S3Config.REQUEST_TIMEOUT_IN_SECONDS, "30");
66+
67+
return properties;
68+
}
69+
4670
public static Map<String, String> getCreationOptions() {
4771
return Collections.emptyMap();
4872
}
73+
74+
public static boolean isBlobStorage() {
75+
return System.getProperty(PROP_OBJECT_STORAGE_STORAGE, DEFAULT_OBJECT_STORAGE_STORAGE)
76+
.equals(BlobStorageConfig.STORAGE_NAME);
77+
}
78+
79+
public static boolean isS3() {
80+
return System.getProperty(PROP_OBJECT_STORAGE_STORAGE, DEFAULT_OBJECT_STORAGE_STORAGE)
81+
.equals(S3Config.STORAGE_NAME);
82+
}
4983
}

core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperIntegrationTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,11 @@ public void getKeys_WithPrefixForTheNumberOfObjectsExceedingTheListLimit_ShouldR
270270
}
271271
} finally {
272272
for (int i = 0; i < numberOfObjects; i++) {
273-
wrapper.delete(prefix + i);
273+
try {
274+
wrapper.delete(prefix + i);
275+
} catch (PreconditionFailedException e) {
276+
// The object may not exist if the setup failed partially, so do nothing
277+
}
274278
}
275279
}
276280
}
@@ -336,8 +340,12 @@ public void deleteByPrefix_WithNonExistingPrefix_ShouldDoNothing() throws Except
336340
@Test
337341
public void close_ShouldNotThrowException() {
338342
// Arrange
343+
Properties properties = getProperties(TEST_NAME);
344+
ObjectStorageConfig objectStorageConfig =
345+
ObjectStorageUtils.getObjectStorageConfig(new DatabaseConfig(properties));
346+
ObjectStorageWrapper wrapper = ObjectStorageWrapperFactory.create(objectStorageConfig);
339347

340348
// Act Assert
341-
assertThatCode(() -> wrapper.close()).doesNotThrowAnyException();
349+
assertThatCode(wrapper::close).doesNotThrowAnyException();
342350
}
343351
}

core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperLargeObjectWriteIntegrationTest.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55

66
import com.scalar.db.config.DatabaseConfig;
77
import com.scalar.db.storage.objectstorage.blobstorage.BlobStorageConfig;
8+
import com.scalar.db.storage.objectstorage.s3.S3Config;
89
import java.util.Arrays;
910
import java.util.Optional;
1011
import java.util.Properties;
11-
import java.util.stream.LongStream;
1212
import org.junit.jupiter.api.AfterAll;
1313
import org.junit.jupiter.api.BeforeAll;
1414
import org.junit.jupiter.api.Test;
@@ -35,23 +35,42 @@ public class ObjectStorageWrapperLargeObjectWriteIntegrationTest {
3535
@BeforeAll
3636
public void beforeAll() throws ObjectStorageWrapperException {
3737
Properties properties = getProperties(TEST_NAME);
38-
ObjectStorageConfig objectStorageConfig =
39-
ObjectStorageUtils.getObjectStorageConfig(new DatabaseConfig(properties));
40-
wrapper = ObjectStorageWrapperFactory.create(objectStorageConfig);
41-
long objectSizeInBytes =
42-
LongStream.of(BlobStorageConfig.DEFAULT_PARALLEL_UPLOAD_THRESHOLD_IN_BYTES)
43-
.max()
44-
.getAsLong()
45-
+ 1;
38+
long parallelUploadThresholdInBytes;
39+
40+
if (ObjectStorageEnv.isBlobStorage()) {
41+
// Minimum block size must be greater than or equal to 256KB for Blob Storage
42+
Long parallelUploadUnit = 256 * 1024L; // 256KB
43+
properties.setProperty(
44+
BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES,
45+
String.valueOf(parallelUploadUnit));
46+
properties.setProperty(
47+
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES,
48+
String.valueOf(parallelUploadUnit * 2));
49+
parallelUploadThresholdInBytes = parallelUploadUnit * 2;
50+
} else if (ObjectStorageEnv.isS3()) {
51+
// Minimum part size must be greater than or equal to 5MB for S3
52+
Long parallelUploadUnit = 5 * 1024 * 1024L; // 5MB
53+
properties.setProperty(
54+
S3Config.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, String.valueOf(parallelUploadUnit));
55+
properties.setProperty(
56+
S3Config.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, String.valueOf(parallelUploadUnit * 2));
57+
parallelUploadThresholdInBytes = parallelUploadUnit * 2;
58+
} else {
59+
throw new AssertionError();
60+
}
4661

47-
char[] charArray = new char[(int) objectSizeInBytes];
62+
char[] charArray = new char[(int) parallelUploadThresholdInBytes];
4863
Arrays.fill(charArray, 'a');
4964
testObject1 = new String(charArray);
5065
Arrays.fill(charArray, 'b');
5166
testObject2 = new String(charArray);
5267
Arrays.fill(charArray, 'c');
5368
testObject3 = new String(charArray);
5469

70+
ObjectStorageConfig objectStorageConfig =
71+
ObjectStorageUtils.getObjectStorageConfig(new DatabaseConfig(properties));
72+
wrapper = ObjectStorageWrapperFactory.create(objectStorageConfig);
73+
5574
createObjects();
5675
}
5776

@@ -73,7 +92,7 @@ public void afterAll() {
7392
}
7493

7594
protected Properties getProperties(String testName) {
76-
return ObjectStorageEnv.getProperties(testName);
95+
return ObjectStorageEnv.getPropertiesWithPerformanceOptions(testName);
7796
}
7897

7998
private void createObjects() throws ObjectStorageWrapperException {
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.scalar.db.storage.objectstorage;
2+
3+
public class ConflictOccurredException extends ObjectStorageWrapperException {
4+
5+
public ConflictOccurredException(String message, Throwable cause) {
6+
super(message, cause);
7+
}
8+
}

core/src/main/java/com/scalar/db/storage/objectstorage/MutateStatementHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
import com.scalar.db.exception.storage.RetriableExecutionException;
1111
import java.util.Collections;
1212
import java.util.List;
13+
import javax.annotation.concurrent.ThreadSafe;
1314

15+
@ThreadSafe
1416
public class MutateStatementHandler extends StatementHandler {
1517
public MutateStatementHandler(
1618
ObjectStorageWrapper wrapper, TableMetadataManager metadataManager) {
@@ -78,7 +80,7 @@ private void writePartition(ObjectStoragePartitionSnapshot snapshot) throws Exec
7880
wrapper.insert(snapshot.getObjectKey(), snapshot.getPartition().serialize());
7981
}
8082
}
81-
} catch (PreconditionFailedException e) {
83+
} catch (PreconditionFailedException | ConflictOccurredException e) {
8284
throw new RetriableExecutionException(
8385
CoreError.OBJECT_STORAGE_CONFLICT_OCCURRED_IN_MUTATION.buildMessage(e.getMessage()), e);
8486
} catch (ObjectStorageWrapperException e) {

core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageAdmin.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import java.util.Set;
2121
import java.util.stream.Collectors;
2222
import javax.annotation.Nullable;
23+
import javax.annotation.concurrent.ThreadSafe;
2324

25+
@ThreadSafe
2426
public class ObjectStorageAdmin implements DistributedStorageAdmin {
2527
public static final String NAMESPACE_METADATA_TABLE = "namespaces";
2628
public static final String TABLE_METADATA_TABLE = "metadata";
@@ -48,6 +50,29 @@ public ObjectStorageAdmin(DatabaseConfig databaseConfig) {
4850
metadataNamespace = objectStorageConfig.getMetadataNamespace();
4951
}
5052

53+
private static String getTableMetadataKey(String namespace, String table) {
54+
return String.join(
55+
String.valueOf(ObjectStorageUtils.CONCATENATED_KEY_DELIMITER), namespace, table);
56+
}
57+
58+
private static String getNamespaceNameFromTableMetadataKey(String tableMetadataKey) {
59+
List<String> parts =
60+
Splitter.on(ObjectStorageUtils.CONCATENATED_KEY_DELIMITER).splitToList(tableMetadataKey);
61+
if (parts.size() != 2 || parts.get(0).isEmpty()) {
62+
throw new IllegalArgumentException("Invalid table metadata key: " + tableMetadataKey);
63+
}
64+
return parts.get(0);
65+
}
66+
67+
private static String getTableNameFromTableMetadataKey(String tableMetadataKey) {
68+
List<String> parts =
69+
Splitter.on(ObjectStorageUtils.CONCATENATED_KEY_DELIMITER).splitToList(tableMetadataKey);
70+
if (parts.size() != 2 || parts.get(1).isEmpty()) {
71+
throw new IllegalArgumentException("Invalid table metadata key: " + tableMetadataKey);
72+
}
73+
return parts.get(1);
74+
}
75+
5176
@Override
5277
public StorageInfo getStorageInfo(String namespace) throws ExecutionException {
5378
return STORAGE_INFO;
@@ -450,29 +475,6 @@ private void deleteTableData(String namespace, String table) throws ExecutionExc
450475
}
451476
}
452477

453-
private static String getTableMetadataKey(String namespace, String table) {
454-
return String.join(
455-
String.valueOf(ObjectStorageUtils.CONCATENATED_KEY_DELIMITER), namespace, table);
456-
}
457-
458-
private static String getNamespaceNameFromTableMetadataKey(String tableMetadataKey) {
459-
List<String> parts =
460-
Splitter.on(ObjectStorageUtils.CONCATENATED_KEY_DELIMITER).splitToList(tableMetadataKey);
461-
if (parts.size() != 2 || parts.get(0).isEmpty()) {
462-
throw new IllegalArgumentException("Invalid table metadata key: " + tableMetadataKey);
463-
}
464-
return parts.get(0);
465-
}
466-
467-
private static String getTableNameFromTableMetadataKey(String tableMetadataKey) {
468-
List<String> parts =
469-
Splitter.on(ObjectStorageUtils.CONCATENATED_KEY_DELIMITER).splitToList(tableMetadataKey);
470-
if (parts.size() != 2 || parts.get(1).isEmpty()) {
471-
throw new IllegalArgumentException("Invalid table metadata key: " + tableMetadataKey);
472-
}
473-
return parts.get(1);
474-
}
475-
476478
private void checkTableMetadata(TableMetadata metadata) {
477479
Set<String> secondaryIndexNames = metadata.getSecondaryIndexNames();
478480
if (!secondaryIndexNames.isEmpty()) {

core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageConfig.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,6 @@ public interface ObjectStorageConfig {
99
*/
1010
String getStorageName();
1111

12-
/**
13-
* Returns the endpoint for the object storage service.
14-
*
15-
* @return the endpoint
16-
*/
17-
String getEndpoint();
18-
1912
/**
2013
* Returns the username for authentication.
2114
*

0 commit comments

Comments
 (0)