diff --git a/paimon-lumina/README.md b/paimon-lumina/README.md
new file mode 100644
index 000000000000..d25a549e9dc7
--- /dev/null
+++ b/paimon-lumina/README.md
@@ -0,0 +1,35 @@
+## Paimon Lumina
+
+This module integrates [Lumina](https://github.com/alibaba/paimon-cpp/tree/main/third_party/lumina)
+as a vector index for Apache Paimon's global index framework.
+
+Lumina vector search library is derived from an internal repository maintained by
+Alibaba Storage Service Team. It is accessed via JNI through the `lumina-jni` artifact.
+
+### Supported Index Types
+
+| Index Type | Description |
+|------------|-------------|
+| **DISKANN** | DiskANN graph-based index (default) |
+
+### Supported Vector Metrics
+
+| Metric | Description |
+|--------|-------------|
+| **L2** | Euclidean distance (default) |
+| **COSINE** | Cosine distance |
+| **INNER_PRODUCT** | Dot product |
+
+### Configuration Options
+
+| Option | Type | Default | Description |
+|--------|------|---------|-------------|
+| `vector.dim` | int | 128 | Vector dimension |
+| `vector.metric` | enum | L2 | Distance metric |
+| `vector.index-type` | enum | DISKANN | Index type |
+| `vector.encoding-type` | string | rawf32 | Encoding type (rawf32, sq8, pq) |
+| `vector.size-per-index` | int | 2,000,000 | Max vectors per index file |
+| `vector.training-size` | int | 500,000 | Vectors used for pretraining |
+| `vector.search-factor` | int | 10 | Multiplier for search limit when filtering |
+| `vector.diskann.search-list-size` | int | 100 | DiskANN search list size |
+| `vector.pretrain-sample-ratio` | double | 1.0 | Pretrain sample ratio |
diff --git a/paimon-lumina/pom.xml b/paimon-lumina/pom.xml
new file mode 100644
index 000000000000..007ebdd06bf3
--- /dev/null
+++ b/paimon-lumina/pom.xml
@@ -0,0 +1,146 @@
+
+
+
+ 4.0.0
+
+
+ paimon-parent
+ org.apache.paimon
+ 1.4-SNAPSHOT
+
+
+ paimon-lumina
+ Paimon : Lumina Index
+
+
+
+ lumina
+ https://lumina-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/
+
+
+ jindodata
+ https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/
+
+
+
+
+
+ org.apache.paimon
+ paimon-common
+ ${project.version}
+ provided
+
+
+
+ org.aliyun.lumina
+ lumina-jni
+ 0.1.0
+
+
+
+ org.apache.paimon
+ paimon-shade-jackson-2
+ ${paimon.shade.jackson.version}-${paimon.shade.version}
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit5.version}
+ test
+
+
+
+ org.apache.paimon
+ paimon-core
+ ${project.version}
+ test
+
+
+
+ org.apache.paimon
+ paimon-format
+ ${project.version}
+ test
+
+
+
+ org.apache.paimon
+ paimon-test-utils
+ ${project.version}
+ test
+
+
+
+ org.apache.hadoop
+ hadoop-client
+ ${hadoop.version}
+ test
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+
+ org.apache.paimon
+ paimon-jindo
+ ${project.version}
+ test
+
+
+
+ com.aliyun.jindodata
+ jindo-core
+ 6.9.1
+ test
+
+
+
+ com.aliyun.jindodata
+ jindo-sdk
+ 6.9.1
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ **/*Benchmark*
+
+
+
+
+
+
diff --git a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaIndex.java b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaIndex.java
new file mode 100644
index 000000000000..c89e0f019ac1
--- /dev/null
+++ b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaIndex.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.aliyun.lumina.LuminaBuilder;
+import org.aliyun.lumina.LuminaDataset;
+import org.aliyun.lumina.LuminaFileInput;
+import org.aliyun.lumina.LuminaFileOutput;
+import org.aliyun.lumina.LuminaSearcher;
+import org.aliyun.lumina.MetricType;
+
+import java.io.Closeable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A high-level wrapper for Lumina index operations (build and search).
+ *
+ *
This class provides a safe Java API for building and searching Lumina vector indices. It
+ * manages the lifecycle of native LuminaBuilder and LuminaSearcher objects.
+ */
+public class LuminaIndex implements Closeable {
+
+ private LuminaBuilder builder;
+ private LuminaSearcher searcher;
+ private final int dimension;
+ private final LuminaVectorMetric metric;
+ private volatile boolean closed = false;
+
+ /** All lumina options (prefix-stripped), stored for building search options at query time. */
+ private Map allOptions;
+
+ private LuminaIndex(int dimension, LuminaVectorMetric metric) {
+ this.dimension = dimension;
+ this.metric = metric;
+ }
+
+ /** Create a new index for building. */
+ public static LuminaIndex createForBuild(
+ int dimension, LuminaVectorMetric metric, Map extraOptions) {
+ LuminaIndex index = new LuminaIndex(dimension, metric);
+
+ Map opts = new LinkedHashMap<>(extraOptions);
+ index.builder =
+ LuminaBuilder.create(
+ LuminaVectorIndexOptions.INDEX_TYPE.defaultValue(),
+ dimension,
+ toMetricType(metric),
+ opts);
+ return index;
+ }
+
+ /**
+ * Open an existing index from a streaming file input for searching.
+ *
+ * The native searcher reads on-demand from the provided input. The caller must keep the
+ * underlying stream open until this index is closed.
+ */
+ public static LuminaIndex fromStream(
+ LuminaFileInput fileInput,
+ long fileSize,
+ int dimension,
+ LuminaVectorMetric metric,
+ Map extraOptions) {
+ LuminaIndex index = new LuminaIndex(dimension, metric);
+ index.searcher =
+ LuminaSearcher.create(
+ LuminaVectorIndexOptions.INDEX_TYPE.defaultValue(),
+ dimension,
+ toMetricType(metric),
+ extraOptions);
+ index.searcher.open(fileInput, fileSize);
+ index.allOptions = extraOptions;
+ return index;
+ }
+
+ /** Pretrain using a streaming dataset (no contiguous buffer needed). */
+ public void pretrainFrom(LuminaDataset dataset) {
+ ensureOpen();
+ ensureBuilder();
+ builder.pretrainFrom(dataset);
+ }
+
+ /** Insert vectors using a streaming dataset (no contiguous buffer needed). */
+ public void insertFrom(LuminaDataset dataset) {
+ ensureOpen();
+ ensureBuilder();
+ builder.insertFrom(dataset);
+ }
+
+ /** Dump (serialize) the built index to a streaming file output. */
+ public void dump(LuminaFileOutput fileOutput) {
+ ensureOpen();
+ ensureBuilder();
+ builder.dump(fileOutput);
+ }
+
+ /** Search for k nearest neighbors. */
+ public void search(
+ float[] queryVectors,
+ int n,
+ int k,
+ float[] distances,
+ long[] labels,
+ Map searchOptions) {
+ ensureOpen();
+ ensureSearcher();
+ searcher.search(n, queryVectors, k, distances, labels, filterSearchOptions(searchOptions));
+ }
+
+ /** Search for k nearest neighbors with native pre-filtering on vector IDs. */
+ public void searchWithFilter(
+ float[] queryVectors,
+ int n,
+ int k,
+ float[] distances,
+ long[] labels,
+ long[] filterIds,
+ Map searchOptions) {
+ ensureOpen();
+ ensureSearcher();
+ searcher.searchWithFilter(
+ n,
+ queryVectors,
+ k,
+ distances,
+ labels,
+ filterIds,
+ filterSearchOptions(searchOptions));
+ }
+
+ /** Get the number of vectors (searcher mode). */
+ public long size() {
+ ensureOpen();
+ ensureSearcher();
+ return searcher.getCount();
+ }
+
+ public int dimension() {
+ return dimension;
+ }
+
+ public LuminaVectorMetric metric() {
+ return metric;
+ }
+
+ /**
+ * Filters an options map to only include keys valid for Lumina SearchOptions. This mirrors
+ * paimon-cpp's {@code NormalizeSearchOptions} which extracts only search-relevant keys.
+ *
+ * Valid search option prefixes: {@code search.*} (core search options) and {@code
+ * diskann.search.*} (DiskANN-specific search options).
+ */
+ private static Map filterSearchOptions(Map options) {
+ Map searchOpts = new LinkedHashMap<>();
+ for (Map.Entry entry : options.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith("search.") || key.startsWith("diskann.search.")) {
+ searchOpts.put(key, entry.getValue());
+ }
+ }
+ return searchOpts;
+ }
+
+ private void ensureOpen() {
+ if (closed) {
+ throw new IllegalStateException("Index has been closed");
+ }
+ }
+
+ private void ensureBuilder() {
+ if (builder == null) {
+ throw new IllegalStateException("Index was not opened for building");
+ }
+ }
+
+ private void ensureSearcher() {
+ if (searcher == null) {
+ throw new IllegalStateException("Index was not opened for searching");
+ }
+ }
+
+ private static MetricType toMetricType(LuminaVectorMetric metric) {
+ switch (metric) {
+ case L2:
+ return MetricType.L2;
+ case COSINE:
+ return MetricType.COSINE;
+ case INNER_PRODUCT:
+ return MetricType.INNER_PRODUCT;
+ default:
+ throw new IllegalArgumentException(String.format("Unknown metric: %s", metric));
+ }
+ }
+
+ @Override
+ public void close() {
+ if (!closed) {
+ synchronized (this) {
+ if (!closed) {
+ if (builder != null) {
+ builder.close();
+ builder = null;
+ }
+ if (searcher != null) {
+ searcher.close();
+ searcher = null;
+ }
+ closed = true;
+ }
+ }
+ }
+ }
+}
diff --git a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaIndexMeta.java b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaIndexMeta.java
new file mode 100644
index 000000000000..44df020f6230
--- /dev/null
+++ b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaIndexMeta.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Metadata for a Lumina vector index file.
+ *
+ * Serialized as a flat JSON {@code Map} whose keys are lumina native option keys
+ * (with the {@code lumina.} prefix stripped). This matches paimon-cpp's metadata format exactly, so
+ * that indexes built by either paimon-cpp or paimon-lumina can be read by both implementations.
+ *
+ * Standard keys include:
+ *
+ *
+ * {@code index.dimension} – vector dimension
+ * {@code index.type} – index algorithm (e.g. "diskann")
+ * {@code distance.metric} – distance metric (e.g. "l2", "cosine", "inner_product")
+ * {@code encoding.type} – vector encoding (e.g. "rawf32", "pq", "sq8")
+ *
+ */
+public class LuminaIndexMeta implements Serializable {
+
+ private static final long serialVersionUID = 3L;
+
+ private static final String KEY_DIMENSION =
+ LuminaVectorIndexOptions.toLuminaKey(LuminaVectorIndexOptions.DIMENSION);
+ private static final String KEY_DISTANCE_METRIC =
+ LuminaVectorIndexOptions.toLuminaKey(LuminaVectorIndexOptions.DISTANCE_METRIC);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static final TypeReference> MAP_TYPE_REF =
+ new TypeReference>() {};
+
+ private final Map options;
+
+ public LuminaIndexMeta(Map options) {
+ this.options = new LinkedHashMap<>(options);
+ }
+
+ /** Returns the full options map. */
+ public Map options() {
+ return options;
+ }
+
+ public int dim() {
+ return Integer.parseInt(options.get(KEY_DIMENSION));
+ }
+
+ public String distanceMetric() {
+ return options.get(KEY_DISTANCE_METRIC);
+ }
+
+ public LuminaVectorMetric metric() {
+ return LuminaVectorMetric.fromLuminaName(distanceMetric());
+ }
+
+ /** Serializes this metadata as a UTF-8 encoded JSON string (flat key-value map). */
+ public byte[] serialize() throws IOException {
+ return OBJECT_MAPPER.writeValueAsBytes(options);
+ }
+
+ /** Deserializes metadata from a UTF-8 encoded JSON byte array. */
+ public static LuminaIndexMeta deserialize(byte[] data) throws IOException {
+ Map map = OBJECT_MAPPER.readValue(data, MAP_TYPE_REF);
+ if (!map.containsKey(KEY_DIMENSION)) {
+ throw new IOException(
+ "Missing required key in Lumina index metadata: " + KEY_DIMENSION);
+ }
+ if (!map.containsKey(KEY_DISTANCE_METRIC)) {
+ throw new IOException(
+ "Missing required key in Lumina index metadata: " + KEY_DISTANCE_METRIC);
+ }
+ return new LuminaIndexMeta(map);
+ }
+}
diff --git a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaScoredGlobalIndexResult.java b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaScoredGlobalIndexResult.java
new file mode 100644
index 000000000000..1769ee2eb25c
--- /dev/null
+++ b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaScoredGlobalIndexResult.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.globalindex.ScoreGetter;
+import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import java.util.HashMap;
+
+/** Vector search global index result for Lumina vector index. */
+public class LuminaScoredGlobalIndexResult implements ScoredGlobalIndexResult {
+
+ private final HashMap id2scores;
+ private final RoaringNavigableMap64 results;
+
+ public LuminaScoredGlobalIndexResult(
+ RoaringNavigableMap64 results, HashMap id2scores) {
+ this.id2scores = id2scores;
+ this.results = results;
+ }
+
+ @Override
+ public ScoreGetter scoreGetter() {
+ return rowId -> {
+ Float score = id2scores.get(rowId);
+ if (score == null) {
+ throw new IllegalArgumentException(
+ "No score found for rowId: "
+ + rowId
+ + ". Only rowIds present in results() are valid.");
+ }
+ return score;
+ };
+ }
+
+ @Override
+ public RoaringNavigableMap64 results() {
+ return this.results;
+ }
+}
diff --git a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java
new file mode 100644
index 000000000000..02bfce598861
--- /dev/null
+++ b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexReader.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.aliyun.lumina.LuminaFileInput;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Vector global index reader using Lumina.
+ *
+ * Each shard has exactly one Lumina index file. This reader loads the single index and performs
+ * vector similarity search.
+ */
+public class LuminaVectorGlobalIndexReader implements GlobalIndexReader {
+
+ private final GlobalIndexIOMeta ioMeta;
+ private final GlobalIndexFileReader fileReader;
+ private final DataType fieldType;
+ private final LuminaVectorIndexOptions options;
+
+ private volatile LuminaIndexMeta indexMeta;
+ private volatile LuminaIndex index;
+ private SeekableInputStream openStream;
+
+ public LuminaVectorGlobalIndexReader(
+ GlobalIndexFileReader fileReader,
+ List ioMetas,
+ DataType fieldType,
+ LuminaVectorIndexOptions options) {
+ checkArgument(ioMetas.size() == 1, "Expected exactly one index file per shard");
+ this.fileReader = fileReader;
+ this.ioMeta = ioMetas.get(0);
+ this.fieldType = fieldType;
+ this.options = options;
+ }
+
+ @Override
+ public Optional visitVectorSearch(VectorSearch vectorSearch) {
+ try {
+ ensureLoaded();
+ return Optional.ofNullable(search(vectorSearch));
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to search Lumina vector index with fieldName=%s, limit=%d",
+ vectorSearch.fieldName(), vectorSearch.limit()),
+ e);
+ }
+ }
+
+ private GlobalIndexResult search(VectorSearch vectorSearch) throws IOException {
+ validateSearchVector(vectorSearch.vector());
+ float[] queryVector = ((float[]) vectorSearch.vector()).clone();
+ int limit = vectorSearch.limit();
+ LuminaVectorMetric indexMetric = indexMeta.metric();
+
+ int effectiveK = (int) Math.min(limit, index.size());
+ if (effectiveK <= 0) {
+ return null;
+ }
+
+ RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+ float[] distances;
+ long[] labels;
+
+ if (includeRowIds != null) {
+ long[] scopedIds = new long[(int) includeRowIds.getLongCardinality()];
+ Iterator iter = includeRowIds.iterator();
+ for (int i = 0; i < scopedIds.length; i++) {
+ scopedIds[i] = iter.next();
+ }
+ if (scopedIds.length == 0) {
+ return null;
+ }
+ effectiveK = Math.min(effectiveK, scopedIds.length);
+ distances = new float[effectiveK];
+ labels = new long[effectiveK];
+ Map searchOptions = options.toLuminaOptions();
+ searchOptions.putAll(indexMeta.options());
+ searchOptions.put("search.thread_safe_filter", "true");
+ index.searchWithFilter(
+ queryVector, 1, effectiveK, distances, labels, scopedIds, searchOptions);
+ } else {
+ distances = new float[effectiveK];
+ labels = new long[effectiveK];
+ Map searchOptions = options.toLuminaOptions();
+ searchOptions.putAll(indexMeta.options());
+ index.search(queryVector, 1, effectiveK, distances, labels, searchOptions);
+ }
+
+ // Min-heap: smallest score at head, so we can evict the weakest candidate efficiently.
+ PriorityQueue topK =
+ new PriorityQueue<>(limit + 1, Comparator.comparingDouble(s -> s.score));
+ collectResults(distances, labels, effectiveK, limit, topK, indexMetric);
+
+ RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64();
+ HashMap id2scores = new HashMap<>(topK.size());
+ for (ScoredRow row : topK) {
+ roaringBitmap64.add(row.rowId);
+ id2scores.put(row.rowId, row.score);
+ }
+ return new LuminaScoredGlobalIndexResult(roaringBitmap64, id2scores);
+ }
+
+ private static void collectResults(
+ float[] distances,
+ long[] labels,
+ int count,
+ int limit,
+ PriorityQueue topK,
+ LuminaVectorMetric metric) {
+ for (int i = 0; i < count; i++) {
+ long rowId = labels[i];
+ if (rowId < 0) {
+ continue;
+ }
+ float score = convertDistanceToScore(distances[i], metric);
+ if (topK.size() < limit) {
+ topK.offer(new ScoredRow(rowId, score));
+ } else if (score > topK.peek().score) {
+ topK.poll();
+ topK.offer(new ScoredRow(rowId, score));
+ }
+ }
+ }
+
+ private static float convertDistanceToScore(float distance, LuminaVectorMetric metric) {
+ if (metric == LuminaVectorMetric.L2) {
+ return 1.0f / (1.0f + distance);
+ } else if (metric == LuminaVectorMetric.COSINE) {
+ return 1.0f - distance;
+ } else {
+ // Inner product is already a similarity
+ return distance;
+ }
+ }
+
+ private void validateSearchVector(Object vector) {
+ if (!(vector instanceof float[])) {
+ throw new IllegalArgumentException(
+ "Expected float[] vector but got: " + vector.getClass());
+ }
+ if (!(fieldType instanceof ArrayType)
+ || !(((ArrayType) fieldType).getElementType() instanceof FloatType)) {
+ throw new IllegalArgumentException(
+ "Lumina currently only supports float arrays, but field type is: " + fieldType);
+ }
+ int queryDim = ((float[]) vector).length;
+ if (queryDim != indexMeta.dim()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Query vector dimension mismatch: index expects %d, but got %d",
+ indexMeta.dim(), queryDim));
+ }
+ }
+
+ private void ensureLoaded() throws IOException {
+ if (index == null) {
+ synchronized (this) {
+ if (index == null) {
+ indexMeta = LuminaIndexMeta.deserialize(ioMeta.metadata());
+ SeekableInputStream in = fileReader.getInputStream(ioMeta);
+ try {
+ LuminaFileInput fileInput = new InputStreamFileInput(in);
+ Map searcherOptions = options.toLuminaOptions();
+ searcherOptions.putAll(indexMeta.options());
+ index =
+ LuminaIndex.fromStream(
+ fileInput,
+ ioMeta.fileSize(),
+ indexMeta.dim(),
+ indexMeta.metric(),
+ searcherOptions);
+ openStream = in;
+ } catch (Exception e) {
+ IOUtils.closeQuietly(in);
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ Throwable firstException = null;
+
+ if (index != null) {
+ try {
+ index.close();
+ } catch (Throwable t) {
+ firstException = t;
+ }
+ index = null;
+ }
+
+ if (openStream != null) {
+ try {
+ openStream.close();
+ } catch (Throwable t) {
+ if (firstException == null) {
+ firstException = t;
+ } else {
+ firstException.addSuppressed(t);
+ }
+ }
+ openStream = null;
+ }
+
+ if (firstException != null) {
+ if (firstException instanceof IOException) {
+ throw (IOException) firstException;
+ } else if (firstException instanceof RuntimeException) {
+ throw (RuntimeException) firstException;
+ } else {
+ throw new RuntimeException(
+ "Failed to close Lumina vector global index reader", firstException);
+ }
+ }
+ }
+
+ // =================== unsupported =====================
+
+ @Override
+ public Optional visitIsNotNull(FieldRef fieldRef) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional visitIsNull(FieldRef fieldRef) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional visitStartsWith(FieldRef fieldRef, Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional visitEndsWith(FieldRef fieldRef, Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional visitContains(FieldRef fieldRef, Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional visitLike(FieldRef fieldRef, Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional visitLessThan(FieldRef fieldRef, Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional visitNotEqual(FieldRef fieldRef, Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional visitLessOrEqual(FieldRef fieldRef, Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional visitEqual(FieldRef fieldRef, Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional visitGreaterThan(FieldRef fieldRef, Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional visitIn(FieldRef fieldRef, List literals) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional visitNotIn(FieldRef fieldRef, List literals) {
+ return Optional.empty();
+ }
+
+ /**
+ * Adapts a {@link SeekableInputStream} to the {@link LuminaFileInput} JNI callback API.
+ *
+ * This mirrors the C++ {@code LuminaFileReader} adapter that bridges Paimon's {@code
+ * InputStream} to Lumina's {@code FileReader} interface. The stream lifecycle is managed by the
+ * enclosing reader, not by this adapter.
+ */
+ static class InputStreamFileInput implements LuminaFileInput {
+ private final SeekableInputStream in;
+
+ InputStreamFileInput(SeekableInputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public void seek(long position) throws IOException {
+ in.seek(position);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return in.getPos();
+ }
+
+ @Override
+ public void close() {
+ // Stream lifecycle is managed by the enclosing Reader.
+ }
+ }
+
+ /** A row ID paired with its similarity score, used in the top-k min-heap. */
+ private static class ScoredRow {
+ final long rowId;
+ final float score;
+
+ ScoredRow(long rowId, float score) {
+ this.rowId = rowId;
+ this.score = score;
+ }
+ }
+}
diff --git a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java
new file mode 100644
index 000000000000..16e5cac4706f
--- /dev/null
+++ b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexWriter.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+
+import org.aliyun.lumina.LuminaDataset;
+import org.aliyun.lumina.LuminaFileOutput;
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Vector global index writer using Lumina. Builds a single index file per shard.
+ *
+ *
Vectors are accumulated in chunked float buffers and then streamed to the Lumina builder via
+ * the {@link LuminaDataset} callback API. The chunked design avoids the 2 GB single Java array
+ * limit and supports up to 1 billion+ vectors regardless of dimension.
+ */
+public class LuminaVectorGlobalIndexWriter implements GlobalIndexSingletonWriter, Closeable {
+
+ private static final String FILE_NAME_PREFIX = "lumina";
+
+ /** Buffer size for wrapping PositionOutputStream when dumping index (8 MB). */
+ private static final int DUMP_BUFFER_SIZE = 8 * 1024 * 1024;
+
+ /** Target chunk size in floats (~64 MB of float data). */
+ private static final int TARGET_CHUNK_FLOATS = 16 * 1024 * 1024;
+
+ private final GlobalIndexFileWriter fileWriter;
+ private final LuminaVectorIndexOptions options;
+ private final int dim;
+
+ /** Number of vectors each chunk can hold. */
+ private final int chunkVectors;
+
+ /** Chunked float buffer to break the 2 GB single-array limit. */
+ private List chunks;
+
+ private int count;
+ private boolean finished;
+ private boolean closed;
+
+ public LuminaVectorGlobalIndexWriter(
+ GlobalIndexFileWriter fileWriter,
+ DataType fieldType,
+ LuminaVectorIndexOptions options) {
+ this.fileWriter = fileWriter;
+ this.options = options;
+ this.dim = options.dimension();
+ this.chunkVectors = Math.max(1, TARGET_CHUNK_FLOATS / dim);
+ this.chunks = new ArrayList<>();
+ this.count = 0;
+ this.finished = false;
+ this.closed = false;
+
+ validateFieldType(fieldType);
+ }
+
+ private void validateFieldType(DataType dataType) {
+ if (!(dataType instanceof ArrayType)) {
+ throw new IllegalArgumentException(
+ "Lumina vector index requires ArrayType, but got: " + dataType);
+ }
+ DataType elementType = ((ArrayType) dataType).getElementType();
+ if (!(elementType instanceof FloatType)) {
+ throw new IllegalArgumentException(
+ "Lumina vector index requires float array, but got: " + elementType);
+ }
+ }
+
+ @Override
+ public void write(Object fieldData) {
+ float[] vector;
+ if (fieldData == null) {
+ throw new IllegalArgumentException("Field data must not be null");
+ }
+ if (fieldData instanceof float[]) {
+ vector = (float[]) fieldData;
+ } else if (fieldData instanceof InternalArray) {
+ vector = ((InternalArray) fieldData).toFloatArray();
+ } else {
+ throw new RuntimeException(
+ "Unsupported vector type: " + fieldData.getClass().getName());
+ }
+ checkDimension(vector);
+ int chunkIndex = count / chunkVectors;
+ int offsetInChunk = count % chunkVectors;
+ if (chunkIndex >= chunks.size()) {
+ chunks.add(new float[chunkVectors * dim]);
+ }
+ System.arraycopy(vector, 0, chunks.get(chunkIndex), offsetInChunk * dim, dim);
+ count++;
+ }
+
+ @Override
+ public List finish() {
+ if (finished) {
+ throw new IllegalStateException("finish() has already been called");
+ }
+ finished = true;
+ try {
+ if (count == 0) {
+ return Collections.emptyList();
+ }
+ return Collections.singletonList(buildIndex());
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to write Lumina vector global index", e);
+ }
+ }
+
+ private ResultEntry buildIndex() throws IOException {
+ Map luminaOptions = options.toLuminaOptions();
+ boolean needsPretrain = !"rawf32".equalsIgnoreCase(luminaOptions.get("encoding.type"));
+
+ try (LuminaIndex index = LuminaIndex.createForBuild(dim, options.metric(), luminaOptions)) {
+
+ if (needsPretrain) {
+ index.pretrainFrom(new ChunkedDataset(chunks, dim, chunkVectors, count));
+ }
+ index.insertFrom(new ChunkedDataset(chunks, dim, chunkVectors, count));
+
+ // Free vector memory after insertion
+ chunks = null;
+
+ String fileName = fileWriter.newFileName(FILE_NAME_PREFIX);
+ try (PositionOutputStream out = fileWriter.newOutputStream(fileName)) {
+ BufferedFileOutput bufferedOut = new BufferedFileOutput(out, DUMP_BUFFER_SIZE);
+ index.dump(bufferedOut);
+ bufferedOut.flush();
+ out.flush();
+ }
+
+ LuminaIndexMeta meta = new LuminaIndexMeta(luminaOptions);
+ return new ResultEntry(fileName, count, meta.serialize());
+ }
+ }
+
+ private void checkDimension(float[] vector) {
+ if (vector.length != dim) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Vector dimension mismatch: expected %d, but got %d",
+ dim, vector.length));
+ }
+ }
+
+ @Override
+ public void close() {
+ if (!closed) {
+ closed = true;
+ chunks = null;
+ }
+ }
+
+ /**
+ * A {@link LuminaDataset} backed by chunked float arrays. Supports vector counts beyond the 2
+ * GB single-array limit by splitting data across multiple chunks.
+ */
+ static class ChunkedDataset implements LuminaDataset {
+ private final List chunks;
+ private final int dim;
+ private final int chunkVectors;
+ private final int totalCount;
+ private int cursor;
+
+ ChunkedDataset(List chunks, int dim, int chunkVectors, int totalCount) {
+ this.chunks = chunks;
+ this.dim = dim;
+ this.chunkVectors = chunkVectors;
+ this.totalCount = totalCount;
+ this.cursor = 0;
+ }
+
+ @Override
+ public int dim() {
+ return dim;
+ }
+
+ @Override
+ public long totalSize() {
+ return totalCount;
+ }
+
+ @Override
+ public long getNextBatch(float[] vectorBuf, long[] idBuf) {
+ if (cursor >= totalCount) {
+ return 0;
+ }
+ int remaining = totalCount - cursor;
+ int batchSize = Math.min(idBuf.length, remaining);
+ int destOffset = 0;
+ int copied = 0;
+ while (copied < batchSize) {
+ int chunkIndex = (cursor + copied) / chunkVectors;
+ int offsetInChunk = (cursor + copied) % chunkVectors;
+ int availableInChunk = chunkVectors - offsetInChunk;
+ int toCopy = Math.min(batchSize - copied, availableInChunk);
+ System.arraycopy(
+ chunks.get(chunkIndex),
+ offsetInChunk * dim,
+ vectorBuf,
+ destOffset,
+ toCopy * dim);
+ destOffset += toCopy * dim;
+ copied += toCopy;
+ }
+ for (int i = 0; i < batchSize; i++) {
+ idBuf[i] = cursor + i;
+ }
+ cursor += batchSize;
+ return batchSize;
+ }
+ }
+
+ /**
+ * Adapts a {@link PositionOutputStream} to the {@link LuminaFileOutput} JNI callback API with
+ * buffering for better I/O throughput (especially important for OSS writes).
+ */
+ static class BufferedFileOutput implements LuminaFileOutput {
+ private final PositionOutputStream posOut;
+ private final BufferedOutputStream bufOut;
+
+ BufferedFileOutput(PositionOutputStream posOut, int bufferSize) {
+ this.posOut = posOut;
+ this.bufOut = new BufferedOutputStream(posOut, bufferSize);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ bufOut.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ bufOut.flush();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ bufOut.flush();
+ return posOut.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Flush buffered data before the caller closes the underlying stream.
+ bufOut.flush();
+ }
+ }
+}
diff --git a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexer.java b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexer.java
new file mode 100644
index 000000000000..00ea925e86c7
--- /dev/null
+++ b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
+
+import java.util.List;
+
+/** Lumina vector global indexer. */
+public class LuminaVectorGlobalIndexer implements GlobalIndexer {
+
+ private final DataType fieldType;
+ private final LuminaVectorIndexOptions options;
+
+ public LuminaVectorGlobalIndexer(DataType fieldType, Options options) {
+ this.fieldType = fieldType;
+ this.options = new LuminaVectorIndexOptions(options);
+ }
+
+ @Override
+ public GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter) {
+ return new LuminaVectorGlobalIndexWriter(fileWriter, fieldType, options);
+ }
+
+ @Override
+ public GlobalIndexReader createReader(
+ GlobalIndexFileReader fileReader, List files) {
+ return new LuminaVectorGlobalIndexReader(fileReader, files, fieldType, options);
+ }
+}
diff --git a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexerFactory.java b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexerFactory.java
new file mode 100644
index 000000000000..9326a94b1c15
--- /dev/null
+++ b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexerFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.GlobalIndexerFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+
+/** Factory for creating Lumina vector index. */
+public class LuminaVectorGlobalIndexerFactory implements GlobalIndexerFactory {
+
+ public static final String IDENTIFIER = "lumina-vector-ann";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public GlobalIndexer create(DataField field, Options options) {
+ return new LuminaVectorGlobalIndexer(field.type(), options);
+ }
+}
diff --git a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorIndexOptions.java b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorIndexOptions.java
new file mode 100644
index 000000000000..3fa4a110373c
--- /dev/null
+++ b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorIndexOptions.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Options for the Lumina vector index.
+ *
+ * All option keys use the {@code lumina.} prefix so that both paimon-cpp and paimon-lumina share
+ * the same table properties. For example:
+ *
+ *
+ * lumina.index.dimension = 1024
+ * lumina.distance.metric = inner_product
+ * lumina.encoding.type = pq
+ * lumina.diskann.build.ef_construction = 128
+ * lumina.diskann.search.list_size = 100
+ *
+ *
+ * Use {@link #toLuminaOptions()} to obtain a {@code Map} with the {@code
+ * lumina.} prefix stripped, suitable for passing directly to the native Lumina API. This mirrors
+ * paimon-cpp's {@code FetchOptionsWithPrefix("lumina.", options)}.
+ */
+public class LuminaVectorIndexOptions {
+
+ /** The common prefix for all Lumina options. */
+ public static final String LUMINA_PREFIX = "lumina.";
+
+ public static final ConfigOption DIMENSION =
+ ConfigOptions.key("lumina.index.dimension")
+ .intType()
+ .defaultValue(128)
+ .withDescription("The dimension of the vector.");
+
+ public static final ConfigOption INDEX_TYPE =
+ ConfigOptions.key("lumina.index.type")
+ .stringType()
+ .defaultValue("diskann")
+ .withDescription("The index type for vector search diskann.");
+
+ public static final ConfigOption DISTANCE_METRIC =
+ ConfigOptions.key("lumina.distance.metric")
+ .stringType()
+ .defaultValue("inner_product")
+ .withDescription(
+ "The distance metric for vector search (l2, cosine, inner_product).");
+
+ public static final ConfigOption ENCODING_TYPE =
+ ConfigOptions.key("lumina.encoding.type")
+ .stringType()
+ .defaultValue("pq")
+ .withDescription("The encoding type for vectors (rawf32, sq8, pq).");
+
+ public static final ConfigOption PRETRAIN_SAMPLE_RATIO =
+ ConfigOptions.key("lumina.pretrain.sample_ratio")
+ .doubleType()
+ .defaultValue(0.2)
+ .withDescription("The sample ratio for pretraining.");
+
+ public static final ConfigOption DISKANN_BUILD_EF_CONSTRUCTION =
+ ConfigOptions.key("lumina.diskann.build.ef_construction")
+ .intType()
+ .defaultValue(1024)
+ .withDescription(
+ "Controls the size of the dynamic candidate list during graph construction.");
+
+ public static final ConfigOption DISKANN_BUILD_NEIGHBOR_COUNT =
+ ConfigOptions.key("lumina.diskann.build.neighbor_count")
+ .intType()
+ .defaultValue(64)
+ .withDescription("Maximum number of neighbors per node in the graph.");
+
+ public static final ConfigOption DISKANN_BUILD_THREAD_COUNT =
+ ConfigOptions.key("lumina.diskann.build.thread_count")
+ .intType()
+ .defaultValue(64)
+ .withDescription("Number of threads used for DiskANN index building.");
+
+ public static final ConfigOption DISKANN_SEARCH_LIST_SIZE =
+ ConfigOptions.key("lumina.diskann.search.list_size")
+ .intType()
+ .defaultValue(1024)
+ .withDescription("The search list size for DiskANN search.");
+
+ public static final ConfigOption DISKANN_SEARCH_BEAM_WIDTH =
+ ConfigOptions.key("lumina.diskann.search.beam_width")
+ .intType()
+ .defaultValue(4)
+ .withDescription("The beam width for DiskANN search.");
+
+ public static final ConfigOption ENCODING_PQ_M =
+ ConfigOptions.key("lumina.encoding.pq.m")
+ .intType()
+ .defaultValue(64)
+ .withDescription("Number of sub-quantizers for PQ encoding.");
+
+ public static final ConfigOption DISKANN_DISK_ENCODING_PQ_THREAD_COUNT =
+ ConfigOptions.key("lumina.diskann.disk_encoding.encoding.pq.thread_count")
+ .intType()
+ .defaultValue(64)
+ .withDescription(
+ "Number of threads used for PQ training when DiskANN disk encoding type is PQ.");
+
+ public static final ConfigOption SEARCH_PARALLEL_NUMBER =
+ ConfigOptions.key("lumina.search.parallel_number")
+ .intType()
+ .defaultValue(5)
+ .withDescription("The parallel number for search.");
+
+ private final int dimension;
+ private final LuminaVectorMetric metric;
+ private final Map luminaOptions;
+
+ public LuminaVectorIndexOptions(Options options) {
+ this.dimension = validatePositive(options.get(DIMENSION), DIMENSION.key());
+ this.metric = parseMetric(options.get(DISTANCE_METRIC));
+ validateEncodingMetricCombination(options.get(ENCODING_TYPE), this.metric);
+ this.luminaOptions = buildLuminaOptions(options, this.dimension);
+ }
+
+ /**
+ * Returns all {@code lumina.*} options with the prefix stripped, producing native Lumina keys.
+ * For example, {@code lumina.diskann.build.ef_construction} becomes {@code
+ * diskann.build.ef_construction}.
+ */
+ public Map toLuminaOptions() {
+ return new LinkedHashMap<>(luminaOptions);
+ }
+
+ public int dimension() {
+ return dimension;
+ }
+
+ public LuminaVectorMetric metric() {
+ return metric;
+ }
+
+ /**
+ * Converts a {@link ConfigOption} key (e.g. {@code lumina.index.dimension}) to the native
+ * Lumina key (e.g. {@code index.dimension}) by stripping the {@link #LUMINA_PREFIX}.
+ */
+ public static String toLuminaKey(ConfigOption> option) {
+ String key = option.key();
+ if (key.startsWith(LUMINA_PREFIX)) {
+ return key.substring(LUMINA_PREFIX.length());
+ }
+ return key;
+ }
+
+ /** All ConfigOptions with defaults that should always appear in lumina options. */
+ @SuppressWarnings("rawtypes")
+ private static final List ALL_OPTIONS =
+ Arrays.asList(
+ DIMENSION,
+ INDEX_TYPE,
+ DISTANCE_METRIC,
+ ENCODING_TYPE,
+ PRETRAIN_SAMPLE_RATIO,
+ DISKANN_BUILD_EF_CONSTRUCTION,
+ DISKANN_BUILD_NEIGHBOR_COUNT,
+ DISKANN_BUILD_THREAD_COUNT,
+ DISKANN_SEARCH_LIST_SIZE,
+ DISKANN_SEARCH_BEAM_WIDTH,
+ ENCODING_PQ_M,
+ DISKANN_DISK_ENCODING_PQ_THREAD_COUNT,
+ SEARCH_PARALLEL_NUMBER);
+
+ /**
+ * Builds native Lumina options by first populating all known ConfigOptions (with defaults) and
+ * then overlaying any user-specified {@code lumina.*} options. This ensures that required keys
+ * like {@code index.type} are always present in the metadata, matching paimon-cpp behavior.
+ */
+ @SuppressWarnings("unchecked")
+ private static Map buildLuminaOptions(Options options, int dimension) {
+ Map result = new LinkedHashMap<>();
+ // Populate all known options with their resolved values (user-set or default).
+ for (ConfigOption> opt : ALL_OPTIONS) {
+ Object value = options.get(opt);
+ if (value != null) {
+ result.put(toLuminaKey(opt), String.valueOf(value));
+ }
+ }
+ // Overlay any extra user-specified lumina.* options not in ALL_OPTIONS.
+ for (Map.Entry entry : options.toMap().entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(LUMINA_PREFIX)) {
+ result.putIfAbsent(key.substring(LUMINA_PREFIX.length()), entry.getValue());
+ }
+ }
+ // PQ encoding requires pq.m <= dimension; auto-cap to avoid native init failures.
+ capPqM(result, dimension);
+ return result;
+ }
+
+ /**
+ * Ensures {@code encoding.pq.m} does not exceed the vector dimension. Lumina's QuantizerTrainer
+ * requires numChunks (pq.m) to be > 0 and ≤ dimension.
+ */
+ private static void capPqM(Map opts, int dimension) {
+ String encoding = opts.get(toLuminaKey(ENCODING_TYPE));
+ if (!"pq".equalsIgnoreCase(encoding)) {
+ return;
+ }
+ String pqMKey = toLuminaKey(ENCODING_PQ_M);
+ String pqMStr = opts.get(pqMKey);
+ if (pqMStr != null) {
+ int pqM = Integer.parseInt(pqMStr);
+ if (pqM > dimension) {
+ opts.put(pqMKey, String.valueOf(dimension));
+ }
+ }
+ }
+
+ /**
+ * Parses the distance metric string, accepting both lumina native names (l2, cosine,
+ * inner_product) and enum names (L2, COSINE, INNER_PRODUCT).
+ */
+ private static LuminaVectorMetric parseMetric(String value) {
+ try {
+ return LuminaVectorMetric.fromLuminaName(value);
+ } catch (IllegalArgumentException e) {
+ return LuminaVectorMetric.fromString(value);
+ }
+ }
+
+ private static void validateEncodingMetricCombination(
+ String encoding, LuminaVectorMetric metric) {
+ if ("pq".equalsIgnoreCase(encoding) && metric == LuminaVectorMetric.COSINE) {
+ throw new IllegalArgumentException(
+ "Lumina does not support PQ encoding with cosine metric. "
+ + "Please use 'rawf32' or 'sq8' encoding, or switch to 'l2' or 'inner_product' metric.");
+ }
+ }
+
+ private static int validatePositive(int value, String key) {
+ if (value <= 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid value for '%s': %d. Must be a positive integer.", key, value));
+ }
+ return value;
+ }
+}
diff --git a/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorMetric.java b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorMetric.java
new file mode 100644
index 000000000000..8f17c649a8f9
--- /dev/null
+++ b/paimon-lumina/src/main/java/org/apache/paimon/lumina/index/LuminaVectorMetric.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+/** Enumeration of supported Lumina vector similarity metrics. */
+public enum LuminaVectorMetric {
+
+ /** L2 (Euclidean) distance metric. Lower values indicate more similar vectors. */
+ L2(0, "l2"),
+
+ /** Cosine distance metric. Lower values indicate more similar vectors. */
+ COSINE(1, "cosine"),
+
+ /**
+ * Inner product (dot product) metric. Higher values indicate more similar vectors. For
+ * normalized vectors, this is equivalent to cosine similarity.
+ */
+ INNER_PRODUCT(2, "inner_product");
+
+ private final int value;
+ private final String luminaName;
+
+ LuminaVectorMetric(int value, String luminaName) {
+ this.value = value;
+ this.luminaName = luminaName;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ /** Returns the Lumina native distance metric name (e.g. "l2", "cosine", "inner_product"). */
+ public String getLuminaName() {
+ return luminaName;
+ }
+
+ public static LuminaVectorMetric fromString(String name) {
+ return valueOf(name.toUpperCase());
+ }
+
+ public static LuminaVectorMetric fromValue(int value) {
+ for (LuminaVectorMetric metric : values()) {
+ if (metric.value == value) {
+ return metric;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Unknown metric value: %s", value));
+ }
+
+ /** Resolves a Lumina native metric name (e.g. "l2") to the corresponding enum constant. */
+ public static LuminaVectorMetric fromLuminaName(String luminaName) {
+ for (LuminaVectorMetric metric : values()) {
+ if (metric.luminaName.equals(luminaName)) {
+ return metric;
+ }
+ }
+ throw new IllegalArgumentException(
+ String.format("Unknown lumina metric name: %s", luminaName));
+ }
+}
diff --git a/paimon-lumina/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory b/paimon-lumina/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
new file mode 100644
index 000000000000..17aa3bd887f3
--- /dev/null
+++ b/paimon-lumina/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.lumina.index.LuminaVectorGlobalIndexerFactory
diff --git a/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorBenchmark.java b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorBenchmark.java
new file mode 100644
index 000000000000..faa1662e3756
--- /dev/null
+++ b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorBenchmark.java
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+
+import org.aliyun.lumina.Lumina;
+import org.aliyun.lumina.LuminaException;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SplittableRandom;
+import java.util.UUID;
+
+/**
+ * Benchmark for Lumina vector index using {@link LuminaVectorGlobalIndexWriter} and {@link
+ * LuminaVectorGlobalIndexReader}.
+ *
+ * Build and query benchmarks are separate tests so they can be run independently.
+ *
+ *
All parameters are passed via {@code -DextraJavaTestArgs} as JVM system properties ({@code
+ * -DKEY=value}). They also fall back to environment variables.
+ *
+ *
Parameters:
+ *
+ *
+ * {@code BENCHMARK_PATH} (required) — base path for index files (local or oss://).
+ * {@code BENCHMARK_NUM_VECTORS} — number of vectors (default 10,000,000).
+ * {@code BENCHMARK_DIMENSION} — vector dimension (default 1024).
+ * {@code BENCHMARK_ENCODING_TYPE} — encoding type: rawf32, pq, sq8 (default pq).
+ * {@code BENCHMARK_DISTANCE_METRIC} — distance metric: l2, cosine, inner_product (default
+ * l2).
+ * {@code BENCHMARK_NUM_QUERIES} — number of queries (default 1000).
+ * {@code BENCHMARK_BUILD_THREADS} — DiskANN build thread count (default CPU cores).
+ * {@code BENCHMARK_EF_CONSTRUCTION} — DiskANN ef_construction (default 128).
+ * {@code BENCHMARK_NEIGHBOR_COUNT} — DiskANN neighbor count (default 64).
+ * {@code BENCHMARK_PQ_M} — PQ sub-quantizer count (default 64).
+ * {@code BENCHMARK_PQ_MAX_EPOCH} — PQ training max epoch.
+ * {@code BENCHMARK_PQ_THREAD_COUNT} — PQ training thread count.
+ * {@code BENCHMARK_SEARCH_LIST_SIZE} — DiskANN search list size.
+ * {@code BENCHMARK_INDEX_FILE} — existing index file path (required for benchmarkQuery).
+ * {@code BENCHMARK_KEEP_INDEX} — {@code true} to keep index file after build benchmark.
+ * {@code OSS_ENDPOINT}, {@code OSS_ACCESS_KEY_ID}, {@code OSS_ACCESS_KEY_SECRET} — for OSS.
+ *
+ *
+ * Example: Build (Local)
+ *
+ * {@code
+ * mvn test -pl paimon-lumina -Dtest=LuminaVectorBenchmark#benchmarkBuild \
+ * -DextraJavaTestArgs='-Xmx64g -DBENCHMARK_PATH=/tmp/lumina-benchmark -DBENCHMARK_NUM_VECTORS=100000 -DBENCHMARK_DIMENSION=128 -DBENCHMARK_ENCODING_TYPE=rawf32 -DBENCHMARK_EF_CONSTRUCTION=32 -DBENCHMARK_NEIGHBOR_COUNT=16 -DBENCHMARK_BUILD_THREADS=16 -DBENCHMARK_KEEP_INDEX=true'
+ * }
+ *
+ * Example: Query (Local, using index from build)
+ *
+ * {@code
+ * mvn test -pl paimon-lumina -Dtest=LuminaVectorBenchmark#benchmarkQuery \
+ * -DextraJavaTestArgs='-Xmx64g -DBENCHMARK_PATH=/tmp/lumina-benchmark -DBENCHMARK_INDEX_FILE=/tmp/lumina-benchmark//lumina- -DBENCHMARK_DIMENSION=128 -DBENCHMARK_ENCODING_TYPE=rawf32 -DBENCHMARK_NUM_QUERIES=1000'
+ * }
+ *
+ * Example: Build (OSS)
+ *
+ * {@code
+ * mvn test -pl paimon-lumina -Dtest=LuminaVectorBenchmark#benchmarkBuild \
+ * -DextraJavaTestArgs='-Xmx64g -DBENCHMARK_PATH=oss://your-bucket/lumina-benchmark -DOSS_ENDPOINT=oss-cn-hangzhou-internal.aliyuncs.com -DOSS_ACCESS_KEY_ID=your-access-key-id -DOSS_ACCESS_KEY_SECRET=your-access-key-secret -DBENCHMARK_NUM_VECTORS=100000 -DBENCHMARK_DIMENSION=128 -DBENCHMARK_ENCODING_TYPE=pq -DBENCHMARK_EF_CONSTRUCTION=32 -DBENCHMARK_NEIGHBOR_COUNT=16 -DBENCHMARK_BUILD_THREADS=16 -DBENCHMARK_KEEP_INDEX=true'
+ * }
+ *
+ * Example: Query (OSS)
+ *
+ * {@code
+ * mvn test -pl paimon-lumina -Dtest=LuminaVectorBenchmark#benchmarkQuery \
+ * -DextraJavaTestArgs='-Xmx64g -DBENCHMARK_PATH=oss://your-bucket/lumina-benchmark -DOSS_ENDPOINT=oss-cn-hangzhou-internal.aliyuncs.com -DOSS_ACCESS_KEY_ID=your-access-key-id -DOSS_ACCESS_KEY_SECRET=your-access-key-secret -DBENCHMARK_INDEX_FILE=oss://your-bucket/lumina-benchmark//lumina- -DBENCHMARK_DIMENSION=128 -DBENCHMARK_ENCODING_TYPE=pq -DBENCHMARK_NUM_QUERIES=1000'
+ * }
+ */
+public class LuminaVectorBenchmark {
+
+ private static final int DEFAULT_NUM_VECTORS = 10_000_000;
+ private static final int DEFAULT_DIMENSION = 1024;
+ private static final int TOP_K = 10;
+
+ /** Returns the RSS (Resident Set Size) of the current process in MB, or -1 if unavailable. */
+ private static long getRssMb() {
+ // Read from /proc/self/status (Linux) — works on JDK 8+.
+ try (BufferedReader reader =
+ new BufferedReader(
+ new InputStreamReader(new java.io.FileInputStream("/proc/self/status")))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (line.startsWith("VmRSS:")) {
+ // Format: "VmRSS: 123456 kB"
+ String[] parts = line.split("\\s+");
+ return Long.parseLong(parts[1]) / 1024; // kB -> MB
+ }
+ }
+ } catch (Exception ignored) {
+ }
+ return -1;
+ }
+
+ private static void printMemory(String label) {
+ Runtime rt = Runtime.getRuntime();
+ long heapUsedMb = (rt.totalMemory() - rt.freeMemory()) / (1024 * 1024);
+ long heapMaxMb = rt.maxMemory() / (1024 * 1024);
+ long rssMb = getRssMb();
+ System.out.printf(
+ "[Memory] %-30s Heap: %,d / %,d MB RSS: %s%n",
+ label, heapUsedMb, heapMaxMb, rssMb >= 0 ? rssMb + " MB" : "N/A");
+ }
+
+ /** Reads a config value: system property first, then environment variable. */
+ private static String getEnv(String key) {
+ String val = System.getProperty(key);
+ if (val == null || val.isEmpty()) {
+ val = System.getenv(key);
+ }
+ return val != null && !val.isEmpty() ? val : null;
+ }
+
+ private static int getEnvInt(String key, int defaultValue) {
+ String val = getEnv(key);
+ return val != null ? Integer.parseInt(val) : defaultValue;
+ }
+
+ private static final int UPLOAD_BUFFER_SIZE = 8 * 1024 * 1024;
+
+ private static void uploadFile(FileIO srcIO, Path src, FileIO dstIO, Path dst)
+ throws IOException {
+ try (SeekableInputStream in = srcIO.newInputStream(src);
+ PositionOutputStream out = dstIO.newOutputStream(dst, false)) {
+ byte[] buf = new byte[UPLOAD_BUFFER_SIZE];
+ int read;
+ while ((read = in.read(buf)) != -1) {
+ out.write(buf, 0, read);
+ }
+ out.flush();
+ }
+ }
+
+ private static void ensureLumina() {
+ if (!Lumina.isLibraryLoaded()) {
+ try {
+ Lumina.loadLibrary();
+ } catch (LuminaException e) {
+ Assumptions.assumeTrue(
+ false, "Lumina native library not available: " + e.getMessage());
+ }
+ }
+ }
+
+ private static FileIO createFileIO(String benchmarkPath) throws IOException {
+ if (benchmarkPath.startsWith("oss://")) {
+ String endpoint = getEnv("OSS_ENDPOINT");
+ String accessKeyId = getEnv("OSS_ACCESS_KEY_ID");
+ String accessKeySecret = getEnv("OSS_ACCESS_KEY_SECRET");
+ Assumptions.assumeTrue(
+ endpoint != null && accessKeyId != null && accessKeySecret != null,
+ "OSS mode requires OSS_ENDPOINT, OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET.");
+
+ Options fsOptions = new Options();
+ fsOptions.setString("fs.oss.endpoint", endpoint);
+ fsOptions.setString("fs.oss.accessKeyId", accessKeyId);
+ fsOptions.setString("fs.oss.accessKeySecret", accessKeySecret);
+ CatalogContext context = CatalogContext.create(fsOptions);
+ return FileIO.get(new Path(benchmarkPath), context);
+ } else {
+ return new LocalFileIO();
+ }
+ }
+
+ private static LuminaVectorIndexOptions createIndexOptions(
+ int dimension, String distanceMetric, String encodingType) {
+ int buildThreads =
+ getEnvInt("BENCHMARK_BUILD_THREADS", Runtime.getRuntime().availableProcessors());
+ int efConstruction = getEnvInt("BENCHMARK_EF_CONSTRUCTION", 128);
+ int neighborCount = getEnvInt("BENCHMARK_NEIGHBOR_COUNT", 64);
+ int pqMaxEpoch = getEnvInt("BENCHMARK_PQ_MAX_EPOCH", 0);
+ int pqThreadCount = getEnvInt("BENCHMARK_PQ_THREAD_COUNT", 0);
+ int pqM = getEnvInt("BENCHMARK_PQ_M", 64);
+ int searchListSize = getEnvInt("BENCHMARK_SEARCH_LIST_SIZE", 0);
+
+ Options luminaOpts = new Options();
+ luminaOpts.setInteger(LuminaVectorIndexOptions.DIMENSION.key(), dimension);
+ luminaOpts.setString(LuminaVectorIndexOptions.DISTANCE_METRIC.key(), distanceMetric);
+ luminaOpts.setString(LuminaVectorIndexOptions.ENCODING_TYPE.key(), encodingType);
+ luminaOpts.setInteger(
+ LuminaVectorIndexOptions.DISKANN_BUILD_THREAD_COUNT.key(), buildThreads);
+ luminaOpts.setInteger(
+ LuminaVectorIndexOptions.DISKANN_BUILD_EF_CONSTRUCTION.key(), efConstruction);
+ luminaOpts.setInteger(
+ LuminaVectorIndexOptions.DISKANN_BUILD_NEIGHBOR_COUNT.key(), neighborCount);
+ if (pqMaxEpoch > 0) {
+ luminaOpts.setString("lumina.encoding.pq.max_epoch", String.valueOf(pqMaxEpoch));
+ }
+ if (pqThreadCount > 0) {
+ luminaOpts.setInteger(
+ LuminaVectorIndexOptions.DISKANN_DISK_ENCODING_PQ_THREAD_COUNT.key(),
+ pqThreadCount);
+ }
+ luminaOpts.setInteger(LuminaVectorIndexOptions.ENCODING_PQ_M.key(), pqM);
+ if (searchListSize > 0) {
+ luminaOpts.setString(
+ LuminaVectorIndexOptions.DISKANN_SEARCH_LIST_SIZE.key(),
+ String.valueOf(searchListSize));
+ }
+ return new LuminaVectorIndexOptions(luminaOpts);
+ }
+
+ private static final String BUILD_PARAMS_FILE = "build_params.json";
+
+ /**
+ * Writes build parameters to a JSON file in the index directory so that query benchmarks can
+ * reference the exact configuration used during build.
+ */
+ private static void writeBuildParams(
+ FileIO fileIO,
+ Path indexDir,
+ int numVectors,
+ int dimension,
+ String distanceMetric,
+ String encodingType,
+ LuminaVectorIndexOptions indexOptions)
+ throws IOException {
+ Map params = new LinkedHashMap<>();
+ params.put("num_vectors", String.valueOf(numVectors));
+ params.put("dimension", String.valueOf(dimension));
+ params.put("distance_metric", distanceMetric);
+ params.put("encoding_type", encodingType);
+ params.putAll(indexOptions.toLuminaOptions());
+
+ Path paramsPath = new Path(indexDir, BUILD_PARAMS_FILE);
+ try (Writer writer =
+ new OutputStreamWriter(
+ fileIO.newOutputStream(paramsPath, false), StandardCharsets.UTF_8)) {
+ writer.write("{\n");
+ int i = 0;
+ for (Map.Entry entry : params.entrySet()) {
+ writer.write(
+ String.format(
+ " \"%s\": \"%s\"%s\n",
+ entry.getKey(),
+ entry.getValue(),
+ i < params.size() - 1 ? "," : ""));
+ i++;
+ }
+ writer.write("}\n");
+ }
+ System.out.printf("Build params written to: %s%n", paramsPath);
+ }
+
+ private static void printBuildConfig(
+ String storageMode,
+ Path indexDir,
+ int numVectors,
+ int dimension,
+ String distanceMetric,
+ String encodingType,
+ boolean keepIndex) {
+ int buildThreads =
+ getEnvInt("BENCHMARK_BUILD_THREADS", Runtime.getRuntime().availableProcessors());
+ int efConstruction = getEnvInt("BENCHMARK_EF_CONSTRUCTION", 128);
+ int neighborCount = getEnvInt("BENCHMARK_NEIGHBOR_COUNT", 64);
+ int pqM = getEnvInt("BENCHMARK_PQ_M", 64);
+ int pqMaxEpoch = getEnvInt("BENCHMARK_PQ_MAX_EPOCH", 0);
+ int pqThreadCount = getEnvInt("BENCHMARK_PQ_THREAD_COUNT", 0);
+
+ System.out.println("=== Lumina Build Benchmark ===");
+ System.out.printf("Storage: %s%n", storageMode);
+ System.out.printf("Path: %s%n", indexDir);
+ System.out.printf("Vectors: %,d Dimension: %d%n", numVectors, dimension);
+ System.out.printf("Metric: %s Encoding: %s%n", distanceMetric, encodingType);
+ System.out.printf(
+ "Build: threads=%d ef_construction=%d neighbor_count=%d%n",
+ buildThreads, efConstruction, neighborCount);
+ System.out.printf(
+ "PQ: m=%d max_epoch=%s thread_count=%s%n",
+ pqM,
+ pqMaxEpoch > 0 ? String.valueOf(pqMaxEpoch) : "default",
+ pqThreadCount > 0 ? String.valueOf(pqThreadCount) : "default");
+ System.out.printf("Keep index: %s%n", keepIndex);
+ System.out.println();
+ }
+
+ /** Benchmark: build index only. Use {@code -Dtest=LuminaVectorBenchmark#benchmarkBuild}. */
+ @Test
+ public void benchmarkBuild() throws Exception {
+ ensureLumina();
+
+ String benchmarkPath = getEnv("BENCHMARK_PATH");
+ Assumptions.assumeTrue(
+ benchmarkPath != null && !benchmarkPath.isEmpty(), "BENCHMARK_PATH not set.");
+
+ boolean keepIndex = "true".equalsIgnoreCase(getEnv("BENCHMARK_KEEP_INDEX"));
+ int numVectors = getEnvInt("BENCHMARK_NUM_VECTORS", DEFAULT_NUM_VECTORS);
+ int dimension = getEnvInt("BENCHMARK_DIMENSION", DEFAULT_DIMENSION);
+ String distanceMetric =
+ getEnv("BENCHMARK_DISTANCE_METRIC") != null
+ ? getEnv("BENCHMARK_DISTANCE_METRIC")
+ : "inner_product";
+ String encodingType =
+ getEnv("BENCHMARK_ENCODING_TYPE") != null
+ ? getEnv("BENCHMARK_ENCODING_TYPE")
+ : "pq";
+
+ boolean isOss = benchmarkPath.startsWith("oss://");
+ String storageMode = isOss ? "OSS (Jindo)" : "Local";
+ FileIO fileIO = createFileIO(benchmarkPath);
+ LuminaVectorIndexOptions indexOptions =
+ createIndexOptions(dimension, distanceMetric, encodingType);
+ DataType vectorType = new ArrayType(new FloatType());
+
+ Path indexDir = new Path(benchmarkPath, UUID.randomUUID().toString());
+ if (!isOss) {
+ fileIO.mkdirs(indexDir);
+ }
+
+ printBuildConfig(
+ storageMode,
+ indexDir,
+ numVectors,
+ dimension,
+ distanceMetric,
+ encodingType,
+ keepIndex);
+
+ writeBuildParams(
+ fileIO,
+ indexDir,
+ numVectors,
+ dimension,
+ distanceMetric,
+ encodingType,
+ indexOptions);
+
+ printMemory("Before build");
+ long buildStartTime = System.currentTimeMillis();
+
+ // When targeting OSS, build to a local temp dir first, then upload
+ Path localTempDir = null;
+ FileIO localFileIO = new LocalFileIO();
+ final FileIO buildFileIO;
+ final Path buildDir;
+ if (isOss) {
+ localTempDir =
+ new Path(
+ System.getProperty("java.io.tmpdir"),
+ "lumina-bench-" + UUID.randomUUID());
+ localFileIO.mkdirs(localTempDir);
+ buildFileIO = localFileIO;
+ buildDir = localTempDir;
+ System.out.printf("Building to local temp: %s%n", localTempDir);
+ } else {
+ buildFileIO = fileIO;
+ buildDir = indexDir;
+ }
+
+ String indexFileName;
+ try {
+ final Path finalBuildDir = buildDir;
+ GlobalIndexFileWriter gFileWriter =
+ new GlobalIndexFileWriter() {
+ @Override
+ public String newFileName(String prefix) {
+ return prefix + "-" + UUID.randomUUID();
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(String fileName)
+ throws IOException {
+ return buildFileIO.newOutputStream(
+ new Path(finalBuildDir, fileName), false);
+ }
+ };
+
+ List results;
+ try (LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(gFileWriter, vectorType, indexOptions)) {
+ System.out.printf("Writing %,d vectors...%n", numVectors);
+ long writeStart = System.currentTimeMillis();
+ SplittableRandom insertRandom = new SplittableRandom(12345);
+ float[] vec = new float[dimension];
+ for (int i = 0; i < numVectors; i++) {
+ for (int d = 0; d < dimension; d++) {
+ vec[d] = (float) insertRandom.nextDouble() * 2 - 1;
+ }
+ writer.write(vec);
+ }
+ long writeEnd = System.currentTimeMillis();
+ System.out.printf(
+ "Write done in %.2f s (%.0f vectors/s)%n",
+ (writeEnd - writeStart) / 1000.0,
+ numVectors / ((writeEnd - writeStart) / 1000.0));
+ printMemory("After vector write");
+
+ System.out.println("Building index (pretrain + insert + dump)...");
+ long finishStart = System.currentTimeMillis();
+ results = writer.finish();
+ long finishEnd = System.currentTimeMillis();
+ System.out.printf(
+ "Index build done in %.2f s%n", (finishEnd - finishStart) / 1000.0);
+ printMemory("After index build");
+ }
+
+ indexFileName = results.get(0).fileName();
+
+ // Upload to OSS if needed
+ if (isOss) {
+ Path localFile = new Path(buildDir, indexFileName);
+ Path remoteFile = new Path(indexDir, indexFileName);
+ long localSize = localFileIO.getFileSize(localFile);
+ System.out.printf(
+ "Uploading to OSS: %s (%,d bytes, %.2f GB)...%n",
+ remoteFile, localSize, localSize / (1024.0 * 1024 * 1024));
+ long uploadStart = System.currentTimeMillis();
+ uploadFile(localFileIO, localFile, fileIO, remoteFile);
+ long uploadEnd = System.currentTimeMillis();
+ double uploadSec = (uploadEnd - uploadStart) / 1000.0;
+ System.out.printf(
+ "Upload done in %.2f s (%.2f MB/s)%n",
+ uploadSec, localSize / (1024.0 * 1024) / uploadSec);
+ }
+
+ Path indexFilePath = new Path(indexDir, indexFileName);
+ long buildFileSize = fileIO.getFileSize(indexFilePath);
+ long buildEndTime = System.currentTimeMillis();
+
+ System.out.println();
+ System.out.println("=== Build Benchmark Results ===");
+ System.out.printf(
+ "Total build time: %.2f s%n", (buildEndTime - buildStartTime) / 1000.0);
+ System.out.printf(
+ "Index file: %s (%,d bytes, %.2f GB)%n",
+ indexFilePath, buildFileSize, buildFileSize / (1024.0 * 1024 * 1024));
+ System.out.printf(
+ "%nUse this for query benchmark:%n -DBENCHMARK_INDEX_FILE=%s%n",
+ indexFilePath);
+ System.out.println("===============================");
+
+ } finally {
+ if (localTempDir != null) {
+ localFileIO.delete(localTempDir, true);
+ }
+ if (!keepIndex) {
+ System.out.println("\nCleaning up...");
+ fileIO.delete(indexDir, true);
+ System.out.println("Done.");
+ }
+ }
+ }
+
+ /**
+ * Benchmark: query only. Requires {@code BENCHMARK_INDEX_FILE}. Use {@code
+ * -Dtest=LuminaVectorBenchmark#benchmarkQuery}.
+ */
+ @Test
+ public void benchmarkQuery() throws Exception {
+ ensureLumina();
+
+ String benchmarkPath = getEnv("BENCHMARK_PATH");
+ Assumptions.assumeTrue(
+ benchmarkPath != null && !benchmarkPath.isEmpty(), "BENCHMARK_PATH not set.");
+
+ String existingIndexFile = getEnv("BENCHMARK_INDEX_FILE");
+ Assumptions.assumeTrue(
+ existingIndexFile != null && !existingIndexFile.isEmpty(),
+ "BENCHMARK_INDEX_FILE not set. Run benchmarkBuild first.");
+
+ int dimension = getEnvInt("BENCHMARK_DIMENSION", DEFAULT_DIMENSION);
+ String distanceMetric =
+ getEnv("BENCHMARK_DISTANCE_METRIC") != null
+ ? getEnv("BENCHMARK_DISTANCE_METRIC")
+ : "inner_product";
+ String encodingType =
+ getEnv("BENCHMARK_ENCODING_TYPE") != null
+ ? getEnv("BENCHMARK_ENCODING_TYPE")
+ : "pq";
+ int numQueries = getEnvInt("BENCHMARK_NUM_QUERIES", 1000);
+
+ boolean isOss = benchmarkPath.startsWith("oss://");
+ String storageMode = isOss ? "OSS (Jindo)" : "Local";
+ FileIO fileIO = createFileIO(benchmarkPath);
+ LuminaVectorIndexOptions indexOptions =
+ createIndexOptions(dimension, distanceMetric, encodingType);
+ DataType vectorType = new ArrayType(new FloatType());
+
+ Path existingPath = new Path(existingIndexFile);
+ Path indexDir = existingPath.getParent();
+ String indexFileName = existingPath.getName();
+
+ Path indexFilePath = new Path(indexDir, indexFileName);
+ long fileSize = fileIO.getFileSize(indexFilePath);
+
+ System.out.println("=== Lumina Query Benchmark ===");
+ System.out.printf("Storage: %s%n", storageMode);
+ System.out.printf("Index file: %s%n", indexFilePath);
+ System.out.printf(
+ "Index size: %,d bytes (%.2f MB)%n", fileSize, fileSize / (1024.0 * 1024));
+ System.out.printf("Dimension: %d TopK: %d%n", dimension, TOP_K);
+ System.out.printf("Queries: %,d%n", numQueries);
+ System.out.printf("Metric: %s Encoding: %s%n", distanceMetric, encodingType);
+ int searchListSize = getEnvInt("BENCHMARK_SEARCH_LIST_SIZE", 0);
+ if (searchListSize > 0) {
+ System.out.printf("Search: list_size=%d%n", searchListSize);
+ }
+ System.out.println();
+
+ // Generate random query vectors
+ SplittableRandom queryRandom = new SplittableRandom(99999);
+ float[][] queryVectors = new float[numQueries][dimension];
+ for (int i = 0; i < numQueries; i++) {
+ for (int d = 0; d < dimension; d++) {
+ queryVectors[i][d] = (float) queryRandom.nextDouble() * 2 - 1;
+ }
+ }
+
+ LuminaIndexMeta meta = new LuminaIndexMeta(indexOptions.toLuminaOptions());
+ GlobalIndexIOMeta ioMeta = new GlobalIndexIOMeta(indexFilePath, fileSize, meta.serialize());
+ String fieldName = "vec";
+ List ioMetas = Collections.singletonList(ioMeta);
+
+ final FileIO benchFileIO = fileIO;
+ final Path benchIndexDir = indexDir;
+
+ printMemory("Before queries");
+ System.out.printf(
+ "Running %,d queries (top-%d), each with fresh index load...%n", numQueries, TOP_K);
+
+ long[] queryLatencies = new long[numQueries];
+ long totalQueryStart = System.currentTimeMillis();
+
+ for (int i = 0; i < numQueries; i++) {
+ VectorSearch vs = new VectorSearch(queryVectors[i], TOP_K, fieldName);
+ long queryStart = System.nanoTime();
+
+ GlobalIndexFileReader gFileReader =
+ ioMetaArg ->
+ benchFileIO.newInputStream(
+ new Path(benchIndexDir, ioMetaArg.filePath().getName()));
+ try (LuminaVectorGlobalIndexReader reader =
+ new LuminaVectorGlobalIndexReader(
+ gFileReader, ioMetas, vectorType, indexOptions)) {
+ reader.visitVectorSearch(vs);
+ }
+
+ long queryEnd = System.nanoTime();
+ queryLatencies[i] = queryEnd - queryStart;
+ }
+
+ long totalQueryEnd = System.currentTimeMillis();
+ double totalQueryTimeSec = (totalQueryEnd - totalQueryStart) / 1000.0;
+
+ Arrays.sort(queryLatencies);
+ double avgLatencyMs = 0;
+ for (long lat : queryLatencies) {
+ avgLatencyMs += lat / 1_000_000.0;
+ }
+ avgLatencyMs /= numQueries;
+
+ double p50Ms = queryLatencies[numQueries / 2] / 1_000_000.0;
+ double p90Ms = queryLatencies[(int) (numQueries * 0.9)] / 1_000_000.0;
+ double p95Ms = queryLatencies[(int) (numQueries * 0.95)] / 1_000_000.0;
+ double p99Ms = queryLatencies[(int) (numQueries * 0.99)] / 1_000_000.0;
+ double minMs = queryLatencies[0] / 1_000_000.0;
+ double maxMs = queryLatencies[numQueries - 1] / 1_000_000.0;
+ double rps = numQueries / totalQueryTimeSec;
+
+ printMemory("After queries");
+
+ System.out.println();
+ System.out.println("=== Query Benchmark Results ===");
+ System.out.printf("Storage: %s%n", storageMode);
+ System.out.printf(
+ "Index file size: %,d bytes (%.2f MB)%n", fileSize, fileSize / (1024.0 * 1024));
+ System.out.printf("Total queries: %,d%n", numQueries);
+ System.out.printf("Total time: %.2f s%n", totalQueryTimeSec);
+ System.out.printf("RPS: %.2f queries/s%n", rps);
+ System.out.println();
+ System.out.println("Latency (ms):");
+ System.out.printf(" Min: %.3f%n", minMs);
+ System.out.printf(" Avg: %.3f%n", avgLatencyMs);
+ System.out.printf(" P50: %.3f%n", p50Ms);
+ System.out.printf(" P90: %.3f%n", p90Ms);
+ System.out.printf(" P95: %.3f%n", p95Ms);
+ System.out.printf(" P99: %.3f%n", p99Ms);
+ System.out.printf(" Max: %.3f%n", maxMs);
+ System.out.println("===============================");
+ }
+}
diff --git a/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexScanTest.java b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexScanTest.java
new file mode 100644
index 000000000000..2e0696c9ff3f
--- /dev/null
+++ b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexScanTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.aliyun.lumina.Lumina;
+import org.aliyun.lumina.LuminaException;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for scanning Lumina vector global index. */
+public class LuminaVectorGlobalIndexScanTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private FileStoreTable table;
+ private String commitUser;
+ private FileIO fileIO;
+ private RowType rowType;
+ private final String vectorFieldName = "vec";
+
+ @BeforeEach
+ public void before() throws Exception {
+ if (!Lumina.isLibraryLoaded()) {
+ try {
+ Lumina.loadLibrary();
+ } catch (LuminaException e) {
+ StringBuilder errorMsg = new StringBuilder("Lumina native library not available.");
+ errorMsg.append("\nError: ").append(e.getMessage());
+ if (e.getCause() != null) {
+ errorMsg.append("\nCause: ").append(e.getCause().getMessage());
+ }
+ errorMsg.append(
+ "\n\nTo run Lumina tests, ensure the paimon-lumina-jni JAR"
+ + " with native libraries is available in the classpath.");
+ Assumptions.assumeTrue(false, errorMsg.toString());
+ }
+ }
+
+ Path tablePath = new Path(tempDir.toString());
+ fileIO = new LocalFileIO();
+ SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+
+ String similarityMetric = "l2";
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column(vectorFieldName, new ArrayType(DataTypes.FLOAT()))
+ .option(CoreOptions.BUCKET.key(), "-1")
+ .option("lumina.index.dimension", "2")
+ .option("lumina.distance.metric", similarityMetric)
+ .option("data-evolution.enabled", "true")
+ .option("row-tracking.enabled", "true")
+ .build();
+
+ TableSchema tableSchema = schemaManager.createTable(schema);
+ table = FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+ rowType = table.rowType();
+ commitUser = UUID.randomUUID().toString();
+ }
+
+ @Test
+ public void testVectorIndexScanEndToEnd() throws Exception {
+ float[][] vectors =
+ new float[][] {
+ new float[] {1.0f, 0.0f}, new float[] {0.95f, 0.1f}, new float[] {0.1f, 0.95f},
+ new float[] {0.98f, 0.05f}, new float[] {0.0f, 1.0f}, new float[] {0.05f, 0.98f}
+ };
+
+ writeVectors(vectors);
+
+ List indexFiles = buildIndexManually(vectors);
+
+ commitIndex(indexFiles);
+
+ float[] queryVector = new float[] {0.85f, 0.15f};
+ VectorSearch vectorSearch = new VectorSearch(queryVector, 3, vectorFieldName);
+ ReadBuilder readBuilder = table.newReadBuilder().withVectorSearch(vectorSearch);
+ TableScan scan = readBuilder.newScan();
+ List ids = new ArrayList<>();
+ readBuilder
+ .newRead()
+ .createReader(scan.plan())
+ .forEachRemaining(
+ row -> {
+ ids.add(row.getInt(0));
+ });
+ assertThat(ids).isNotEmpty();
+ }
+
+ @Test
+ public void testVectorIndexScanWithDifferentMetrics() throws Exception {
+ Path tablePath = new Path(tempDir.toString(), "inner_product");
+ fileIO.mkdirs(tablePath);
+ SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column(vectorFieldName, new ArrayType(DataTypes.FLOAT()))
+ .option(CoreOptions.BUCKET.key(), "-1")
+ .option("lumina.index.dimension", "2")
+ .option("lumina.distance.metric", "inner_product")
+ .option("data-evolution.enabled", "true")
+ .option("row-tracking.enabled", "true")
+ .build();
+
+ TableSchema tableSchema = schemaManager.createTable(schema);
+ FileStoreTable ipTable = FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+ String ipCommitUser = UUID.randomUUID().toString();
+
+ float[][] vectors =
+ new float[][] {
+ new float[] {1.0f, 0.0f},
+ new float[] {0.707f, 0.707f},
+ new float[] {0.0f, 1.0f},
+ };
+
+ StreamTableWrite write = ipTable.newWrite(ipCommitUser);
+ for (int i = 0; i < vectors.length; i++) {
+ write.write(GenericRow.of(i, new GenericArray(vectors[i])));
+ }
+ List messages = write.prepareCommit(false, 0);
+ StreamTableCommit commit = ipTable.newCommit(ipCommitUser);
+ commit.commit(0, messages);
+ write.close();
+
+ Options options = new Options(ipTable.options());
+ LuminaVectorIndexOptions indexOptions = new LuminaVectorIndexOptions(options);
+ Path indexDir = ipTable.store().pathFactory().indexPath();
+ if (!fileIO.exists(indexDir)) {
+ fileIO.mkdirs(indexDir);
+ }
+
+ GlobalIndexFileWriter fileWriter =
+ new GlobalIndexFileWriter() {
+ @Override
+ public String newFileName(String prefix) {
+ return prefix + "-" + UUID.randomUUID();
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(String fileName)
+ throws IOException {
+ return fileIO.newOutputStream(new Path(indexDir, fileName), false);
+ }
+ };
+
+ LuminaVectorGlobalIndexWriter indexWriter =
+ new LuminaVectorGlobalIndexWriter(
+ fileWriter, new ArrayType(DataTypes.FLOAT()), indexOptions);
+ for (float[] vec : vectors) {
+ indexWriter.write(vec);
+ }
+
+ List entries = indexWriter.finish();
+ List metas = new ArrayList<>();
+ int fieldId = ipTable.rowType().getFieldIndex(vectorFieldName);
+
+ for (ResultEntry entry : entries) {
+ long fileSize = fileIO.getFileSize(new Path(indexDir, entry.fileName()));
+ GlobalIndexMeta globalMeta =
+ new GlobalIndexMeta(0, vectors.length - 1, fieldId, null, entry.meta());
+
+ metas.add(
+ new IndexFileMeta(
+ LuminaVectorGlobalIndexerFactory.IDENTIFIER,
+ entry.fileName(),
+ fileSize,
+ entry.rowCount(),
+ globalMeta,
+ (String) null));
+ }
+
+ DataIncrement dataIncrement = DataIncrement.indexIncrement(metas);
+ CommitMessage message =
+ new CommitMessageImpl(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ 1,
+ dataIncrement,
+ CompactIncrement.emptyIncrement());
+ ipTable.newCommit(ipCommitUser).commit(1, Collections.singletonList(message));
+
+ float[] queryVector = new float[] {1.0f, 0.0f};
+ VectorSearch vectorSearch = new VectorSearch(queryVector, 3, vectorFieldName);
+ ReadBuilder readBuilder = ipTable.newReadBuilder().withVectorSearch(vectorSearch);
+ TableScan scan = readBuilder.newScan();
+ List ids = new ArrayList<>();
+ readBuilder
+ .newRead()
+ .createReader(scan.plan())
+ .forEachRemaining(
+ row -> {
+ ids.add(row.getInt(0));
+ });
+ assertThat(ids).isNotEmpty();
+ }
+
+ private void writeVectors(float[][] vectors) throws Exception {
+ StreamTableWrite write = table.newWrite(commitUser);
+ for (int i = 0; i < vectors.length; i++) {
+ write.write(GenericRow.of(i, new GenericArray(vectors[i])));
+ }
+ List messages = write.prepareCommit(false, 0);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ commit.commit(0, messages);
+ write.close();
+ }
+
+ private List buildIndexManually(float[][] vectors) throws Exception {
+ Options options = new Options(table.options());
+ LuminaVectorIndexOptions indexOptions = new LuminaVectorIndexOptions(options);
+ Path indexDir = table.store().pathFactory().indexPath();
+ if (!fileIO.exists(indexDir)) {
+ fileIO.mkdirs(indexDir);
+ }
+
+ GlobalIndexFileWriter fileWriter =
+ new GlobalIndexFileWriter() {
+ @Override
+ public String newFileName(String prefix) {
+ return prefix + "-" + UUID.randomUUID();
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(String fileName)
+ throws IOException {
+ return fileIO.newOutputStream(new Path(indexDir, fileName), false);
+ }
+ };
+
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(
+ fileWriter, new ArrayType(DataTypes.FLOAT()), indexOptions);
+ for (float[] vec : vectors) {
+ writer.write(vec);
+ }
+
+ List entries = writer.finish();
+
+ List metas = new ArrayList<>();
+ int fieldId = rowType.getFieldIndex(vectorFieldName);
+
+ for (ResultEntry entry : entries) {
+ long fileSize = fileIO.getFileSize(new Path(indexDir, entry.fileName()));
+ GlobalIndexMeta globalMeta =
+ new GlobalIndexMeta(0, vectors.length - 1, fieldId, null, entry.meta());
+
+ metas.add(
+ new IndexFileMeta(
+ LuminaVectorGlobalIndexerFactory.IDENTIFIER,
+ entry.fileName(),
+ fileSize,
+ entry.rowCount(),
+ globalMeta,
+ (String) null));
+ }
+ return metas;
+ }
+
+ private void commitIndex(List indexFiles) {
+ StreamTableCommit commit = table.newCommit(commitUser);
+ DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles);
+ CommitMessage message =
+ new CommitMessageImpl(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ 1,
+ dataIncrement,
+ CompactIncrement.emptyIncrement());
+ commit.commit(1, Collections.singletonList(message));
+ }
+}
diff --git a/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexTest.java b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexTest.java
new file mode 100644
index 000000000000..c2a6a50677be
--- /dev/null
+++ b/paimon-lumina/src/test/java/org/apache/paimon/lumina/index/LuminaVectorGlobalIndexTest.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lumina.index;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.aliyun.lumina.Lumina;
+import org.aliyun.lumina.LuminaException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link LuminaVectorGlobalIndexWriter} and {@link LuminaVectorGlobalIndexReader}. */
+public class LuminaVectorGlobalIndexTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private FileIO fileIO;
+ private Path indexPath;
+ private DataType vectorType;
+ private final String fieldName = "vec";
+
+ @BeforeEach
+ public void setup() {
+ if (!Lumina.isLibraryLoaded()) {
+ try {
+ Lumina.loadLibrary();
+ } catch (LuminaException e) {
+ StringBuilder errorMsg = new StringBuilder("Lumina native library not available.");
+ errorMsg.append("\nError: ").append(e.getMessage());
+ if (e.getCause() != null) {
+ errorMsg.append("\nCause: ").append(e.getCause().getMessage());
+ }
+ errorMsg.append(
+ "\n\nTo run Lumina tests, ensure the paimon-lumina-jni JAR"
+ + " with native libraries is available in the classpath.");
+ Assumptions.assumeTrue(false, errorMsg.toString());
+ }
+ }
+
+ fileIO = new LocalFileIO();
+ indexPath = new Path(tempDir.toString());
+ vectorType = new ArrayType(new FloatType());
+ }
+
+ @AfterEach
+ public void cleanup() throws IOException {
+ if (fileIO != null) {
+ fileIO.delete(indexPath, true);
+ }
+ }
+
+ private GlobalIndexFileWriter createFileWriter(Path path) {
+ return new GlobalIndexFileWriter() {
+ @Override
+ public String newFileName(String prefix) {
+ return prefix + "-" + UUID.randomUUID();
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(String fileName) throws IOException {
+ return fileIO.newOutputStream(new Path(path, fileName), false);
+ }
+ };
+ }
+
+ private GlobalIndexFileReader createFileReader(Path path) {
+ return meta -> fileIO.newInputStream(new Path(path, meta.filePath()));
+ }
+
+ private List toIOMetas(List results, Path path)
+ throws IOException {
+ assertThat(results).hasSize(1);
+ ResultEntry result = results.get(0);
+ Path filePath = new Path(path, result.fileName());
+ return Collections.singletonList(
+ new GlobalIndexIOMeta(filePath, fileIO.getFileSize(filePath), result.meta()));
+ }
+
+ @Test
+ public void testDifferentMetrics() throws IOException {
+ int dimension = 32;
+ int numVectors = 20;
+
+ String[] metrics = {"l2", "cosine", "inner_product"};
+
+ for (String metric : metrics) {
+ Options options = createDefaultOptions(dimension);
+ options.setString("lumina.distance.metric", metric);
+ if ("cosine".equals(metric)) {
+ // Lumina v0.1.0 does not support PQ + cosine combination
+ options.setString(LuminaVectorIndexOptions.ENCODING_TYPE.key(), "rawf32");
+ }
+ LuminaVectorIndexOptions indexOptions = new LuminaVectorIndexOptions(options);
+ Path metricIndexPath = new Path(indexPath, metric.toLowerCase());
+ GlobalIndexFileWriter fileWriter = createFileWriter(metricIndexPath);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, indexOptions);
+
+ List testVectors = generateRandomVectors(numVectors, dimension);
+ testVectors.forEach(writer::write);
+
+ List results = writer.finish();
+ List metas = toIOMetas(results, metricIndexPath);
+
+ GlobalIndexFileReader fileReader = createFileReader(metricIndexPath);
+ try (LuminaVectorGlobalIndexReader reader =
+ new LuminaVectorGlobalIndexReader(
+ fileReader, metas, vectorType, indexOptions)) {
+ VectorSearch vectorSearch = new VectorSearch(testVectors.get(0), 3, fieldName);
+ LuminaScoredGlobalIndexResult searchResult =
+ (LuminaScoredGlobalIndexResult)
+ reader.visitVectorSearch(vectorSearch).get();
+ assertThat(searchResult.results().getLongCardinality()).isEqualTo(3);
+ assertThat(searchResult.results().contains(0L)).isTrue();
+ float score = searchResult.scoreGetter().score(0L);
+ assertThat(score).isNotNaN();
+ }
+ }
+ }
+
+ @Test
+ public void testDifferentDimensions() throws IOException {
+ int[] dimensions = {8, 32, 128, 256};
+
+ for (int dimension : dimensions) {
+ Options options = createDefaultOptions(dimension);
+ LuminaVectorIndexOptions indexOptions = new LuminaVectorIndexOptions(options);
+ Path dimIndexPath = new Path(indexPath, "dim_" + dimension);
+ GlobalIndexFileWriter fileWriter = createFileWriter(dimIndexPath);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, indexOptions);
+
+ int numVectors = 10;
+ List testVectors = generateRandomVectors(numVectors, dimension);
+ testVectors.forEach(writer::write);
+
+ List results = writer.finish();
+ List metas = toIOMetas(results, dimIndexPath);
+
+ GlobalIndexFileReader fileReader = createFileReader(dimIndexPath);
+ try (LuminaVectorGlobalIndexReader reader =
+ new LuminaVectorGlobalIndexReader(
+ fileReader, metas, vectorType, indexOptions)) {
+ VectorSearch vectorSearch = new VectorSearch(testVectors.get(0), 5, fieldName);
+ LuminaScoredGlobalIndexResult searchResult =
+ (LuminaScoredGlobalIndexResult)
+ reader.visitVectorSearch(vectorSearch).get();
+ assertThat(searchResult.results().getLongCardinality()).isEqualTo(5);
+ assertThat(searchResult.results().contains(0L)).isTrue();
+ float score = searchResult.scoreGetter().score(0L);
+ assertThat(score).isNotNaN();
+ }
+ }
+ }
+
+ @Test
+ public void testDimensionMismatch() {
+ Options options = createDefaultOptions(64);
+
+ GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+ LuminaVectorIndexOptions indexOptions = new LuminaVectorIndexOptions(options);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, indexOptions);
+
+ float[] wrongDimVector = new float[32];
+ assertThatThrownBy(() -> writer.write(wrongDimVector))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("dimension mismatch");
+ }
+
+ @Test
+ public void testFloatVectorIndexEndToEnd() throws IOException {
+ int dimension = 2;
+ Options options = createDefaultOptions(dimension);
+
+ float[][] vectors =
+ new float[][] {
+ new float[] {1.0f, 0.0f}, new float[] {0.95f, 0.1f}, new float[] {0.1f, 0.95f},
+ new float[] {0.98f, 0.05f}, new float[] {0.0f, 1.0f}, new float[] {0.05f, 0.98f}
+ };
+
+ GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+ LuminaVectorIndexOptions indexOptions = new LuminaVectorIndexOptions(options);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, indexOptions);
+ Arrays.stream(vectors).forEach(writer::write);
+
+ List results = writer.finish();
+ List metas = toIOMetas(results, indexPath);
+
+ GlobalIndexFileReader fileReader = createFileReader(indexPath);
+ try (LuminaVectorGlobalIndexReader reader =
+ new LuminaVectorGlobalIndexReader(fileReader, metas, vectorType, indexOptions)) {
+ // Query vector[0] = (1.0, 0.0); nearest neighbors by L2 should be
+ // row 0 (1.0, 0.0), row 3 (0.98, 0.05), row 1 (0.95, 0.1).
+ VectorSearch vectorSearch = new VectorSearch(vectors[0], 3, fieldName);
+ LuminaScoredGlobalIndexResult result =
+ (LuminaScoredGlobalIndexResult) reader.visitVectorSearch(vectorSearch).get();
+ assertThat(result.results().getLongCardinality()).isEqualTo(3);
+ assertThat(result.results().contains(0L)).isTrue();
+ assertThat(result.results().contains(3L)).isTrue();
+ float scoreRow0 = result.scoreGetter().score(0L);
+ float scoreRow3 = result.scoreGetter().score(3L);
+ assertThat(scoreRow0).isGreaterThanOrEqualTo(scoreRow3);
+
+ // Test with filter: only row 1
+ long expectedRowId = 1;
+ RoaringNavigableMap64 filterResults = new RoaringNavigableMap64();
+ filterResults.add(expectedRowId);
+ vectorSearch =
+ new VectorSearch(vectors[0], 3, fieldName).withIncludeRowIds(filterResults);
+ result = (LuminaScoredGlobalIndexResult) reader.visitVectorSearch(vectorSearch).get();
+ assertThat(result.results().getLongCardinality()).isEqualTo(1);
+ assertThat(result.results().contains(expectedRowId)).isTrue();
+
+ // Test with multiple results
+ float[] queryVector = new float[] {0.85f, 0.15f};
+ vectorSearch = new VectorSearch(queryVector, 2, fieldName);
+ result = (LuminaScoredGlobalIndexResult) reader.visitVectorSearch(vectorSearch).get();
+ assertThat(result.results().getLongCardinality()).isEqualTo(2);
+ }
+ }
+
+ @Test
+ public void testSearchWithFilter() throws IOException {
+ int dimension = 2;
+ Options options = createDefaultOptions(dimension);
+
+ float[][] vectors =
+ new float[][] {
+ new float[] {1.0f, 0.0f},
+ new float[] {0.95f, 0.1f},
+ new float[] {0.9f, 0.2f},
+ new float[] {-1.0f, 0.0f},
+ new float[] {-0.95f, 0.1f},
+ new float[] {-0.9f, 0.2f}
+ };
+
+ GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+ LuminaVectorIndexOptions indexOptions = new LuminaVectorIndexOptions(options);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, indexOptions);
+ Arrays.stream(vectors).forEach(writer::write);
+ List results = writer.finish();
+ List metas = toIOMetas(results, indexPath);
+
+ GlobalIndexFileReader fileReader = createFileReader(indexPath);
+ try (LuminaVectorGlobalIndexReader reader =
+ new LuminaVectorGlobalIndexReader(fileReader, metas, vectorType, indexOptions)) {
+
+ // Unfiltered: query (1,0) top-3 should come from the first cluster (rows 0,1,2).
+ VectorSearch search = new VectorSearch(vectors[0], 3, fieldName);
+ LuminaScoredGlobalIndexResult result =
+ (LuminaScoredGlobalIndexResult) reader.visitVectorSearch(search).get();
+ assertThat(result.results().contains(0L)).isTrue();
+ assertThat(result.results().contains(1L)).isTrue();
+ assertThat(result.results().contains(2L)).isTrue();
+
+ // Filter to row 3 only.
+ RoaringNavigableMap64 filter = new RoaringNavigableMap64();
+ filter.add(3L);
+ search = new VectorSearch(vectors[0], 3, fieldName).withIncludeRowIds(filter);
+ result = (LuminaScoredGlobalIndexResult) reader.visitVectorSearch(search).get();
+ assertThat(result.results().contains(3L)).isTrue();
+ assertThat(result.results().getLongCardinality()).isEqualTo(1);
+
+ // Filter spanning multiple rows: {1, 4}.
+ RoaringNavigableMap64 crossFilter = new RoaringNavigableMap64();
+ crossFilter.add(1L);
+ crossFilter.add(4L);
+ search = new VectorSearch(vectors[0], 6, fieldName).withIncludeRowIds(crossFilter);
+ result = (LuminaScoredGlobalIndexResult) reader.visitVectorSearch(search).get();
+ assertThat(result.results().contains(1L)).isTrue();
+ assertThat(result.results().contains(4L)).isTrue();
+ assertThat(result.results().getLongCardinality()).isEqualTo(2);
+ }
+ }
+
+ @Test
+ public void testPQWithCosineRejected() {
+ Options options = new Options();
+ options.setInteger(LuminaVectorIndexOptions.DIMENSION.key(), 32);
+ options.setString(LuminaVectorIndexOptions.DISTANCE_METRIC.key(), "cosine");
+ options.setString(LuminaVectorIndexOptions.ENCODING_TYPE.key(), "pq");
+ assertThatThrownBy(() -> new LuminaVectorIndexOptions(options))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("PQ encoding with cosine metric");
+ }
+
+ @Test
+ public void testInvalidTopK() {
+ assertThatThrownBy(() -> new VectorSearch(new float[] {0.1f}, 0, fieldName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Limit must be positive");
+ }
+
+ @Test
+ public void testLargeVectorSet() throws IOException {
+ int dimension = 32;
+ Options options = createDefaultOptions(dimension);
+
+ GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+ LuminaVectorIndexOptions indexOptions = new LuminaVectorIndexOptions(options);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, indexOptions);
+
+ int numVectors = 350;
+ List testVectors = generateRandomVectors(numVectors, dimension);
+ testVectors.forEach(writer::write);
+
+ List results = writer.finish();
+ List metas = toIOMetas(results, indexPath);
+
+ Path filePath = new Path(indexPath, results.get(0).fileName());
+ assertThat(fileIO.exists(filePath)).isTrue();
+ assertThat(fileIO.getFileSize(filePath)).isGreaterThan(0);
+
+ GlobalIndexFileReader fileReader = createFileReader(indexPath);
+ try (LuminaVectorGlobalIndexReader reader =
+ new LuminaVectorGlobalIndexReader(fileReader, metas, vectorType, indexOptions)) {
+ for (int queryIdx : new int[] {50, 150, 320}) {
+ VectorSearch vectorSearch =
+ new VectorSearch(testVectors.get(queryIdx), 3, fieldName);
+ LuminaScoredGlobalIndexResult searchResult =
+ (LuminaScoredGlobalIndexResult)
+ reader.visitVectorSearch(vectorSearch).get();
+ assertThat(searchResult.results().getLongCardinality()).isEqualTo(3);
+ assertThat(searchResult.results().contains((long) queryIdx)).isTrue();
+ assertThat(searchResult.scoreGetter().score((long) queryIdx)).isNotNaN();
+ }
+
+ VectorSearch vectorSearch = new VectorSearch(testVectors.get(200), 5, fieldName);
+ LuminaScoredGlobalIndexResult result =
+ (LuminaScoredGlobalIndexResult) reader.visitVectorSearch(vectorSearch).get();
+ assertThat(result.results().getLongCardinality()).isEqualTo(5);
+ assertThat(result.results().contains(200L)).isTrue();
+ }
+ }
+
+ @Test
+ public void testReaderMetaOptionsOverrideDefaultOptions() throws IOException {
+ // Write index with dimension=2
+ int dimension = 2;
+ Options writeOptions = createDefaultOptions(dimension);
+ LuminaVectorIndexOptions writeIndexOptions = new LuminaVectorIndexOptions(writeOptions);
+
+ float[][] vectors =
+ new float[][] {
+ new float[] {1.0f, 0.0f},
+ new float[] {0.95f, 0.1f},
+ new float[] {0.1f, 0.95f},
+ new float[] {0.0f, 1.0f}
+ };
+
+ GlobalIndexFileWriter fileWriter = createFileWriter(indexPath);
+ LuminaVectorGlobalIndexWriter writer =
+ new LuminaVectorGlobalIndexWriter(fileWriter, vectorType, writeIndexOptions);
+ Arrays.stream(vectors).forEach(writer::write);
+ List results = writer.finish();
+ List metas = toIOMetas(results, indexPath);
+
+ // Read with default options (dimension=128, the default) — simulates
+ // the case where table options do not contain lumina.index.dimension.
+ // The reader should still work because meta options written at build time
+ // override the stale default dimension.
+ Options readOptions = new Options();
+ readOptions.setString(LuminaVectorIndexOptions.DISTANCE_METRIC.key(), "l2");
+ // Do NOT set dimension — it defaults to 128
+ LuminaVectorIndexOptions readIndexOptions = new LuminaVectorIndexOptions(readOptions);
+ assertThat(readIndexOptions.dimension()).isEqualTo(128);
+
+ GlobalIndexFileReader fileReader = createFileReader(indexPath);
+ try (LuminaVectorGlobalIndexReader reader =
+ new LuminaVectorGlobalIndexReader(
+ fileReader, metas, vectorType, readIndexOptions)) {
+ VectorSearch vectorSearch = new VectorSearch(vectors[0], 3, fieldName);
+ LuminaScoredGlobalIndexResult result =
+ (LuminaScoredGlobalIndexResult) reader.visitVectorSearch(vectorSearch).get();
+ assertThat(result.results().getLongCardinality()).isEqualTo(3);
+ assertThat(result.results().contains(0L)).isTrue();
+ }
+ }
+
+ private Options createDefaultOptions(int dimension) {
+ Options options = new Options();
+ options.setInteger(LuminaVectorIndexOptions.DIMENSION.key(), dimension);
+ options.setString(LuminaVectorIndexOptions.DISTANCE_METRIC.key(), "l2");
+ return options;
+ }
+
+ private List generateRandomVectors(int count, int dimension) {
+ Random random = new Random(42);
+ List vectors = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ float[] vector = new float[dimension];
+ for (int j = 0; j < dimension; j++) {
+ vector[j] = random.nextFloat() * 2 - 1;
+ }
+ float norm = 0;
+ for (float v : vector) {
+ norm += v * v;
+ }
+ norm = (float) Math.sqrt(norm);
+ if (norm > 0) {
+ for (int m = 0; m < vector.length; m++) {
+ vector[m] /= norm;
+ }
+ }
+ vectors.add(vector);
+ }
+ return vectors;
+ }
+}
diff --git a/paimon-spark/paimon-spark-ut/pom.xml b/paimon-spark/paimon-spark-ut/pom.xml
index e6eab99f1737..fbaed106e6df 100644
--- a/paimon-spark/paimon-spark-ut/pom.xml
+++ b/paimon-spark/paimon-spark-ut/pom.xml
@@ -88,6 +88,13 @@ under the License.
${spark.version}
test
+
+
+ org.apache.paimon
+ paimon-lumina
+ ${project.version}
+ test
+
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala
new file mode 100644
index 000000000000..211ef23a5eef
--- /dev/null
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import scala.collection.JavaConverters._
+
+/** Tests for Lumina vector index read/write operations. */
+class LuminaVectorIndexTest extends PaimonSparkTestBase {
+
+ private val indexType = "lumina-vector-ann"
+ private val defaultOptions = "lumina.index.dimension=3"
+
+ // ========== Index Creation Tests ==========
+
+ test("create lumina vector index - basic") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, v ARRAY)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ |""".stripMargin)
+
+ val values = (0 until 100)
+ .map(
+ i => s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)))")
+ .mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ val output = spark
+ .sql(
+ s"CALL sys.create_global_index(table => 'test.T', index_column => 'v', index_type => '$indexType', options => '$defaultOptions')")
+ .collect()
+ .head
+ assert(output.getBoolean(0))
+
+ val table = loadTable("T")
+ val indexEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == indexType)
+
+ assert(indexEntries.nonEmpty)
+ val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
+ assert(totalRowCount == 100L)
+ }
+ }
+
+ test("create lumina vector index - with partitioned table") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, v ARRAY, pt STRING)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ | PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ var values = (0 until 500)
+ .map(
+ i =>
+ s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)), 'p0')")
+ .mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 300)
+ .map(
+ i =>
+ s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)), 'p1')")
+ .mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ val output = spark
+ .sql(
+ s"CALL sys.create_global_index(table => 'test.T', index_column => 'v', index_type => '$indexType', options => '$defaultOptions')")
+ .collect()
+ .head
+ assert(output.getBoolean(0))
+
+ val table = loadTable("T")
+ val indexEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == indexType)
+
+ assert(indexEntries.nonEmpty)
+ val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
+ assert(totalRowCount == 800L)
+ }
+ }
+
+ // ========== Index Write Tests ==========
+
+ test("write vectors - large dataset") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, v ARRAY)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ |""".stripMargin)
+
+ val df = spark
+ .range(0, 10000)
+ .selectExpr(
+ "cast(id as int) as id",
+ "array(cast(id as float), cast(id + 1 as float), cast(id + 2 as float)) as v")
+ df.write.insertInto("T")
+
+ val output = spark
+ .sql(
+ s"CALL sys.create_global_index(table => 'test.T', index_column => 'v', index_type => '$indexType', options => '$defaultOptions')")
+ .collect()
+ .head
+ assert(output.getBoolean(0))
+
+ val table = loadTable("T")
+ val indexEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == indexType)
+
+ val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
+ assert(totalRowCount == 10000L)
+ }
+ }
+
+ // ========== Index Read/Search Tests ==========
+
+ test("read vectors - basic search") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, v ARRAY)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ |""".stripMargin)
+
+ val values = (0 until 100)
+ .map(
+ i => s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)))")
+ .mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ spark
+ .sql(
+ s"CALL sys.create_global_index(table => 'test.T', index_column => 'v', index_type => '$indexType', options => '$defaultOptions')")
+ .collect()
+
+ val result = spark
+ .sql("""
+ |SELECT * FROM vector_search('T', 'v', array(50.0f, 51.0f, 52.0f), 5)
+ |""".stripMargin)
+ .collect()
+ assert(result.length == 5)
+ }
+ }
+
+ test("read vectors - top-k search with different k values") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, v ARRAY)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ |""".stripMargin)
+
+ val values = (0 until 200)
+ .map(
+ i => s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)))")
+ .mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ spark
+ .sql(
+ s"CALL sys.create_global_index(table => 'test.T', index_column => 'v', index_type => '$indexType', options => '$defaultOptions')")
+ .collect()
+
+ // Test with k=1
+ var result = spark
+ .sql("""
+ |SELECT * FROM vector_search('T', 'v', array(100.0f, 101.0f, 102.0f), 1)
+ |""".stripMargin)
+ .collect()
+ assert(result.length == 1)
+
+ // Test with k=10
+ result = spark
+ .sql("""
+ |SELECT * FROM vector_search('T', 'v', array(100.0f, 101.0f, 102.0f), 10)
+ |""".stripMargin)
+ .collect()
+ assert(result.length == 10)
+ }
+ }
+
+ test("read vectors - normalized vectors search") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, v ARRAY)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ |""".stripMargin)
+
+ val values = (1 to 100)
+ .map {
+ i =>
+ val v = math.sqrt(3.0 * i * i)
+ val normalized = i.toFloat / v.toFloat
+ s"($i, array($normalized, $normalized, $normalized))"
+ }
+ .mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ spark.sql(
+ s"CALL sys.create_global_index(table => 'test.T', index_column => 'v', index_type => '$indexType', options => '$defaultOptions')")
+
+ val result = spark
+ .sql("""
+ |SELECT * FROM vector_search('T', 'v', array(0.577f, 0.577f, 0.577f), 10)
+ |""".stripMargin)
+ .collect()
+
+ assert(result.length == 10)
+ }
+ }
+
+ // ========== Integration Tests ==========
+
+ test("end-to-end: write, index, read cycle") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING, embedding ARRAY)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ |""".stripMargin)
+
+ val values = (0 until 1000)
+ .map(
+ i =>
+ s"($i, 'item_$i', array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)))")
+ .mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ val indexResult = spark
+ .sql(
+ s"CALL sys.create_global_index(table => 'test.T', index_column => 'embedding', index_type => '$indexType', options => '$defaultOptions')")
+ .collect()
+ .head
+ assert(indexResult.getBoolean(0))
+
+ val table = loadTable("T")
+ val indexEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == indexType)
+ assert(indexEntries.nonEmpty)
+
+ val searchResult = spark
+ .sql(
+ """
+ |SELECT id, name FROM vector_search('T', 'embedding', array(500.0f, 501.0f, 502.0f), 10)
+ |""".stripMargin)
+ .collect()
+
+ assert(searchResult.length == 10)
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 176a5a82eabd..159d3593f7d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,6 +72,7 @@ under the License.
paimon-arrow
tools/ci/paimon-ci-tools
paimon-api
+ paimon-lumina