Skip to content

Commit 2453787

Browse files
feeblefakieKodaiD
andauthored
Backport to branch(3) : Fix option issues in Object Storage adapter (#3243)
Co-authored-by: Kodai Doki <52027276+KodaiD@users.noreply.github.com>
1 parent edf7bfa commit 2453787

File tree

12 files changed

+167
-166
lines changed

12 files changed

+167
-166
lines changed

core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageEnv.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,17 @@ public static Properties getPropertiesWithPerformanceOptions(String testName) {
5353
Properties properties = getProperties(testName);
5454

5555
// For Blob Storage
56-
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, "5242880"); // 5MB
57-
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_MAX_PARALLELISM, "4");
56+
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_BYTES, "5242880"); // 5MB
57+
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_MAX_CONCURRENCY, "4");
5858
properties.setProperty(
59-
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, "10485760"); // 10MB
60-
properties.setProperty(BlobStorageConfig.REQUEST_TIMEOUT_IN_SECONDS, "30");
59+
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES, "10485760"); // 10MB
60+
properties.setProperty(BlobStorageConfig.REQUEST_TIMEOUT_SECS, "30");
6161

6262
// For S3
63-
properties.setProperty(S3Config.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, "5242880"); // 5MB
64-
properties.setProperty(S3Config.PARALLEL_UPLOAD_MAX_PARALLELISM, "4");
65-
properties.setProperty(S3Config.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, "10485760"); // 10MB
66-
properties.setProperty(S3Config.REQUEST_TIMEOUT_IN_SECONDS, "30");
63+
properties.setProperty(S3Config.MULTIPART_UPLOAD_PART_SIZE_BYTES, "5242880"); // 5MB
64+
properties.setProperty(S3Config.MULTIPART_UPLOAD_MAX_CONCURRENCY, "4");
65+
properties.setProperty(S3Config.MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES, "10485760"); // 10MB
66+
properties.setProperty(S3Config.REQUEST_TIMEOUT_SECS, "30");
6767

6868
return properties;
6969
}

core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperLargeObjectWriteIntegrationTest.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,38 +36,34 @@ public class ObjectStorageWrapperLargeObjectWriteIntegrationTest {
3636
@BeforeAll
3737
public void beforeAll() throws ObjectStorageWrapperException {
3838
Properties properties = getProperties(TEST_NAME);
39-
long parallelUploadThresholdInBytes;
39+
long payloadSizeBytes;
4040

4141
if (ObjectStorageEnv.isBlobStorage()) {
4242
// Minimum block size must be greater than or equal to 256KB for Blob Storage
43-
Long parallelUploadUnit = 256 * 1024L; // 256KB
43+
Long uploadUnit = 256 * 1024L; // 256KB
4444
properties.setProperty(
45-
BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES,
46-
String.valueOf(parallelUploadUnit));
45+
BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_BYTES, String.valueOf(uploadUnit));
4746
properties.setProperty(
48-
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES,
49-
String.valueOf(parallelUploadUnit * 2));
50-
parallelUploadThresholdInBytes = parallelUploadUnit * 2;
47+
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES, String.valueOf(uploadUnit * 2));
48+
payloadSizeBytes = uploadUnit * 2 + 1;
5149
} else if (ObjectStorageEnv.isCloudStorage()) {
5250
// Minimum block size must be greater than or equal to 256KB for Cloud Storage
53-
Long parallelUploadUnit = 256 * 1024L; // 256KB
51+
Long uploadUnit = 256 * 1024L; // 256KB
5452
properties.setProperty(
55-
CloudStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES,
56-
String.valueOf(parallelUploadUnit));
57-
parallelUploadThresholdInBytes = parallelUploadUnit * 2;
53+
CloudStorageConfig.UPLOAD_CHUNK_SIZE_BYTES, String.valueOf(uploadUnit));
54+
payloadSizeBytes = uploadUnit * 2 + 1;
5855
} else if (ObjectStorageEnv.isS3()) {
5956
// Minimum part size must be greater than or equal to 5MB for S3
60-
Long parallelUploadUnit = 5 * 1024 * 1024L; // 5MB
57+
Long uploadUnit = 5 * 1024 * 1024L; // 5MB
58+
properties.setProperty(S3Config.MULTIPART_UPLOAD_PART_SIZE_BYTES, String.valueOf(uploadUnit));
6159
properties.setProperty(
62-
S3Config.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, String.valueOf(parallelUploadUnit));
63-
properties.setProperty(
64-
S3Config.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, String.valueOf(parallelUploadUnit * 2));
65-
parallelUploadThresholdInBytes = parallelUploadUnit * 2;
60+
S3Config.MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES, String.valueOf(uploadUnit * 2));
61+
payloadSizeBytes = uploadUnit * 2 + 1;
6662
} else {
6763
throw new AssertionError();
6864
}
6965

