diff --git a/python/cuvs_bench/cuvs_bench/backends/__init__.py b/python/cuvs_bench/cuvs_bench/backends/__init__.py index 3e9688dbe4..a09e6fd1fe 100644 --- a/python/cuvs_bench/cuvs_bench/backends/__init__.py +++ b/python/cuvs_bench/cuvs_bench/backends/__init__.py @@ -25,10 +25,12 @@ ) from .cpp_gbench import CppGoogleBenchmarkBackend +from .opensearch import OpenSearchBackend # Auto-register built-in backends _registry = get_registry() _registry.register("cpp_gbench", CppGoogleBenchmarkBackend) +_registry.register("opensearch", OpenSearchBackend) __all__ = [ # Base classes and data structures @@ -43,4 +45,5 @@ "get_backend", # Built-in backends "CppGoogleBenchmarkBackend", + "OpenSearchBackend", ] diff --git a/python/cuvs_bench/cuvs_bench/backends/opensearch.py b/python/cuvs_bench/cuvs_bench/backends/opensearch.py new file mode 100644 index 0000000000..750cf961ae --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/backends/opensearch.py @@ -0,0 +1,941 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# + +""" +OpenSearch benchmark backend for cuvs-bench supporting faiss and lucene +engines for approximate nearest-neighbor search. + +It also supports the remote index build service (OpenSearch 3.0+), +which offloads Faiss HNSW graph construction to a GPU-accelerated external service. +https://docs.opensearch.org/latest/vector-search/remote-index-build/ +""" + +import os +import time +from typing import Any, Dict, List, Optional, Tuple, Union + +import numpy as np + +from .base import BenchmarkBackend, BuildResult, Dataset, SearchResult +from ..orchestrator.config_loaders import ( + ConfigLoader, + DatasetConfig, + BenchmarkConfig, + IndexConfig, +) + + +class OpenSearchConfigLoader(ConfigLoader): + """ + Configuration loader for the OpenSearch backend. + + Reads opensearch-prefixed algorithm YAML files from the standard config + directory. The shared :class:`ConfigLoader` base handles dataset loading + and parameter expansion before calling into this loader's OpenSearch- + specific hooks. This built-in loader is registered automatically when + :mod:`cuvs_bench.orchestrator` is imported. + """ + + def __init__(self, config_path: Optional[Union[str, os.PathLike]] = None): + """ + Initialize the config loader. + + Parameters + ---------- + config_path : Optional[Union[str, os.PathLike]] + Path to the config directory. If None, uses the default path + bundled with cuvs-bench. + """ + self.config_path = ( + os.fspath(config_path) + if config_path is not None + else os.path.join( + os.path.dirname(os.path.realpath(__file__)), "../config" + ) + ) + + @property + def backend_type(self) -> str: + return "opensearch" + + def _discover_algo_groups( + self, + dataset_conf: dict, + dataset: str, + dataset_path: str, + **kwargs, + ): + """Discover OpenSearch algorithm groups to benchmark.""" + algo_files = [ + algo_file + for algo_file in self.gather_algorithm_configs( + self.config_path, kwargs.get("algorithm_configuration") + ) + if os.path.basename(algo_file).startswith("opensearch") + ] + + algorithms = kwargs.get("algorithms") + allowed_algos = ( + [a.strip() for a in algorithms.split(",")] if algorithms else None + ) + allowed_groups = ( + [g.strip() for g in kwargs["groups"].split(",")] + if kwargs.get("groups") + else None + ) + + result = [] + + for algo_file in algo_files: + algo_yaml = self.load_yaml_file(algo_file) + algo_name = algo_yaml.get("name", "") + if allowed_algos and algo_name not in allowed_algos: + continue + + groups: Dict[str, Any] = algo_yaml.get("groups", {}) + if allowed_groups: + groups = { + k: v for k, v in groups.items() if k in allowed_groups + } + + for group_name, group_conf in groups.items(): + result.append((algo_name, group_name, group_conf, {})) + + return result + + def _build_benchmark_configs( + self, + dataset_config: DatasetConfig, + dataset_conf: dict, + dataset: str, + dataset_path: str, + expanded_groups: List[Tuple[str, str, dict, List, List, dict]], + **kwargs, + ) -> List[BenchmarkConfig]: + """ + Build OpenSearch benchmark configurations from expanded param combos. + + The base ConfigLoader has already expanded each group's build and + search parameter grids before calling this hook. + """ + host = kwargs.get("host", "localhost") + port = kwargs.get("port", 9200) + + # Connection and remote-build kwargs forwarded verbatim to backend_config + _conn_keys = ( + "username", + "password", + "use_ssl", + "verify_certs", + "bulk_batch_size", + # Remote Index Build (OpenSearch 3.0+, faiss engine only) + "remote_index_build", + "remote_build_size_min", + "remote_build_timeout", + ) + conn_kwargs = {k: kwargs[k] for k in _conn_keys if k in kwargs} + + tune_mode = kwargs.get("_tune_mode", False) + tune_build_params = kwargs.get("_tune_build_params") + tune_search_params = kwargs.get("_tune_search_params") + + benchmark_configs: List[BenchmarkConfig] = [] + + for ( + algo_name, + group_name, + _group_conf, + build_combos, + search_combos, + _group_meta, + ) in expanded_groups: + if tune_mode and tune_build_params is not None: + actual_build = [tune_build_params] + actual_search = ( + [tune_search_params] if tune_search_params else [{}] + ) + else: + actual_build = build_combos + actual_search = search_combos + + for build_param in actual_build: + prefix = ( + algo_name + if group_name == "base" + else f"{algo_name}_{group_name}" + ) + parts = [prefix] + [f"{k}{v}" for k, v in build_param.items()] + index_label = ".".join(parts) + + # OpenSearch index names must be lowercase with no dots + os_index_name = index_label.replace(".", "_").lower() + index_file = os.path.join( + dataset_path, dataset, "index", index_label + ) + + index_cfg = IndexConfig( + name=index_label, + algo=algo_name, + build_param=build_param, + search_params=actual_search, + file=index_file, + ) + + engine = "lucene" + if "faiss" in algo_name: + engine = "faiss" + + backend_cfg: Dict[str, Any] = { + "name": index_label, + "host": host, + "port": port, + "index_name": os_index_name, + "engine": engine, + "algo": algo_name, + "requires_network": True, + **conn_kwargs, + } + + benchmark_configs.append( + BenchmarkConfig( + indexes=[index_cfg], + backend_config=backend_cfg, + ) + ) + + return benchmark_configs + + +# Mapping from cuvs-bench distance metric names to OpenSearch space_type +_DISTANCE_TO_SPACE_TYPE: Dict[str, str] = { + "euclidean": "l2", + "l2": "l2", + "inner_product": "innerproduct", + "innerproduct": "innerproduct", + "cosine": "cosinesimil", + "cosinesimil": "cosinesimil", + "angular": "cosinesimil", +} + +_DEFAULT_REMOTE_BUILD_TIMEOUT = 30 * 60 +_REMOTE_BUILD_START_TIMEOUT = 30.0 + +_REMOTE_BUILD_MERGE_OPS = "remote_index_build_current_merge_operations" +_REMOTE_BUILD_FLUSH_OPS = "remote_index_build_current_flush_operations" +_REMOTE_BUILD_REQUEST_SUCCESS_COUNT = "build_request_success_count" +_REMOTE_BUILD_REQUEST_FAILURE_COUNT = "build_request_failure_count" +_REMOTE_BUILD_SUCCESS_COUNT = "index_build_success_count" +_REMOTE_BUILD_FAILURE_COUNT = "index_build_failure_count" + +_REMOTE_BUILD_BUILD_STAT_KEYS = ( + _REMOTE_BUILD_MERGE_OPS, + _REMOTE_BUILD_FLUSH_OPS, +) +_REMOTE_BUILD_CLIENT_STAT_KEYS = ( + _REMOTE_BUILD_REQUEST_SUCCESS_COUNT, + _REMOTE_BUILD_REQUEST_FAILURE_COUNT, + _REMOTE_BUILD_SUCCESS_COUNT, + _REMOTE_BUILD_FAILURE_COUNT, +) +_REMOTE_BUILD_STAT_KEYS = ( + *_REMOTE_BUILD_CLIENT_STAT_KEYS, + *_REMOTE_BUILD_BUILD_STAT_KEYS, +) + + +class OpenSearchBackend(BenchmarkBackend): + """ + Benchmark backend for OpenSearch's k-NN plugin. + + Supports the faiss (HNSW / IVF) and lucene (HNSW) engines. Vectors are + bulk-indexed as ``knn_vector`` fields and retrieved via the standard + ``knn`` query type. + + Requires ``opensearch-py`` Python package. + + Parameters + ---------- + config : Dict[str, Any] + Backend configuration produced by :class:`OpenSearchConfigLoader`. + Recognized keys: + + Required: + - ``name`` – index label (e.g. ``"opensearch_faiss_hnsw.m16.ef_construction100"``) + - ``index_name`` – OpenSearch index name (lowercase, no dots) + - ``engine`` – ``"faiss"`` or ``"lucene"`` + - ``algo`` – algorithm name (e.g. ``"opensearch_faiss_hnsw"``) + + Optional: + - ``host`` – hostname (default: ``"localhost"``) + - ``port`` – port (default: ``9200``) + - ``username`` – HTTP basic auth user (default: ``"admin"``) + - ``password`` – HTTP basic auth password (default: ``"admin"``) + - ``use_ssl`` – use HTTPS (default: ``False``) + - ``verify_certs`` – verify SSL certs (default: ``False``) + - ``bulk_batch_size`` – vectors per bulk request (default: ``500``) + - ``requires_network`` – trigger network pre-flight check (default: ``True``) + - ``remote_index_build`` – set ``index.knn.remote_index_build.enabled=true`` + on the index at creation time, opting it into the GPU build path (default: ``False``). + - ``remote_build_size_min`` – minimum segment size to trigger GPU build, e.g. ``"1kb"`` + (default: OpenSearch's default) + - ``remote_build_timeout`` – seconds to wait for GPU build (default: ``1800``) + + """ + + def __init__(self, config: Dict[str, Any]): + super().__init__(config) + self.__client = None + + @property + def algo(self) -> str: + return self.config.get("algo", "opensearch") + + @property + def _client(self): + if self.__client is None: + try: + from opensearchpy import OpenSearch + except ImportError as e: + raise ImportError( + "`opensearch-py` is required for the OpenSearch backend in cuvs-bench.\n\n" + "Install it with: `pip install opensearch-py`" + ) from e + + host = self.config.get("host", "localhost") + port = self.config.get("port", 9200) + username = self.config.get("username", "admin") + password = self.config.get("password", "admin") + use_ssl = self.config.get("use_ssl", False) + verify_certs = self.config.get("verify_certs", False) + + self.__client = OpenSearch( + hosts=[{"host": host, "port": port}], + http_auth=(username, password) if username else None, + use_ssl=use_ssl, + verify_certs=verify_certs, + timeout=None, + ) + return self.__client + + def _check_network_available(self) -> bool: + try: + self._client.cluster.health(request_timeout=5) + return True + except ImportError: + return False + except Exception: + return False + + def initialize(self) -> None: + """Eagerly open the connection to OpenSearch.""" + _ = self._client + + def cleanup(self) -> None: + """Close the OpenSearch connection.""" + if self.__client is not None: + self.__client.close() + self.__client = None + + def _build_index_mapping( + self, + dims: int, + engine: str, + space_type: str, + build_param: Dict[str, Any], + remote_index_build: bool = False, + remote_build_size_min: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Construct the OpenSearch index mapping dict for k-NN. + + The mapping enables the k-NN plugin and configures the vector field + with the chosen engine and method parameters. + + When ``remote_index_build=True``, ``index.knn.remote_index_build.enabled`` + is set to ``true`` in the index settings, opting qualifying segments into + the GPU build path. If ``remote_build_size_min`` is provided it overrides + ``index.knn.remote_index_build.size.min``; otherwise OpenSearch's default + applies. The cluster-level infrastructure is assumed to be pre-configured + externally. + """ + m = build_param.get("m", 16) + ef_construction = build_param.get("ef_construction", 100) + + if engine == "lucene": + method_name = "hnsw" + method_params: Dict[str, Any] = { + "m": m, + "ef_construction": ef_construction, + } + elif engine == "faiss": + faiss_method = build_param.get("faiss_method", "hnsw") + method_name = faiss_method + if faiss_method == "hnsw": + method_params = { + "m": m, + "ef_construction": ef_construction, + } + elif faiss_method == "ivf": + method_params = {"nlist": build_param.get("nlist", 4)} + else: + raise ValueError( + f"Unsupported faiss_method {faiss_method!r}. " + "Use 'hnsw' or 'ivf'." + ) + else: + raise ValueError( + f"Unknown OpenSearch k-NN engine {engine!r}. " + "Use 'faiss' or 'lucene'." + ) + + index_settings = { + "knn": True, + "number_of_shards": build_param.get("number_of_shards", 1), + "number_of_replicas": build_param.get("number_of_replicas", 0), + } + if remote_index_build: + if engine != "faiss": + raise ValueError( + "Remote Index Build only supports the faiss engine " + f"(got engine={engine!r}). " + "Use algorithms='opensearch_faiss_hnsw'." + ) + index_settings["knn.remote_index_build.enabled"] = True + if remote_build_size_min: + index_settings["knn.remote_index_build.size.min"] = ( + remote_build_size_min + ) + + return { + "settings": {"index": index_settings}, + "mappings": { + "properties": { + "vector": { + "type": "knn_vector", + "dimension": dims, + "method": { + "name": method_name, + "space_type": space_type, + "engine": engine, + "parameters": method_params, + }, + } + } + }, + } + + def _bulk_index( + self, + index_name: str, + vectors: np.ndarray, + bulk_batch_size: int, + ) -> None: + """ + Bulk-index vectors into index_name using the helpers API. + + Vectors are stored under the ``"vector"`` field with their integer + row index as the document ``_id`` so they can be mapped back to + dataset and ground-truth neighbor IDs. + """ + from opensearchpy.helpers import streaming_bulk + + def _doc_generator(): + for i, vec in enumerate(vectors): + yield { + "_index": index_name, + "_id": str(i), + "vector": vec.tolist(), + } + + total = vectors.shape[0] + indexed = 0 + for ok, info in streaming_bulk( + self._client, + _doc_generator(), + chunk_size=bulk_batch_size, + request_timeout=120, + ): + if not ok: + raise RuntimeError(f"Failed to index document: {info}") + indexed += 1 + milestone = max(total // 10, 1) + if indexed % milestone == 0: + print( + f" Indexed {indexed} / {total} vectors ({100 * indexed // total}%)" + ) + print(f" Indexed all {total} vectors") + + def _get_knn_remote_build_stats(self) -> dict: + """ + Query the kNN stats API for cluster-wide remote build counters. + + The remote build stats live under a nested structure in the response: + nodes..remote_vector_index_build_stats.build_stats.* + nodes..remote_vector_index_build_stats.client_stats.* + """ + resp = self._client.knn.stats() + totals = {key: 0 for key in _REMOTE_BUILD_STAT_KEYS} + for node_stats in resp.get("nodes", {}).values(): + remote = node_stats.get("remote_vector_index_build_stats", {}) + build = remote.get("build_stats", {}) + client = remote.get("client_stats", {}) + for key in _REMOTE_BUILD_BUILD_STAT_KEYS: + totals[key] += build.get(key, 0) + for key in _REMOTE_BUILD_CLIENT_STAT_KEYS: + totals[key] += client.get(key, 0) + return totals + + def _format_remote_build_stats(self, stats: Dict[str, int]) -> str: + return ", ".join( + f"{key}={stats.get(key, 0)}" for key in _REMOTE_BUILD_STAT_KEYS + ) + + def _remote_build_delta( + self, stats: Dict[str, int], initial_stats: Dict[str, int], key: str + ) -> int: + return stats.get(key, 0) - initial_stats.get(key, 0) + + def _flush_index(self, index_name: str) -> None: + resp = self._client.indices.flush( + index=index_name, request_timeout=None + ) + shards = resp.get("_shards", {}) + total = shards.get("total", 0) + successful = shards.get("successful", 0) + failed = shards.get("failed", 0) + + if failed or successful != total: + raise RuntimeError( + f"Flush did not complete on all shards for {index_name}: {resp}" + ) + + def _resolve_index_name(self, index_cfg: IndexConfig) -> str: + return self.config.get( + "index_name", index_cfg.name.replace(".", "_").lower() + ) + + def _failed_build_result( + self, error_message: str, build_params: Optional[Dict[str, Any]] = None + ) -> BuildResult: + return BuildResult( + index_path="", + build_time_seconds=0.0, + index_size_bytes=0, + algorithm=self.algo, + build_params=build_params or {}, + success=False, + error_message=error_message, + ) + + def _failed_search_result( + self, + k: int, + error_message: str, + search_params: Optional[List[Dict[str, Any]]] = None, + ) -> SearchResult: + return SearchResult( + neighbors=np.zeros((0, k), dtype=np.int64), + distances=np.zeros((0, k), dtype=np.float32), + search_time_ms=0.0, + queries_per_second=0.0, + recall=0.0, + algorithm=self.algo, + search_params=search_params or [], + success=False, + error_message=error_message, + ) + + def _wait_for_remote_build( + self, + initial_stats: Dict[str, int], + timeout: int = _DEFAULT_REMOTE_BUILD_TIMEOUT, + poll_interval: int = 5, + no_activity_timeout: float = _REMOTE_BUILD_START_TIMEOUT, + ) -> None: + """ + Poll the kNN stats API until all submitted remote GPU builds complete. + + *initial_stats* should be snapshotted from + ``_get_knn_remote_build_stats()`` immediately before ingestion starts. + + Raises ``RuntimeError`` if the stats report a failed remote build or if + no remote build activity starts before *no_activity_timeout*. Raises + ``TimeoutError`` if submitted builds do not complete within *timeout* + seconds. + """ + start = time.perf_counter() + deadline = start + timeout + no_activity_deadline = start + min(no_activity_timeout, timeout) + observed_activity = False + stats: Optional[Dict[str, int]] = None + + while time.perf_counter() < deadline: + stats = self._get_knn_remote_build_stats() + request_failure_delta = self._remote_build_delta( + stats, initial_stats, _REMOTE_BUILD_REQUEST_FAILURE_COUNT + ) + build_failure_delta = self._remote_build_delta( + stats, initial_stats, _REMOTE_BUILD_FAILURE_COUNT + ) + if request_failure_delta > 0 or build_failure_delta > 0: + raise RuntimeError( + "GPU build failed via kNN stats API: " + f"{self._format_remote_build_stats(stats)}" + ) + + submitted_delta = self._remote_build_delta( + stats, initial_stats, _REMOTE_BUILD_REQUEST_SUCCESS_COUNT + ) + completed_delta = self._remote_build_delta( + stats, initial_stats, _REMOTE_BUILD_SUCCESS_COUNT + ) + current_ops = ( + stats[_REMOTE_BUILD_MERGE_OPS] + stats[_REMOTE_BUILD_FLUSH_OPS] + ) + is_idle = current_ops == 0 + if submitted_delta > 0 or completed_delta > 0 or current_ops > 0: + observed_activity = True + + if ( + submitted_delta > 0 + and completed_delta >= submitted_delta + and is_idle + ): + return + + if ( + not observed_activity + and time.perf_counter() >= no_activity_deadline + ): + raise RuntimeError( + "No remote GPU build was observed via kNN stats API. " + "OpenSearch may not have scheduled one because the segment " + "size did not meet its default remote build threshold. " + "Set remote_build_size_min lower if this benchmark must " + "force remote builds. " + f"Stats: {self._format_remote_build_stats(stats)}" + ) + time.sleep(poll_interval) + + stats_msg = ( + self._format_remote_build_stats(stats) + if stats is not None + else "unavailable" + ) + raise TimeoutError( + f"GPU build not confirmed after {timeout}s: " + f"last stats: {stats_msg}" + ) + + def build( + self, + dataset: Dataset, + indexes: List[IndexConfig], + force: bool = False, + dry_run: bool = False, + ) -> BuildResult: + """ + Build an OpenSearch k-NN index from the dataset's training vectors. + + Creates an index with k-NN plugin mapping and bulk-indexes all training + vectors. If the index already exists and ``force=False`` the build is + skipped. + + Build time measures ingest and flush. When ``remote_index_build=True`` + it also includes waiting for GPU build confirmation via the kNN stats + API. The final index refresh runs after build timing is recorded. + + Parameters + ---------- + dataset : Dataset + Must have either non-empty ``training_vectors`` or a valid + ``base_file`` path. + indexes : List[IndexConfig] + The first element provides the build parameters. + force : bool + Delete and recreate the index if it already exists. + dry_run : bool + Log what would happen without making any changes. + + Returns + ------- + BuildResult + """ + skip = self._pre_flight_check() + if skip: + return self._failed_build_result( + f"pre-flight check failed: {skip}" + ) + + if not indexes: + return self._failed_build_result("No indexes provided") + + index_cfg = indexes[0] + build_param = index_cfg.build_param + index_name = self._resolve_index_name(index_cfg) + engine = self.config.get("engine", "lucene") + bulk_batch_size = int(self.config.get("bulk_batch_size", 500)) + remote_index_build = bool(self.config.get("remote_index_build", False)) + remote_build_size_min = self.config.get("remote_build_size_min") + + if dry_run: + print( + f"[dry_run] Would build OpenSearch index '{index_name}' " + f"(engine={engine}, remote_index_build={remote_index_build}, build_param={build_param})" + ) + + return BuildResult( + index_path=index_name, + build_time_seconds=0.0, + index_size_bytes=0, + algorithm=self.algo, + build_params=build_param, + success=True, + ) + + # Handle existing index + if self._client.indices.exists(index=index_name): + if force: + self._client.indices.delete(index=index_name) + else: + return BuildResult( + index_path=index_name, + build_time_seconds=0.0, + index_size_bytes=0, + algorithm=self.algo, + build_params=build_param, + success=True, + ) + + # Dataset handles lazy vector loading from base_file when needed. + base_vectors = dataset.training_vectors + + if base_vectors.size == 0: + return self._failed_build_result( + "No training vectors available. Provide dataset.training_vectors " + "or a valid dataset.base_file path.", + build_params=build_param, + ) + + dims = base_vectors.shape[1] + space_type = _DISTANCE_TO_SPACE_TYPE.get(dataset.distance_metric, "l2") + + # Create index + mapping = self._build_index_mapping( + dims, + engine, + space_type, + build_param, + remote_index_build, + remote_build_size_min, + ) + self._client.indices.create(index=index_name, body=mapping) + + if remote_index_build: + remote_timeout = int( + self.config.get( + "remote_build_timeout", _DEFAULT_REMOTE_BUILD_TIMEOUT + ) + ) + pre_ingest_stats = self._get_knn_remote_build_stats() + + # Bulk index, then flush segments before timing build completion. + t0 = time.perf_counter() + self._bulk_index(index_name, base_vectors, bulk_batch_size) + self._flush_index(index_name) + if remote_index_build: + self._wait_for_remote_build( + initial_stats=pre_ingest_stats, + timeout=remote_timeout, + ) + build_time = time.perf_counter() - t0 + + self._client.indices.refresh(index=index_name, request_timeout=120) + + # Index size + stats = self._client.indices.stats(index=index_name) + index_size_bytes = int( + stats["_all"]["total"]["store"]["size_in_bytes"] + ) + + return BuildResult( + index_path=index_name, + build_time_seconds=build_time, + index_size_bytes=index_size_bytes, + algorithm=self.algo, + build_params=build_param, + metadata={ + "engine": engine, + "space_type": space_type, + "remote_index_build": remote_index_build, + }, + success=True, + ) + + def search( + self, + dataset: Dataset, + indexes: List[IndexConfig], + k: int, + batch_size: int = 10000, + mode: str = "latency", + force: bool = False, + search_threads: Optional[int] = None, + dry_run: bool = False, + ) -> SearchResult: + """ + Search the OpenSearch k-NN index for nearest neighbors. + + Iterates over every search-parameter combination defined in the index + config, updating the index-level ``ef_search`` setting between runs. + Metrics (QPS, latency) are collected per parameter set and stored in + ``SearchResult.metadata["per_search_param_results"]``. + + The *neighbors* and *distances* arrays in the returned result reflect + the **last** search-parameter combination (highest ef_search by + convention), while *queries_per_second* is the average across all + parameter combinations. This backend returns ``recall=0.0``; the + shared orchestrator path computes recall from the returned neighbors + and dataset ground truth. + + Parameters + ---------- + dataset : Dataset + Must have either non-empty ``query_vectors`` or a valid + ``query_file`` path. + indexes : List[IndexConfig] + The first element is used; its ``search_params`` list defines the + ef_search values to sweep. + k : int + Number of neighbors to retrieve per query. + batch_size : int + Unused; included for interface compatibility. + mode : str + ``"latency"`` or ``"throughput"``; informational for this backend. + force : bool + Unused; included for interface compatibility. + search_threads : Optional[int] + Unused; OpenSearch manages its own threading. + dry_run : bool + Log what would happen without making any OpenSearch calls. + + Returns + ------- + SearchResult + """ + skip = self._pre_flight_check() + if skip: + return self._failed_search_result( + k, f"pre-flight check failed: {skip}" + ) + + if not indexes: + return self._failed_search_result(k, "No indexes provided") + + index_cfg = indexes[0] + index_name = self._resolve_index_name(index_cfg) + engine = self.config.get("engine", "lucene") + search_params_list = index_cfg.search_params or [{}] + + if dry_run: + print( + f"[dry_run] Would search OpenSearch index '{index_name}' with {len(search_params_list)} param set(s) (k={k})" + ) + + return SearchResult( + neighbors=np.zeros((0, k), dtype=np.int64), + distances=np.zeros((0, k), dtype=np.float32), + search_time_ms=0.0, + queries_per_second=0.0, + recall=0.0, + algorithm=self.algo, + search_params=search_params_list, + success=True, + ) + + # Dataset handles lazy loading from query files when needed. + query_vectors = dataset.query_vectors + + if query_vectors.size == 0: + return self._failed_search_result( + k, + "No query vectors available. Provide dataset.query_vectors " + "or a valid dataset.query_file path.", + search_params=search_params_list, + ) + + n_queries = query_vectors.shape[0] + + # Run search for each search-parameter combination + per_param_results: List[Dict[str, Any]] = [] + last_neighbors = np.full((n_queries, k), -1, dtype=np.int64) + last_distances = np.zeros((n_queries, k), dtype=np.float32) + + for sp in search_params_list: + ef_search = sp.get("ef_search", 100) + + if engine == "faiss": + self._client.indices.put_settings( + index=index_name, + body={"index.knn.algo_param.ef_search": ef_search}, + ) + + neighbors = np.full((n_queries, k), -1, dtype=np.int64) + distances = np.zeros((n_queries, k), dtype=np.float32) + + t0 = time.perf_counter() + for i, q_vec in enumerate(query_vectors): + body: Dict[str, Any] = { + "size": k, + "query": { + "knn": { + "vector": { + "vector": q_vec.tolist(), + "k": k, + } + } + }, + } + resp = self._client.search(index=index_name, body=body) + hits = resp["hits"]["hits"] + for j, hit in enumerate(hits[:k]): + neighbors[i, j] = int(hit["_id"]) + distances[i, j] = float(hit["_score"]) + + elapsed = time.perf_counter() - t0 + qps = n_queries / elapsed if elapsed > 0 else 0.0 + + per_param_results.append( + { + "search_params": sp, + "search_time_ms": elapsed * 1000.0, + "queries_per_second": qps, + } + ) + last_neighbors = neighbors + last_distances = distances + + # Aggregate across all search-param combinations + avg_qps = float( + np.mean([r["queries_per_second"] for r in per_param_results]) + ) + total_search_time_ms = float( + sum(r["search_time_ms"] for r in per_param_results) + ) + + return SearchResult( + neighbors=last_neighbors, + distances=last_distances, + search_time_ms=total_search_time_ms, + queries_per_second=avg_qps, + recall=0.0, + algorithm=self.algo, + search_params=search_params_list, + metadata={ + "engine": engine, + "per_search_param_results": per_param_results, + }, + success=True, + ) diff --git a/python/cuvs_bench/cuvs_bench/backends/search_spaces.py b/python/cuvs_bench/cuvs_bench/backends/search_spaces.py index 5a1848294c..bca48d792e 100644 --- a/python/cuvs_bench/cuvs_bench/backends/search_spaces.py +++ b/python/cuvs_bench/cuvs_bench/backends/search_spaces.py @@ -158,6 +158,18 @@ "ef": {"type": "int", "min": 10, "max": 1000}, }, }, + # ========================================================================= + # OpenSearch HNSW (faiss engine) + # ========================================================================= + "opensearch_faiss_hnsw": { + "build": { + "m": {"type": "int", "min": 4, "max": 64}, + "ef_construction": {"type": "int", "min": 32, "max": 1024}, + }, + "search": { + "ef_search": {"type": "int", "min": 10, "max": 2048}, + }, + }, } diff --git a/python/cuvs_bench/cuvs_bench/config/algos/opensearch_faiss_hnsw.yaml b/python/cuvs_bench/cuvs_bench/config/algos/opensearch_faiss_hnsw.yaml new file mode 100644 index 0000000000..b0093673f3 --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/config/algos/opensearch_faiss_hnsw.yaml @@ -0,0 +1,27 @@ +# OpenSearch k-NN benchmark configuration – faiss HNSW engine +# +# Uses OpenSearch's faiss engine with the HNSW method. +# +# Build parameters +# ---------------- +# m Number of bidirectional graph edges per node. +# ef_construction Candidate list size during construction. +# +# Search parameters +# ----------------- +# ef_search Candidate list size at query time (updated via index settings). + +name: opensearch_faiss_hnsw +groups: + base: + build: + m: [32, 64, 96] + ef_construction: [64, 128, 256] + search: + ef_search: [10, 20, 40, 60, 80, 120, 200, 400, 600, 800] + test: + build: + m: [16] + ef_construction: [100] + search: + ef_search: [50, 100] diff --git a/python/cuvs_bench/cuvs_bench/orchestrator/__init__.py b/python/cuvs_bench/cuvs_bench/orchestrator/__init__.py index ecbcc94059..7600101439 100644 --- a/python/cuvs_bench/cuvs_bench/orchestrator/__init__.py +++ b/python/cuvs_bench/cuvs_bench/orchestrator/__init__.py @@ -11,6 +11,7 @@ register_config_loader, get_config_loader, ) +from ..backends.opensearch import OpenSearchConfigLoader __all__ = [ # Main orchestrator @@ -20,6 +21,7 @@ "BenchmarkConfig", "DatasetConfig", "CppGBenchConfigLoader", + "OpenSearchConfigLoader", # Registry functions "get_backend_class", "list_backends", @@ -35,6 +37,7 @@ def _register_builtin_loaders(): """Register built-in config loaders.""" register_config_loader("cpp_gbench", CppGBenchConfigLoader) + register_config_loader("opensearch", OpenSearchConfigLoader) # Auto-register when module is imported diff --git a/python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py b/python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py index 8e4022b0f3..580291c2f0 100644 --- a/python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py +++ b/python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py @@ -223,57 +223,65 @@ def _run_sweep( for config in benchmark_configs: # Create backend instance backend = self.backend_class(config.backend_config) - - if build: - # Pass ALL indexes at once - ONE C++ command builds all - build_result = backend.build( - dataset=bench_dataset, - indexes=config.indexes, - force=force, - dry_run=dry_run, - ) - results.append(build_result) - - if not build_result.success: - print( - f"Build failed for {config.index_name}: {build_result.error_message}" + try: + backend.initialize() + + if build: + # Pass ALL indexes at once - ONE C++ command builds all + build_result = backend.build( + dataset=bench_dataset, + indexes=config.indexes, + force=force, + dry_run=dry_run, ) - continue - - if search: - # Pass ALL indexes at once - ONE C++ command searches all - # Each index has its own search_params list - # Total benchmarks = sum(len(idx.search_params) for idx in indexes) - search_result = backend.search( - dataset=bench_dataset, - indexes=config.indexes, - k=count, - batch_size=batch_size, - mode=search_mode, - force=force, - search_threads=search_threads, - dry_run=dry_run, - ) + results.append(build_result) - # Compute recall for backends that return actual neighbors. - # The C++ backend computes recall in the subprocess and returns - # empty neighbors, so this is skipped for it. - # Note: The recall computation relies on empty neighbors to - # distinguish backends that already computed recall. - if search_result.success and search_result.neighbors.size > 0: - gt = bench_dataset.groundtruth_neighbors - if gt is not None: - search_result.recall = compute_recall( - search_result.neighbors, gt, count + if not build_result.success: + print( + f"Build failed for {config.index_name}: {build_result.error_message}" ) - - results.append(search_result) - - if not search_result.success: - print( - f"Search failed for {config.index_name}: {search_result.error_message}" + continue + + if search: + # Pass ALL indexes at once - ONE C++ command searches all + # Each index has its own search_params list + # Total benchmarks = sum(len(idx.search_params) for idx in indexes) + search_result = backend.search( + dataset=bench_dataset, + indexes=config.indexes, + k=count, + batch_size=batch_size, + mode=search_mode, + force=force, + search_threads=search_threads, + dry_run=dry_run, ) + # Compute recall for backends that return actual neighbors. + # The C++ backend computes recall in the subprocess and returns + # empty neighbors, so this is skipped for it. + # Empty neighbors or nonzero recall indicate that the backend + # already handled recall itself. + if ( + search_result.success + and search_result.neighbors.size > 0 + and search_result.recall == 0.0 + ): + gt = bench_dataset.groundtruth_neighbors + if gt is not None: + search_result.recall = compute_recall( + search_result.neighbors, gt, count + ) + + results.append(search_result) + + if not search_result.success: + print( + f"Search failed for {config.index_name}: {search_result.error_message}" + ) + finally: + backend.cleanup() + return results def _run_tune( @@ -564,40 +572,50 @@ def _run_trial( "append_results": append_results, } backend = self.backend_class(backend_config) + try: + backend.initialize() - result = None + result = None - if build: - result = backend.build( - dataset=bench_dataset, - indexes=config.indexes, - force=force, - dry_run=dry_run, - ) - if not result.success: - return result + if build: + result = backend.build( + dataset=bench_dataset, + indexes=config.indexes, + force=force, + dry_run=dry_run, + ) + if not result.success: + return result - if search: - result = backend.search( - dataset=bench_dataset, - indexes=config.indexes, - k=count, - batch_size=batch_size, - mode=search_mode, - force=force, - search_threads=search_threads, - dry_run=dry_run, - ) + if search: + result = backend.search( + dataset=bench_dataset, + indexes=config.indexes, + k=count, + batch_size=batch_size, + mode=search_mode, + force=force, + search_threads=search_threads, + dry_run=dry_run, + ) - # Compute recall for backends that return actual neighbors. - # Note: The recall computation relies on empty neighbors to - # distinguish backends that already computed recall. - if result.success and result.neighbors.size > 0: - gt = bench_dataset.groundtruth_neighbors - if gt is not None: - result.recall = compute_recall(result.neighbors, gt, count) + # Compute recall for backends that return actual neighbors. + # Empty neighbors or nonzero recall indicate that the backend + # already handled recall itself. + if ( + result.success + and result.neighbors.size > 0 + and result.recall == 0.0 + ): + gt = bench_dataset.groundtruth_neighbors + if gt is not None: + result.recall = compute_recall( + result.neighbors, gt, count + ) - return result + return result + finally: + backend.cleanup() def _create_dataset(self, dataset_config: DatasetConfig) -> Dataset: """ diff --git a/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py b/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py new file mode 100644 index 0000000000..a4f1c300e0 --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/tests/test_opensearch.py @@ -0,0 +1,492 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# +""" +Tests for the OpenSearch benchmark backend. +""" + +import os +from urllib.parse import urlparse + +import numpy as np +import pytest +import requests + +pytest.importorskip("opensearchpy") + +from cuvs_bench.backends.base import Dataset +from cuvs_bench.backends.opensearch import ( + OpenSearchBackend, + OpenSearchConfigLoader, +) +from cuvs_bench.orchestrator.config_loaders import IndexConfig + + +def _make_backend(config_overrides: dict = None) -> OpenSearchBackend: + """Backend with no network requirement so pre-flight passes without a server.""" + config = { + "name": "test_index", + "index_name": "test_index", + "engine": "faiss", + "algo": "opensearch_faiss_hnsw", + **(config_overrides or {}), + } + return OpenSearchBackend(config) + + +def _make_dataset( + n_base: int = 10, n_queries: int = 5, dims: int = 4, k: int = 10 +) -> Dataset: + rng = np.random.default_rng(0) + base = rng.random((n_base, dims), dtype=np.float32) + queries = rng.random((n_queries, dims), dtype=np.float32) + dists = np.sum((queries[:, None, :] - base[None, :, :]) ** 2, axis=2) + groundtruth = np.argsort(dists, axis=1)[:, :k].astype(np.int32) + return Dataset( + name="test", + training_vectors=base, + query_vectors=queries, + groundtruth_neighbors=groundtruth, + ) + + +def _make_index_cfg(search_params: list = None) -> IndexConfig: + return IndexConfig( + name="test_index", + algo="opensearch_faiss_hnsw", + build_param={"m": 16, "ef_construction": 100}, + search_params=search_params or [{"ef_search": 50}, {"ef_search": 100}], + file="", + ) + + +def _cleanup_backend(backend: OpenSearchBackend, index_name: str) -> None: + """Delete the test index and close the client connection.""" + try: + if backend._client.indices.exists(index=index_name): + backend._client.indices.delete(index=index_name) + except Exception: + pass + backend.cleanup() + + +@pytest.fixture(scope="session") +def opensearch_url(): + """Skip integration tests when no live OpenSearch node is reachable.""" + url = os.environ.get("OPENSEARCH_URL", "http://localhost:9200").rstrip("/") + try: + requests.get(f"{url}/_cluster/health", timeout=2).raise_for_status() + except Exception: + pytest.skip("No OpenSearch node reachable") + return url + + +@pytest.fixture +def live_backend(opensearch_url): + """Backend connected to a live OpenSearch node; cleans up the test index on teardown.""" + parsed = urlparse(opensearch_url) + index_name = "cuvs_test_index" + backend = OpenSearchBackend( + { + "name": index_name, + "index_name": index_name, + "engine": "faiss", + "algo": "opensearch_faiss_hnsw", + "host": parsed.hostname, + "port": parsed.port or 9200, + "use_ssl": parsed.scheme == "https", + "verify_certs": False, + "requires_network": True, + } + ) + try: + yield backend + finally: + _cleanup_backend(backend, index_name) + + +@pytest.fixture +def config_dir(tmp_path): + """Config directory with a minimal dataset and algo definition.""" + (tmp_path / "datasets").mkdir() + (tmp_path / "datasets" / "datasets.yaml").write_text( + """\ +- name: test-ds + distance: euclidean + dims: 4 +""" + ) + (tmp_path / "algos").mkdir() + (tmp_path / "algos" / "opensearch_faiss_hnsw.yaml").write_text( + """\ +name: opensearch_faiss_hnsw +groups: + test: + build: + m: [16, 32] + ef_construction: [100, 200] + search: + ef_search: [50, 100] +""" + ) + return tmp_path + + +class TestOpenSearchConfigLoader: + def test_load_produces_correct_configs(self, config_dir): + loader = OpenSearchConfigLoader(config_path=config_dir) + dataset_config, benchmark_configs = loader.load( + dataset="test-ds", dataset_path="/data", groups="test" + ) + + assert dataset_config.name == "test-ds" + assert dataset_config.distance == "euclidean" + # m=[16,32] x ef_construction=[100,200] = 4 build combos + assert len(benchmark_configs) == 4 + bc = benchmark_configs[0] + assert bc.backend_config["engine"] == "faiss" + assert len(bc.indexes[0].search_params) == 2 # ef_search: [50, 100] + + def test_load_forwards_remote_build_kwargs(self, config_dir): + loader = OpenSearchConfigLoader(config_path=config_dir) + _, configs = loader.load( + dataset="test-ds", + dataset_path="/data", + remote_index_build=True, + remote_build_size_min="2kb", + remote_build_timeout=123, + remote_build_s3_endpoint="http://s3:9000", + ) + + bc = configs[0].backend_config + assert bc["remote_index_build"] is True + assert bc["remote_build_size_min"] == "2kb" + assert bc["remote_build_timeout"] == 123 + assert "remote_build_s3_endpoint" not in bc + + +class TestOpenSearchBackend: + def test_build_dry_run(self): + backend = _make_backend() + result = backend.build( + _make_dataset(), [_make_index_cfg()], dry_run=True + ) + assert result.success + assert result.index_path == backend.config["index_name"] + + def test_search_dry_run(self): + result = _make_backend().search( + _make_dataset(), [_make_index_cfg()], k=3, dry_run=True + ) + assert result.success + assert len(result.search_params) == 2 + + def test_remote_build_requires_faiss_engine(self): + backend = _make_backend({"engine": "lucene"}) + with pytest.raises(ValueError, match="faiss engine"): + backend._build_index_mapping( + dims=4, + engine="lucene", + space_type="l2", + build_param={}, + remote_index_build=True, + ) + + def test_remote_build_uses_opensearch_default_size_when_unspecified(self): + backend = _make_backend() + mapping = backend._build_index_mapping( + dims=4, + engine="faiss", + space_type="l2", + build_param={}, + remote_index_build=True, + ) + + settings = mapping["settings"]["index"] + assert settings["knn.remote_index_build.enabled"] is True + assert "knn.remote_index_build.size.min" not in settings + + def test_remote_build_size_min_overrides_default(self): + backend = _make_backend() + mapping = backend._build_index_mapping( + dims=4, + engine="faiss", + space_type="l2", + build_param={}, + remote_index_build=True, + remote_build_size_min="2kb", + ) + + settings = mapping["settings"]["index"] + assert settings["knn.remote_index_build.size.min"] == "2kb" + + def test_wait_for_remote_build_raises_on_failure_count(self): + backend = _make_backend() + initial_stats = { + "build_request_success_count": 0, + "build_request_failure_count": 0, + "index_build_success_count": 0, + "index_build_failure_count": 0, + "remote_index_build_current_merge_operations": 0, + "remote_index_build_current_flush_operations": 0, + } + backend._get_knn_remote_build_stats = lambda: { + "build_request_success_count": 1, + "build_request_failure_count": 0, + "index_build_success_count": 0, + "index_build_failure_count": 1, + "remote_index_build_current_merge_operations": 0, + "remote_index_build_current_flush_operations": 0, + } + + with pytest.raises(RuntimeError, match="GPU build failed"): + backend._wait_for_remote_build( + initial_stats=initial_stats, + timeout=1, + poll_interval=0, + ) + + def test_wait_for_remote_build_raises_when_no_build_observed(self): + backend = _make_backend() + initial_stats = { + "build_request_success_count": 0, + "build_request_failure_count": 0, + "index_build_success_count": 0, + "index_build_failure_count": 0, + "remote_index_build_current_merge_operations": 0, + "remote_index_build_current_flush_operations": 0, + } + backend._get_knn_remote_build_stats = lambda: { + "build_request_success_count": 0, + "build_request_failure_count": 0, + "index_build_success_count": 0, + "index_build_failure_count": 0, + "remote_index_build_current_merge_operations": 0, + "remote_index_build_current_flush_operations": 0, + } + + with pytest.raises(RuntimeError, match="No remote GPU build"): + backend._wait_for_remote_build( + initial_stats=initial_stats, + timeout=1, + poll_interval=0, + no_activity_timeout=0, + ) + + def test_wait_for_remote_build_waits_for_all_submitted_builds(self): + backend = _make_backend() + initial_stats = { + "build_request_success_count": 0, + "build_request_failure_count": 0, + "index_build_success_count": 0, + "index_build_failure_count": 0, + "remote_index_build_current_merge_operations": 0, + "remote_index_build_current_flush_operations": 0, + } + stats_sequence = iter( + [ + { + "build_request_success_count": 2, + "build_request_failure_count": 0, + "index_build_success_count": 1, + "index_build_failure_count": 0, + "remote_index_build_current_merge_operations": 0, + "remote_index_build_current_flush_operations": 1, + }, + { + "build_request_success_count": 3, + "build_request_failure_count": 0, + "index_build_success_count": 2, + "index_build_failure_count": 0, + "remote_index_build_current_merge_operations": 0, + "remote_index_build_current_flush_operations": 1, + }, + { + "build_request_success_count": 3, + "build_request_failure_count": 0, + "index_build_success_count": 3, + "index_build_failure_count": 0, + "remote_index_build_current_merge_operations": 0, + "remote_index_build_current_flush_operations": 0, + }, + ] + ) + backend._get_knn_remote_build_stats = lambda: next(stats_sequence) + + backend._wait_for_remote_build( + initial_stats=initial_stats, + timeout=1, + poll_interval=0, + ) + + def test_search_fails_without_query_vectors(self): + dataset = Dataset( + name="empty", + training_vectors=np.empty((0, 4), dtype=np.float32), + query_vectors=np.empty((0, 4), dtype=np.float32), + ) + result = _make_backend().search(dataset, [_make_index_cfg()], k=3) + assert not result.success + assert "No query vectors" in result.error_message + + +@pytest.fixture(scope="session") +def remote_build_env(opensearch_url): + """ + Skip remote index build tests if required env vars aren't set or services aren't reachable. + + Required environment variables: + BUILDER_URL URL of the remote index builder service + S3_BUCKET Bucket name used by OpenSearch for vector staging + S3_ACCESS_KEY S3 access key ID + S3_SECRET_KEY S3 secret access key + + Optional environment variables: + S3_ENDPOINT Custom S3 endpoint URL (omit to use real AWS S3) + S3_SESSION_TOKEN STS session token (required for temporary credentials) + """ + builder_url = os.environ.get("BUILDER_URL") + # S3_ENDPOINT is optional — omit it (or leave unset) to use real AWS S3. + s3_endpoint = os.environ.get("S3_ENDPOINT") or None + s3_bucket = os.environ.get("S3_BUCKET") + s3_region = ( + os.environ.get("S3_REGION") + or os.environ.get("AWS_DEFAULT_REGION") + or "us-east-1" + ) + s3_access_key = os.environ.get("S3_ACCESS_KEY") + s3_secret_key = os.environ.get("S3_SECRET_KEY") + s3_session_token = os.environ.get("S3_SESSION_TOKEN") or None + s3_prefix = "knn-indexes/" + + missing = [ + name + for name, val in { + "BUILDER_URL": builder_url, + "S3_BUCKET": s3_bucket, + "S3_ACCESS_KEY": s3_access_key, + "S3_SECRET_KEY": s3_secret_key, + }.items() + if not val + ] + if missing: + pytest.skip( + f"Remote index build tests require environment variables: {', '.join(missing)}" + ) + + try: + requests.get(builder_url, timeout=2) + except requests.exceptions.ConnectionError: + pytest.skip(f"Remote index builder not reachable at {builder_url}") + + if s3_endpoint is not None: + try: + requests.get(s3_endpoint, timeout=2) + except requests.exceptions.ConnectionError: + pytest.skip(f"S3 endpoint not reachable at {s3_endpoint}") + + # Register the S3 snapshot repo and enable remote index build + session = requests.Session() + session.headers.update({"Content-Type": "application/json"}) + repo_name = "vector-repo" + + session.put( + f"{opensearch_url}/_snapshot/{repo_name}", + json={ + "type": "s3", + "settings": { + "bucket": s3_bucket, + "base_path": s3_prefix.rstrip("/"), + "region": s3_region, + }, + }, + ).raise_for_status() + + session.put( + f"{opensearch_url}/_cluster/settings", + json={ + "persistent": { + "knn.remote_index_build.enabled": True, + "knn.remote_index_build.repository": repo_name, + "knn.remote_index_build.service.endpoint": builder_url, + } + }, + ).raise_for_status() + + return { + "s3_endpoint": s3_endpoint, + "s3_bucket": s3_bucket, + "s3_prefix": s3_prefix, + "s3_access_key": s3_access_key, + "s3_secret_key": s3_secret_key, + "s3_session_token": s3_session_token, + } + + +@pytest.fixture +def live_remote_build_backend(opensearch_url, remote_build_env): + """Backend with remote_index_build=True pointing at a configured remote build stack.""" + parsed = urlparse(opensearch_url) + index_name = "cuvs_test_remote_index" + backend = OpenSearchBackend( + { + "name": index_name, + "index_name": index_name, + "engine": "faiss", + "algo": "opensearch_faiss_hnsw", + "host": parsed.hostname, + "port": parsed.port or 9200, + "use_ssl": parsed.scheme == "https", + "verify_certs": False, + "requires_network": True, + "remote_index_build": True, + # Keep the tiny integration dataset above the remote build threshold. + "remote_build_size_min": "1kb", + } + ) + try: + yield backend + finally: + _cleanup_backend(backend, index_name) + + +@pytest.mark.integration +class TestOpenSearchBackendIntegration: + def test_build_and_search(self, live_backend): + k = 3 + dataset = _make_dataset(n_base=100, n_queries=10, dims=4, k=k) + idx = _make_index_cfg(search_params=[{"ef_search": 50}]) + + build_result = live_backend.build(dataset, [idx], force=True) + assert build_result.success + assert build_result.build_time_seconds > 0 + assert build_result.index_size_bytes > 0 + + search_result = live_backend.search(dataset, [idx], k=k) + assert search_result.success + assert search_result.recall == 0.0 + assert search_result.queries_per_second > 0 + assert search_result.neighbors.shape == (10, k) + assert len(search_result.metadata["per_search_param_results"]) == 1 + + +@pytest.mark.integration +class TestOpenSearchRemoteIndexBuildIntegration: + def test_remote_build_and_search(self, live_remote_build_backend): + k = 3 + dataset = _make_dataset(n_base=100, n_queries=10, dims=4, k=k) + idx = _make_index_cfg(search_params=[{"ef_search": 50}]) + + build_result = live_remote_build_backend.build( + dataset, [idx], force=True + ) + assert build_result.success + assert build_result.build_time_seconds > 0 + assert build_result.metadata["remote_index_build"] is True + + search_result = live_remote_build_backend.search(dataset, [idx], k=k) + assert search_result.success + assert search_result.recall == 0.0 + assert search_result.queries_per_second > 0 + assert search_result.neighbors.shape == (10, k) diff --git a/python/cuvs_bench/pyproject.toml b/python/cuvs_bench/pyproject.toml index 9def6ab839..3ef80f5bff 100644 --- a/python/cuvs_bench/pyproject.toml +++ b/python/cuvs_bench/pyproject.toml @@ -38,12 +38,20 @@ classifiers = [ "Programming Language :: Python :: 3.14", ] +[project.optional-dependencies] +opensearch = ["opensearch-py>=2.4.0"] + [project.urls] Homepage = "https://github.com/rapidsai/cuvs" [tool.setuptools.package-data] "*" = ["*.*", "VERSION"] +[tool.pytest.ini_options] +markers = [ + "integration: tests that require a live OpenSearch node (run with '-m integration')", +] + [tool.isort] line_length = 79 multi_line_output = 3