Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.tidesdb</groupId>
<artifactId>tidesdb-java</artifactId>
<version>0.7.0</version>
<version>0.8.0</version>
<packaging>jar</packaging>

<name>TidesDB Java</name>
Expand Down
148 changes: 142 additions & 6 deletions src/main/c/com_tidesdb_TidesDB.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ JNIEXPORT jlong JNICALL Java_com_tidesdb_TidesDB_nativeOpen(
jint oscMaxConcurrentUploads, jint oscMaxConcurrentDownloads, jlong oscMultipartThreshold,
jlong oscMultipartPartSize, jboolean oscSyncManifestToObject, jboolean oscReplicateWal,
jboolean oscWalUploadSync, jlong oscWalSyncThresholdBytes, jboolean oscWalSyncOnCommit,
jboolean oscReplicaMode, jlong oscReplicaSyncIntervalUs, jboolean oscReplicaReplayWal)
jboolean oscReplicaMode, jlong oscReplicaSyncIntervalUs, jboolean oscReplicaReplayWal,
jint maxConcurrentFlushes)
{
const char *path = (*env)->GetStringUTFChars(env, dbPath, NULL);
if (path == NULL)
Expand Down Expand Up @@ -149,7 +150,8 @@ JNIEXPORT jlong JNICALL Java_com_tidesdb_TidesDB_nativeOpen(
.unified_memtable_sync_mode = unifiedMemtableSyncMode,
.unified_memtable_sync_interval_us = (uint64_t)unifiedMemtableSyncIntervalUs,
.object_store = obj_store,
.object_store_config = obj_store != NULL ? &os_cfg : NULL};
.object_store_config = obj_store != NULL ? &os_cfg : NULL,
.max_concurrent_flushes = maxConcurrentFlushes};