70-
char[] charArray = new char[(int) parallelUploadThresholdInBytes + 1];
66+
char[] charArray = new char[(int) payloadSizeBytes];
7167
Arrays.fill(charArray, 'a');
7268
testObject1 = new String(charArray);
7369
Arrays.fill(charArray, 'b');

core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfig.java

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,17 @@
1313

1414
public class BlobStorageConfig implements ObjectStorageConfig {
1515
public static final String STORAGE_NAME = "blob-storage";
16-
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + ".";
16+
public static final String STORAGE_NAME_IN_PREFIX = "blob_storage";
17+
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME_IN_PREFIX + ".";
1718
public static final String TABLE_METADATA_NAMESPACE = PREFIX + "table_metadata.namespace";
1819

19-
public static final String PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES =
20-
PREFIX + "parallel_upload_block_size_in_bytes";
21-
public static final String PARALLEL_UPLOAD_MAX_PARALLELISM =
22-
PREFIX + "parallel_upload_max_parallelism";
23-
public static final String PARALLEL_UPLOAD_THRESHOLD_IN_BYTES =
24-
PREFIX + "parallel_upload_threshold_in_bytes";
25-
public static final String REQUEST_TIMEOUT_IN_SECONDS = PREFIX + "request_timeout_in_seconds";
20+
public static final String PARALLEL_UPLOAD_BLOCK_SIZE_BYTES =
21+
PREFIX + "parallel_upload_block_size_bytes";
22+
public static final String PARALLEL_UPLOAD_MAX_CONCURRENCY =
23+
PREFIX + "parallel_upload_max_concurrency";
24+
public static final String PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES =
25+
PREFIX + "parallel_upload_threshold_size_bytes";
26+
public static final String REQUEST_TIMEOUT_SECS = PREFIX + "request_timeout_secs";
2627

2728
private static final Logger logger = LoggerFactory.getLogger(BlobStorageConfig.class);
2829
private final String endpoint;
@@ -31,10 +32,10 @@ public class BlobStorageConfig implements ObjectStorageConfig {
3132
private final String bucket;
3233
private final String metadataNamespace;
3334

34-
private final Long parallelUploadBlockSizeInBytes;
35-
private final Integer parallelUploadMaxParallelism;
36-
private final Long parallelUploadThresholdInBytes;
37-
private final Integer requestTimeoutInSeconds;
35+
private final Long parallelUploadBlockSizeBytes;
36+
private final Integer parallelUploadMaxConcurrency;
37+
private final Long parallelUploadThresholdSizeBytes;
38+
private final Integer requestTimeoutSecs;
3839

3940
public BlobStorageConfig(DatabaseConfig databaseConfig) {
4041
String storage = databaseConfig.getStorage();
@@ -69,14 +70,13 @@ public BlobStorageConfig(DatabaseConfig databaseConfig) {
6970
+ "\" is not applicable to Blob Storage and will be ignored.");
7071
}
7172

72-
parallelUploadBlockSizeInBytes =
73-
getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, null);
74-
parallelUploadMaxParallelism =
75-
getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_MAX_PARALLELISM, null);
76-
parallelUploadThresholdInBytes =
77-
getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, null);
78-
requestTimeoutInSeconds =
79-
getInt(databaseConfig.getProperties(), REQUEST_TIMEOUT_IN_SECONDS, null);
73+
parallelUploadBlockSizeBytes =
74+
getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_BYTES, null);
75+
parallelUploadMaxConcurrency =
76+
getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_MAX_CONCURRENCY, null);
77+
parallelUploadThresholdSizeBytes =
78+
getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES, null);
79+
requestTimeoutSecs = getInt(databaseConfig.getProperties(), REQUEST_TIMEOUT_SECS, null);
8080
}
8181

