diff --git a/pom.xml b/pom.xml index 7c382c9..5c23001 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.tidesdb tidesdb-java - 0.8.0 + 0.8.1 jar TidesDB Java @@ -142,6 +142,12 @@ org.apache.maven.plugins maven-javadoc-plugin 3.12.0 + + + all,-missing + attach-javadocs diff --git a/src/main/c/com_tidesdb_TidesDB.c b/src/main/c/com_tidesdb_TidesDB.c index 2c8f806..c8b4985 100644 --- a/src/main/c/com_tidesdb_TidesDB.c +++ b/src/main/c/com_tidesdb_TidesDB.c @@ -70,6 +70,8 @@ static const char *getErrorMessage(int code) return "database is locked"; case TDB_ERR_READONLY: return "database is read-only"; + case TDB_ERR_BUSY: + return "resource is busy"; default: return "unknown error"; } @@ -87,7 +89,7 @@ JNIEXPORT jlong JNICALL Java_com_tidesdb_TidesDB_nativeOpen( jlong oscMultipartPartSize, jboolean oscSyncManifestToObject, jboolean oscReplicateWal, jboolean oscWalUploadSync, jlong oscWalSyncThresholdBytes, jboolean oscWalSyncOnCommit, jboolean oscReplicaMode, jlong oscReplicaSyncIntervalUs, jboolean oscReplicaReplayWal, - jint maxConcurrentFlushes) + jint maxConcurrentFlushes, jboolean finishCompactionsOnClose) { const char *path = (*env)->GetStringUTFChars(env, dbPath, NULL); if (path == NULL) @@ -151,7 +153,8 @@ JNIEXPORT jlong JNICALL Java_com_tidesdb_TidesDB_nativeOpen( .unified_memtable_sync_interval_us = (uint64_t)unifiedMemtableSyncIntervalUs, .object_store = obj_store, .object_store_config = obj_store != NULL ? &os_cfg : NULL, - .max_concurrent_flushes = maxConcurrentFlushes}; + .max_concurrent_flushes = maxConcurrentFlushes, + .finish_compactions_on_close = finishCompactionsOnClose ? 1 : 0}; tidesdb_t *db = NULL; int result = tidesdb_open(&config, &db); @@ -632,7 +635,7 @@ JNIEXPORT jobject JNICALL Java_com_tidesdb_ColumnFamily_nativeGetStats(JNIEnv *e jclass statsClass = (*env)->FindClass(env, "com/tidesdb/Stats"); jmethodID constructor = (*env)->GetMethodID(env, statsClass, "", - "(IJ[J[ILcom/tidesdb/ColumnFamilyConfig;JJDD[JDDZJIDJD[JDI)V"); + "(IJ[J[ILcom/tidesdb/ColumnFamilyConfig;JJDD[JDDZJIDJD[JDIJJJJJJJ)V"); jobject statsObj = (*env)->NewObject(env, statsClass, constructor, stats->num_levels, (jlong)stats->memtable_size, levelSizes, levelNumSSTables, @@ -646,7 +649,14 @@ JNIEXPORT jobject JNICALL Java_com_tidesdb_ColumnFamily_nativeGetStats(JNIEnv *e (jdouble)stats->tombstone_ratio, levelTombstoneCounts, (jdouble)stats->max_sst_density, - (jint)stats->max_sst_density_level); + (jint)stats->max_sst_density_level, + (jlong)stats->wal_bytes_written, + (jlong)stats->flush_bytes_written, + (jlong)stats->compaction_bytes_written, + (jlong)stats->compaction_bytes_read, + (jlong)stats->user_bytes_written, + (jlong)stats->flush_count, + (jlong)stats->compaction_count); tidesdb_free_stats(stats); @@ -1323,7 +1333,7 @@ JNIEXPORT jobject JNICALL Java_com_tidesdb_TidesDB_nativeGetDbStats(JNIEnv *env, boolean, String, long, long, int, long, long, long, long, boolean */ jmethodID constructor = (*env)->GetMethodID( env, dbStatsClass, "", - "(IJJJIIJIIJIJJJJZJIZIJZLjava/lang/String;JJIJJJJZ)V"); + "(IJJJIIJIIJIJJJJZJIZIJZLjava/lang/String;JJIJJJJZJJJJJJJJ)V"); jstring connectorStr = NULL; if (db_stats.object_store_connector != NULL) @@ -1362,7 +1372,15 @@ JNIEXPORT jobject JNICALL Java_com_tidesdb_TidesDB_nativeGetDbStats(JNIEnv *env, (jlong)db_stats.upload_queue_depth, (jlong)db_stats.total_uploads, (jlong)db_stats.total_upload_failures, - db_stats.replica_mode != 0); + db_stats.replica_mode != 0, + (jlong)db_stats.uwal_bytes_written, + (jlong)db_stats.wal_bytes_written, + (jlong)db_stats.flush_bytes_written, + (jlong)db_stats.compaction_bytes_written, + (jlong)db_stats.compaction_bytes_read, + (jlong)db_stats.user_bytes_written, + (jlong)db_stats.flush_count, + (jlong)db_stats.compaction_count); } JNIEXPORT jdouble JNICALL Java_com_tidesdb_ColumnFamily_nativeRangeCost(JNIEnv *env, jclass cls, @@ -1422,6 +1440,25 @@ JNIEXPORT void JNICALL Java_com_tidesdb_TidesDB_nativePromoteToPrimary(JNIEnv *e } } +JNIEXPORT void JNICALL Java_com_tidesdb_TidesDB_nativeCancelBackgroundWork(JNIEnv *env, jclass cls, + jlong handle) +{ + tidesdb_t *db = (tidesdb_t *)(uintptr_t)handle; + + int result = tidesdb_cancel_background_work(db); + + if (result != TDB_SUCCESS) + { + throwTidesDBException(env, result, getErrorMessage(result)); + } +} + +JNIEXPORT jlong JNICALL Java_com_tidesdb_TidesDB_nativeRaiseOpenFileLimit(JNIEnv *env, jclass cls, + jlong desired) +{ + return (jlong)tidesdb_raise_open_file_limit((long)desired); +} + JNIEXPORT jint JNICALL Java_com_tidesdb_Config_nativeDefaultMaxConcurrentFlushes(JNIEnv *env, jclass cls) { @@ -1443,6 +1480,159 @@ JNIEXPORT jlong JNICALL Java_com_tidesdb_ColumnFamilyConfig_nativeDefaultTombsto return (jlong)cfg.tombstone_density_min_entries; } +/** + * Builds a com.tidesdb.ColumnFamilyConfig from a native config struct via the + * ColumnFamilyConfig.fromNative static factory. Returns a local ref, or NULL on error. + */ +static jobject buildCfConfigObject(JNIEnv *env, const tidesdb_column_family_config_t *cfg) +{ + jclass cfConfigClass = (*env)->FindClass(env, "com/tidesdb/ColumnFamilyConfig"); + if (cfConfigClass == NULL) return NULL; + + jmethodID fromNative = (*env)->GetStaticMethodID( + env, cfConfigClass, "fromNative", + "(JJIIJIZDZIIIJLjava/lang/String;IFIJIIDJZZZ)Lcom/tidesdb/ColumnFamilyConfig;"); + if (fromNative == NULL) + { + (*env)->DeleteLocalRef(env, cfConfigClass); + return NULL; + } + + jstring comparatorName = (*env)->NewStringUTF(env, cfg->comparator_name); + + jobject obj = (*env)->CallStaticObjectMethod( + env, cfConfigClass, fromNative, + (jlong)cfg->write_buffer_size, (jlong)cfg->level_size_ratio, (jint)cfg->min_levels, + (jint)cfg->dividing_level_offset, (jlong)cfg->klog_value_threshold, + (jint)cfg->compression_algorithm, + cfg->enable_bloom_filter != 0 ? JNI_TRUE : JNI_FALSE, (jdouble)cfg->bloom_fpr, + cfg->enable_block_indexes != 0 ? JNI_TRUE : JNI_FALSE, (jint)cfg->index_sample_ratio, + (jint)cfg->block_index_prefix_len, (jint)cfg->sync_mode, (jlong)cfg->sync_interval_us, + comparatorName, (jint)cfg->skip_list_max_level, (jfloat)cfg->skip_list_probability, + (jint)cfg->default_isolation_level, (jlong)cfg->min_disk_space, + (jint)cfg->l1_file_count_trigger, (jint)cfg->l0_queue_stall_threshold, + (jdouble)cfg->tombstone_density_trigger, (jlong)cfg->tombstone_density_min_entries, + cfg->use_btree != 0 ? JNI_TRUE : JNI_FALSE, + cfg->object_lazy_compaction != 0 ? JNI_TRUE : JNI_FALSE, + cfg->object_prefetch_compaction != 0 ? JNI_TRUE : JNI_FALSE); + + (*env)->DeleteLocalRef(env, comparatorName); + (*env)->DeleteLocalRef(env, cfConfigClass); + return obj; +} + +JNIEXPORT void JNICALL Java_com_tidesdb_ColumnFamilyConfig_nativeSaveToIni( + JNIEnv *env, jclass cls, jstring iniFile, jstring sectionName, jlong writeBufferSize, + jlong levelSizeRatio, jint minLevels, jint dividingLevelOffset, jlong klogValueThreshold, + jint compressionAlgorithm, jboolean enableBloomFilter, jdouble bloomFPR, + jboolean enableBlockIndexes, jint indexSampleRatio, jint blockIndexPrefixLen, jint syncMode, + jlong syncIntervalUs, jstring comparatorName, jint skipListMaxLevel, jfloat skipListProbability, + jint defaultIsolationLevel, jlong minDiskSpace, jint l1FileCountTrigger, + jint l0QueueStallThreshold, jdouble tombstoneDensityTrigger, jlong tombstoneDensityMinEntries, + jboolean useBtree, jboolean objectLazyCompaction, jboolean objectPrefetchCompaction) +{ + const char *ini = (*env)->GetStringUTFChars(env, iniFile, NULL); + if (ini == NULL) + { + throwTidesDBException(env, TDB_ERR_MEMORY, "Failed to get INI file path"); + return; + } + const char *section = (*env)->GetStringUTFChars(env, sectionName, NULL); + if (section == NULL) + { + (*env)->ReleaseStringUTFChars(env, iniFile, ini); + throwTidesDBException(env, TDB_ERR_MEMORY, "Failed to get section name"); + return; + } + + const char *compName = NULL; + if (comparatorName != NULL) + { + compName = (*env)->GetStringUTFChars(env, comparatorName, NULL); + } + + tidesdb_column_family_config_t config = { + .write_buffer_size = (size_t)writeBufferSize, + .level_size_ratio = (size_t)levelSizeRatio, + .min_levels = minLevels, + .dividing_level_offset = dividingLevelOffset, + .klog_value_threshold = (size_t)klogValueThreshold, + .compression_algorithm = (compression_algorithm)compressionAlgorithm, + .enable_bloom_filter = enableBloomFilter ? 1 : 0, + .bloom_fpr = bloomFPR, + .enable_block_indexes = enableBlockIndexes ? 1 : 0, + .index_sample_ratio = indexSampleRatio, + .block_index_prefix_len = blockIndexPrefixLen, + .sync_mode = syncMode, + .sync_interval_us = (uint64_t)syncIntervalUs, + .skip_list_max_level = skipListMaxLevel, + .skip_list_probability = skipListProbability, + .default_isolation_level = (tidesdb_isolation_level_t)defaultIsolationLevel, + .min_disk_space = (uint64_t)minDiskSpace, + .l1_file_count_trigger = l1FileCountTrigger, + .l0_queue_stall_threshold = l0QueueStallThreshold, + .tombstone_density_trigger = tombstoneDensityTrigger, + .tombstone_density_min_entries = (uint64_t)tombstoneDensityMinEntries, + .use_btree = useBtree ? 1 : 0, + .object_lazy_compaction = objectLazyCompaction ? 1 : 0, + .object_prefetch_compaction = objectPrefetchCompaction ? 1 : 0}; + + memset(config.comparator_name, 0, TDB_MAX_COMPARATOR_NAME); + if (compName != NULL && strlen(compName) > 0) + { + strncpy(config.comparator_name, compName, TDB_MAX_COMPARATOR_NAME - 1); + } + memset(config.comparator_ctx_str, 0, TDB_MAX_COMPARATOR_CTX); + + int result = tidesdb_cf_config_save_to_ini(ini, section, &config); + + (*env)->ReleaseStringUTFChars(env, iniFile, ini); + (*env)->ReleaseStringUTFChars(env, sectionName, section); + if (compName != NULL) + { + (*env)->ReleaseStringUTFChars(env, comparatorName, compName); + } + + if (result != TDB_SUCCESS) + { + throwTidesDBException(env, result, getErrorMessage(result)); + } +} + +JNIEXPORT jobject JNICALL Java_com_tidesdb_ColumnFamilyConfig_nativeLoadFromIni( + JNIEnv *env, jclass cls, jstring iniFile, jstring sectionName) +{ + const char *ini = (*env)->GetStringUTFChars(env, iniFile, NULL); + if (ini == NULL) + { + throwTidesDBException(env, TDB_ERR_MEMORY, "Failed to get INI file path"); + return NULL; + } + const char *section = (*env)->GetStringUTFChars(env, sectionName, NULL); + if (section == NULL) + { + (*env)->ReleaseStringUTFChars(env, iniFile, ini); + throwTidesDBException(env, TDB_ERR_MEMORY, "Failed to get section name"); + return NULL; + } + + /* start from engine defaults so fields absent from the INI section keep sane values */ + tidesdb_column_family_config_t config = tidesdb_default_column_family_config(); + + int result = tidesdb_cf_config_load_from_ini(ini, section, &config); + + (*env)->ReleaseStringUTFChars(env, iniFile, ini); + (*env)->ReleaseStringUTFChars(env, sectionName, section); + + if (result != TDB_SUCCESS) + { + throwTidesDBException(env, result, getErrorMessage(result)); + return NULL; + } + + return buildCfConfigObject(env, &config); +} + JNIEXPORT jobject JNICALL Java_com_tidesdb_TidesDBIterator_nativeKeyValue(JNIEnv *env, jclass cls, jlong handle) { diff --git a/src/main/java/com/tidesdb/ColumnFamilyConfig.java b/src/main/java/com/tidesdb/ColumnFamilyConfig.java index 39079b0..ff7c4a6 100644 --- a/src/main/java/com/tidesdb/ColumnFamilyConfig.java +++ b/src/main/java/com/tidesdb/ColumnFamilyConfig.java @@ -199,8 +199,70 @@ static ColumnFamilyConfig fromNative(long writeBufferSize, long levelSizeRatio, public boolean isObjectLazyCompaction() { return objectLazyCompaction; } public boolean isObjectPrefetchCompaction() { return objectPrefetchCompaction; } + /** + * Saves this column family configuration to an INI file under the given section. + * If the file already exists it is overwritten. The written file can be read back + * with {@link #loadFromIni(String, String)}. + * + *

Note: not every field round-trips. The persisted fields are the ones the engine + * stores in a column family's {@code config.ini} (write buffer size, level ratios, + * compression, bloom/index settings, sync mode, skip list parameters, isolation level, + * compaction triggers, tombstone density, B+tree and object-store flags, and the + * comparator name). Runtime-only fields such as commit hooks are not persisted.

+ * + * @param iniFile path to the INI file to write + * @param sectionName section name to write the configuration under + * @throws TidesDBException if the file cannot be written + */ + public void saveToIni(String iniFile, String sectionName) throws TidesDBException { + if (iniFile == null || iniFile.isEmpty()) { + throw new IllegalArgumentException("INI file path cannot be null or empty"); + } + if (sectionName == null || sectionName.isEmpty()) { + throw new IllegalArgumentException("Section name cannot be null or empty"); + } + nativeSaveToIni(iniFile, sectionName, + writeBufferSize, levelSizeRatio, minLevels, dividingLevelOffset, klogValueThreshold, + compressionAlgorithm.getValue(), enableBloomFilter, bloomFPR, enableBlockIndexes, + indexSampleRatio, blockIndexPrefixLen, syncMode.getValue(), syncIntervalUs, + comparatorName, skipListMaxLevel, skipListProbability, + defaultIsolationLevel.getValue(), minDiskSpace, l1FileCountTrigger, + l0QueueStallThreshold, tombstoneDensityTrigger, tombstoneDensityMinEntries, + useBtree, objectLazyCompaction, objectPrefetchCompaction); + } + + /** + * Loads a column family configuration from an INI file section previously written by + * {@link #saveToIni(String, String)} (or produced by the engine for an existing column + * family). Fields absent from the section fall back to the engine defaults. + * + * @param iniFile path to the INI file to read + * @param sectionName section name to read the configuration from + * @return the loaded configuration + * @throws TidesDBException if the file cannot be read or the section is missing + */ + public static ColumnFamilyConfig loadFromIni(String iniFile, String sectionName) throws TidesDBException { + if (iniFile == null || iniFile.isEmpty()) { + throw new IllegalArgumentException("INI file path cannot be null or empty"); + } + if (sectionName == null || sectionName.isEmpty()) { + throw new IllegalArgumentException("Section name cannot be null or empty"); + } + return nativeLoadFromIni(iniFile, sectionName); + } + private static native double nativeDefaultTombstoneDensityTrigger(); private static native long nativeDefaultTombstoneDensityMinEntries(); + private static native void nativeSaveToIni(String iniFile, String sectionName, + long writeBufferSize, long levelSizeRatio, int minLevels, int dividingLevelOffset, + long klogValueThreshold, int compressionAlgorithm, boolean enableBloomFilter, + double bloomFPR, boolean enableBlockIndexes, int indexSampleRatio, int blockIndexPrefixLen, + int syncMode, long syncIntervalUs, String comparatorName, int skipListMaxLevel, + float skipListProbability, int defaultIsolationLevel, long minDiskSpace, + int l1FileCountTrigger, int l0QueueStallThreshold, double tombstoneDensityTrigger, + long tombstoneDensityMinEntries, boolean useBtree, boolean objectLazyCompaction, + boolean objectPrefetchCompaction) throws TidesDBException; + private static native ColumnFamilyConfig nativeLoadFromIni(String iniFile, String sectionName) throws TidesDBException; /** * Builder for ColumnFamilyConfig. diff --git a/src/main/java/com/tidesdb/Config.java b/src/main/java/com/tidesdb/Config.java index a6169e0..b72dbd2 100644 --- a/src/main/java/com/tidesdb/Config.java +++ b/src/main/java/com/tidesdb/Config.java @@ -45,6 +45,7 @@ public class Config { private String objectStoreFsPath; private ObjectStoreConfig objectStoreConfig; private int maxConcurrentFlushes; + private boolean finishCompactionsOnClose; private Config(Builder builder) { this.dbPath = builder.dbPath; @@ -65,6 +66,7 @@ private Config(Builder builder) { this.objectStoreFsPath = builder.objectStoreFsPath; this.objectStoreConfig = builder.objectStoreConfig; this.maxConcurrentFlushes = builder.maxConcurrentFlushes; + this.finishCompactionsOnClose = builder.finishCompactionsOnClose; } /** @@ -172,6 +174,16 @@ public int getMaxConcurrentFlushes() { return maxConcurrentFlushes; } + /** + * Returns the close behavior for in-flight compactions. + * + * @return true if {@code close()} waits for in-flight compactions to finish; + * false (default) cancels them at their next checkpoint for a fast shutdown + */ + public boolean isFinishCompactionsOnClose() { + return finishCompactionsOnClose; + } + /** * Builder for Config. */ @@ -194,6 +206,7 @@ public static class Builder { private String objectStoreFsPath = null; private ObjectStoreConfig objectStoreConfig = null; private int maxConcurrentFlushes = 0; + private boolean finishCompactionsOnClose = false; public Builder dbPath(String dbPath) { this.dbPath = dbPath; @@ -285,6 +298,20 @@ public Builder maxConcurrentFlushes(int maxConcurrentFlushes) { return this; } + /** + * Sets the close behavior for in-flight compactions. + * + * @param finishCompactionsOnClose false (default) cancels in-flight compactions at their + * next checkpoint for a fast shutdown (no data is lost; recovery handles a mid-merge + * state). true lets in-flight compactions run to completion before {@code close()} + * returns. + * @return this builder + */ + public Builder finishCompactionsOnClose(boolean finishCompactionsOnClose) { + this.finishCompactionsOnClose = finishCompactionsOnClose; + return this; + } + public Config build() { validate(); return new Config(this); diff --git a/src/main/java/com/tidesdb/DbStats.java b/src/main/java/com/tidesdb/DbStats.java index c5ae3b9..83d8718 100644 --- a/src/main/java/com/tidesdb/DbStats.java +++ b/src/main/java/com/tidesdb/DbStats.java @@ -54,6 +54,14 @@ public class DbStats { private final long totalUploads; private final long totalUploadFailures; private final boolean replicaMode; + private final long uwalBytesWritten; + private final long walBytesWritten; + private final long flushBytesWritten; + private final long compactionBytesWritten; + private final long compactionBytesRead; + private final long userBytesWritten; + private final long flushCount; + private final long compactionCount; public DbStats(int numColumnFamilies, long totalMemory, long availableMemory, long resolvedMemoryLimit, int memoryPressureLevel, int flushPendingCount, @@ -66,7 +74,10 @@ public DbStats(int numColumnFamilies, long totalMemory, long availableMemory, boolean objectStoreEnabled, String objectStoreConnector, long localCacheBytesUsed, long localCacheBytesMax, int localCacheNumFiles, long lastUploadedGeneration, long uploadQueueDepth, - long totalUploads, long totalUploadFailures, boolean replicaMode) { + long totalUploads, long totalUploadFailures, boolean replicaMode, + long uwalBytesWritten, long walBytesWritten, long flushBytesWritten, + long compactionBytesWritten, long compactionBytesRead, long userBytesWritten, + long flushCount, long compactionCount) { this.numColumnFamilies = numColumnFamilies; this.totalMemory = totalMemory; this.availableMemory = availableMemory; @@ -98,6 +109,14 @@ public DbStats(int numColumnFamilies, long totalMemory, long availableMemory, this.totalUploads = totalUploads; this.totalUploadFailures = totalUploadFailures; this.replicaMode = replicaMode; + this.uwalBytesWritten = uwalBytesWritten; + this.walBytesWritten = walBytesWritten; + this.flushBytesWritten = flushBytesWritten; + this.compactionBytesWritten = compactionBytesWritten; + this.compactionBytesRead = compactionBytesRead; + this.userBytesWritten = userBytesWritten; + this.flushCount = flushCount; + this.compactionCount = compactionCount; } public int getNumColumnFamilies() { @@ -224,6 +243,84 @@ public boolean isReplicaMode() { return replicaMode; } + /** + * Gets the framed bytes appended to the shared unified WAL (lifetime since open). + * Returns 0 when unified memtable mode is off. + * + * @return unified WAL bytes written + */ + public long getUwalBytesWritten() { + return uwalBytesWritten; + } + + /** + * Gets the per-column-family WAL bytes summed across all column families + * (lifetime since open). + * + * @return WAL bytes written across all CFs + */ + public long getWalBytesWritten() { + return walBytesWritten; + } + + /** + * Gets the flush output bytes summed across all column families (lifetime since open). + * + * @return flush output bytes written across all CFs + */ + public long getFlushBytesWritten() { + return flushBytesWritten; + } + + /** + * Gets the compaction output bytes summed across all column families (lifetime since open). + * + * @return compaction output bytes written across all CFs + */ + public long getCompactionBytesWritten() { + return compactionBytesWritten; + } + + /** + * Gets the compaction input bytes summed across all column families (lifetime since open). + * + * @return compaction input bytes read across all CFs + */ + public long getCompactionBytesRead() { + return compactionBytesRead; + } + + /** + * Gets the logical committed bytes summed across all column families (lifetime since open). + * This is the database-wide write-amplification denominator: + * {@code (uwal + wal + flush + compaction) / userBytesWritten}. + * + * @return user bytes written across all CFs + */ + public long getUserBytesWritten() { + return userBytesWritten; + } + + /** + * Gets the number of flushed SSTables summed across all column families + * (lifetime since open). + * + * @return flush count across all CFs + */ + public long getFlushCount() { + return flushCount; + } + + /** + * Gets the number of compaction output SSTables summed across all column families + * (lifetime since open). + * + * @return compaction count across all CFs + */ + public long getCompactionCount() { + return compactionCount; + } + @Override public String toString() { return "DbStats{" + @@ -258,6 +355,14 @@ public String toString() { ", totalUploads=" + totalUploads + ", totalUploadFailures=" + totalUploadFailures + ", replicaMode=" + replicaMode + + ", uwalBytesWritten=" + uwalBytesWritten + + ", walBytesWritten=" + walBytesWritten + + ", flushBytesWritten=" + flushBytesWritten + + ", compactionBytesWritten=" + compactionBytesWritten + + ", compactionBytesRead=" + compactionBytesRead + + ", userBytesWritten=" + userBytesWritten + + ", flushCount=" + flushCount + + ", compactionCount=" + compactionCount + '}'; } } diff --git a/src/main/java/com/tidesdb/Stats.java b/src/main/java/com/tidesdb/Stats.java index 37b6f5b..44aa721 100644 --- a/src/main/java/com/tidesdb/Stats.java +++ b/src/main/java/com/tidesdb/Stats.java @@ -44,6 +44,13 @@ public class Stats { private final long[] levelTombstoneCounts; private final double maxSstDensity; private final int maxSstDensityLevel; + private final long walBytesWritten; + private final long flushBytesWritten; + private final long compactionBytesWritten; + private final long compactionBytesRead; + private final long userBytesWritten; + private final long flushCount; + private final long compactionCount; public Stats(int numLevels, long memtableSize, long[] levelSizes, int[] levelNumSSTables, ColumnFamilyConfig config, long totalKeys, long totalDataSize, @@ -51,7 +58,10 @@ public Stats(int numLevels, long memtableSize, long[] levelSizes, int[] levelNum double readAmp, double hitRate, boolean useBtree, long btreeTotalNodes, int btreeMaxHeight, double btreeAvgHeight, long totalTombstones, double tombstoneRatio, long[] levelTombstoneCounts, - double maxSstDensity, int maxSstDensityLevel) { + double maxSstDensity, int maxSstDensityLevel, + long walBytesWritten, long flushBytesWritten, long compactionBytesWritten, + long compactionBytesRead, long userBytesWritten, long flushCount, + long compactionCount) { this.numLevels = numLevels; this.memtableSize = memtableSize; this.levelSizes = levelSizes; @@ -73,6 +83,13 @@ public Stats(int numLevels, long memtableSize, long[] levelSizes, int[] levelNum this.levelTombstoneCounts = levelTombstoneCounts; this.maxSstDensity = maxSstDensity; this.maxSstDensityLevel = maxSstDensityLevel; + this.walBytesWritten = walBytesWritten; + this.flushBytesWritten = flushBytesWritten; + this.compactionBytesWritten = compactionBytesWritten; + this.compactionBytesRead = compactionBytesRead; + this.userBytesWritten = userBytesWritten; + this.flushCount = flushCount; + this.compactionCount = compactionCount; } /** @@ -271,6 +288,76 @@ public int getMaxSstDensityLevel() { return maxSstDensityLevel; } + /** + * Gets the framed bytes appended to this column family's WAL (lifetime since open). + * Always 0 in unified memtable mode, where the shared WAL volume is reported db-wide + * via {@link DbStats#getUwalBytesWritten()}. + * + * @return WAL bytes written + */ + public long getWalBytesWritten() { + return walBytesWritten; + } + + /** + * Gets the on-disk bytes this column family's flushes wrote to L0 SSTables + * (lifetime since open). + * + * @return flush output bytes written + */ + public long getFlushBytesWritten() { + return flushBytesWritten; + } + + /** + * Gets the on-disk bytes this column family's compactions wrote (lifetime since open). + * + * @return compaction output bytes written + */ + public long getCompactionBytesWritten() { + return compactionBytesWritten; + } + + /** + * Gets the on-disk bytes this column family's compactions read as input + * (lifetime since open). + * + * @return compaction input bytes read + */ + public long getCompactionBytesRead() { + return compactionBytesRead; + } + + /** + * Gets the logical key+value bytes committed to this column family (lifetime since open). + * This is the write-amplification denominator: divide the WAL, flush, and compaction + * write totals by this value to compute write amplification. + * + * @return user bytes written + */ + public long getUserBytesWritten() { + return userBytesWritten; + } + + /** + * Gets the number of flushed SSTables produced by this column family (lifetime since open). + * + * @return flush count + */ + public long getFlushCount() { + return flushCount; + } + + /** + * Gets the number of compaction output SSTables produced by this column family + * (lifetime since open). + * + * @return compaction count + */ + public long getCompactionCount() { + return compactionCount; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -292,6 +379,13 @@ public String toString() { sb.append(", tombstoneRatio=").append(tombstoneRatio); sb.append(", maxSstDensity=").append(maxSstDensity); sb.append(", maxSstDensityLevel=").append(maxSstDensityLevel); + sb.append(", walBytesWritten=").append(walBytesWritten); + sb.append(", flushBytesWritten=").append(flushBytesWritten); + sb.append(", compactionBytesWritten=").append(compactionBytesWritten); + sb.append(", compactionBytesRead=").append(compactionBytesRead); + sb.append(", userBytesWritten=").append(userBytesWritten); + sb.append(", flushCount=").append(flushCount); + sb.append(", compactionCount=").append(compactionCount); if (levelSizes != null) { sb.append(", levelSizes=["); for (int i = 0; i < levelSizes.length; i++) { diff --git a/src/main/java/com/tidesdb/TidesDB.java b/src/main/java/com/tidesdb/TidesDB.java index 635d0b1..0000601 100644 --- a/src/main/java/com/tidesdb/TidesDB.java +++ b/src/main/java/com/tidesdb/TidesDB.java @@ -87,7 +87,8 @@ public static TidesDB open(Config config) throws TidesDBException { osc != null ? osc.isReplicaMode() : false, osc != null ? osc.getReplicaSyncIntervalUs() : 5000000, osc != null ? osc.isReplicaReplayWal() : true, - config.getMaxConcurrentFlushes() + config.getMaxConcurrentFlushes(), + config.isFinishCompactionsOnClose() ); return new TidesDB(handle); @@ -361,7 +362,40 @@ public DbStats getDbStats() throws TidesDBException { checkNotClosed(); return nativeGetDbStats(nativeHandle); } - + + /** + * Cancels background compaction database-wide. In-flight merges bail safely at their + * next checkpoint (their uncommitted output is discarded, inputs are left intact, so no + * data is lost) and any queued compaction is skipped. Flushes are unaffected, so + * durability is preserved. Blocks (bounded) until compaction is idle. + * + *

The cancellation is sticky for the session and is reset on the next open. It is + * intended to be called immediately before {@link #close()} for a fast shutdown.

+ * + * @throws TidesDBException if the operation fails + */ + public void cancelBackgroundWork() throws TidesDBException { + checkNotClosed(); + nativeCancelBackgroundWork(nativeHandle); + } + + /** + * Raises this process's open-file ceiling toward {@code desired} descriptors so a database + * can keep more SSTables open. The engine sizes {@code maxOpenSSTables} to fit this at open + * time, so call it before {@link #open(Config)}. This is an explicit, opt-in + * operator action; TidesDB never raises the limit itself. + * + *

On POSIX systems (Linux, macOS, the BSDs, illumos) this raises the {@code RLIMIT_NOFILE} + * soft limit toward the hard limit; on Windows it raises the CRT stdio cap (max 8192). A + * failed or partial raise is non-fatal.

+ * + * @param desired target descriptor count; values ≤ 0 just report the current ceiling + * @return the open-file ceiling in effect after the attempt + */ + public static long raiseOpenFileLimit(long desired) { + return nativeRaiseOpenFileLimit(desired); + } + private void checkNotClosed() { if (closed) { throw new IllegalStateException("TidesDB instance is closed"); @@ -391,7 +425,8 @@ private static native long nativeOpen(String dbPath, int numFlushThreads, int nu boolean oscWalSyncOnCommit, boolean oscReplicaMode, long oscReplicaSyncIntervalUs, boolean oscReplicaReplayWal, - int maxConcurrentFlushes) throws TidesDBException; + int maxConcurrentFlushes, + boolean finishCompactionsOnClose) throws TidesDBException; private static native void nativeClose(long handle); @@ -436,4 +471,8 @@ private static native void nativeCreateColumnFamily(long handle, String name, private static native void nativeDeleteColumnFamily(long handle, long cfHandle) throws TidesDBException; private static native void nativePromoteToPrimary(long handle) throws TidesDBException; + + private static native void nativeCancelBackgroundWork(long handle) throws TidesDBException; + + private static native long nativeRaiseOpenFileLimit(long desired); } diff --git a/src/main/java/com/tidesdb/TidesDBIterator.java b/src/main/java/com/tidesdb/TidesDBIterator.java index b75fdf5..66900c6 100644 --- a/src/main/java/com/tidesdb/TidesDBIterator.java +++ b/src/main/java/com/tidesdb/TidesDBIterator.java @@ -72,7 +72,7 @@ public void seek(byte[] key) throws TidesDBException { } /** - * Positions the iterator at the last key <= target key. + * Positions the iterator at the last key {@code <=} target key. * * @param key the target key * @throws TidesDBException if the seek fails diff --git a/src/test/java/com/tidesdb/TidesDBTest.java b/src/test/java/com/tidesdb/TidesDBTest.java index 24378b8..77b991e 100644 --- a/src/test/java/com/tidesdb/TidesDBTest.java +++ b/src/test/java/com/tidesdb/TidesDBTest.java @@ -1734,10 +1734,12 @@ void testMaxConcurrentFlushes() throws TidesDBException { cf.flushMemtable(); } - // defaultConfig() should source maxConcurrentFlushes from the C library + // defaultConfig() should source maxConcurrentFlushes from the C library. The engine's + // default is 0, which is the "auto" sentinel meaning "pin to the resolved + // num_flush_threads" -- so the only invariant we can assert is that it is non-negative. Config defaults = Config.defaultConfig(); - assertTrue(defaults.getMaxConcurrentFlushes() > 0, - "default maxConcurrentFlushes should be non-zero (sourced from tidesdb_default_config())"); + assertTrue(defaults.getMaxConcurrentFlushes() >= 0, + "default maxConcurrentFlushes should be sourced from tidesdb_default_config()"); } @Test @@ -1767,4 +1769,190 @@ void testTransactionSingleDeleteNullArgs() throws TidesDBException { } } } + + @Test + @Order(48) + void testRaiseOpenFileLimit() { + // Reporting-only call (desired <= 0) returns the current ceiling without changing it. + long current = TidesDB.raiseOpenFileLimit(0); + assertTrue(current > 0, "current open-file ceiling should be positive"); + + // A raise attempt is non-fatal and returns the ceiling in effect afterwards (>= current). + long after = TidesDB.raiseOpenFileLimit(current); + assertTrue(after >= current, "ceiling after a raise attempt should not be lower"); + } + + @Test + @Order(49) + void testCancelBackgroundWork() throws TidesDBException { + Config config = Config.builder(tempDir.resolve("testdb_cancel_bg").toString()) + .numFlushThreads(2) + .numCompactionThreads(2) + .logLevel(LogLevel.INFO) + .blockCacheSize(64 * 1024 * 1024) + .maxOpenSSTables(256) + .build(); + + try (TidesDB db = TidesDB.open(config)) { + db.createColumnFamily("cancel_cf", ColumnFamilyConfig.defaultConfig()); + ColumnFamily cf = db.getColumnFamily("cancel_cf"); + + try (Transaction txn = db.beginTransaction()) { + txn.put(cf, "k".getBytes(StandardCharsets.UTF_8), + "v".getBytes(StandardCharsets.UTF_8)); + txn.commit(); + } + + // Sticky db-wide cancel of background compaction; flushes are unaffected. + assertDoesNotThrow(db::cancelBackgroundWork); + } + } + + @Test + @Order(50) + void testFinishCompactionsOnClose() throws TidesDBException { + Config config = Config.builder(tempDir.resolve("testdb_finish_compactions").toString()) + .numFlushThreads(2) + .numCompactionThreads(2) + .logLevel(LogLevel.INFO) + .blockCacheSize(64 * 1024 * 1024) + .maxOpenSSTables(256) + .finishCompactionsOnClose(true) + .build(); + + assertTrue(config.isFinishCompactionsOnClose()); + + try (TidesDB db = TidesDB.open(config)) { + db.createColumnFamily("finish_cf", ColumnFamilyConfig.defaultConfig()); + ColumnFamily cf = db.getColumnFamily("finish_cf"); + try (Transaction txn = db.beginTransaction()) { + txn.put(cf, "k".getBytes(StandardCharsets.UTF_8), + "v".getBytes(StandardCharsets.UTF_8)); + txn.commit(); + } + cf.flushMemtable(); + } + // close() returning without error is the observable contract for this flag. + } + + @Test + @Order(51) + void testCfConfigIniRoundTrip() throws TidesDBException { + String iniFile = tempDir.resolve("cf_config.ini").toString(); + String section = "round_trip_cf"; + + ColumnFamilyConfig original = ColumnFamilyConfig.builder() + .writeBufferSize(96 * 1024 * 1024) + .levelSizeRatio(8) + .minLevels(4) + .klogValueThreshold(1024) + .compressionAlgorithm(CompressionAlgorithm.ZSTD_COMPRESSION) + .enableBloomFilter(true) + .bloomFPR(0.02) + .enableBlockIndexes(true) + .indexSampleRatio(2) + .blockIndexPrefixLen(8) + .syncMode(SyncMode.SYNC_INTERVAL) + .syncIntervalUs(250000) + .defaultIsolationLevel(IsolationLevel.SNAPSHOT) + .l1FileCountTrigger(6) + .l0QueueStallThreshold(15) + .tombstoneDensityTrigger(0.4) + .tombstoneDensityMinEntries(2048) + .minDiskSpace(50 * 1024 * 1024) + .useBtree(true) + .objectLazyCompaction(true) + .objectPrefetchCompaction(false) + .build(); + + original.saveToIni(iniFile, section); + + ColumnFamilyConfig loaded = ColumnFamilyConfig.loadFromIni(iniFile, section); + + assertEquals(original.getWriteBufferSize(), loaded.getWriteBufferSize()); + assertEquals(original.getLevelSizeRatio(), loaded.getLevelSizeRatio()); + assertEquals(original.getMinLevels(), loaded.getMinLevels()); + assertEquals(original.getKlogValueThreshold(), loaded.getKlogValueThreshold()); + assertEquals(original.getCompressionAlgorithm(), loaded.getCompressionAlgorithm()); + assertEquals(original.isEnableBloomFilter(), loaded.isEnableBloomFilter()); + assertEquals(original.getBloomFPR(), loaded.getBloomFPR(), 1e-9); + assertEquals(original.isEnableBlockIndexes(), loaded.isEnableBlockIndexes()); + assertEquals(original.getIndexSampleRatio(), loaded.getIndexSampleRatio()); + assertEquals(original.getBlockIndexPrefixLen(), loaded.getBlockIndexPrefixLen()); + assertEquals(original.getSyncMode(), loaded.getSyncMode()); + assertEquals(original.getSyncIntervalUs(), loaded.getSyncIntervalUs()); + assertEquals(original.getDefaultIsolationLevel(), loaded.getDefaultIsolationLevel()); + assertEquals(original.getL1FileCountTrigger(), loaded.getL1FileCountTrigger()); + assertEquals(original.getL0QueueStallThreshold(), loaded.getL0QueueStallThreshold()); + assertEquals(original.getTombstoneDensityTrigger(), loaded.getTombstoneDensityTrigger(), 1e-9); + assertEquals(original.getTombstoneDensityMinEntries(), loaded.getTombstoneDensityMinEntries()); + assertEquals(original.getMinDiskSpace(), loaded.getMinDiskSpace()); + assertEquals(original.isUseBtree(), loaded.isUseBtree()); + assertEquals(original.isObjectLazyCompaction(), loaded.isObjectLazyCompaction()); + assertEquals(original.isObjectPrefetchCompaction(), loaded.isObjectPrefetchCompaction()); + } + + @Test + @Order(52) + void testCfConfigIniInvalidArgs() { + assertThrows(IllegalArgumentException.class, + () -> ColumnFamilyConfig.defaultConfig().saveToIni(null, "s")); + assertThrows(IllegalArgumentException.class, + () -> ColumnFamilyConfig.defaultConfig().saveToIni("f.ini", "")); + assertThrows(IllegalArgumentException.class, + () -> ColumnFamilyConfig.loadFromIni("", "s")); + assertThrows(IllegalArgumentException.class, + () -> ColumnFamilyConfig.loadFromIni("f.ini", null)); + // Reading a non-existent INI file surfaces as a TidesDBException, not a crash. + assertThrows(TidesDBException.class, + () -> ColumnFamilyConfig.loadFromIni( + tempDir.resolve("does_not_exist.ini").toString(), "nope")); + } + + @Test + @Order(53) + void testWriteAmplificationCounters() throws TidesDBException { + Config config = Config.builder(tempDir.resolve("testdb_write_amp").toString()) + .numFlushThreads(2) + .numCompactionThreads(2) + .logLevel(LogLevel.INFO) + .blockCacheSize(64 * 1024 * 1024) + .maxOpenSSTables(256) + .build(); + + try (TidesDB db = TidesDB.open(config)) { + db.createColumnFamily("wa_cf", ColumnFamilyConfig.defaultConfig()); + ColumnFamily cf = db.getColumnFamily("wa_cf"); + + for (int i = 0; i < 200; i++) { + try (Transaction txn = db.beginTransaction()) { + txn.put(cf, ("key" + i).getBytes(StandardCharsets.UTF_8), + ("value" + i).getBytes(StandardCharsets.UTF_8)); + txn.commit(); + } + } + cf.flushMemtable(); + cf.purge(); + + Stats stats = cf.getStats(); + // Counters are non-negative and user bytes reflect the committed payload. + assertTrue(stats.getUserBytesWritten() > 0, "user bytes should be recorded"); + assertTrue(stats.getWalBytesWritten() >= 0); + assertTrue(stats.getFlushBytesWritten() >= 0); + assertTrue(stats.getCompactionBytesWritten() >= 0); + assertTrue(stats.getCompactionBytesRead() >= 0); + assertTrue(stats.getFlushCount() >= 0); + assertTrue(stats.getCompactionCount() >= 0); + + DbStats dbStats = db.getDbStats(); + assertTrue(dbStats.getUserBytesWritten() > 0, "db-wide user bytes should be recorded"); + assertTrue(dbStats.getUwalBytesWritten() >= 0); + assertTrue(dbStats.getWalBytesWritten() >= 0); + assertTrue(dbStats.getFlushBytesWritten() >= 0); + assertTrue(dbStats.getCompactionBytesWritten() >= 0); + assertTrue(dbStats.getCompactionBytesRead() >= 0); + assertTrue(dbStats.getFlushCount() >= 0); + assertTrue(dbStats.getCompactionCount() >= 0); + } + } }