tidesdb_t *db = NULL;
int result = tidesdb_open(&config, &db);
Expand Down Expand Up @@ -189,7 +191,8 @@ JNIEXPORT void JNICALL Java_com_tidesdb_TidesDB_nativeCreateColumnFamily(
jboolean enableBlockIndexes, jint indexSampleRatio, jint blockIndexPrefixLen, jint syncMode,
jlong syncIntervalUs, jstring comparatorName, jint skipListMaxLevel, jfloat skipListProbability,
jint defaultIsolationLevel, jlong minDiskSpace, jint l1FileCountTrigger,
jint l0QueueStallThreshold, jboolean useBtree,
jint l0QueueStallThreshold, jdouble tombstoneDensityTrigger,
jlong tombstoneDensityMinEntries, jboolean useBtree,
jboolean objectLazyCompaction, jboolean objectPrefetchCompaction)
{
tidesdb_t *db = (tidesdb_t *)(uintptr_t)handle;
Expand Down Expand Up @@ -226,6 +229,8 @@ JNIEXPORT void JNICALL Java_com_tidesdb_TidesDB_nativeCreateColumnFamily(
.min_disk_space = (uint64_t)minDiskSpace,
.l1_file_count_trigger = l1FileCountTrigger,
.l0_queue_stall_threshold = l0QueueStallThreshold,
.tombstone_density_trigger = tombstoneDensityTrigger,
.tombstone_density_min_entries = (uint64_t)tombstoneDensityMinEntries,
.use_btree = useBtree ? 1 : 0,
.object_lazy_compaction = objectLazyCompaction ? 1 : 0,
.object_prefetch_compaction = objectPrefetchCompaction ? 1 : 0};
Expand Down Expand Up @@ -570,17 +575,78 @@ JNIEXPORT jobject JNICALL Java_com_tidesdb_ColumnFamily_nativeGetStats(JNIEnv *e
free(counts);
}

jlongArray levelTombstoneCounts = (*env)->NewLongArray(env, stats->num_levels);
if (stats->level_tombstone_counts != NULL)
{
jlong *counts = malloc(stats->num_levels * sizeof(jlong));
for (int i = 0; i < stats->num_levels; i++)
{
counts[i] = (jlong)stats->level_tombstone_counts[i];
}
(*env)->SetLongArrayRegion(env, levelTombstoneCounts, 0, stats->num_levels, counts);
free(counts);
}

/* Build ColumnFamilyConfig from stats->config so callers can round-trip CF settings */
jobject cfConfigObj = NULL;
if (stats->config != NULL)
{
jclass cfConfigClass = (*env)->FindClass(env, "com/tidesdb/ColumnFamilyConfig");
jmethodID fromNative = (*env)->GetStaticMethodID(
env, cfConfigClass, "fromNative",
"(JJIIJIZDZIIIJLjava/lang/String;IFIJIIDJZZZ)Lcom/tidesdb/ColumnFamilyConfig;");

jstring comparatorName = (*env)->NewStringUTF(env, stats->config->comparator_name);

cfConfigObj = (*env)->CallStaticObjectMethod(
env, cfConfigClass, fromNative,
(jlong)stats->config->write_buffer_size,
(jlong)stats->config->level_size_ratio,
(jint)stats->config->min_levels,
(jint)stats->config->dividing_level_offset,
(jlong)stats->config->klog_value_threshold,
(jint)stats->config->compression_algorithm,
stats->config->enable_bloom_filter != 0 ? JNI_TRUE : JNI_FALSE,
(jdouble)stats->config->bloom_fpr,
stats->config->enable_block_indexes != 0 ? JNI_TRUE : JNI_FALSE,
(jint)stats->config->index_sample_ratio,
(jint)stats->config->block_index_prefix_len,
(jint)stats->config->sync_mode,
(jlong)stats->config->sync_interval_us,
comparatorName,
(jint)stats->config->skip_list_max_level,
(jfloat)stats->config->skip_list_probability,
(jint)stats->config->default_isolation_level,
(jlong)stats->config->min_disk_space,
(jint)stats->config->l1_file_count_trigger,
(jint)stats->config->l0_queue_stall_threshold,
(jdouble)stats->config->tombstone_density_trigger,
(jlong)stats->config->tombstone_density_min_entries,
stats->config->use_btree != 0 ? JNI_TRUE : JNI_FALSE,
stats->config->object_lazy_compaction != 0 ? JNI_TRUE : JNI_FALSE,
stats->config->object_prefetch_compaction != 0 ? JNI_TRUE : JNI_FALSE);

(*env)->DeleteLocalRef(env, comparatorName);
(*env)->DeleteLocalRef(env, cfConfigClass);
}

jclass statsClass = (*env)->FindClass(env, "com/tidesdb/Stats");
jmethodID constructor = (*env)->GetMethodID(env, statsClass, "<init>",
"(IJ[J[ILcom/tidesdb/ColumnFamilyConfig;JJDD[JDDZJID)V");
"(IJ[J[ILcom/tidesdb/ColumnFamilyConfig;JJDD[JDDZJIDJD[JDI)V");

jobject statsObj = (*env)->NewObject(env, statsClass, constructor, stats->num_levels,
(jlong)stats->memtable_size, levelSizes, levelNumSSTables,
NULL, (jlong)stats->total_keys, (jlong)stats->total_data_size,
cfConfigObj, (jlong)stats->total_keys,
(jlong)stats->total_data_size,
stats->avg_key_size, stats->avg_value_size, levelKeyCounts,
stats->read_amp, stats->hit_rate,
stats->use_btree != 0, (jlong)stats->btree_total_nodes,
(jint)stats->btree_max_height, stats->btree_avg_height);
(jint)stats->btree_max_height, stats->btree_avg_height,
(jlong)stats->total_tombstones,
(jdouble)stats->tombstone_ratio,
levelTombstoneCounts,
(jdouble)stats->max_sst_density,
(jint)stats->max_sst_density_level);

tidesdb_free_stats(stats);

Expand All @@ -599,6 +665,55 @@ JNIEXPORT void JNICALL Java_com_tidesdb_ColumnFamily_nativeCompact(JNIEnv *env,
}
}

