Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.github.jbellis.jvector.graph.disk;

/**
* Callback interface for monitoring the I/O progress of an
* {@link OnDiskGraphIndexCompactor} run.
*
* <p>The compactor processes nodes in batches, writing each batch to disk as it
* completes. {@link #onProgress} is called every ten batches so that the caller
* can update a live status display or health-check endpoint without incurring the
* overhead of a callback on every single batch.
*
* <p>For graphs with multiple hierarchical levels the compactor calls
* {@link #onProgress} separately per level; the {@code totalBatches} value resets
* to the batch count of the new level at the start of each level. Level&nbsp;0
* (the base layer) is by far the largest and dominates total runtime, so
* level-local progress is a good proxy for overall merge progress.
*
* <p>Implementations must be thread-safe: callbacks are issued from whichever
* thread drives the completion service inside
* {@code OnDiskGraphIndexCompactor.runBatchesWithBackpressure}.
*/
@FunctionalInterface
public interface CompactionProgressListener {

/**
* Called periodically as batches are written to disk.
*
* @param completedBatches number of batches written to disk so far in the
* current level (always a positive multiple of 10,
* or equal to {@code totalBatches} for the final call)
* @param totalBatches total number of batches for the current level
*/
void onProgress(long completedBatches, long totalBatches);
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,24 @@ private void validateFeatures(List<OnDiskGraphIndex> sources) {
/**
* Main compaction entry point. Merges all source indexes into a single output index at the
* specified path, handling PQ retraining if needed, and writing header, all layers, and footer.
*
* <p>Equivalent to {@code compact(outputPath, null)}.
*/
public void compact(Path outputPath) throws FileNotFoundException {
compact(outputPath, null);
}

/**
* Main compaction entry point with optional progress reporting. Merges all source indexes into
* a single output index at the specified path, handling PQ retraining if needed, and writing
* header, all layers, and footer.
*
* @param outputPath destination file for the merged graph
* @param progressListener optional callback invoked every ten batches (and at completion of
* each level) so the caller can track I/O progress; pass {@code null}
* to disable progress reporting
*/
public void compact(Path outputPath, CompactionProgressListener progressListener) throws FileNotFoundException {
boolean fusedPQEnabled = hasFusedPQ();
boolean compressedPrecision = fusedPQEnabled;

Expand All @@ -230,7 +246,7 @@ public void compact(Path outputPath) throws FileNotFoundException {
numTotalNodes, maxOrdinal, dimension, maxDegrees.get(0));
try (CompactWriter writer = new CompactWriter(outputPath, maxOrdinal, numTotalNodes, 0, layerInfo, entryNode, dimension, maxDegrees, pq, pqLength, fusedPQEnabled)) {
writer.writeHeader();
compactLevels(writer, similarityFunction, fusedPQEnabled, compressedPrecision, pq);
compactLevels(writer, similarityFunction, fusedPQEnabled, compressedPrecision, pq, progressListener);

// When FusedPQ is enabled and there is no hierarchy (only L0), the reader expects
// to find the entry node's own PQ code written after the L0 block, just as
Expand Down Expand Up @@ -304,7 +320,8 @@ private void compactLevels(CompactWriter writer,
VectorSimilarityFunction similarityFunction,
boolean fusedPQEnabled,
boolean compressedPrecision,
ProductQuantization pq)
ProductQuantization pq,
CompactionProgressListener progressListener)
throws IOException, ExecutionException, InterruptedException {

int maxUpperDegree = 0;
Expand Down Expand Up @@ -362,7 +379,8 @@ private void compactLevels(CompactWriter writer,
} catch (IOException e) {
throw new RuntimeException(e);
}
}
},
progressListener
);
}

Expand Down Expand Up @@ -399,7 +417,8 @@ private void compactLevels(CompactWriter writer,
} catch (IOException e) {
throw new RuntimeException(e);
}
}
},
progressListener
);
}
}
Expand Down Expand Up @@ -740,12 +759,17 @@ private float rescore(OnDiskGraphIndex.View view,
* Executes batches with controlled concurrency using a sliding window approach. Prevents
* overwhelming memory by limiting the number of in-flight tasks while maintaining high
* throughput via the completion service.
*
* @param progressListener optional; when non-null, called every ten batches (and at the
* final batch) with {@code (completedBatches, totalBatches)} so
* the caller can expose live compaction progress
*/
private <T> void runBatchesWithBackpressure(
List<BatchSpec> batches,
ExecutorCompletionService<List<T>> ecs,
java.util.function.Consumer<BatchSpec> submitOne,
java.util.function.Consumer<List<T>> onComplete
java.util.function.Consumer<List<T>> onComplete,
CompactionProgressListener progressListener
) throws InterruptedException, ExecutionException {

final int total = batches.size();
Expand All @@ -770,8 +794,11 @@ private <T> void runBatchesWithBackpressure(
submitOne.accept(batches.get(nextToSubmit++));
inFlight++;
}
if (completed % 10 == 0) {
if (completed % 10 == 0 || completed == total) {
log.info("Compaction I/O progress: {}/{} batches written to disk", completed, total);
if (progressListener != null) {
progressListener.onProgress(completed, total);
}
}
}
}
Expand Down
Loading