diff --git a/python/DEVELOPMENT.md b/python/DEVELOPMENT.md index 12c56549608..21dba0bdddd 100644 --- a/python/DEVELOPMENT.md +++ b/python/DEVELOPMENT.md @@ -8,6 +8,22 @@ uv sync --extra tests --extra dev Add extras such as `benchmarks`, `torch`, or `geo` only when you need them. After the environment is initialized, either activate it or use `uv run ...` for commands. +`accelerator="cuvs"` does not have a normal project extra today. cuVS Python +packages are published per CUDA major version and are typically installed from +NVIDIA's package index, for example: + +```shell +uv pip install --extra-index-url https://pypi.nvidia.com cuvs-cu12 +``` + +or: + +```shell +uv pip install --extra-index-url https://pypi.nvidia.com cuvs-cu13 +``` + +Pick the package that matches the CUDA version in your environment. + `uv sync` is not just downloading Python packages here. It also builds the local `pylance` Rust extension as part of the editable environment, so the first run, cache misses, or Rust dependency changes can make it noticeably slow. This is expected; let the build finish instead of interrupting it and switching to a different environment setup. ## Building the project diff --git a/python/python/lance/cuvs.py b/python/python/lance/cuvs.py new file mode 100644 index 00000000000..ed8cad83907 --- /dev/null +++ b/python/python/lance/cuvs.py @@ -0,0 +1,534 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +from __future__ import annotations + +import json +import os +import re +import tempfile +from importlib import import_module +from typing import TYPE_CHECKING, Iterator, Tuple + +import pyarrow as pa +import pyarrow.compute as pc + +from .dependencies import numpy as np +from .file import LanceFileWriter +from .log import LOGGER +from .util import _normalize_metric_type + +if TYPE_CHECKING: + from pathlib import Path + + +def is_cuvs_accelerator(accelerator: object) -> bool: + return accelerator == "cuvs" + + +def _require_cuvs(): + try: + return import_module("cuvs.neighbors.ivf_pq") + except ModuleNotFoundError as exc: + raise ModuleNotFoundError( + "accelerator='cuvs' requires cuVS Python bindings to be installed. " + "Install a CUDA-matched package such as 'cuvs-cu12' or 'cuvs-cu13' " + "from https://pypi.nvidia.com." + ) from exc + + +def _optional_cupy(): + try: + return import_module("cupy") + except ModuleNotFoundError: + return None + + +def _make_progress(total: int): + try: + from tqdm.auto import tqdm + + return tqdm(total=total) + except ModuleNotFoundError: + + class _NoOpProgress: + def set_description(self, _description: str): + return None + + def update(self, _count: int): + return None + + def close(self): + return None + + return _NoOpProgress() + + +def _metric_to_cuvs(metric_type: str) -> str: + metric_type = _normalize_metric_type(metric_type).lower() + if metric_type in {"l2", "euclidean"}: + return "sqeuclidean" + if metric_type == "dot": + return "inner_product" + if metric_type == "cosine": + return "cosine" + raise ValueError(f"Metric '{metric_type}' is not supported by cuVS IVF_PQ") + + +def _coerce_float_matrix(matrix: np.ndarray, *, column: str) -> np.ndarray: + if matrix.ndim != 2: + raise ValueError( + f"Expected a 2D training matrix for column '{column}', got {matrix.shape}" + ) + if matrix.dtype == np.float64: + matrix = matrix.astype(np.float32) + elif matrix.dtype not in (np.float16, np.float32): + matrix = matrix.astype(np.float32) + return matrix + + +def _column_to_numpy(table: pa.Table | pa.RecordBatch, column: str) -> np.ndarray: + array = table.column(column) + if isinstance(array, pa.ChunkedArray): + array = array.combine_chunks() + if len(array) == 0: + raise ValueError("cuVS training requires at least one training vector") + + if pa.types.is_fixed_size_list(array.type): + values = array.values.to_numpy(zero_copy_only=False) + matrix = values.reshape(len(array), array.type.list_size) + return _coerce_float_matrix(matrix, column=column) + + values = array.to_pylist() + return _coerce_float_matrix(np.asarray(values), column=column) + + +def _as_numpy(array_like) -> np.ndarray: + if isinstance(array_like, np.ndarray): + return array_like + + if hasattr(array_like, "copy_to_host"): + return np.asarray(array_like.copy_to_host()) + + try: + array = np.asarray(array_like) + if isinstance(array, np.ndarray): + return array + except Exception: + pass + + if hasattr(array_like, "get"): + return np.asarray(array_like.get()) + + cupy = _optional_cupy() + if cupy is not None: + return cupy.asnumpy(array_like) + + raise TypeError("Unable to convert cuVS output to numpy") + + +def _to_cuvs_transform_input(matrix: np.ndarray): + cupy = _optional_cupy() + if cupy is None: + raise ModuleNotFoundError( + "accelerator='cuvs' full index build requires the 'cupy' package " + "to pass transform batches in device memory" + ) + return cupy.asarray(matrix) + + +def _normalize_centroids(index, num_partitions: int, dimension: int) -> np.ndarray: + centroids = _as_numpy(index.centers) + if centroids.shape != (num_partitions, dimension): + raise ValueError( + "cuVS returned incompatible IVF centroids shape: " + f"expected {(num_partitions, dimension)}, got {centroids.shape}" + ) + return centroids + + +def _normalize_pq_codebook( + index, num_sub_vectors: int, num_bits: int, dimension: int +) -> np.ndarray: + pq_book_size = 1 << num_bits + subvector_dim = dimension // num_sub_vectors + pq_centers = _as_numpy(index.pq_centers) + + expected_shapes = { + (num_sub_vectors, subvector_dim, pq_book_size): (0, 2, 1), + (num_sub_vectors, pq_book_size, subvector_dim): None, + } + transpose = expected_shapes.get(pq_centers.shape) + if transpose is None and pq_centers.shape not in expected_shapes: + raise ValueError( + "cuVS returned incompatible PQ codebook shape: expected one of " + f"{list(expected_shapes.keys())}, got {pq_centers.shape}" + ) + if transpose is not None: + pq_centers = np.transpose(pq_centers, transpose) + return pq_centers + + +def _estimate_trainset_fraction( + num_rows: int, num_partitions: int, sample_rate: int +) -> float: + if num_rows <= 0: + raise ValueError("cuVS training requires a non-empty dataset") + desired_rows = max(num_partitions * sample_rate, 256 * 256) + return min(1.0, desired_rows / num_rows) + + +def _sample_training_table( + dataset, column: str, train_rows: int, filt: str | None +) -> pa.Table: + if filt is None: + return dataset.sample(train_rows, columns=[column], randomize_order=True) + + total_rows = dataset.count_rows() + sample_rows = min(total_rows, max(train_rows * 2, train_rows + 1024)) + trainset = dataset.sample(sample_rows, columns=[column], randomize_order=True) + trainset = trainset.filter(pc.is_valid(trainset.column(column))) + if len(trainset) >= train_rows or sample_rows == total_rows: + return trainset.slice(0, min(train_rows, len(trainset))) + + return dataset.to_table(columns=[column], filter=filt, limit=train_rows) + + +def _make_shuffle_batch( + row_ids: np.ndarray, + partitions: np.ndarray, + pq_codes: np.ndarray, + num_partitions: int, + num_sub_vectors: int, +) -> tuple[pa.RecordBatch, pa.RecordBatch]: + sort_indices = np.argsort(partitions, kind="stable") + row_ids = row_ids[sort_indices] + partitions = partitions[sort_indices] + pq_codes = pq_codes[sort_indices] + + pq_values = pa.array(pq_codes.reshape(-1), type=pa.uint8()) + pq_code_array = pa.FixedSizeListArray.from_arrays(pq_values, num_sub_vectors) + partition_counts = np.bincount(partitions, minlength=num_partitions).astype( + np.uint64, copy=False + ) + offsets = np.cumsum(partition_counts, dtype=np.uint64) + data_batch = pa.RecordBatch.from_arrays( + [ + pa.array(row_ids, type=pa.uint64()), + pq_code_array, + ], + schema=pa.schema( + [ + pa.field("_rowid", pa.uint64()), + pa.field("__pq_code", pa.list_(pa.uint8(), list_size=num_sub_vectors)), + ] + ), + ) + offsets_batch = pa.RecordBatch.from_arrays( + [pa.array(offsets, type=pa.uint64())], + schema=pa.schema([pa.field("offset", pa.uint64())]), + ) + return data_batch, offsets_batch + + +def _shuffle_metadata( + num_partitions: int, num_batches: int, partition_counts +) -> dict[str, str]: + return { + "lance:shuffle:num_partitions": str(num_partitions), + "lance:shuffle:num_batches": str(num_batches), + "lance:shuffle:partition_counts": json.dumps(list(partition_counts)), + "lance:shuffle:total_loss": "0.0", + } + + +def _write_v3_shuffle_files( + output_root: str, + batches: Iterator[tuple[pa.RecordBatch, pa.RecordBatch]], + *, + num_partitions: int, + num_sub_vectors: int, +) -> list[str]: + os.makedirs(output_root, exist_ok=True) + data_path = os.path.join(output_root, "shuffle_data.lance") + offsets_path = os.path.join(output_root, "shuffle_offsets.lance") + + data_schema = pa.schema( + [ + pa.field("_rowid", pa.uint64()), + pa.field("__pq_code", pa.list_(pa.uint8(), list_size=num_sub_vectors)), + ] + ) + offsets_schema = pa.schema([pa.field("offset", pa.uint64())]) + + data_writer = None + offsets_writer = LanceFileWriter(offsets_path, offsets_schema) + total_partition_counts = np.zeros(num_partitions, dtype=np.uint64) + global_row_count = np.uint64(0) + num_batches = 0 + + for data_batch, offsets_batch in batches: + if data_writer is None: + data_writer = LanceFileWriter(data_path, data_batch.schema) + data_writer.write_batch(data_batch) + + offsets = offsets_batch.column(0).to_numpy() + adjusted_offsets = offsets + global_row_count + offsets_writer.write_batch( + pa.RecordBatch.from_arrays( + [pa.array(adjusted_offsets, type=pa.uint64())], + schema=offsets_schema, + ) + ) + last_offset = np.uint64(0) + for idx, offset in enumerate(offsets): + total_partition_counts[idx] += np.uint64(offset) - last_offset + last_offset = np.uint64(offset) + global_row_count += np.uint64(data_batch.num_rows) + num_batches += 1 + + if data_writer is None: + data_writer = LanceFileWriter(data_path, data_schema) + + metadata = _shuffle_metadata( + num_partitions, num_batches, total_partition_counts.tolist() + ) + for key, value in metadata.items(): + data_writer.add_schema_metadata(key, value) + offsets_writer.add_schema_metadata(key, value) + + data_writer.close() + offsets_writer.close() + return ["shuffle_data.lance", "shuffle_offsets.lance"] + + +def _train_ivf_pq_index_on_cuvs( + dataset, + column: str, + num_partitions: int, + metric_type: str, + accelerator: str, + num_sub_vectors: int, + *, + sample_rate: int = 256, + max_iters: int = 50, + num_bits: int = 8, + filter_nan: bool = True, +): + if accelerator != "cuvs": + raise ValueError("cuVS acceleration only supports accelerator='cuvs'") + if num_bits != 8: + raise ValueError("cuVS IVF_PQ integration currently supports only num_bits=8") + + dimension = dataset.schema.field(column).type.list_size + if dimension % num_sub_vectors != 0: + raise ValueError( + "cuVS IVF_PQ integration requires vector dimension to be divisible by " + "num_sub_vectors" + ) + + if dataset.schema.field(column).nullable and filter_nan: + filt = f"{column} is not null" + else: + filt = None + + num_rows = dataset.count_rows(filter=filt) + if num_rows == 0: + raise ValueError("cuVS training requires at least one non-null training vector") + + train_rows = max(1, min(num_rows, max(num_partitions * sample_rate, 256 * 256))) + trainset = _sample_training_table(dataset, column, train_rows, filt) + matrix = _column_to_numpy(trainset, column) + + ivf_pq = _require_cuvs() + build_params = ivf_pq.IndexParams( + n_lists=num_partitions, + metric=_metric_to_cuvs(metric_type), + kmeans_n_iters=max_iters, + kmeans_trainset_fraction=_estimate_trainset_fraction( + matrix.shape[0], num_partitions, sample_rate + ), + pq_bits=num_bits, + pq_dim=num_sub_vectors, + codebook_kind="subspace", + force_random_rotation=False, + add_data_on_build=False, + ) + + index = ivf_pq.build(build_params, matrix) + centroids = _normalize_centroids(index, num_partitions, dimension) + pq_codebook = _normalize_pq_codebook(index, num_sub_vectors, num_bits, dimension) + return index, centroids, pq_codebook + + +def one_pass_assign_ivf_pq_on_cuvs( + dataset, + column: str, + metric_type: str, + accelerator: str, + ivf_centroids: np.ndarray, + pq_codebook: np.ndarray, + trained_index=None, + dst_dataset_uri: str | Path | None = None, + batch_size: int = 1024 * 128, + *, + filter_nan: bool = True, +): + if accelerator != "cuvs": + raise ValueError("cuVS acceleration only supports accelerator='cuvs'") + + num_rows = dataset.count_rows() + if dataset.schema.field(column).nullable and filter_nan: + filt = f"{column} is not null" + else: + filt = None + + num_sub_vectors = pq_codebook.shape[0] + ivf_pq = _require_cuvs() + + if trained_index is None: + raise ValueError( + "one_pass_assign_ivf_pq_on_cuvs requires a trained cuVS index for " + "single-node transform" + ) + + progress = _make_progress(num_rows) + progress.set_description("Assigning partitions and computing pq codes") + + def _partition_and_pq_codes_assignment() -> Iterator[ + tuple[pa.RecordBatch, pa.RecordBatch] + ]: + for batch in dataset.to_batches( + columns=[column], + filter=filt, + with_row_id=True, + batch_size=batch_size, + ): + vectors = _column_to_numpy(batch, column) + row_ids = batch.column("_rowid").to_numpy() + valid_mask = np.isfinite(vectors).all(axis=1) + if not np.all(valid_mask): + LOGGER.warning( + "%s vectors are ignored during partition assignment", + len(valid_mask) - int(valid_mask.sum()), + ) + row_ids = row_ids[valid_mask] + vectors = vectors[valid_mask] + if len(row_ids) == 0: + continue + partitions, pq_codes = ivf_pq.transform( + trained_index, _to_cuvs_transform_input(vectors) + ) + partitions = _as_numpy(partitions).astype(np.uint32, copy=False) + pq_codes = _as_numpy(pq_codes).astype(np.uint8, copy=False) + if pq_codes.shape != (len(row_ids), num_sub_vectors): + raise ValueError( + "cuVS transform returned incompatible PQ codes shape: " + f"expected {(len(row_ids), num_sub_vectors)}, got {pq_codes.shape}" + ) + + partition_batch = _make_shuffle_batch( + row_ids, + partitions, + pq_codes, + ivf_centroids.shape[0], + num_sub_vectors, + ) + progress.update(len(row_ids)) + yield partition_batch + + if dst_dataset_uri is None: + dst_dataset_uri = tempfile.mkdtemp() + if re.search(r".:\\", dst_dataset_uri) is not None: + dst_dataset_uri = dst_dataset_uri.replace("\\", "/", 1) + shuffle_buffers = _write_v3_shuffle_files( + str(dst_dataset_uri), + _partition_and_pq_codes_assignment(), + num_partitions=ivf_centroids.shape[0], + num_sub_vectors=num_sub_vectors, + ) + + progress.close() + LOGGER.info("Saved precomputed pq_codes to %s", dst_dataset_uri) + return str(dst_dataset_uri), shuffle_buffers + + +def train_ivf_pq_on_cuvs( + dataset, + column: str, + num_partitions: int, + metric_type: str, + accelerator: str, + num_sub_vectors: int, + *, + sample_rate: int = 256, + max_iters: int = 50, + num_bits: int = 8, + filter_nan: bool = True, +) -> Tuple[np.ndarray, np.ndarray]: + _, centroids, pq_codebook = _train_ivf_pq_index_on_cuvs( + dataset, + column, + num_partitions, + metric_type, + accelerator, + num_sub_vectors, + sample_rate=sample_rate, + max_iters=max_iters, + num_bits=num_bits, + filter_nan=filter_nan, + ) + return centroids, pq_codebook + + +def one_pass_train_ivf_pq_on_cuvs( + dataset, + column: str, + num_partitions: int, + metric_type: str, + accelerator: str, + num_sub_vectors: int, + *, + sample_rate: int = 256, + max_iters: int = 50, + num_bits: int = 8, + filter_nan: bool = True, +): + return train_ivf_pq_on_cuvs( + dataset, + column, + num_partitions, + metric_type, + accelerator, + num_sub_vectors, + sample_rate=sample_rate, + max_iters=max_iters, + num_bits=num_bits, + filter_nan=filter_nan, + ) + + +def prepare_global_ivf_pq_on_cuvs( + dataset, + column: str, + num_partitions: int, + num_sub_vectors: int, + *, + distance_type: str = "l2", + accelerator: str = "cuvs", + sample_rate: int = 256, + max_iters: int = 50, + num_bits: int = 8, +): + centroids, pq_codebook = train_ivf_pq_on_cuvs( + dataset, + column, + num_partitions, + distance_type, + accelerator, + num_sub_vectors, + sample_rate=sample_rate, + max_iters=max_iters, + num_bits=num_bits, + ) + return {"ivf_centroids": centroids, "pq_codebook": pq_codebook} diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 7496746285a..5dee7767918 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -39,6 +39,7 @@ from lance.log import LOGGER from .blob import BlobFile +from .cuvs import is_cuvs_accelerator from .dependencies import ( _check_for_numpy, _check_for_torch, @@ -2899,20 +2900,24 @@ def _create_index_impl( # Handle timing for various parts of accelerated builds timers = {} + use_cuvs = is_cuvs_accelerator(accelerator) if accelerator is not None and index_type != "IVF_PQ": + if use_cuvs: + raise ValueError( + f"accelerator='{accelerator}' only supports IVF_PQ index builds" + ) LOGGER.warning( "Index type %s does not support GPU acceleration; falling back to CPU", index_type, ) accelerator = None + use_cuvs = False # IMPORTANT: Distributed indexing is CPU-only. Enforce single-node when - # accelerator or torch-related paths are detected. - torch_detected = False + # any Python-side accelerator path is selected. + accelerated_build_detected = accelerator is not None try: - if accelerator is not None: - torch_detected = True - else: + if accelerator is None: impl = kwargs.get("implementation") use_torch_flag = kwargs.get("use_torch") is True one_pass_flag = kwargs.get("one_pass_ivfpq") is True @@ -2925,16 +2930,16 @@ def _create_index_impl( or torch_centroids or torch_codebook ): - torch_detected = True + accelerated_build_detected = True except Exception: # Be conservative: if detection fails, do not modify behavior pass - if torch_detected: + if accelerated_build_detected: if require_commit: if fragment_ids is not None or index_uuid is not None: LOGGER.info( - "Torch detected; " + "Accelerated build detected; " "enforce single-node indexing (distributed is CPU-only)." ) fragment_ids = None @@ -2942,63 +2947,117 @@ def _create_index_impl( else: if index_uuid is not None: LOGGER.info( - "Torch detected; " + "Accelerated build detected; " "enforce single-node indexing (distributed is CPU-only)." ) index_uuid = None if accelerator is not None: - from .vector import ( - one_pass_assign_ivf_pq_on_accelerator, - one_pass_train_ivf_pq_on_accelerator, - ) - - LOGGER.info("Doing one-pass ivfpq accelerated computations") if num_partitions is None: num_rows = self.count_rows() num_partitions = _target_partition_size_to_num_partitions( num_rows, target_partition_size ) - timers["ivf+pq_train:start"] = time.time() - ( - ivf_centroids, - ivf_kmeans, - pq_codebook, - pq_kmeans_list, - ) = one_pass_train_ivf_pq_on_accelerator( - self, - column[0], - num_partitions, - metric, - accelerator, - num_sub_vectors=num_sub_vectors, - batch_size=20480, - filter_nan=filter_nan, - ) - timers["ivf+pq_train:end"] = time.time() - ivfpq_train_time = timers["ivf+pq_train:end"] - timers["ivf+pq_train:start"] - LOGGER.info("ivf+pq training time: %ss", ivfpq_train_time) - timers["ivf+pq_assign:start"] = time.time() - shuffle_output_dir, shuffle_buffers = one_pass_assign_ivf_pq_on_accelerator( - self, - column[0], - metric, - accelerator, - ivf_kmeans, - pq_kmeans_list, - batch_size=20480, - filter_nan=filter_nan, - ) - timers["ivf+pq_assign:end"] = time.time() - ivfpq_assign_time = ( - timers["ivf+pq_assign:end"] - timers["ivf+pq_assign:start"] - ) - LOGGER.info("ivf+pq transform time: %ss", ivfpq_assign_time) - kwargs["precomputed_shuffle_buffers"] = shuffle_buffers - kwargs["precomputed_shuffle_buffers_path"] = os.path.join( - shuffle_output_dir, "data" - ) + if use_cuvs: + from .cuvs import ( + _train_ivf_pq_index_on_cuvs, + one_pass_assign_ivf_pq_on_cuvs, + ) + + LOGGER.info("Doing one-pass ivfpq cuVS training") + timers["ivf+pq_train:start"] = time.time() + trained_index, ivf_centroids, pq_codebook = _train_ivf_pq_index_on_cuvs( + self, + column[0], + num_partitions, + metric, + accelerator, + num_sub_vectors=num_sub_vectors, + sample_rate=kwargs.get("sample_rate", 256), + max_iters=kwargs.get("max_iters", 50), + num_bits=kwargs.get("num_bits", 8), + filter_nan=filter_nan, + ) + timers["ivf+pq_train:end"] = time.time() + ivfpq_train_time = ( + timers["ivf+pq_train:end"] - timers["ivf+pq_train:start"] + ) + LOGGER.info("cuVS ivf+pq training time: %ss", ivfpq_train_time) + timers["ivf+pq_assign:start"] = time.time() + ( + shuffle_output_dir, + shuffle_buffers, + ) = one_pass_assign_ivf_pq_on_cuvs( + self, + column[0], + metric, + accelerator, + ivf_centroids, + pq_codebook, + trained_index=trained_index, + batch_size=1024 * 128, + filter_nan=filter_nan, + ) + timers["ivf+pq_assign:end"] = time.time() + ivfpq_assign_time = ( + timers["ivf+pq_assign:end"] - timers["ivf+pq_assign:start"] + ) + LOGGER.info("cuVS ivf+pq transform time: %ss", ivfpq_assign_time) + kwargs["precomputed_shuffle_buffers"] = shuffle_buffers + kwargs["precomputed_shuffle_buffers_path"] = shuffle_output_dir + else: + from .vector import ( + one_pass_assign_ivf_pq_on_accelerator, + one_pass_train_ivf_pq_on_accelerator, + ) + + LOGGER.info("Doing one-pass ivfpq accelerated computations") + timers["ivf+pq_train:start"] = time.time() + ( + ivf_centroids, + ivf_kmeans, + pq_codebook, + pq_kmeans_list, + ) = one_pass_train_ivf_pq_on_accelerator( + self, + column[0], + num_partitions, + metric, + accelerator, + num_sub_vectors=num_sub_vectors, + batch_size=20480, + filter_nan=filter_nan, + ) + timers["ivf+pq_train:end"] = time.time() + ivfpq_train_time = ( + timers["ivf+pq_train:end"] - timers["ivf+pq_train:start"] + ) + LOGGER.info("ivf+pq training time: %ss", ivfpq_train_time) + timers["ivf+pq_assign:start"] = time.time() + ( + shuffle_output_dir, + shuffle_buffers, + ) = one_pass_assign_ivf_pq_on_accelerator( + self, + column[0], + metric, + accelerator, + ivf_kmeans, + pq_kmeans_list, + batch_size=20480, + filter_nan=filter_nan, + ) + timers["ivf+pq_assign:end"] = time.time() + ivfpq_assign_time = ( + timers["ivf+pq_assign:end"] - timers["ivf+pq_assign:start"] + ) + LOGGER.info("ivf+pq transform time: %ss", ivfpq_assign_time) + + kwargs["precomputed_shuffle_buffers"] = shuffle_buffers + kwargs["precomputed_shuffle_buffers_path"] = os.path.join( + shuffle_output_dir, "data" + ) if index_type.startswith("IVF"): if (ivf_centroids is not None) and (ivf_centroids_file is not None): raise ValueError( @@ -3229,7 +3288,17 @@ def create_index( The number of sub-vectors for PQ (Product Quantization). accelerator : str or ``torch.Device``, optional If set, use an accelerator to speed up the training process. - Accepted accelerator: "cuda" (Nvidia GPU) and "mps" (Apple Silicon GPU). + Accepted accelerator: + + - "cuda" or ``torch.device(...)`` for the existing torch-based path + on NVIDIA GPUs + - "mps" for Apple Silicon GPU + - "cuvs" for the explicit cuVS-based IVF_PQ training path on NVIDIA + GPUs + + The cuVS path also requires the cuVS Python bindings to be installed + separately. + If not set, use the CPU. index_cache_size : int, optional The size of the index cache in number of entries. Default value is 256. @@ -3338,8 +3407,13 @@ def create_index( Experimental Accelerator (GPU) support: - *accelerate*: use GPU to train IVF partitions. - Only supports CUDA (Nvidia) or MPS (Apple) currently. - Requires PyTorch being installed. + `accelerator="cuda"` and `accelerator="mps"` use the existing torch + path. `accelerator="cuda"` runs on NVIDIA GPUs and `accelerator="mps"` + runs on Apple Silicon GPUs. `accelerator="cuvs"` uses cuVS for IVF_PQ + training only and requires an NVIDIA GPU. + + The torch path requires PyTorch. The cuVS path requires the cuVS + Python bindings to be installed separately. .. code-block:: python @@ -3354,9 +3428,10 @@ def create_index( accelerator="cuda" ) - Note: GPU acceleration is currently supported only for the ``IVF_PQ`` index - type. Providing an accelerator for other index types will fall back to CPU - index building. + Note: accelerator support is currently limited to the ``IVF_PQ`` index type. + Providing ``accelerator="cuda"`` for other index types will fall back to CPU + index building. Providing ``accelerator="cuvs"`` for other index types will + raise an error. References ---------- diff --git a/python/python/lance/indices/builder.py b/python/python/lance/indices/builder.py index c31ea0a7a0c..00591ead934 100644 --- a/python/python/lance/indices/builder.py +++ b/python/python/lance/indices/builder.py @@ -9,6 +9,7 @@ import numpy as np import pyarrow as pa +from lance.cuvs import is_cuvs_accelerator, prepare_global_ivf_pq_on_cuvs from lance.indices.ivf import IvfModel from lance.indices.pq import PqModel @@ -115,6 +116,11 @@ def train_ivf( self._verify_ivf_sample_rate(sample_rate, num_partitions, num_rows) distance_type = self._normalize_distance_type(distance_type) self._verify_ivf_params(num_partitions) + if is_cuvs_accelerator(accelerator): + raise NotImplementedError( + "IndicesBuilder.train_ivf does not support accelerator='cuvs'; " + "use prepare_global_ivf_pq instead" + ) if accelerator is None: from lance.lance import indices @@ -250,6 +256,25 @@ def prepare_global_ivf_pq( `IndicesBuilder.train_pq` (indices.train_pq_model). No public method names elsewhere are changed. """ + if is_cuvs_accelerator(accelerator): + if fragment_ids is not None: + raise NotImplementedError( + "fragment_ids is not supported with accelerator='cuvs'" + ) + num_rows = self._count_rows() + num_partitions = self._determine_num_partitions(num_partitions, num_rows) + num_subvectors = self._normalize_pq_params(num_subvectors, self.dimension) + return prepare_global_ivf_pq_on_cuvs( + self.dataset, + self.column[0], + num_partitions, + num_subvectors, + distance_type=distance_type, + accelerator=accelerator, + sample_rate=sample_rate, + max_iters=max_iters, + ) + # Global IVF training ivf_model = self.train_ivf( num_partitions, diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index b20ffc8cf7a..cf0bfe6e2bf 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -9,10 +9,12 @@ import string import tempfile import time +from importlib import import_module from pathlib import Path from typing import Optional import lance +import lance.cuvs as lance_cuvs import numpy as np import pyarrow as pa import pyarrow.compute as pc @@ -505,6 +507,15 @@ def test_create_index_unsupported_accelerator(tmp_path): accelerator="cuda:abc", ) + with pytest.raises(ValueError): + dataset.create_index( + "vector", + index_type="IVF_PQ", + num_partitions=4, + num_sub_vectors=16, + accelerator="cuvs:0", + ) + def test_create_index_accelerator_fallback(tmp_path, caplog): tbl = create_table() @@ -526,6 +537,287 @@ def test_create_index_accelerator_fallback(tmp_path, caplog): ) +def test_create_index_cuvs_dispatch(tmp_path, monkeypatch): + tbl = create_table(nvec=512, ndim=128) + dataset = lance.write_dataset(tbl, tmp_path) + calls = {} + + class FakeIndex: + pq_dim = 16 + pq_bits = 8 + + def fake_train( + dataset_arg, + column, + num_partitions, + metric_type, + accelerator, + num_sub_vectors, + *, + sample_rate, + max_iters, + num_bits, + filter_nan, + ): + calls["dataset"] = dataset_arg + calls["column"] = column + calls["num_partitions"] = num_partitions + calls["metric_type"] = metric_type + calls["accelerator"] = accelerator + calls["num_sub_vectors"] = num_sub_vectors + calls["sample_rate"] = sample_rate + calls["max_iters"] = max_iters + calls["num_bits"] = num_bits + calls["filter_nan"] = filter_nan + return ( + FakeIndex(), + np.random.randn(num_partitions, 128).astype(np.float32), + np.random.randn(num_sub_vectors, 256, 128 // num_sub_vectors).astype( + np.float32 + ), + ) + + def fake_assign( + dataset_arg, + column, + metric_type, + accelerator, + ivf_centroids, + pq_codebook, + trained_index=None, + dst_dataset_uri=None, + batch_size=20480, + *, + filter_nan, + ): + calls["assign_dataset"] = dataset_arg + calls["assign_column"] = column + calls["assign_metric_type"] = metric_type + calls["assign_accelerator"] = accelerator + calls["assign_trained_index"] = trained_index + calls["assign_batch_size"] = batch_size + calls["assign_filter_nan"] = filter_nan + + row_ids = dataset_arg.to_table(columns=[], with_row_id=True)[ + "_rowid" + ].to_numpy() + part_ids = pa.array(np.zeros(len(row_ids), dtype=np.uint32)) + pq_values = pa.array(np.zeros(len(row_ids) * 16, dtype=np.uint8)) + pq_codes = pa.FixedSizeListArray.from_arrays(pq_values, 16) + shuffle_ds_uri = str(tmp_path / "cuvs_shuffle_buffers") + shuffle_ds = lance.write_dataset( + pa.Table.from_arrays( + [pa.array(row_ids), part_ids, pq_codes], + names=["row_id", "__ivf_part_id", "__pq_code"], + ), + shuffle_ds_uri, + ) + shuffle_buffers = [ + data_file.path + for frag in shuffle_ds.get_fragments() + for data_file in frag.data_files() + ] + return shuffle_ds_uri, shuffle_buffers + + monkeypatch.setattr(lance_cuvs, "_train_ivf_pq_index_on_cuvs", fake_train) + monkeypatch.setattr(lance_cuvs, "one_pass_assign_ivf_pq_on_cuvs", fake_assign) + + dataset = dataset.create_index( + "vector", + index_type="IVF_PQ", + num_partitions=4, + num_sub_vectors=16, + accelerator="cuvs", + ) + + assert calls["column"] == "vector" + assert calls["num_partitions"] == 4 + assert calls["metric_type"] == "L2" + assert calls["accelerator"] == "cuvs" + assert calls["num_sub_vectors"] == 16 + assert calls["assign_column"] == "vector" + assert calls["assign_metric_type"] == "L2" + assert calls["assign_accelerator"] == "cuvs" + assert isinstance(calls["assign_trained_index"], FakeIndex) + assert dataset.stats.index_stats("vector_idx")["index_type"] == "IVF_PQ" + + +def test_create_index_cuvs_rejects_non_ivf_pq(tmp_path): + tbl = create_table() + dataset = lance.write_dataset(tbl, tmp_path) + + with pytest.raises(ValueError, match="only supports IVF_PQ"): + dataset.create_index( + "vector", + index_type="IVF_FLAT", + num_partitions=4, + accelerator="cuvs", + ) + + +def test_prepare_global_ivf_pq_cuvs_dispatch(tmp_path, monkeypatch): + ds = _make_sample_dataset_base(tmp_path, "cuvs_prepare_ds", 512, 128) + builder = IndicesBuilder(ds, "vector") + builder_module = import_module("lance.indices.builder") + calls = {} + + def fake_prepare( + dataset_arg, + column, + num_partitions, + num_sub_vectors, + *, + distance_type, + accelerator, + sample_rate, + max_iters, + ): + calls["dataset"] = dataset_arg + calls["column"] = column + calls["num_partitions"] = num_partitions + calls["num_sub_vectors"] = num_sub_vectors + calls["distance_type"] = distance_type + calls["accelerator"] = accelerator + calls["sample_rate"] = sample_rate + calls["max_iters"] = max_iters + return { + "ivf_centroids": np.random.randn(num_partitions, 128).astype(np.float32), + "pq_codebook": np.random.randn( + num_sub_vectors, 256, 128 // num_sub_vectors + ).astype(np.float32), + } + + monkeypatch.setattr(builder_module, "prepare_global_ivf_pq_on_cuvs", fake_prepare) + + prepared = builder.prepare_global_ivf_pq( + num_partitions=4, + num_subvectors=16, + distance_type="l2", + accelerator="cuvs", + sample_rate=7, + max_iters=20, + ) + + assert calls["column"] == "vector" + assert calls["num_partitions"] == 4 + assert calls["num_sub_vectors"] == 16 + assert calls["distance_type"] == "l2" + assert calls["accelerator"] == "cuvs" + assert prepared["ivf_centroids"].shape == (4, 128) + assert prepared["pq_codebook"].shape == (16, 256, 8) + + +def test_train_ivf_pq_on_cuvs_nullable_vectors(tmp_path, monkeypatch): + tbl = create_table(nvec=32, ndim=16, nullify=True) + dataset = lance.write_dataset(tbl, tmp_path) + + class FakeIndex: + centers = np.random.randn(4, 16).astype(np.float32) + pq_centers = np.random.randn(4, 256, 4).astype(np.float32) + + class FakeIvfPqModule: + class IndexParams: + def __init__(self, **kwargs): + self.kwargs = kwargs + + @staticmethod + def build(build_params, matrix): + assert build_params.kwargs["n_lists"] == 4 + assert matrix.shape[1] == 16 + assert matrix.dtype == np.float32 + return FakeIndex() + + monkeypatch.setattr(lance_cuvs, "_require_cuvs", lambda: FakeIvfPqModule()) + + centroids, pq_codebook = lance_cuvs.train_ivf_pq_on_cuvs( + dataset, + "vector", + 4, + "L2", + "cuvs", + 4, + sample_rate=4, + ) + + assert centroids.shape == (4, 16) + assert pq_codebook.shape == (4, 256, 4) + + +def test_cuvs_as_numpy_prefers_copy_to_host(): + class FakeDeviceTensor: + def copy_to_host(self): + return np.arange(6, dtype=np.float32).reshape(2, 3) + + array = lance_cuvs._as_numpy(FakeDeviceTensor()) + + assert isinstance(array, np.ndarray) + assert array.shape == (2, 3) + assert array.dtype == np.float32 + + +def test_one_pass_assign_ivf_pq_on_cuvs_writes_shuffle_buffers(tmp_path, monkeypatch): + tbl = create_table(nvec=32, ndim=16) + dataset = lance.write_dataset(tbl, tmp_path / "cuvs_assign_src") + + ivf_centroids = np.random.randn(4, 16).astype(np.float32) + pq_codebook = np.random.randn(4, 256, 4).astype(np.float32) + + class FakeDeviceTensor: + def __init__(self, array): + self._array = array + + def copy_to_host(self): + return self._array + + class FakeCupyArray: + def __init__(self, array): + self.array = array + + class FakeCupyModule: + @staticmethod + def asarray(array): + return FakeCupyArray(array) + + class FakeIndex: + pq_dim = 4 + pq_bits = 8 + + class FakeIvfPqModule: + @staticmethod + def transform(index, vectors): + assert isinstance(index, FakeIndex) + assert isinstance(vectors, FakeCupyArray) + labels = np.arange(len(vectors.array), dtype=np.uint32) % 4 + pq_codes = np.full((len(vectors.array), 4), 7, dtype=np.uint8) + return FakeDeviceTensor(labels), FakeDeviceTensor(pq_codes) + + monkeypatch.setattr(lance_cuvs, "_require_cuvs", lambda: FakeIvfPqModule()) + monkeypatch.setattr(lance_cuvs, "_optional_cupy", lambda: FakeCupyModule()) + + shuffle_uri, shuffle_buffers = lance_cuvs.one_pass_assign_ivf_pq_on_cuvs( + dataset, + "vector", + "l2", + "cuvs", + ivf_centroids, + pq_codebook, + trained_index=FakeIndex(), + batch_size=8, + ) + + from lance.file import LanceFileReader + + data_reader = LanceFileReader(str(Path(shuffle_uri) / "shuffle_data.lance")) + offsets_reader = LanceFileReader(str(Path(shuffle_uri) / "shuffle_offsets.lance")) + data_batch = next(data_reader.read_all(batch_size=1024).to_batches()) + offsets_batch = next(offsets_reader.read_all(batch_size=1024).to_batches()) + + assert shuffle_buffers == ["shuffle_data.lance", "shuffle_offsets.lance"] + assert data_batch.column("_rowid").type == pa.uint64() + assert data_batch.column("__pq_code").type == pa.list_(pa.uint8(), 4) + assert offsets_batch.column("offset").type == pa.uint64() + + def test_use_index(dataset, tmp_path): ann_ds = lance.write_dataset(dataset.to_table(), tmp_path / "indexed.lance") ann_ds = ann_ds.create_index( diff --git a/rust/lance-index/src/vector/v3/shuffler.rs b/rust/lance-index/src/vector/v3/shuffler.rs index 0bf714df237..45c719d523a 100644 --- a/rust/lance-index/src/vector/v3/shuffler.rs +++ b/rust/lance-index/src/vector/v3/shuffler.rs @@ -4,6 +4,7 @@ //! Shuffler is a component that takes a stream of record batches and shuffles them into //! the corresponding IVF partitions. +use std::collections::HashMap; use std::ops::Range; use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex}; @@ -36,6 +37,13 @@ use object_store::path::Path; use crate::vector::{LOSS_METADATA_KEY, PART_ID_COLUMN}; +const SHUFFLE_NUM_PARTITIONS_METADATA_KEY: &str = "lance:shuffle:num_partitions"; +const SHUFFLE_NUM_BATCHES_METADATA_KEY: &str = "lance:shuffle:num_batches"; +const SHUFFLE_PARTITION_COUNTS_METADATA_KEY: &str = "lance:shuffle:partition_counts"; +const SHUFFLE_TOTAL_LOSS_METADATA_KEY: &str = "lance:shuffle:total_loss"; +pub const SHUFFLE_DATA_FILE_NAME: &str = "shuffle_data.lance"; +pub const SHUFFLE_OFFSETS_FILE_NAME: &str = "shuffle_offsets.lance"; + #[async_trait::async_trait] /// A reader that can read the shuffled partitions. pub trait ShuffleReader: Send + Sync { @@ -435,7 +443,7 @@ impl Shuffler for TwoFileShuffler { ); // Create data file writer - let data_path = self.output_dir.child("shuffle_data.lance"); + let data_path = self.output_dir.child(SHUFFLE_DATA_FILE_NAME); let spill_path = self.output_dir.child("shuffle_data.spill"); let writer = self.object_store.create(&data_path).await?; let mut file_writer = FileWriter::try_new( @@ -446,7 +454,7 @@ impl Shuffler for TwoFileShuffler { .with_page_metadata_spill(self.object_store.clone(), spill_path); // Create offsets file writer - let offsets_path = self.output_dir.child("shuffle_offsets.lance"); + let offsets_path = self.output_dir.child(SHUFFLE_OFFSETS_FILE_NAME); let spill_path = self.output_dir.child("shuffle_offsets.spill"); let writer = self.object_store.create(&offsets_path).await?; let mut offsets_writer = FileWriter::try_new( @@ -527,13 +535,37 @@ impl Shuffler for TwoFileShuffler { .await?; } + let partition_counts_json = serde_json::to_string(&partition_counts).map_err(|e| { + Error::invalid_input(format!("Failed to serialize shuffle partition counts: {e}")) + })?; + let num_partitions_str = num_partitions.to_string(); + let num_batches_str = num_batches + .load(std::sync::atomic::Ordering::Relaxed) + .to_string(); + let total_loss_str = total_loss.lock().unwrap().to_string(); + for writer in [&mut file_writer, &mut offsets_writer] { + writer.add_schema_metadata( + SHUFFLE_NUM_PARTITIONS_METADATA_KEY, + num_partitions_str.clone(), + ); + writer.add_schema_metadata(SHUFFLE_NUM_BATCHES_METADATA_KEY, num_batches_str.clone()); + writer.add_schema_metadata( + SHUFFLE_PARTITION_COUNTS_METADATA_KEY, + partition_counts_json.clone(), + ); + writer.add_schema_metadata(SHUFFLE_TOTAL_LOSS_METADATA_KEY, total_loss_str.clone()); + } + // Finish files file_writer.finish().await?; offsets_writer.finish().await?; - let num_batches = num_batches.load(std::sync::atomic::Ordering::Relaxed); - - let total_loss_val = *total_loss.lock().unwrap(); + let num_batches = num_batches_str + .parse::() + .expect("num_batches string was produced from u64"); + let total_loss_val = total_loss_str + .parse::() + .expect("total_loss string was produced from f64"); TwoFileShuffleReader::try_new( self.object_store.clone(), @@ -558,6 +590,46 @@ pub struct TwoFileShuffleReader { } impl TwoFileShuffleReader { + pub async fn try_open_existing( + object_store: Arc, + output_dir: Path, + data_file: impl AsRef, + offsets_file: impl AsRef, + ) -> Result> { + let scheduler_config = SchedulerConfig::max_bandwidth(&object_store); + let scheduler = ScanScheduler::new(object_store, scheduler_config); + + let file_reader = FileReader::try_open( + scheduler + .open_file( + &output_dir.child(data_file.as_ref()), + &CachedFileSize::unknown(), + ) + .await?, + None, + Arc::::default(), + &LanceCache::no_cache(), + FileReaderOptions::default(), + ) + .await?; + + let offsets_reader = FileReader::try_open( + scheduler + .open_file( + &output_dir.child(offsets_file.as_ref()), + &CachedFileSize::unknown(), + ) + .await?, + None, + Arc::::default(), + &LanceCache::no_cache(), + FileReaderOptions::default(), + ) + .await?; + + Self::from_existing_readers(scheduler, file_reader, offsets_reader) + } + async fn try_new( object_store: Arc, output_dir: Path, @@ -573,7 +645,7 @@ impl TwoFileShuffleReader { let scheduler_config = SchedulerConfig::max_bandwidth(&object_store); let scheduler = ScanScheduler::new(object_store, scheduler_config); - let data_path = output_dir.child("shuffle_data.lance"); + let data_path = output_dir.child(SHUFFLE_DATA_FILE_NAME); let file_reader = FileReader::try_open( scheduler .open_file(&data_path, &CachedFileSize::unknown()) @@ -585,7 +657,7 @@ impl TwoFileShuffleReader { ) .await?; - let offsets_path = output_dir.child("shuffle_offsets.lance"); + let offsets_path = output_dir.child(SHUFFLE_OFFSETS_FILE_NAME); let offsets_reader = FileReader::try_open( scheduler .open_file(&offsets_path, &CachedFileSize::unknown()) @@ -608,6 +680,87 @@ impl TwoFileShuffleReader { })) } + fn from_existing_readers( + scheduler: Arc, + file_reader: FileReader, + offsets_reader: FileReader, + ) -> Result> { + let metadata: &HashMap = &offsets_reader.schema().metadata; + + let num_partitions = metadata + .get(SHUFFLE_NUM_PARTITIONS_METADATA_KEY) + .ok_or_else(|| { + Error::invalid_input(format!( + "Missing required metadata key {SHUFFLE_NUM_PARTITIONS_METADATA_KEY} in precomputed V3 shuffle offsets file" + )) + })? + .parse::() + .map_err(|e| { + Error::invalid_input(format!( + "Invalid value for {SHUFFLE_NUM_PARTITIONS_METADATA_KEY}: {e}" + )) + })?; + let num_batches = metadata + .get(SHUFFLE_NUM_BATCHES_METADATA_KEY) + .ok_or_else(|| { + Error::invalid_input(format!( + "Missing required metadata key {SHUFFLE_NUM_BATCHES_METADATA_KEY} in precomputed V3 shuffle offsets file" + )) + })? + .parse::() + .map_err(|e| { + Error::invalid_input(format!( + "Invalid value for {SHUFFLE_NUM_BATCHES_METADATA_KEY}: {e}" + )) + })?; + let partition_counts = serde_json::from_str::>( + metadata + .get(SHUFFLE_PARTITION_COUNTS_METADATA_KEY) + .ok_or_else(|| { + Error::invalid_input(format!( + "Missing required metadata key {SHUFFLE_PARTITION_COUNTS_METADATA_KEY} in precomputed V3 shuffle offsets file" + )) + })?, + ) + .map_err(|e| { + Error::invalid_input(format!( + "Invalid value for {SHUFFLE_PARTITION_COUNTS_METADATA_KEY}: {e}" + )) + })?; + if partition_counts.len() != num_partitions { + return Err(Error::invalid_input(format!( + "Precomputed V3 shuffle partition count length {} does not match num_partitions {}", + partition_counts.len(), + num_partitions + ))); + } + let total_loss = metadata + .get(SHUFFLE_TOTAL_LOSS_METADATA_KEY) + .map(|value| { + value.parse::().map_err(|e| { + Error::invalid_input(format!( + "Invalid value for {SHUFFLE_TOTAL_LOSS_METADATA_KEY}: {e}" + )) + }) + }) + .transpose()? + .unwrap_or(0.0); + + if num_batches == 0 { + return Ok(Box::new(EmptyReader)); + } + + Ok(Box::new(Self { + _scheduler: scheduler, + file_reader, + offsets_reader, + num_partitions, + num_batches, + partition_counts, + total_loss, + })) + } + async fn partition_ranges(&self, partition_id: usize) -> Result>> { let mut positions = Vec::with_capacity(self.num_batches as usize * 2); for batch_idx in 0..self.num_batches { @@ -844,6 +997,42 @@ mod tests { assert!((loss - 4.25).abs() < 1e-10, "expected 4.25, got {}", loss); } + #[tokio::test] + async fn test_two_file_shuffler_reopen_existing_files() { + let dir = TempStrDir::default(); + let output_dir = Path::from(dir.as_ref()); + let num_partitions = 3; + + let batch1 = make_batch(&[0, 1, 2], &[10, 20, 30], Some(1.5)); + let batch2 = make_batch(&[2, 0, 1, 0], &[40, 50, 60, 70], Some(2.0)); + + let shuffler = TwoFileShuffler::new(output_dir.clone(), num_partitions); + let stream = batches_to_stream(vec![batch1, batch2]); + let _ = shuffler.shuffle(stream).await.unwrap(); + + let reopened = TwoFileShuffleReader::try_open_existing( + Arc::new(ObjectStore::local()), + output_dir, + SHUFFLE_DATA_FILE_NAME, + SHUFFLE_OFFSETS_FILE_NAME, + ) + .await + .unwrap(); + + assert_eq!(reopened.partition_size(0).unwrap(), 3); + assert_eq!(reopened.partition_size(1).unwrap(), 2); + assert_eq!(reopened.partition_size(2).unwrap(), 2); + + let p0 = collect_partition(reopened.as_ref(), 0).await.unwrap(); + let vals: &Int32Array = p0.column_by_name("val").unwrap().as_primitive(); + let mut v: Vec = vals.iter().map(|x| x.unwrap()).collect(); + v.sort(); + assert_eq!(v, vec![10, 50, 70]); + + let loss = reopened.total_loss().unwrap(); + assert!((loss - 3.5).abs() < 1e-10, "expected 3.5, got {}", loss); + } + #[tokio::test] async fn test_two_file_shuffler_single_batch() { let dir = TempStrDir::default(); diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 24298cbba18..9a7001834d0 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -3,6 +3,7 @@ use std::collections::HashSet; use std::future; +use std::path::Path as StdPath; use std::sync::Arc; use std::{collections::HashMap, pin::Pin}; @@ -44,7 +45,10 @@ use lance_index::vector::quantizer::{QuantizerMetadata, QuantizerStorage}; use lance_index::vector::shared::{SupportedIvfIndexType, write_unified_ivf_and_index_metadata}; use lance_index::vector::storage::STORAGE_METADATA_KEY; use lance_index::vector::transform::Flatten; -use lance_index::vector::v3::shuffler::{EmptyReader, IvfShufflerReader}; +use lance_index::vector::v3::shuffler::{ + EmptyReader, IvfShufflerReader, SHUFFLE_DATA_FILE_NAME, SHUFFLE_OFFSETS_FILE_NAME, + TwoFileShuffleReader, +}; use lance_index::vector::v3::subindex::SubIndexType; use lance_index::vector::{LOSS_METADATA_KEY, PART_ID_COLUMN, PQ_CODE_COLUMN, VectorIndex}; use lance_index::vector::{PART_ID_FIELD, ivf::storage::IvfModel}; @@ -141,6 +145,43 @@ type BuildStream = Pin::Storage, S, f64)>>> + Send>>; impl IvfIndexBuilder { + async fn try_open_precomputed_v3_shuffle_reader( + &self, + root: &Path, + files: &[String], + ) -> Result>> { + if files.len() != 2 { + return Ok(None); + } + + let mut data_file = None; + let mut offsets_file = None; + for file in files { + let Some(file_name) = StdPath::new(file).file_name() else { + return Ok(None); + }; + match file_name.to_string_lossy().as_ref() { + SHUFFLE_DATA_FILE_NAME => data_file = Some(SHUFFLE_DATA_FILE_NAME), + SHUFFLE_OFFSETS_FILE_NAME => offsets_file = Some(SHUFFLE_OFFSETS_FILE_NAME), + _ => return Ok(None), + } + } + let (Some(data_file), Some(offsets_file)) = (data_file, offsets_file) else { + return Ok(None); + }; + + Ok(Some( + TwoFileShuffleReader::try_open_existing( + Arc::new(ObjectStore::local()), + root.clone(), + data_file, + offsets_file, + ) + .await? + .into(), + )) + } + #[allow(clippy::too_many_arguments)] pub fn new( dataset: Dataset, @@ -528,13 +569,30 @@ impl IvfIndexBuilder .as_ref() .and_then(|p| p.precomputed_shuffle_buffers.as_ref()) { - Some((uri, _)) => { + Some((uri, files)) => { + if let Some(reader) = self + .try_open_precomputed_v3_shuffle_reader(uri, files) + .await? + { + log::info!("shuffle with precomputed V3 shuffle files from {}", uri); + self.shuffle_reader = Some(reader); + return Ok(()); + } + let uri = to_local_path(uri); - // the uri points to data directory, - // so need to trim the "data" suffix for reading the dataset - let uri = uri.trim_end_matches("data"); + let uri = if StdPath::new(&uri) + .file_name() + .is_some_and(|name| name == "data") + { + StdPath::new(&uri) + .parent() + .map(|path| path.to_string_lossy().to_string()) + .unwrap_or(uri) + } else { + uri + }; log::info!("shuffle with precomputed shuffle buffers from {}", uri); - let ds = Dataset::open(uri).await?; + let ds = Dataset::open(&uri).await?; ds.scan().try_into_stream().await? } _ => {