JNIEXPORT void JNICALL Java_com_tidesdb_ColumnFamily_nativeCompactRange(JNIEnv *env, jclass cls,
jlong handle,
jbyteArray startKey,
jbyteArray endKey)
{
tidesdb_column_family_t *cf = (tidesdb_column_family_t *)(uintptr_t)handle;

/* Map null/empty byte arrays to NULL pointers for unbounded endpoints. The C API
rejects both-NULL with TDB_ERR_INVALID_ARGS, so we don't need to filter here. */
jbyte *startBytes = NULL;
jsize startLen = 0;
if (startKey != NULL)
{
startLen = (*env)->GetArrayLength(env, startKey);
if (startLen > 0)
{
startBytes = (*env)->GetByteArrayElements(env, startKey, NULL);
}
}

jbyte *endBytes = NULL;
jsize endLen = 0;
if (endKey != NULL)
{
endLen = (*env)->GetArrayLength(env, endKey);
if (endLen > 0)
{
endBytes = (*env)->GetByteArrayElements(env, endKey, NULL);
}
}

int result = tidesdb_compact_range(cf, (const uint8_t *)startBytes, (size_t)startLen,
(const uint8_t *)endBytes, (size_t)endLen);

if (startBytes != NULL)
{
(*env)->ReleaseByteArrayElements(env, startKey, startBytes, JNI_ABORT);
}
if (endBytes != NULL)
{
(*env)->ReleaseByteArrayElements(env, endKey, endBytes, JNI_ABORT);
}

if (result != TDB_SUCCESS)
{
throwTidesDBException(env, result, getErrorMessage(result));
}
}

JNIEXPORT void JNICALL Java_com_tidesdb_ColumnFamily_nativeFlushMemtable(JNIEnv *env, jclass cls,
jlong handle)
{
Expand Down Expand Up @@ -1307,6 +1422,27 @@ JNIEXPORT void JNICALL Java_com_tidesdb_TidesDB_nativePromoteToPrimary(JNIEnv *e
}
}

JNIEXPORT jint JNICALL Java_com_tidesdb_Config_nativeDefaultMaxConcurrentFlushes(JNIEnv *env,
jclass cls)
{
tidesdb_config_t cfg = tidesdb_default_config();
return (jint)cfg.max_concurrent_flushes;
}

JNIEXPORT jdouble JNICALL Java_com_tidesdb_ColumnFamilyConfig_nativeDefaultTombstoneDensityTrigger(
JNIEnv *env, jclass cls)
{
tidesdb_column_family_config_t cfg = tidesdb_default_column_family_config();
return (jdouble)cfg.tombstone_density_trigger;
}

JNIEXPORT jlong JNICALL Java_com_tidesdb_ColumnFamilyConfig_nativeDefaultTombstoneDensityMinEntries(
JNIEnv *env, jclass cls)
{
tidesdb_column_family_config_t cfg = tidesdb_default_column_family_config();
return (jlong)cfg.tombstone_density_min_entries;
}

JNIEXPORT jobject JNICALL Java_com_tidesdb_TidesDBIterator_nativeKeyValue(JNIEnv *env, jclass cls,
jlong handle)
{
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/tidesdb/ColumnFamily.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@ public Stats getStats() throws TidesDBException {
public void compact() throws TidesDBException {
nativeCompact(nativeHandle);
}

/**
* Synchronously compacts every SSTable whose key range overlaps {@code [startKey, endKey)}.
* Blocks the calling thread until the merge commits or fails - does not enqueue work
* onto the compaction thread pool.
*
* <p>A {@code null} or empty endpoint means unbounded on that side. Both endpoints
* being {@code null} or empty is rejected with {@link TidesDBException}; callers
* wanting full-CF compaction must use {@link #compact()}.</p>
*
* @param startKey lower bound of the range, or {@code null}/empty for unbounded
* @param endKey upper bound (exclusive), or {@code null}/empty for unbounded
* @throws TidesDBException if the range is invalid, another compaction is running,
* or the merge fails
*/
public void compactRange(byte[] startKey, byte[] endKey) throws TidesDBException {
nativeCompactRange(nativeHandle, startKey, endKey);
}

/**
* Manually triggers memtable flush for this column family.
Expand Down Expand Up @@ -203,6 +221,7 @@ long getNativeHandle() {

private static native Stats nativeGetStats(long handle) throws TidesDBException;
private static native void nativeCompact(long handle) throws TidesDBException;
private static native void nativeCompactRange(long handle, byte[] startKey, byte[] endKey) throws TidesDBException;
private static native void nativeFlushMemtable(long handle) throws TidesDBException;
private static native boolean nativeIsFlushing(long handle);
private static native boolean nativeIsCompacting(long handle);
Expand Down
Loading
Loading