8282
@Override
@@ -107,19 +107,19 @@ public String getUsername() {
107107
return username;
108108
}
109109

110-
public Optional<Long> getParallelUploadBlockSizeInBytes() {
111-
return Optional.ofNullable(parallelUploadBlockSizeInBytes);
110+
public Optional<Long> getParallelUploadBlockSizeBytes() {
111+
return Optional.ofNullable(parallelUploadBlockSizeBytes);
112112
}
113113

114-
public Optional<Integer> getParallelUploadMaxParallelism() {
115-
return Optional.ofNullable(parallelUploadMaxParallelism);
114+
public Optional<Integer> getParallelUploadMaxConcurrency() {
115+
return Optional.ofNullable(parallelUploadMaxConcurrency);
116116
}
117117

118-
public Optional<Long> getParallelUploadThresholdInBytes() {
119-
return Optional.ofNullable(parallelUploadThresholdInBytes);
118+
public Optional<Long> getParallelUploadThresholdSizeBytes() {
119+
return Optional.ofNullable(parallelUploadThresholdSizeBytes);
120120
}
121121

122-
public Optional<Integer> getRequestTimeoutInSeconds() {
123-
return Optional.ofNullable(requestTimeoutInSeconds);
122+
public Optional<Integer> getRequestTimeoutSecs() {
123+
return Optional.ofNullable(requestTimeoutSecs);
124124
}
125125
}

core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageWrapper.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
@ThreadSafe
2828
public class BlobStorageWrapper implements ObjectStorageWrapper {
2929
private final BlobContainerClient client;
30-
private final Duration requestTimeoutInSeconds;
30+
private final Duration requestTimeoutSecs;
3131
private final ParallelTransferOptions parallelTransferOptions;
3232

3333
public BlobStorageWrapper(BlobStorageConfig config) {
@@ -37,18 +37,17 @@ public BlobStorageWrapper(BlobStorageConfig config) {
3737
.credential(new StorageSharedKeyCredential(config.getUsername(), config.getPassword()))
3838
.buildClient()
3939
.getBlobContainerClient(config.getBucket());
40-
this.requestTimeoutInSeconds =
41-
config.getRequestTimeoutInSeconds().map(Duration::ofSeconds).orElse(null);
40+
this.requestTimeoutSecs = config.getRequestTimeoutSecs().map(Duration::ofSeconds).orElse(null);
4241
this.parallelTransferOptions = new ParallelTransferOptions();
43-
if (config.getParallelUploadBlockSizeInBytes().isPresent()) {
44-
parallelTransferOptions.setBlockSizeLong(config.getParallelUploadBlockSizeInBytes().get());
42+
if (config.getParallelUploadBlockSizeBytes().isPresent()) {
43+
parallelTransferOptions.setBlockSizeLong(config.getParallelUploadBlockSizeBytes().get());
4544
}
46-
if (config.getParallelUploadMaxParallelism().isPresent()) {
47-
parallelTransferOptions.setMaxConcurrency(config.getParallelUploadMaxParallelism().get());
45+
if (config.getParallelUploadMaxConcurrency().isPresent()) {
46+
parallelTransferOptions.setMaxConcurrency(config.getParallelUploadMaxConcurrency().get());
4847
}
49-
if (config.getParallelUploadThresholdInBytes().isPresent()) {
48+
if (config.getParallelUploadThresholdSizeBytes().isPresent()) {
5049
parallelTransferOptions.setMaxSingleUploadSizeLong(
51-
config.getParallelUploadThresholdInBytes().get());
50+
config.getParallelUploadThresholdSizeBytes().get());
5251
}
5352
}
5453

@@ -58,7 +57,7 @@ public Optional<ObjectStorageWrapperResponse> get(String key)
5857
try {
5958
BlobClient blobClient = client.getBlobClient(key);
6059
BlobDownloadContentResponse response =
61-
blobClient.downloadContentWithResponse(null, null, requestTimeoutInSeconds, null);
60+
blobClient.downloadContentWithResponse(null, null, requestTimeoutSecs, null);
6261
String data = response.getValue().toString();
6362
String eTag = response.getHeaders().getValue(HttpHeaderName.ETAG);
6463
return Optional.of(new ObjectStorageWrapperResponse(data, eTag));
@@ -77,8 +76,7 @@ public Optional<ObjectStorageWrapperResponse> get(String key)
7776
@Override
7877
public Set<String> getKeys(String prefix) throws ObjectStorageWrapperException {
7978
try {
80-
return client.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutInSeconds)
81-
.stream()
79+
return client.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutSecs).stream()
8280
.map(BlobItem::getName)
8381
.collect(Collectors.toSet());
8482
} catch (Exception e) {
@@ -95,7 +93,7 @@ public void insert(String key, String object) throws ObjectStorageWrapperExcepti
9593
new BlobParallelUploadOptions(BinaryData.fromString(object))
9694
.setRequestConditions(new BlobRequestConditions().setIfNoneMatch("*"))
9795
.setParallelTransferOptions(parallelTransferOptions);
98-
blobClient.uploadWithResponse(options, requestTimeoutInSeconds, null);
96+
blobClient.uploadWithResponse(options, requestTimeoutSecs, null);
9997
} catch (BlobStorageException e) {
10098
if (e.getErrorCode().equals(BlobErrorCode.BLOB_ALREADY_EXISTS)) {
10199
throw new PreconditionFailedException(
@@ -120,7 +118,7 @@ public void update(String key, String object, String version)
120118
new BlobParallelUploadOptions(BinaryData.fromString(object))
121119
.setRequestConditions(new BlobRequestConditions().setIfMatch(version))
122120
.setParallelTransferOptions(parallelTransferOptions);
123-
blobClient.uploadWithResponse(options, requestTimeoutInSeconds, null);
121+
blobClient.uploadWithResponse(options, requestTimeoutSecs, null);
124122
} catch (BlobStorageException e) {
125123
if (e.getErrorCode().equals(BlobErrorCode.CONDITION_NOT_MET)
126124
|| e.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
@@ -162,7 +160,7 @@ public void delete(String key, String version) throws ObjectStorageWrapperExcept
162160
try {
163161
BlobClient blobClient = client.getBlobClient(key);
164162
blobClient.deleteWithResponse(
165-
null, new BlobRequestConditions().setIfMatch(version), requestTimeoutInSeconds, null);
163+
null, new BlobRequestConditions().setIfMatch(version), requestTimeoutSecs, null);
166164
} catch (BlobStorageException e) {
167165
if (e.getErrorCode().equals(BlobErrorCode.CONDITION_NOT_MET)
168166
|| e.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
@@ -183,7 +181,7 @@ public void delete(String key, String version) throws ObjectStorageWrapperExcept
183181
public void deleteByPrefix(String prefix) throws ObjectStorageWrapperException {
184182
try {
185183
client
186-
.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutInSeconds)
184+
.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutSecs)
187185
.forEach(
188186
blobItem -> {
189187
try {

core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@
1717

1818
public class CloudStorageConfig implements ObjectStorageConfig {
1919
public static final String STORAGE_NAME = "cloud-storage";
20-
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + ".";
20+
public static final String STORAGE_NAME_IN_PREFIX = "cloud_storage";
21+
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME_IN_PREFIX + ".";
2122
public static final String TABLE_METADATA_NAMESPACE = PREFIX + "table_metadata.namespace";
2223

23-
public static final String PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES =
24-
PREFIX + "parallel_upload_block_size_in_bytes";
24+
public static final String UPLOAD_CHUNK_SIZE_BYTES = PREFIX + "upload_chunk_size_bytes";
2525

2626
private static final Logger logger = LoggerFactory.getLogger(CloudStorageConfig.class);
2727
private final String password;
2828
private final String bucket;
2929
private final String metadataNamespace;
3030
private final String projectId;
31-
private final Integer parallelUploadBlockSizeInBytes;
31+
private final Integer uploadChunkSizeBytes;
3232

3333
public CloudStorageConfig(DatabaseConfig databaseConfig) {
3434
String storage = databaseConfig.getStorage();
@@ -55,8 +55,7 @@ public CloudStorageConfig(DatabaseConfig databaseConfig) {
5555
+ "\" is not applicable to Cloud Storage and will be ignored.");
5656
}
5757

58-
parallelUploadBlockSizeInBytes =
59-
getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, null);
58+
uploadChunkSizeBytes = getInt(databaseConfig.getProperties(), UPLOAD_CHUNK_SIZE_BYTES, null);
6059
}
6160

6261
@Override
@@ -98,7 +97,7 @@ public Credentials getCredentials() {
9897
}
9998
}
10099

101-
public Optional<Integer> getParallelUploadBlockSizeInBytes() {
102-
return Optional.ofNullable(parallelUploadBlockSizeInBytes);
100+
public Optional<Integer> getUploadChunkSizeBytes() {
101+
return Optional.ofNullable(uploadChunkSizeBytes);
103102
}
104103
}

core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class CloudStorageWrapper implements ObjectStorageWrapper {
3131

3232
private final Storage storage;
3333
private final String bucket;
34-
private final Integer parallelUploadBlockSizeInBytes;
34+
private final Integer uploadChunkSizeBytes;
3535

3636
public CloudStorageWrapper(CloudStorageConfig config) {
3737
storage =
@@ -41,15 +41,15 @@ public CloudStorageWrapper(CloudStorageConfig config) {
4141
.build()
4242
.getService();
4343
bucket = config.getBucket();
44-
parallelUploadBlockSizeInBytes = config.getParallelUploadBlockSizeInBytes().orElse(null);
44+
uploadChunkSizeBytes = config.getUploadChunkSizeBytes().orElse(null);
4545
}
4646

4747
@VisibleForTesting
4848
@SuppressFBWarnings("EI_EXPOSE_REP2")
4949
public CloudStorageWrapper(CloudStorageConfig config, Storage storage) {
5050
this.storage = storage;
5151
this.bucket = config.getBucket();
52-
parallelUploadBlockSizeInBytes = config.getParallelUploadBlockSizeInBytes().orElse(null);
52+
uploadChunkSizeBytes = config.getUploadChunkSizeBytes().orElse(null);
5353
}
5454

5555
@Override
@@ -209,8 +209,8 @@ private void writeData(String key, String object, Storage.BlobWriteOption precon
209209
BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucket, key)).build();
210210

211211
try (WriteChannel writer = storage.writer(blobInfo, precondition)) {
212-
if (parallelUploadBlockSizeInBytes != null) {
213-
writer.setChunkSize(parallelUploadBlockSizeInBytes);
212+
if (uploadChunkSizeBytes != null) {
213+
writer.setChunkSize(uploadChunkSizeBytes);
214214
}
215215
ByteBuffer buffer = ByteBuffer.wrap(data);
216216
while (buffer.hasRemaining()) {

0 commit comments

Comments
 (0)