diff --git a/redisvl/cli/main.py b/redisvl/cli/main.py index 1353192fc..0147adc1b 100644 --- a/redisvl/cli/main.py +++ b/redisvl/cli/main.py @@ -2,6 +2,7 @@ import sys from redisvl.cli.index import Index +from redisvl.cli.migrate import Migrate from redisvl.cli.stats import Stats from redisvl.cli.version import Version from redisvl.utils.log import get_logger @@ -14,6 +15,7 @@ def _usage(): "rvl []\n", "Commands:", "\tindex Index manipulation (create, delete, etc.)", + "\tmigrate Index migration (plan, apply, validate)", "\tversion Obtain the version of RedisVL", "\tstats Obtain statistics about an index", ] @@ -46,6 +48,10 @@ def version(self): Version() exit(0) + def migrate(self): + Migrate() + exit(0) + def stats(self): Stats() exit(0) diff --git a/redisvl/cli/migrate.py b/redisvl/cli/migrate.py new file mode 100644 index 000000000..5bc70bb36 --- /dev/null +++ b/redisvl/cli/migrate.py @@ -0,0 +1,400 @@ +import argparse +import sys +from typing import Optional + +from redisvl.cli.utils import add_redis_connection_options, create_redis_url +from redisvl.migration import MigrationExecutor, MigrationPlanner, MigrationValidator +from redisvl.migration.utils import ( + detect_aof_enabled, + estimate_disk_space, + list_indexes, + load_migration_plan, + write_benchmark_report, + write_migration_report, +) +from redisvl.redis.connection import RedisConnectionFactory +from redisvl.utils.log import get_logger + +logger = get_logger("[RedisVL]") + + +class Migrate: + usage = "\n".join( + [ + "rvl migrate []\n", + "Commands:", + "\thelper Show migration guidance and supported capabilities", + "\tlist List all available indexes", + "\tplan Generate a migration plan for a document-preserving drop/recreate migration", + "\tapply Execute a reviewed drop/recreate migration plan", + "\testimate Estimate disk space required for a migration plan (dry-run, no mutations)", + "\tvalidate Validate a completed migration plan against the live index", + "\n", + ] + ) + + def __init__(self): + parser = argparse.ArgumentParser(usage=self.usage) + parser.add_argument("command", help="Subcommand to run") + + args = parser.parse_args(sys.argv[2:3]) + command = args.command.replace("-", "_") + if not hasattr(self, command): + print(f"Unknown subcommand: {args.command}") + parser.print_help() + sys.exit(1) + + try: + getattr(self, command)() + except Exception as e: + logger.error(e) + sys.exit(1) + + def helper(self): + parser = argparse.ArgumentParser( + usage="rvl migrate helper [--host --port | --url ]" + ) + parser = add_redis_connection_options(parser) + args = parser.parse_args(sys.argv[3:]) + redis_url = create_redis_url(args) + indexes = list_indexes(redis_url=redis_url) + + print("RedisVL Index Migrator\n\nAvailable indexes:") + if indexes: + for position, index_name in enumerate(indexes, start=1): + print(f" {position}. {index_name}") + else: + print(" (none found)") + + print( + """\nSupported changes: + - Adding or removing non-vector fields (text, tag, numeric, geo) + - Changing field options (sortable, separator, weight) + - Changing vector algorithm (FLAT, HNSW, SVS-VAMANA) + - Changing distance metric (COSINE, L2, IP) + - Tuning algorithm parameters (M, EF_CONSTRUCTION, EF_RUNTIME, EPSILON) + - Quantizing vectors (float32 to float16/bfloat16/int8/uint8) + - Changing key prefix (renames all keys) + - Renaming fields (updates all documents) + - Renaming the index + +Not yet supported: + - Changing vector dimensions + - Changing storage type (hash to JSON) + +Commands: + rvl migrate list List all indexes + rvl migrate plan --index --schema-patch + rvl migrate apply --plan + rvl migrate validate --plan """ + ) + + def list(self): + parser = argparse.ArgumentParser( + usage="rvl migrate list [--host --port | --url ]" + ) + parser = add_redis_connection_options(parser) + args = parser.parse_args(sys.argv[3:]) + redis_url = create_redis_url(args) + indexes = list_indexes(redis_url=redis_url) + print("Available indexes:") + for position, index_name in enumerate(indexes, start=1): + print(f"{position}. {index_name}") + + def plan(self): + parser = argparse.ArgumentParser( + usage=( + "rvl migrate plan --index " + "(--schema-patch | --target-schema )" + ) + ) + parser.add_argument("-i", "--index", help="Source index name", required=True) + parser.add_argument("--schema-patch", help="Path to a schema patch file") + parser.add_argument("--target-schema", help="Path to a target schema file") + parser.add_argument( + "--plan-out", + help="Path to write migration_plan.yaml", + default="migration_plan.yaml", + ) + parser.add_argument( + "--key-sample-limit", + help="Maximum number of keys to sample from the index keyspace", + type=int, + default=10, + ) + parser = add_redis_connection_options(parser) + + args = parser.parse_args(sys.argv[3:]) + redis_url = create_redis_url(args) + planner = MigrationPlanner(key_sample_limit=args.key_sample_limit) + plan = planner.create_plan( + args.index, + redis_url=redis_url, + schema_patch_path=args.schema_patch, + target_schema_path=args.target_schema, + ) + planner.write_plan(plan, args.plan_out) + self._print_plan_summary(args.plan_out, plan) + + def apply(self): + parser = argparse.ArgumentParser( + usage=( + "rvl migrate apply --plan " + "[--resume ] " + "[--report-out ]" + ) + ) + parser.add_argument("--plan", help="Path to migration_plan.yaml", required=True) + parser.add_argument( + "--resume", + dest="checkpoint_path", + help="Path to quantization checkpoint file for crash-safe resume", + default=None, + ) + parser.add_argument( + "--report-out", + help="Path to write migration_report.yaml", + default="migration_report.yaml", + ) + parser.add_argument( + "--benchmark-out", + help="Optional path to write benchmark_report.yaml", + default=None, + ) + parser.add_argument( + "--query-check-file", + help="Optional YAML file containing fetch_ids and keys_exist checks", + default=None, + ) + parser = add_redis_connection_options(parser) + args = parser.parse_args(sys.argv[3:]) + + redis_url = create_redis_url(args) + plan = load_migration_plan(args.plan) + + # Print disk space estimate for quantization migrations + aof_enabled = False + try: + client = RedisConnectionFactory.get_redis_connection(redis_url=redis_url) + try: + aof_enabled = detect_aof_enabled(client) + finally: + client.close() + except Exception as exc: + logger.debug("Could not detect AOF for CLI preflight estimate: %s", exc) + + disk_estimate = estimate_disk_space(plan, aof_enabled=aof_enabled) + if disk_estimate.has_quantization: + print(f"\n{disk_estimate.summary()}\n") + + report = self._apply_sync( + plan, redis_url, args.query_check_file, args.checkpoint_path + ) + + write_migration_report(report, args.report_out) + if args.benchmark_out: + write_benchmark_report(report, args.benchmark_out) + self._print_report_summary(args.report_out, report, args.benchmark_out) + + def estimate(self): + """Estimate disk space required for a migration plan (dry-run).""" + parser = argparse.ArgumentParser( + usage="rvl migrate estimate --plan " + ) + parser.add_argument("--plan", help="Path to migration_plan.yaml", required=True) + parser.add_argument( + "--aof-enabled", + action="store_true", + help="Include AOF growth in the disk space estimate", + ) + args = parser.parse_args(sys.argv[3:]) + + plan = load_migration_plan(args.plan) + disk_estimate = estimate_disk_space(plan, aof_enabled=args.aof_enabled) + print(disk_estimate.summary()) + + @staticmethod + def _make_progress_callback(): + """Create a progress callback for migration apply.""" + step_labels = { + "enumerate": "[1/8] Enumerate keys", + "bgsave": "[2/8] BGSAVE snapshot", + "field_rename": "[3/8] Rename fields", + "drop": "[4/8] Drop index", + "key_rename": "[5/8] Rename keys", + "quantize": "[6/8] Quantize vectors", + "create": "[7/8] Create index", + "index": "[8/8] Re-indexing", + "validate": "Validate", + } + + def progress_callback(step: str, detail: Optional[str]) -> None: + label = step_labels.get(step, step) + if detail and not detail.startswith("done"): + print(f" {label}: {detail} ", end="\r", flush=True) + else: + print(f" {label}: {detail} ") + + return progress_callback + + def _apply_sync( + self, + plan, + redis_url: str, + query_check_file: Optional[str], + checkpoint_path: Optional[str] = None, + ): + """Execute migration synchronously.""" + executor = MigrationExecutor() + + print(f"\nApplying migration to '{plan.source.index_name}'...") + + report = executor.apply( + plan, + redis_url=redis_url, + query_check_file=query_check_file, + progress_callback=self._make_progress_callback(), + checkpoint_path=checkpoint_path, + ) + + self._print_apply_result(report) + return report + + def _print_apply_result(self, report) -> None: + """Print the result summary after migration apply.""" + if report.result == "succeeded": + total_time = report.timings.total_migration_duration_seconds or 0 + downtime = report.timings.downtime_duration_seconds or 0 + print(f"\nMigration completed in {total_time}s (downtime: {downtime}s)") + else: + print(f"\nMigration {report.result}") + if report.validation.errors: + for error in report.validation.errors: + print(f" ERROR: {error}") + + def validate(self): + parser = argparse.ArgumentParser( + usage=( + "rvl migrate validate --plan " + "[--report-out ]" + ) + ) + parser.add_argument("--plan", help="Path to migration_plan.yaml", required=True) + parser.add_argument( + "--report-out", + help="Path to write migration_report.yaml", + default="migration_report.yaml", + ) + parser.add_argument( + "--benchmark-out", + help="Optional path to write benchmark_report.yaml", + default=None, + ) + parser.add_argument( + "--query-check-file", + help="Optional YAML file containing fetch_ids and keys_exist checks", + default=None, + ) + parser = add_redis_connection_options(parser) + args = parser.parse_args(sys.argv[3:]) + + redis_url = create_redis_url(args) + plan = load_migration_plan(args.plan) + validator = MigrationValidator() + + from redisvl.migration.utils import timestamp_utc + + started_at = timestamp_utc() + validation, target_info, validation_duration = validator.validate( + plan, + redis_url=redis_url, + query_check_file=args.query_check_file, + ) + finished_at = timestamp_utc() + + from redisvl.migration.models import ( + MigrationBenchmarkSummary, + MigrationReport, + MigrationTimings, + ) + + source_size = float( + plan.source.stats_snapshot.get("vector_index_sz_mb", 0) or 0 + ) + target_size = float(target_info.get("vector_index_sz_mb", 0) or 0) + + report = MigrationReport( + source_index=plan.source.index_name, + target_index=plan.merged_target_schema["index"]["name"], + result="succeeded" if not validation.errors else "failed", + started_at=started_at, + finished_at=finished_at, + timings=MigrationTimings(validation_duration_seconds=validation_duration), + validation=validation, + benchmark_summary=MigrationBenchmarkSummary( + source_index_size_mb=round(source_size, 3), + target_index_size_mb=round(target_size, 3), + index_size_delta_mb=round(target_size - source_size, 3), + ), + warnings=list(plan.warnings), + manual_actions=( + ["Review validation errors before proceeding."] + if validation.errors + else [] + ), + ) + write_migration_report(report, args.report_out) + if args.benchmark_out: + write_benchmark_report(report, args.benchmark_out) + self._print_report_summary(args.report_out, report, args.benchmark_out) + + def _print_plan_summary(self, plan_out: str, plan) -> None: + import os + + abs_path = os.path.abspath(plan_out) + print( + f"""Migration plan written to {abs_path} +Mode: {plan.mode} +Supported: {plan.diff_classification.supported}""" + ) + if plan.warnings: + print("Warnings:") + for warning in plan.warnings: + print(f"- {warning}") + if plan.diff_classification.blocked_reasons: + print("Blocked reasons:") + for reason in plan.diff_classification.blocked_reasons: + print(f"- {reason}") + + print( + f"""\nNext steps: + Review the plan: cat {plan_out} + Apply the migration: rvl migrate apply --plan {plan_out} + Validate the result: rvl migrate validate --plan {plan_out} + To cancel: rm {plan_out}""" + ) + + def _print_report_summary( + self, + report_out: str, + report, + benchmark_out: Optional[str], + ) -> None: + print( + f"""Migration report written to {report_out} +Result: {report.result} +Schema match: {report.validation.schema_match} +Doc count match: {report.validation.doc_count_match} +Key sample exists: {report.validation.key_sample_exists} +Indexing failures delta: {report.validation.indexing_failures_delta}""" + ) + if report.validation.errors: + print("Errors:") + for error in report.validation.errors: + print(f"- {error}") + if report.manual_actions: + print("Manual actions:") + for action in report.manual_actions: + print(f"- {action}") + if benchmark_out: + print(f"Benchmark report written to {benchmark_out}") diff --git a/redisvl/cli/utils.py b/redisvl/cli/utils.py index 5d76a1842..9b19a126c 100644 --- a/redisvl/cli/utils.py +++ b/redisvl/cli/utils.py @@ -14,9 +14,10 @@ def create_redis_url(args: Namespace) -> str: elif args.url: return args.url else: - url = "redis://" if args.ssl: - url += "rediss://" + url = "rediss://" + else: + url = "redis://" if args.user: url += args.user if args.password: @@ -26,11 +27,7 @@ def create_redis_url(args: Namespace) -> str: return url -def add_index_parsing_options(parser: ArgumentParser) -> ArgumentParser: - parser.add_argument("-i", "--index", help="Index name", type=str, required=False) - parser.add_argument( - "-s", "--schema", help="Path to schema file", type=str, required=False - ) +def add_redis_connection_options(parser: ArgumentParser) -> ArgumentParser: parser.add_argument("-u", "--url", help="Redis URL", type=str, required=False) parser.add_argument("--host", help="Redis host", type=str, default="localhost") parser.add_argument("-p", "--port", help="Redis port", type=int, default=6379) @@ -38,3 +35,11 @@ def add_index_parsing_options(parser: ArgumentParser) -> ArgumentParser: parser.add_argument("--ssl", help="Use SSL", action="store_true") parser.add_argument("-a", "--password", help="Redis password", type=str, default="") return parser + + +def add_index_parsing_options(parser: ArgumentParser) -> ArgumentParser: + parser.add_argument("-i", "--index", help="Index name", type=str, required=False) + parser.add_argument( + "-s", "--schema", help="Path to schema file", type=str, required=False + ) + return add_redis_connection_options(parser) diff --git a/redisvl/migration/__init__.py b/redisvl/migration/__init__.py index ddc34dbca..8ee0461cc 100644 --- a/redisvl/migration/__init__.py +++ b/redisvl/migration/__init__.py @@ -1,7 +1,9 @@ +from redisvl.migration.executor import MigrationExecutor from redisvl.migration.planner import MigrationPlanner from redisvl.migration.validation import MigrationValidator __all__ = [ + "MigrationExecutor", "MigrationPlanner", "MigrationValidator", ] diff --git a/redisvl/migration/executor.py b/redisvl/migration/executor.py new file mode 100644 index 000000000..87c8bc138 --- /dev/null +++ b/redisvl/migration/executor.py @@ -0,0 +1,1097 @@ +from __future__ import annotations + +import time +from pathlib import Path +from typing import Any, Callable, Dict, Generator, List, Optional + +from redis.exceptions import ResponseError + +from redisvl.index import SearchIndex +from redisvl.migration.models import ( + MigrationBenchmarkSummary, + MigrationPlan, + MigrationReport, + MigrationTimings, + MigrationValidation, +) +from redisvl.migration.planner import MigrationPlanner +from redisvl.migration.reliability import ( + BatchUndoBuffer, + QuantizationCheckpoint, + is_already_quantized, + is_same_width_dtype_conversion, + trigger_bgsave_and_wait, +) +from redisvl.migration.utils import ( + build_scan_match_patterns, + current_source_matches_snapshot, + detect_aof_enabled, + estimate_disk_space, + get_schema_field_path, + normalize_keys, + timestamp_utc, + wait_for_index_ready, +) +from redisvl.migration.validation import MigrationValidator +from redisvl.redis.utils import array_to_buffer, buffer_to_array +from redisvl.types import SyncRedisClient +from redisvl.utils.log import get_logger + +logger = get_logger(__name__) + + +class MigrationExecutor: + def __init__(self, validator: Optional[MigrationValidator] = None): + self.validator = validator or MigrationValidator() + + def _enumerate_indexed_keys( + self, + client: SyncRedisClient, + index_name: str, + batch_size: int = 1000, + key_separator: str = ":", + ) -> Generator[str, None, None]: + """Enumerate document keys using FT.AGGREGATE with SCAN fallback. + + Uses FT.AGGREGATE WITHCURSOR for efficient enumeration when the index + has no indexing failures. Falls back to SCAN if: + - Index has hash_indexing_failures > 0 (would miss failed docs) + - FT.AGGREGATE command fails for any reason + + Args: + client: Redis client + index_name: Name of the index to enumerate + batch_size: Number of keys per batch + key_separator: Separator between prefix and key ID + + Yields: + Document keys as strings + """ + # Check for indexing failures - if any, fall back to SCAN + try: + info = client.ft(index_name).info() + failures = int(info.get("hash_indexing_failures", 0) or 0) + if failures > 0: + logger.warning( + f"Index '{index_name}' has {failures} indexing failures. " + "Using SCAN for complete enumeration." + ) + yield from self._enumerate_with_scan( + client, index_name, batch_size, key_separator + ) + return + except Exception as e: + logger.warning(f"Failed to check index info: {e}. Using SCAN fallback.") + yield from self._enumerate_with_scan( + client, index_name, batch_size, key_separator + ) + return + + # Try FT.AGGREGATE enumeration + try: + yield from self._enumerate_with_aggregate(client, index_name, batch_size) + except ResponseError as e: + logger.warning( + f"FT.AGGREGATE failed: {e}. Falling back to SCAN enumeration." + ) + yield from self._enumerate_with_scan( + client, index_name, batch_size, key_separator + ) + + def _enumerate_with_aggregate( + self, + client: SyncRedisClient, + index_name: str, + batch_size: int = 1000, + ) -> Generator[str, None, None]: + """Enumerate keys using FT.AGGREGATE WITHCURSOR. + + More efficient than SCAN for sparse indexes (only returns indexed docs). + Requires LOAD 1 __key to retrieve document keys. + """ + cursor_id: Optional[int] = None + + try: + # Initial aggregate call with LOAD 1 __key (not LOAD 0!) + result = client.execute_command( + "FT.AGGREGATE", + index_name, + "*", + "LOAD", + "1", + "__key", + "WITHCURSOR", + "COUNT", + str(batch_size), + ) + + while True: + results_data, cursor_id = result + + # Extract keys from results (skip first element which is count) + for item in results_data[1:]: + if isinstance(item, (list, tuple)) and len(item) >= 2: + key = item[1] + yield key.decode() if isinstance(key, bytes) else str(key) + + # Check if done (cursor_id == 0) + if cursor_id == 0: + break + + # Read next batch + result = client.execute_command( + "FT.CURSOR", + "READ", + index_name, + str(cursor_id), + "COUNT", + str(batch_size), + ) + finally: + # Clean up cursor if interrupted + if cursor_id and cursor_id != 0: + try: + client.execute_command( + "FT.CURSOR", "DEL", index_name, str(cursor_id) + ) + except Exception: + pass # Cursor may have expired + + def _enumerate_with_scan( + self, + client: SyncRedisClient, + index_name: str, + batch_size: int = 1000, + key_separator: str = ":", + ) -> Generator[str, None, None]: + """Enumerate keys using SCAN with prefix matching. + + Fallback method that scans all keys matching the index prefix. + Less efficient but more complete (includes failed-to-index docs). + """ + # Get prefix from index info + try: + info = client.ft(index_name).info() + # Handle both dict and list formats from FT.INFO + if isinstance(info, dict): + prefixes = info.get("index_definition", {}).get("prefixes", []) + else: + # List format - find index_definition + prefixes = [] + for i, item in enumerate(info): + if item == b"index_definition" or item == "index_definition": + defn = info[i + 1] + if isinstance(defn, dict): + prefixes = defn.get("prefixes", []) + elif isinstance(defn, list): + for j, d in enumerate(defn): + if d in (b"prefixes", "prefixes") and j + 1 < len(defn): + prefixes = defn[j + 1] + break + normalized_prefixes = [ + p.decode() if isinstance(p, bytes) else str(p) for p in prefixes + ] + except Exception as e: + logger.warning(f"Failed to get prefix from index info: {e}") + normalized_prefixes = [] + + seen_keys: set[str] = set() + for match_pattern in build_scan_match_patterns( + normalized_prefixes, key_separator + ): + cursor = 0 + while True: + cursor, keys = client.scan( # type: ignore[misc] + cursor=cursor, + match=match_pattern, + count=batch_size, + ) + for key in keys: + key_str = key.decode() if isinstance(key, bytes) else str(key) + if key_str not in seen_keys: + seen_keys.add(key_str) + yield key_str + + if cursor == 0: + break + + def _rename_keys( + self, + client: SyncRedisClient, + keys: List[str], + old_prefix: str, + new_prefix: str, + progress_callback: Optional[Callable[[int, int], None]] = None, + ) -> int: + """Rename keys from old prefix to new prefix. + + Uses RENAMENX to avoid overwriting existing destination keys. + Raises on collision to prevent silent data loss. + + Args: + client: Redis client + keys: List of keys to rename + old_prefix: Current prefix (e.g., "doc:") + new_prefix: New prefix (e.g., "article:") + progress_callback: Optional callback(done, total) + + Returns: + Number of keys successfully renamed + """ + renamed = 0 + total = len(keys) + pipeline_size = 100 # Process in batches + collisions: List[str] = [] + + for i in range(0, total, pipeline_size): + batch = keys[i : i + pipeline_size] + pipe = client.pipeline(transaction=False) + batch_new_keys: List[str] = [] + + for key in batch: + # Compute new key name + if key.startswith(old_prefix): + new_key = new_prefix + key[len(old_prefix) :] + else: + # Key doesn't match expected prefix, skip + logger.warning( + f"Key '{key}' does not start with prefix '{old_prefix}'" + ) + continue + pipe.renamenx(key, new_key) + batch_new_keys.append(new_key) + + try: + results = pipe.execute() + for j, r in enumerate(results): + if r is True or r == 1: + renamed += 1 + else: + collisions.append(batch_new_keys[j]) + except Exception as e: + logger.warning(f"Error in rename batch: {e}") + raise + + # Fail fast on collisions to avoid partial renames across batches. + # Keys already renamed in THIS batch are not rolled back -- caller + # can inspect the error to understand which keys moved. + if collisions: + raise RuntimeError( + f"Prefix rename aborted after {renamed} successful rename(s): " + f"{len(collisions)} destination key(s) already exist " + f"(first 5: {collisions[:5]}). This would overwrite existing data. " + f"Remove conflicting keys or choose a different prefix." + ) + + if progress_callback: + progress_callback(min(i + pipeline_size, total), total) + + return renamed + + def _rename_field_in_hash( + self, + client: SyncRedisClient, + keys: List[str], + old_name: str, + new_name: str, + progress_callback: Optional[Callable[[int, int], None]] = None, + ) -> int: + """Rename a field in hash documents. + + For each document: + 1. HGET key old_name -> value + 2. HSET key new_name value + 3. HDEL key old_name + """ + renamed = 0 + total = len(keys) + pipeline_size = 100 + + for i in range(0, total, pipeline_size): + batch = keys[i : i + pipeline_size] + + # First, get old field values AND check if destination exists + pipe = client.pipeline(transaction=False) + for key in batch: + pipe.hget(key, old_name) + pipe.hexists(key, new_name) + raw_results = pipe.execute() + # Interleaved: [hget_0, hexists_0, hget_1, hexists_1, ...] + values = raw_results[0::2] + dest_exists = raw_results[1::2] + + # Now set new field and delete old + pipe = client.pipeline(transaction=False) + batch_ops = 0 + for key, value, exists in zip(batch, values, dest_exists): + if value is not None: + if exists: + logger.warning( + "Field '%s' already exists in key '%s'; " + "overwriting with value from '%s'", + new_name, + key, + old_name, + ) + pipe.hset(key, new_name, value) + pipe.hdel(key, old_name) + batch_ops += 1 + + try: + pipe.execute() + # Count by number of keys that had old field values, + # not by HSET return (HSET returns 0 for existing field updates) + renamed += batch_ops + except Exception as e: + logger.warning(f"Error in field rename batch: {e}") + raise + + if progress_callback: + progress_callback(min(i + pipeline_size, total), total) + + return renamed + + def _rename_field_in_json( + self, + client: SyncRedisClient, + keys: List[str], + old_path: str, + new_path: str, + progress_callback: Optional[Callable[[int, int], None]] = None, + ) -> int: + """Rename a field in JSON documents. + + For each document: + 1. JSON.GET key old_path -> value + 2. JSON.SET key new_path value + 3. JSON.DEL key old_path + """ + renamed = 0 + total = len(keys) + pipeline_size = 100 + + for i in range(0, total, pipeline_size): + batch = keys[i : i + pipeline_size] + + # First, get all old field values + pipe = client.pipeline(transaction=False) + for key in batch: + pipe.json().get(key, old_path) + values = pipe.execute() + + # Now set new field and delete old + # JSONPath GET returns results as a list; unwrap single-element + # results to preserve the original document shape. + # Missing paths return None or [] depending on Redis version. + pipe = client.pipeline(transaction=False) + batch_ops = 0 + for key, value in zip(batch, values): + if value is None or value == []: + continue + if isinstance(value, list) and len(value) == 1: + value = value[0] + pipe.json().set(key, new_path, value) + pipe.json().delete(key, old_path) + batch_ops += 1 + try: + pipe.execute() + # Count by number of keys that had old field values, + # not by JSON.SET return value + renamed += batch_ops + except Exception as e: + logger.warning(f"Error in JSON field rename batch: {e}") + raise + + if progress_callback: + progress_callback(min(i + pipeline_size, total), total) + + return renamed + + def apply( + self, + plan: MigrationPlan, + *, + redis_url: Optional[str] = None, + redis_client: Optional[Any] = None, + query_check_file: Optional[str] = None, + progress_callback: Optional[Callable[[str, Optional[str]], None]] = None, + checkpoint_path: Optional[str] = None, + ) -> MigrationReport: + """Apply a migration plan. + + Args: + plan: The migration plan to apply. + redis_url: Redis connection URL. + redis_client: Optional existing Redis client. + query_check_file: Optional file with query checks. + progress_callback: Optional callback(step, detail) for progress updates. + step: Current step name (e.g., "drop", "quantize", "create", "index", "validate") + detail: Optional detail string (e.g., "1000/5000 docs (20%)") + checkpoint_path: Optional path for quantization checkpoint file. + When provided, enables crash-safe resume for vector re-encoding. + """ + started_at = timestamp_utc() + started = time.perf_counter() + + report = MigrationReport( + source_index=plan.source.index_name, + target_index=plan.merged_target_schema["index"]["name"], + result="failed", + started_at=started_at, + finished_at=started_at, + warnings=list(plan.warnings), + ) + + if not plan.diff_classification.supported: + report.validation.errors.extend(plan.diff_classification.blocked_reasons) + report.manual_actions.append( + "This change requires document migration, which is not yet supported." + ) + report.finished_at = timestamp_utc() + return report + + # Check if we are resuming from a checkpoint (post-drop crash). + # If so, the source index may no longer exist in Redis, so we + # skip live schema validation and construct from the plan snapshot. + resuming_from_checkpoint = False + if checkpoint_path: + existing_checkpoint = QuantizationCheckpoint.load(checkpoint_path) + if existing_checkpoint is not None: + # Validate checkpoint belongs to this migration and is incomplete + if existing_checkpoint.index_name != plan.source.index_name: + logger.warning( + "Checkpoint index '%s' does not match plan index '%s', ignoring", + existing_checkpoint.index_name, + plan.source.index_name, + ) + elif existing_checkpoint.status == "completed": + # Quantization completed previously. Only resume if + # the source index is actually gone (post-drop crash). + # If the source still exists, this is a fresh run and + # the stale checkpoint should be ignored. + source_still_exists = current_source_matches_snapshot( + plan.source.index_name, + plan.source.schema_snapshot, + redis_url=redis_url, + redis_client=redis_client, + ) + if source_still_exists: + logger.info( + "Checkpoint at %s is completed and source index " + "still exists; treating as fresh run", + checkpoint_path, + ) + # Remove the stale checkpoint so that downstream + # steps (e.g. _quantize_vectors) don't skip work. + Path(checkpoint_path).unlink(missing_ok=True) + else: + resuming_from_checkpoint = True + logger.info( + "Checkpoint at %s is already completed; resuming " + "index recreation from post-drop state", + checkpoint_path, + ) + else: + resuming_from_checkpoint = True + logger.info( + "Checkpoint found at %s, skipping source index validation " + "(index may have been dropped before crash)", + checkpoint_path, + ) + + if not resuming_from_checkpoint: + if not current_source_matches_snapshot( + plan.source.index_name, + plan.source.schema_snapshot, + redis_url=redis_url, + redis_client=redis_client, + ): + report.validation.errors.append( + "The current live source schema no longer matches the saved source snapshot." + ) + report.manual_actions.append( + "Re-run `rvl migrate plan` to refresh the migration plan before applying." + ) + report.finished_at = timestamp_utc() + return report + + source_index = SearchIndex.from_existing( + plan.source.index_name, + redis_url=redis_url, + redis_client=redis_client, + ) + else: + # Source index was dropped before crash; reconstruct from snapshot + # to get a valid SearchIndex with a Redis client attached. + source_index = SearchIndex.from_dict( + plan.source.schema_snapshot, + redis_url=redis_url, + redis_client=redis_client, + ) + + target_index = SearchIndex.from_dict( + plan.merged_target_schema, + redis_url=redis_url, + redis_client=redis_client, + ) + + enumerate_duration = 0.0 + drop_duration = 0.0 + quantize_duration = 0.0 + field_rename_duration = 0.0 + key_rename_duration = 0.0 + recreate_duration = 0.0 + indexing_duration = 0.0 + target_info: Dict[str, Any] = {} + docs_quantized = 0 + keys_to_process: List[str] = [] + storage_type = plan.source.keyspace.storage_type + + # Check if we need to re-encode vectors for datatype changes + datatype_changes = MigrationPlanner.get_vector_datatype_changes( + plan.source.schema_snapshot, + plan.merged_target_schema, + rename_operations=plan.rename_operations, + ) + + # Check for rename operations + rename_ops = plan.rename_operations + has_prefix_change = rename_ops.change_prefix is not None + has_field_renames = bool(rename_ops.rename_fields) + needs_quantization = bool(datatype_changes) and storage_type != "json" + needs_enumeration = needs_quantization or has_prefix_change or has_field_renames + has_same_width_quantization = any( + is_same_width_dtype_conversion(change["source"], change["target"]) + for change in datatype_changes.values() + ) + + if checkpoint_path and has_same_width_quantization: + report.validation.errors.append( + "Crash-safe resume is not supported for same-width datatype " + "changes (float16<->bfloat16 or int8<->uint8)." + ) + report.manual_actions.append( + "Re-run without --resume for same-width vector conversions, or " + "split the migration to avoid same-width datatype changes." + ) + report.finished_at = timestamp_utc() + return report + + def _notify(step: str, detail: Optional[str] = None) -> None: + if progress_callback: + progress_callback(step, detail) + + try: + client = source_index._redis_client + aof_enabled = detect_aof_enabled(client) + disk_estimate = estimate_disk_space(plan, aof_enabled=aof_enabled) + if disk_estimate.has_quantization: + logger.info( + "Disk space estimate: RDB ~%d bytes, AOF ~%d bytes, total ~%d bytes", + disk_estimate.rdb_snapshot_disk_bytes, + disk_estimate.aof_growth_bytes, + disk_estimate.total_new_disk_bytes, + ) + report.disk_space_estimate = disk_estimate + + if resuming_from_checkpoint: + # On resume after a post-drop crash, the index no longer + # exists. Enumerate keys via SCAN using the plan prefix, + # and skip BGSAVE / field renames / drop (already done). + if needs_enumeration: + _notify("enumerate", "Enumerating documents via SCAN (resume)...") + enumerate_started = time.perf_counter() + prefixes = list(plan.source.keyspace.prefixes) + # If a prefix change was part of the migration, keys + # were already renamed before the crash, so scan with + # the new prefix instead. + if has_prefix_change and rename_ops.change_prefix: + prefixes = [rename_ops.change_prefix] + seen_keys: set[str] = set() + for match_pattern in build_scan_match_patterns( + prefixes, plan.source.keyspace.key_separator + ): + cursor: int = 0 + while True: + cursor, scanned = client.scan( # type: ignore[misc] + cursor=cursor, + match=match_pattern, + count=1000, + ) + for k in scanned: + key = k.decode() if isinstance(k, bytes) else str(k) + if key not in seen_keys: + seen_keys.add(key) + keys_to_process.append(key) + if cursor == 0: + break + keys_to_process = normalize_keys(keys_to_process) + enumerate_duration = round( + time.perf_counter() - enumerate_started, 3 + ) + _notify( + "enumerate", + f"found {len(keys_to_process):,} documents ({enumerate_duration}s)", + ) + + _notify("bgsave", "skipped (resume)") + _notify("drop", "skipped (already dropped)") + else: + # Normal (non-resume) path + # STEP 1: Enumerate keys BEFORE any modifications + # Needed for: quantization, prefix change, or field renames + if needs_enumeration: + _notify("enumerate", "Enumerating indexed documents...") + enumerate_started = time.perf_counter() + keys_to_process = list( + self._enumerate_indexed_keys( + client, + plan.source.index_name, + batch_size=1000, + key_separator=plan.source.keyspace.key_separator, + ) + ) + keys_to_process = normalize_keys(keys_to_process) + enumerate_duration = round( + time.perf_counter() - enumerate_started, 3 + ) + _notify( + "enumerate", + f"found {len(keys_to_process):,} documents ({enumerate_duration}s)", + ) + + # BGSAVE safety net: snapshot data before mutations begin + if needs_enumeration and keys_to_process: + _notify("bgsave", "Triggering BGSAVE safety snapshot...") + try: + trigger_bgsave_and_wait(client) + _notify("bgsave", "done") + except Exception as e: + logger.warning("BGSAVE safety snapshot failed: %s", e) + _notify("bgsave", f"skipped ({e})") + + # STEP 2: Field renames (before dropping index) + if has_field_renames and keys_to_process: + _notify("field_rename", "Renaming fields in documents...") + field_rename_started = time.perf_counter() + for field_rename in rename_ops.rename_fields: + if storage_type == "json": + old_path = get_schema_field_path( + plan.source.schema_snapshot, field_rename.old_name + ) + new_path = get_schema_field_path( + plan.merged_target_schema, field_rename.new_name + ) + if not old_path or not new_path or old_path == new_path: + continue + self._rename_field_in_json( + client, + keys_to_process, + old_path, + new_path, + progress_callback=lambda done, total: _notify( + "field_rename", + f"{field_rename.old_name} -> {field_rename.new_name}: {done:,}/{total:,}", + ), + ) + else: + self._rename_field_in_hash( + client, + keys_to_process, + field_rename.old_name, + field_rename.new_name, + progress_callback=lambda done, total: _notify( + "field_rename", + f"{field_rename.old_name} -> {field_rename.new_name}: {done:,}/{total:,}", + ), + ) + field_rename_duration = round( + time.perf_counter() - field_rename_started, 3 + ) + _notify("field_rename", f"done ({field_rename_duration}s)") + + # STEP 3: Drop the index + _notify("drop", "Dropping index definition...") + drop_started = time.perf_counter() + source_index.delete(drop=False) + drop_duration = round(time.perf_counter() - drop_started, 3) + _notify("drop", f"done ({drop_duration}s)") + + # STEP 4: Key renames (after drop, before recreate) + # On resume, key renames were already done before the crash. + if has_prefix_change and keys_to_process and not resuming_from_checkpoint: + _notify("key_rename", "Renaming keys...") + key_rename_started = time.perf_counter() + old_prefix = plan.source.keyspace.prefixes[0] + new_prefix = rename_ops.change_prefix + assert new_prefix is not None # For type checker + renamed_count = self._rename_keys( + client, + keys_to_process, + old_prefix, + new_prefix, + progress_callback=lambda done, total: _notify( + "key_rename", f"{done:,}/{total:,} keys" + ), + ) + key_rename_duration = round(time.perf_counter() - key_rename_started, 3) + _notify( + "key_rename", + f"done ({renamed_count:,} keys in {key_rename_duration}s)", + ) + + # STEP 5: Re-encode vectors using pre-enumerated keys + if needs_quantization and keys_to_process: + _notify("quantize", "Re-encoding vectors...") + quantize_started = time.perf_counter() + # If we renamed keys (non-resume), update keys_to_process + if ( + has_prefix_change + and rename_ops.change_prefix + and not resuming_from_checkpoint + ): + old_prefix = plan.source.keyspace.prefixes[0] + new_prefix = rename_ops.change_prefix + keys_to_process = [ + ( + new_prefix + k[len(old_prefix) :] + if k.startswith(old_prefix) + else k + ) + for k in keys_to_process + ] + keys_to_process = normalize_keys(keys_to_process) + # Remap datatype_changes keys from source to target field + # names when field renames exist, since quantization runs + # after field renames (step 2). The plan always stores + # datatype_changes keyed by source field names, so the + # remap is needed regardless of whether we are resuming. + effective_changes = datatype_changes + if has_field_renames: + field_rename_map = { + fr.old_name: fr.new_name for fr in rename_ops.rename_fields + } + effective_changes = { + field_rename_map.get(k, k): v + for k, v in datatype_changes.items() + } + docs_quantized = self._quantize_vectors( + source_index, + effective_changes, + keys_to_process, + progress_callback=lambda done, total: _notify( + "quantize", f"{done:,}/{total:,} docs" + ), + checkpoint_path=checkpoint_path, + ) + quantize_duration = round(time.perf_counter() - quantize_started, 3) + _notify( + "quantize", + f"done ({docs_quantized:,} docs in {quantize_duration}s)", + ) + report.warnings.append( + f"Re-encoded {docs_quantized} documents for vector quantization: " + f"{datatype_changes}" + ) + elif datatype_changes and storage_type == "json": + # No checkpoint for JSON: vectors are re-indexed on recreate, + # so there is nothing to resume. Creating one would leave a + # stale in-progress checkpoint that misleads future runs. + _notify("quantize", "skipped (JSON vectors are re-indexed on recreate)") + + _notify("create", "Creating index with new schema...") + recreate_started = time.perf_counter() + target_index.create() + recreate_duration = round(time.perf_counter() - recreate_started, 3) + _notify("create", f"done ({recreate_duration}s)") + + _notify("index", "Waiting for re-indexing...") + + def _index_progress(indexed: int, total: int, pct: float) -> None: + _notify("index", f"{indexed:,}/{total:,} docs ({pct:.0f}%)") + + target_info, indexing_duration = wait_for_index_ready( + target_index, progress_callback=_index_progress + ) + _notify("index", f"done ({indexing_duration}s)") + + _notify("validate", "Validating migration...") + validation, target_info, validation_duration = self.validator.validate( + plan, + redis_url=redis_url, + redis_client=redis_client, + query_check_file=query_check_file, + ) + _notify("validate", f"done ({validation_duration}s)") + report.validation = validation + total_duration = round(time.perf_counter() - started, 3) + report.timings = MigrationTimings( + total_migration_duration_seconds=total_duration, + drop_duration_seconds=drop_duration, + quantize_duration_seconds=( + quantize_duration if quantize_duration else None + ), + field_rename_duration_seconds=( + field_rename_duration if field_rename_duration else None + ), + key_rename_duration_seconds=( + key_rename_duration if key_rename_duration else None + ), + recreate_duration_seconds=recreate_duration, + initial_indexing_duration_seconds=indexing_duration, + validation_duration_seconds=validation_duration, + downtime_duration_seconds=round( + drop_duration + + field_rename_duration + + key_rename_duration + + quantize_duration + + recreate_duration + + indexing_duration, + 3, + ), + ) + report.benchmark_summary = self._build_benchmark_summary( + plan, + target_info, + report.timings, + ) + report.result = "succeeded" if not validation.errors else "failed" + if validation.errors: + report.manual_actions.append( + "Review validation errors before treating the migration as complete." + ) + except Exception as exc: + total_duration = round(time.perf_counter() - started, 3) + report.timings = MigrationTimings( + total_migration_duration_seconds=total_duration, + drop_duration_seconds=drop_duration or None, + quantize_duration_seconds=quantize_duration or None, + field_rename_duration_seconds=field_rename_duration or None, + key_rename_duration_seconds=key_rename_duration or None, + recreate_duration_seconds=recreate_duration or None, + initial_indexing_duration_seconds=indexing_duration or None, + downtime_duration_seconds=( + round( + drop_duration + + field_rename_duration + + key_rename_duration + + quantize_duration + + recreate_duration + + indexing_duration, + 3, + ) + if drop_duration + or field_rename_duration + or key_rename_duration + or quantize_duration + or recreate_duration + or indexing_duration + else None + ), + ) + report.validation = MigrationValidation( + errors=[f"Migration execution failed: {exc}"] + ) + report.manual_actions.extend( + [ + "Inspect the Redis index state before retrying.", + "If the source index was dropped, recreate it from the saved migration plan.", + ] + ) + finally: + report.finished_at = timestamp_utc() + + return report + + def _quantize_vectors( + self, + source_index: SearchIndex, + datatype_changes: Dict[str, Dict[str, Any]], + keys: List[str], + progress_callback: Optional[Callable[[int, int], None]] = None, + checkpoint_path: Optional[str] = None, + ) -> int: + """Re-encode vectors in documents for datatype changes (quantization). + + Uses pre-enumerated keys (from _enumerate_indexed_keys) to process + only the documents that were in the index, avoiding full keyspace scan. + Includes idempotent skip (already-quantized vectors), bounded undo + buffer for per-batch rollback, and optional checkpointing for resume. + + Args: + source_index: The source SearchIndex (already dropped but client available) + datatype_changes: Dict mapping field_name -> {"source", "target", "dims"} + keys: Pre-enumerated list of document keys to process + progress_callback: Optional callback(docs_done, total_docs) + checkpoint_path: Optional path for checkpoint file (enables resume) + + Returns: + Number of documents quantized + """ + client = source_index._redis_client + total_keys = len(keys) + docs_processed = 0 + docs_quantized = 0 + skipped = 0 + batch_size = 500 + + # Load or create checkpoint for resume support + checkpoint: Optional[QuantizationCheckpoint] = None + if checkpoint_path: + checkpoint = QuantizationCheckpoint.load(checkpoint_path) + if checkpoint: + # Validate checkpoint matches current migration BEFORE + # checking completion status to avoid skipping quantization + # for an unrelated completed checkpoint. + if checkpoint.index_name != source_index.name: + raise ValueError( + f"Checkpoint index '{checkpoint.index_name}' does not match " + f"source index '{source_index.name}'. " + f"Use the correct checkpoint file or remove it to start fresh." + ) + # Skip if checkpoint shows a completed migration + if checkpoint.status == "completed": + logger.info( + "Checkpoint already marked as completed for index '%s'. " + "Skipping quantization. Remove the checkpoint file to force re-run.", + checkpoint.index_name, + ) + return 0 + if checkpoint.total_keys != total_keys: + if checkpoint.processed_keys: + current_keys = set(keys) + missing_processed = [ + key + for key in checkpoint.processed_keys + if key not in current_keys + ] + if missing_processed or total_keys < checkpoint.total_keys: + raise ValueError( + f"Checkpoint total_keys={checkpoint.total_keys} does not match " + f"the current key set ({total_keys}). " + "Use the correct checkpoint file or remove it to start fresh." + ) + logger.warning( + "Checkpoint total_keys=%d differs from current key set size=%d. " + "Proceeding because all legacy processed keys are present.", + checkpoint.total_keys, + total_keys, + ) + else: + raise ValueError( + f"Checkpoint total_keys={checkpoint.total_keys} does not match " + f"the current key set ({total_keys}). " + "Use the correct checkpoint file or remove it to start fresh." + ) + remaining = checkpoint.get_remaining_keys(keys) + logger.info( + "Resuming from checkpoint: %d/%d keys already processed", + total_keys - len(remaining), + total_keys, + ) + docs_processed = total_keys - len(remaining) + keys = remaining + total_keys_for_progress = total_keys + else: + checkpoint = QuantizationCheckpoint( + index_name=source_index.name, + total_keys=total_keys, + checkpoint_path=checkpoint_path, + ) + checkpoint.save() + total_keys_for_progress = total_keys + else: + total_keys_for_progress = total_keys + + remaining_keys = len(keys) + + for i in range(0, remaining_keys, batch_size): + batch = keys[i : i + batch_size] + pipe = client.pipeline(transaction=False) + undo = BatchUndoBuffer() + keys_updated_in_batch: set[str] = set() + + try: + for key in batch: + for field_name, change in datatype_changes.items(): + field_data: bytes | None = client.hget(key, field_name) # type: ignore[misc,assignment] + if not field_data: + continue + + # Idempotent: skip if already converted to target dtype + dims = change.get("dims", 0) + if dims and is_already_quantized( + field_data, dims, change["source"], change["target"] + ): + skipped += 1 + continue + + undo.store(key, field_name, field_data) + array = buffer_to_array(field_data, change["source"]) + new_bytes = array_to_buffer(array, change["target"]) + pipe.hset(key, field_name, new_bytes) # type: ignore[arg-type] + keys_updated_in_batch.add(key) + + if keys_updated_in_batch: + pipe.execute() + except Exception: + logger.warning( + "Batch %d failed, rolling back %d entries", + i // batch_size, + undo.size, + ) + rollback_pipe = client.pipeline() + undo.rollback(rollback_pipe) + if checkpoint: + checkpoint.save() + raise + finally: + undo.clear() + + docs_quantized += len(keys_updated_in_batch) + docs_processed += len(batch) + + if checkpoint: + # Record all keys in batch (including skipped) so they + # are not re-scanned on resume + checkpoint.record_batch(batch) + checkpoint.save() + + if progress_callback: + progress_callback(docs_processed, total_keys_for_progress) + + if checkpoint: + checkpoint.mark_complete() + checkpoint.save() + + if skipped: + logger.info("Skipped %d already-quantized vector fields", skipped) + logger.info( + "Quantized %d documents across %d fields", + docs_quantized, + len(datatype_changes), + ) + return docs_quantized + + def _build_benchmark_summary( + self, + plan: MigrationPlan, + target_info: dict, + timings: MigrationTimings, + ) -> MigrationBenchmarkSummary: + source_index_size = float( + plan.source.stats_snapshot.get("vector_index_sz_mb", 0) or 0 + ) + target_index_size = float(target_info.get("vector_index_sz_mb", 0) or 0) + source_num_docs = int(plan.source.stats_snapshot.get("num_docs", 0) or 0) + indexed_per_second = None + indexing_time = timings.initial_indexing_duration_seconds + if indexing_time and indexing_time > 0: + indexed_per_second = round(source_num_docs / indexing_time, 3) + + return MigrationBenchmarkSummary( + documents_indexed_per_second=indexed_per_second, + source_index_size_mb=round(source_index_size, 3), + target_index_size_mb=round(target_index_size, 3), + index_size_delta_mb=round(target_index_size - source_index_size, 3), + ) diff --git a/redisvl/migration/reliability.py b/redisvl/migration/reliability.py new file mode 100644 index 000000000..71f6e672e --- /dev/null +++ b/redisvl/migration/reliability.py @@ -0,0 +1,340 @@ +"""Crash-safe quantization utilities for index migration. + +Provides idempotent dtype detection, checkpointing, BGSAVE safety, +and bounded undo buffering for reliable vector re-encoding. +""" + +import asyncio +import os +import tempfile +import time +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import yaml +from pydantic import BaseModel, Field + +from redisvl.migration.models import DTYPE_BYTES +from redisvl.utils.log import get_logger + +logger = get_logger(__name__) + +# Dtypes that share byte widths and are functionally interchangeable +# for idempotent detection purposes (same byte length per element). +_DTYPE_FAMILY: Dict[str, str] = { + "float64": "8byte", + "float32": "4byte", + "float16": "2byte", + "bfloat16": "2byte", + "int8": "1byte", + "uint8": "1byte", +} + + +def is_same_width_dtype_conversion(source_dtype: str, target_dtype: str) -> bool: + """Return True when two dtypes share byte width but differ in encoding.""" + if source_dtype == target_dtype: + return False + source_family = _DTYPE_FAMILY.get(source_dtype) + target_family = _DTYPE_FAMILY.get(target_dtype) + if source_family is None or target_family is None: + return False + return source_family == target_family + + +# --------------------------------------------------------------------------- +# Idempotent Dtype Detection +# --------------------------------------------------------------------------- + + +def detect_vector_dtype(data: bytes, expected_dims: int) -> Optional[str]: + """Inspect raw vector bytes and infer the storage dtype. + + Uses byte length and expected dimensions to determine which dtype + the vector is currently stored as. Returns the canonical representative + for each byte-width family (float16 for 2-byte, int8 for 1-byte), + since dtypes within a family cannot be distinguished by length alone. + + Args: + data: Raw vector bytes from Redis. + expected_dims: Number of dimensions expected for this vector field. + + Returns: + Detected dtype string (e.g. "float32", "float16", "int8") or None + if the size does not match any known dtype. + """ + if not data or expected_dims <= 0: + return None + + nbytes = len(data) + + # Check each dtype in decreasing element size to avoid ambiguity. + # Only canonical representatives are checked (float16 covers bfloat16, + # int8 covers uint8) since they share byte widths. + for dtype in ("float64", "float32", "float16", "int8"): + if nbytes == expected_dims * DTYPE_BYTES[dtype]: + return dtype + + return None + + +def is_already_quantized( + data: bytes, + expected_dims: int, + source_dtype: str, + target_dtype: str, +) -> bool: + """Check whether a vector has already been converted to the target dtype. + + Uses byte-width families to handle ambiguous dtypes. For example, + if source is float32 and target is float16, a vector detected as + 2-bytes-per-element is considered already quantized (the byte width + shrank from 4 to 2, so conversion already happened). + + However, same-width conversions (e.g. float16 -> bfloat16 or + int8 -> uint8) are NOT skipped because the encoding semantics + differ even though the byte length is identical. We cannot + distinguish these by length, so we must always re-encode. + + Args: + data: Raw vector bytes. + expected_dims: Number of dimensions. + source_dtype: The dtype the vector was originally stored as. + target_dtype: The dtype we want to convert to. + + Returns: + True if the vector already matches the target dtype (skip conversion). + """ + detected = detect_vector_dtype(data, expected_dims) + if detected is None: + return False + + detected_family = _DTYPE_FAMILY.get(detected) + target_family = _DTYPE_FAMILY.get(target_dtype) + source_family = _DTYPE_FAMILY.get(source_dtype) + + # If detected byte-width matches target family, the vector looks converted. + # But if source and target share the same byte-width family (e.g. + # float16 -> bfloat16), we cannot tell whether conversion happened, + # so we must NOT skip -- always re-encode for same-width migrations. + if source_family == target_family: + return False + + return detected_family == target_family + + +# --------------------------------------------------------------------------- +# Quantization Checkpoint +# --------------------------------------------------------------------------- + + +class QuantizationCheckpoint(BaseModel): + """Tracks migration progress for crash-safe resume.""" + + index_name: str + total_keys: int + completed_keys: int = 0 + completed_batches: int = 0 + last_batch_keys: List[str] = Field(default_factory=list) + # Retained for backward compatibility with older checkpoint files. + # New checkpoints rely on completed_keys with deterministic key ordering + # instead of rewriting an ever-growing processed key list on every batch. + processed_keys: List[str] = Field(default_factory=list) + status: str = "in_progress" + checkpoint_path: str = "" + + def record_batch(self, keys: List[str]) -> None: + """Record a successfully processed batch. + + Does not auto-save to disk. Call save() after record_batch() + to persist the checkpoint for crash recovery. + """ + self.completed_keys += len(keys) + self.completed_batches += 1 + self.last_batch_keys = list(keys) + if self.processed_keys: + self.processed_keys.extend(keys) + + def mark_complete(self) -> None: + """Mark the migration as completed.""" + self.status = "completed" + + def save(self) -> None: + """Persist checkpoint to disk atomically. + + Writes to a temporary file first, then renames. This ensures a + crash mid-write does not corrupt the checkpoint file. + """ + path = Path(self.checkpoint_path) + path.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_path = tempfile.mkstemp( + dir=path.parent, suffix=".tmp", prefix=".checkpoint_" + ) + try: + exclude = set() + if not self.processed_keys: + exclude.add("processed_keys") + with os.fdopen(fd, "w") as f: + yaml.safe_dump( + self.model_dump(exclude=exclude), + f, + sort_keys=False, + ) + os.replace(tmp_path, str(path)) + except BaseException: + # Clean up temp file on any failure + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + @classmethod + def load(cls, path: str) -> Optional["QuantizationCheckpoint"]: + """Load a checkpoint from disk. Returns None if file does not exist. + + Always sets checkpoint_path to the path used to load, not the + value stored in the file. This ensures subsequent save() calls + write to the correct location even if the file was moved. + """ + p = Path(path) + if not p.exists(): + return None + with open(p, "r") as f: + data = yaml.safe_load(f) + if not data: + return None + checkpoint = cls.model_validate(data) + if checkpoint.processed_keys and checkpoint.completed_keys < len( + checkpoint.processed_keys + ): + checkpoint.completed_keys = len(checkpoint.processed_keys) + checkpoint.checkpoint_path = str(p) + return checkpoint + + def get_remaining_keys(self, all_keys: List[str]) -> List[str]: + """Return keys that have not yet been processed.""" + if self.processed_keys: + done = set(self.processed_keys) + return [k for k in all_keys if k not in done] + + if self.completed_keys <= 0: + return list(all_keys) + + return all_keys[self.completed_keys :] + + +# --------------------------------------------------------------------------- +# BGSAVE Safety Net +# --------------------------------------------------------------------------- + + +def trigger_bgsave_and_wait( + client: Any, + *, + timeout_seconds: int = 300, + poll_interval: float = 1.0, +) -> bool: + """Trigger a Redis BGSAVE and wait for it to complete. + + If a BGSAVE is already in progress, waits for it instead. + + Args: + client: Sync Redis client. + timeout_seconds: Max seconds to wait for BGSAVE to finish. + poll_interval: Seconds between status polls. + + Returns: + True if BGSAVE completed successfully. + """ + try: + client.bgsave() + except Exception as exc: + if "already in progress" not in str(exc).lower(): + raise + logger.info("BGSAVE already in progress, waiting for it to finish.") + + deadline = time.monotonic() + timeout_seconds + while time.monotonic() < deadline: + info = client.info("persistence") + if isinstance(info, dict) and not info.get("rdb_bgsave_in_progress", 0): + status = info.get("rdb_last_bgsave_status", "ok") + if status != "ok": + logger.warning("BGSAVE completed with status: %s", status) + return False + return True + time.sleep(poll_interval) + + raise TimeoutError(f"BGSAVE did not complete within {timeout_seconds}s") + + +async def async_trigger_bgsave_and_wait( + client: Any, + *, + timeout_seconds: int = 300, + poll_interval: float = 1.0, +) -> bool: + """Async version of trigger_bgsave_and_wait.""" + try: + await client.bgsave() + except Exception as exc: + if "already in progress" not in str(exc).lower(): + raise + logger.info("BGSAVE already in progress, waiting for it to finish.") + + deadline = time.monotonic() + timeout_seconds + while time.monotonic() < deadline: + info = await client.info("persistence") + if isinstance(info, dict) and not info.get("rdb_bgsave_in_progress", 0): + status = info.get("rdb_last_bgsave_status", "ok") + if status != "ok": + logger.warning("BGSAVE completed with status: %s", status) + return False + return True + await asyncio.sleep(poll_interval) + + raise TimeoutError(f"BGSAVE did not complete within {timeout_seconds}s") + + +# --------------------------------------------------------------------------- +# Bounded Undo Buffer +# --------------------------------------------------------------------------- + + +class BatchUndoBuffer: + """Stores original vector values for the current batch to allow rollback. + + Memory-bounded: only holds data for one batch at a time. Call clear() + after each successful batch commit. + """ + + def __init__(self) -> None: + self._entries: List[Tuple[str, str, bytes]] = [] + + @property + def size(self) -> int: + return len(self._entries) + + def store(self, key: str, field: str, original_value: bytes) -> None: + """Record the original value of a field before mutation.""" + self._entries.append((key, field, original_value)) + + def rollback(self, pipe: Any) -> None: + """Restore all stored originals via the given pipeline (sync).""" + if not self._entries: + return + for key, field, value in self._entries: + pipe.hset(key, field, value) + pipe.execute() + + async def async_rollback(self, pipe: Any) -> None: + """Restore all stored originals via the given pipeline (async).""" + if not self._entries: + return + for key, field, value in self._entries: + pipe.hset(key, field, value) + await pipe.execute() + + def clear(self) -> None: + """Discard all stored entries.""" + self._entries.clear() diff --git a/tests/integration/test_field_modifier_ordering_integration.py b/tests/integration/test_field_modifier_ordering_integration.py index b26463df0..b9d609674 100644 --- a/tests/integration/test_field_modifier_ordering_integration.py +++ b/tests/integration/test_field_modifier_ordering_integration.py @@ -399,6 +399,241 @@ def test_indexmissing_enables_ismissing_query(self, client, redis_url, worker_id index.delete(drop=True) +class TestIndexEmptyIntegration: + """Integration tests for INDEXEMPTY functionality.""" + + def test_text_field_index_empty_creates_successfully( + self, client, redis_url, worker_id + ): + """Test that INDEXEMPTY on text field allows index creation.""" + skip_if_search_version_below_for_indexmissing(client) + schema_dict = { + "index": { + "name": f"test_text_empty_{worker_id}", + "prefix": f"textempty_{worker_id}:", + "storage_type": "hash", + }, + "fields": [ + { + "name": "description", + "type": "text", + "attrs": {"index_empty": True}, + } + ], + } + + schema = IndexSchema.from_dict(schema_dict) + index = SearchIndex(schema=schema, redis_url=redis_url) + index.create(overwrite=True) + + # Verify index was created + info = client.execute_command("FT.INFO", f"test_text_empty_{worker_id}") + assert info is not None + + # Create documents with empty and non-empty values + client.hset(f"textempty_{worker_id}:1", "description", "has content") + client.hset(f"textempty_{worker_id}:2", "description", "") + client.hset(f"textempty_{worker_id}:3", "description", "more content") + + # Search should work, empty string doc should be indexed + result = client.execute_command( + "FT.SEARCH", + f"test_text_empty_{worker_id}", + "*", + ) + # All 3 docs should be found + assert result[0] == 3 + + # Cleanup + client.delete( + f"textempty_{worker_id}:1", + f"textempty_{worker_id}:2", + f"textempty_{worker_id}:3", + ) + index.delete(drop=True) + + def test_tag_field_index_empty_creates_successfully( + self, client, redis_url, worker_id + ): + """Test that INDEXEMPTY on tag field allows index creation.""" + skip_if_search_version_below_for_indexmissing(client) + schema_dict = { + "index": { + "name": f"test_tag_empty_{worker_id}", + "prefix": f"tagempty_{worker_id}:", + "storage_type": "hash", + }, + "fields": [ + { + "name": "category", + "type": "tag", + "attrs": {"index_empty": True}, + } + ], + } + + schema = IndexSchema.from_dict(schema_dict) + index = SearchIndex(schema=schema, redis_url=redis_url) + index.create(overwrite=True) + + # Verify index was created + info = client.execute_command("FT.INFO", f"test_tag_empty_{worker_id}") + assert info is not None + + # Create documents with empty and non-empty values + client.hset(f"tagempty_{worker_id}:1", "category", "electronics") + client.hset(f"tagempty_{worker_id}:2", "category", "") + client.hset(f"tagempty_{worker_id}:3", "category", "books") + + # Search should work + result = client.execute_command( + "FT.SEARCH", + f"test_tag_empty_{worker_id}", + "*", + ) + # All 3 docs should be found + assert result[0] == 3 + + # Cleanup + client.delete( + f"tagempty_{worker_id}:1", + f"tagempty_{worker_id}:2", + f"tagempty_{worker_id}:3", + ) + index.delete(drop=True) + + +class TestUnfModifierIntegration: + """Integration tests for UNF (un-normalized form) modifier.""" + + def test_text_field_unf_requires_sortable(self, client, redis_url, worker_id): + """Test that UNF on text field works only when sortable is also True.""" + skip_if_search_version_below_for_indexmissing(client) + schema_dict = { + "index": { + "name": f"test_text_unf_{worker_id}", + "prefix": f"textunf_{worker_id}:", + "storage_type": "hash", + }, + "fields": [ + { + "name": "title", + "type": "text", + "attrs": {"sortable": True, "unf": True}, + } + ], + } + + schema = IndexSchema.from_dict(schema_dict) + index = SearchIndex(schema=schema, redis_url=redis_url) + + # Should create successfully + index.create(overwrite=True) + + info = client.execute_command("FT.INFO", f"test_text_unf_{worker_id}") + assert info is not None + + index.delete(drop=True) + + def test_numeric_field_unf_with_sortable(self, client, redis_url, worker_id): + """Test that UNF on numeric field works when sortable is True.""" + skip_if_search_version_below_for_indexmissing(client) + schema_dict = { + "index": { + "name": f"test_num_unf_{worker_id}", + "prefix": f"numunf_{worker_id}:", + "storage_type": "hash", + }, + "fields": [ + { + "name": "price", + "type": "numeric", + "attrs": {"sortable": True, "unf": True}, + } + ], + } + + schema = IndexSchema.from_dict(schema_dict) + index = SearchIndex(schema=schema, redis_url=redis_url) + + # Should create successfully + index.create(overwrite=True) + + info = client.execute_command("FT.INFO", f"test_num_unf_{worker_id}") + assert info is not None + + index.delete(drop=True) + + +class TestNoIndexModifierIntegration: + """Integration tests for NOINDEX modifier.""" + + def test_noindex_with_sortable_allows_sorting_not_searching( + self, client, redis_url, worker_id + ): + """Test that NOINDEX field can be sorted but not searched.""" + schema_dict = { + "index": { + "name": f"test_noindex_{worker_id}", + "prefix": f"noindex_{worker_id}:", + "storage_type": "hash", + }, + "fields": [ + { + "name": "searchable", + "type": "text", + }, + { + "name": "sort_only", + "type": "numeric", + "attrs": {"sortable": True, "no_index": True}, + }, + ], + } + + schema = IndexSchema.from_dict(schema_dict) + index = SearchIndex(schema=schema, redis_url=redis_url) + index.create(overwrite=True) + + # Add test documents + client.hset( + f"noindex_{worker_id}:1", mapping={"searchable": "hello", "sort_only": 10} + ) + client.hset( + f"noindex_{worker_id}:2", mapping={"searchable": "world", "sort_only": 5} + ) + client.hset( + f"noindex_{worker_id}:3", mapping={"searchable": "test", "sort_only": 15} + ) + + # Sorting by no_index field should work + result = client.execute_command( + "FT.SEARCH", + f"test_noindex_{worker_id}", + "*", + "SORTBY", + "sort_only", + "ASC", + ) + assert result[0] == 3 + + # Filtering by NOINDEX field should return no results + filter_result = client.execute_command( + "FT.SEARCH", + f"test_noindex_{worker_id}", + "@sort_only:[5 10]", + ) + assert filter_result[0] == 0 + + # Cleanup + client.delete( + f"noindex_{worker_id}:1", + f"noindex_{worker_id}:2", + f"noindex_{worker_id}:3", + ) + index.delete(drop=True) + + class TestFieldTypeModifierSupport: """Test that field types only support their documented modifiers.""" diff --git a/tests/integration/test_migration_comprehensive.py b/tests/integration/test_migration_comprehensive.py new file mode 100644 index 000000000..370546c68 --- /dev/null +++ b/tests/integration/test_migration_comprehensive.py @@ -0,0 +1,1369 @@ +""" +Comprehensive integration tests for all 38 supported migration operations. + +This test suite validates migrations against real Redis with a tiered validation approach: +- L1: Execution (plan.supported == True) +- L2: Data Integrity (doc_count_match == True) +- L3: Key Existence (key_sample_exists == True) +- L4: Schema Match (schema_match == True) + +Test Categories: +1. Index-Level (2): rename index, change prefix +2. Field Add (4): text, tag, numeric, geo +3. Field Remove (5): text, tag, numeric, geo, vector +4. Field Rename (5): text, tag, numeric, geo, vector +5. Base Attrs (3): sortable, no_index, index_missing +6. Text Attrs (5): weight, no_stem, phonetic_matcher, index_empty, unf +7. Tag Attrs (3): separator, case_sensitive, index_empty +8. Numeric Attrs (1): unf +9. Vector Attrs (8): algorithm, distance_metric, initial_cap, m, ef_construction, + ef_runtime, epsilon, datatype +10. JSON Storage (2): add field, rename field + +Some tests use L2-only validation due to Redis FT.INFO limitations: +- prefix change (keys renamed), HNSW params, initial_cap, phonetic_matcher, numeric unf + +Run: pytest tests/integration/test_migration_comprehensive.py -v +Spec: nitin_docs/index_migrator/32_integration_test_spec.md +""" + +import uuid +from typing import Any, Dict, List + +import pytest +import yaml + +from redisvl.index import SearchIndex +from redisvl.migration import MigrationExecutor, MigrationPlanner +from redisvl.migration.utils import load_migration_plan, schemas_equal +from redisvl.redis.utils import array_to_buffer + +# ============================================================================== +# Fixtures +# ============================================================================== + + +@pytest.fixture +def unique_ids(worker_id): + """Generate unique identifiers for test isolation.""" + uid = str(uuid.uuid4())[:8] + return { + "name": f"mig_test_{worker_id}_{uid}", + "prefix": f"mig_test:{worker_id}:{uid}", + } + + +@pytest.fixture +def base_schema(unique_ids): + """Base schema with all field types for testing.""" + return { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [ + {"name": "doc_id", "type": "tag"}, + {"name": "title", "type": "text"}, + {"name": "description", "type": "text"}, + {"name": "category", "type": "tag"}, + {"name": "price", "type": "numeric"}, + {"name": "location", "type": "geo"}, + { + "name": "embedding", + "type": "vector", + "attrs": { + "algorithm": "hnsw", + "dims": 4, + "distance_metric": "cosine", + "datatype": "float32", + }, + }, + ], + } + + +@pytest.fixture +def sample_docs(): + """Sample documents with all field types.""" + return [ + { + "doc_id": "1", + "title": "Alpha Product", + "description": "First product description", + "category": "electronics", + "price": 99.99, + "location": "37.7749,-122.4194", # SF coordinates (lat,lon) + "embedding": array_to_buffer([0.1, 0.2, 0.3, 0.4], "float32"), + }, + { + "doc_id": "2", + "title": "Beta Service", + "description": "Second service description", + "category": "software", + "price": 149.99, + "location": "40.7484,-73.9857", # NYC coordinates (lat,lon) + "embedding": array_to_buffer([0.2, 0.3, 0.4, 0.5], "float32"), + }, + { + "doc_id": "3", + "title": "Gamma Item", + "description": "", # Empty for index_empty tests + "category": "", # Empty for index_empty tests + "price": 0, + "location": "34.0522,-118.2437", # LA coordinates (lat,lon) + "embedding": array_to_buffer([0.3, 0.4, 0.5, 0.6], "float32"), + }, + ] + + +def run_migration( + redis_url: str, + tmp_path, + index_name: str, + patch: Dict[str, Any], +) -> Dict[str, Any]: + """Helper to run a migration and return results.""" + patch_path = tmp_path / "patch.yaml" + patch_path.write_text(yaml.safe_dump(patch, sort_keys=False)) + + plan_path = tmp_path / "plan.yaml" + planner = MigrationPlanner() + plan = planner.create_plan( + index_name, + redis_url=redis_url, + schema_patch_path=str(patch_path), + ) + planner.write_plan(plan, str(plan_path)) + + executor = MigrationExecutor() + report = executor.apply( + load_migration_plan(str(plan_path)), + redis_url=redis_url, + ) + + return { + "plan": plan, + "report": report, + "supported": plan.diff_classification.supported, + "succeeded": report.result == "succeeded", + # Additional validation fields for granular checks + "doc_count_match": report.validation.doc_count_match, + "schema_match": report.validation.schema_match, + "key_sample_exists": report.validation.key_sample_exists, + "validation_errors": report.validation.errors, + } + + +def setup_index(redis_url: str, schema: Dict, docs: List[Dict]) -> SearchIndex: + """Create index and load documents.""" + index = SearchIndex.from_dict(schema, redis_url=redis_url) + index.create(overwrite=True) + index.load(docs, id_field="doc_id") + return index + + +def cleanup_index(index: SearchIndex): + """Clean up index after test.""" + try: + index.delete(drop=True) + except Exception: + pass + + +# ============================================================================== +# 1. Index-Level Changes +# ============================================================================== + + +class TestIndexLevelChanges: + """Tests for index-level migration operations.""" + + def test_rename_index(self, redis_url, tmp_path, base_schema, sample_docs): + """Test renaming an index.""" + index = setup_index(redis_url, base_schema, sample_docs) + old_name = base_schema["index"]["name"] + new_name = f"{old_name}_renamed" + + try: + result = run_migration( + redis_url, + tmp_path, + old_name, + {"version": 1, "changes": {"index": {"name": new_name}}}, + ) + + assert result["supported"], "Rename index should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + + # Verify new index exists + live_index = SearchIndex.from_existing(new_name, redis_url=redis_url) + assert live_index.schema.index.name == new_name + cleanup_index(live_index) + except Exception: + cleanup_index(index) + raise + + def test_change_prefix(self, redis_url, tmp_path, base_schema, sample_docs): + """Test changing the key prefix (requires key renames).""" + index = setup_index(redis_url, base_schema, sample_docs) + old_prefix = base_schema["index"]["prefix"] + new_prefix = f"{old_prefix}_newprefix" + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + {"version": 1, "changes": {"index": {"prefix": new_prefix}}}, + ) + + assert result["supported"], "Change prefix should be supported" + # Validation now handles prefix change by transforming key_sample to new prefix + assert result["succeeded"], f"Migration failed: {result['report']}" + + # Verify keys were renamed + live_index = SearchIndex.from_existing( + base_schema["index"]["name"], redis_url=redis_url + ) + assert live_index.schema.index.prefix == new_prefix + cleanup_index(live_index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 2. Field Operations - Add Fields +# ============================================================================== + + +class TestAddFields: + """Tests for adding fields of different types.""" + + def test_add_text_field(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding a text field.""" + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [{"name": "doc_id", "type": "tag"}], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "add_fields": [{"name": "title", "type": "text"}], + }, + }, + ) + + assert result["supported"], "Add text field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_tag_field(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding a tag field.""" + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [{"name": "doc_id", "type": "tag"}], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "add_fields": [ + { + "name": "category", + "type": "tag", + "attrs": {"separator": ","}, + } + ], + }, + }, + ) + + assert result["supported"], "Add tag field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_numeric_field(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding a numeric field.""" + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [{"name": "doc_id", "type": "tag"}], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "add_fields": [{"name": "price", "type": "numeric"}], + }, + }, + ) + + assert result["supported"], "Add numeric field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_geo_field(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding a geo field.""" + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [{"name": "doc_id", "type": "tag"}], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "add_fields": [{"name": "location", "type": "geo"}], + }, + }, + ) + + assert result["supported"], "Add geo field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 2. Field Operations - Remove Fields +# ============================================================================== + + +class TestRemoveFields: + """Tests for removing fields of different types.""" + + def test_remove_text_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test removing a text field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + {"version": 1, "changes": {"remove_fields": ["description"]}}, + ) + + assert result["supported"], "Remove text field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_remove_tag_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test removing a tag field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + {"version": 1, "changes": {"remove_fields": ["category"]}}, + ) + + assert result["supported"], "Remove tag field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_remove_numeric_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test removing a numeric field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + {"version": 1, "changes": {"remove_fields": ["price"]}}, + ) + + assert result["supported"], "Remove numeric field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_remove_geo_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test removing a geo field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + {"version": 1, "changes": {"remove_fields": ["location"]}}, + ) + + assert result["supported"], "Remove geo field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_remove_vector_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test removing a vector field (allowed but warned).""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + {"version": 1, "changes": {"remove_fields": ["embedding"]}}, + ) + + assert result["supported"], "Remove vector field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 2. Field Operations - Rename Fields +# ============================================================================== + + +class TestRenameFields: + """Tests for renaming fields of different types.""" + + def test_rename_text_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test renaming a text field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "rename_fields": [ + {"old_name": "title", "new_name": "headline"} + ], + }, + }, + ) + + assert result["supported"], "Rename text field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_rename_tag_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test renaming a tag field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "rename_fields": [{"old_name": "category", "new_name": "tags"}], + }, + }, + ) + + assert result["supported"], "Rename tag field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_rename_numeric_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test renaming a numeric field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "rename_fields": [{"old_name": "price", "new_name": "cost"}], + }, + }, + ) + + assert result["supported"], "Rename numeric field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_rename_geo_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test renaming a geo field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "rename_fields": [ + {"old_name": "location", "new_name": "coordinates"} + ], + }, + }, + ) + + assert result["supported"], "Rename geo field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_rename_vector_field(self, redis_url, tmp_path, base_schema, sample_docs): + """Test renaming a vector field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "rename_fields": [ + {"old_name": "embedding", "new_name": "vector"} + ], + }, + }, + ) + + assert result["supported"], "Rename vector field should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 3. Base Attributes (All Non-Vector Types) +# ============================================================================== + + +class TestBaseAttributes: + """Tests for base attributes: sortable, no_index, index_missing.""" + + def test_add_sortable(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding sortable attribute to a field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "title", "attrs": {"sortable": True}} + ], + }, + }, + ) + + assert result["supported"], "Add sortable should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_no_index(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding no_index attribute (store only, no searching).""" + # Need a sortable field first + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [ + {"name": "doc_id", "type": "tag"}, + {"name": "title", "type": "text", "attrs": {"sortable": True}}, + ], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "title", "attrs": {"no_index": True}} + ], + }, + }, + ) + + assert result["supported"], "Add no_index should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_index_missing(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding index_missing attribute.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "title", "attrs": {"index_missing": True}} + ], + }, + }, + ) + + assert result["supported"], "Add index_missing should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 4. Text Field Attributes +# ============================================================================== + + +class TestTextAttributes: + """Tests for text field attributes: weight, no_stem, phonetic_matcher, etc.""" + + def test_change_weight(self, redis_url, tmp_path, base_schema, sample_docs): + """Test changing text field weight.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [{"name": "title", "attrs": {"weight": 2.0}}], + }, + }, + ) + + assert result["supported"], "Change weight should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_no_stem(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding no_stem attribute.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "title", "attrs": {"no_stem": True}} + ], + }, + }, + ) + + assert result["supported"], "Add no_stem should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_phonetic_matcher(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding phonetic_matcher attribute.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "title", "attrs": {"phonetic_matcher": "dm:en"}} + ], + }, + }, + ) + + assert result["supported"], "Add phonetic_matcher should be supported" + # phonetic_matcher is stripped from schema comparison (FT.INFO doesn't return it) + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_index_empty_text(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding index_empty to text field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "title", "attrs": {"index_empty": True}} + ], + }, + }, + ) + + assert result["supported"], "Add index_empty should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_unf_text(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding unf (un-normalized form) to text field.""" + # UNF requires sortable + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [ + {"name": "doc_id", "type": "tag"}, + {"name": "title", "type": "text", "attrs": {"sortable": True}}, + ], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "update_fields": [{"name": "title", "attrs": {"unf": True}}], + }, + }, + ) + + assert result["supported"], "Add UNF should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 5. Tag Field Attributes +# ============================================================================== + + +class TestTagAttributes: + """Tests for tag field attributes: separator, case_sensitive, withsuffixtrie, etc.""" + + def test_change_separator(self, redis_url, tmp_path, base_schema, sample_docs): + """Test changing tag separator.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "category", "attrs": {"separator": "|"}} + ], + }, + }, + ) + + assert result["supported"], "Change separator should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_case_sensitive(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding case_sensitive attribute.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "category", "attrs": {"case_sensitive": True}} + ], + }, + }, + ) + + assert result["supported"], "Add case_sensitive should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_add_index_empty_tag(self, redis_url, tmp_path, base_schema, sample_docs): + """Test adding index_empty to tag field.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "category", "attrs": {"index_empty": True}} + ], + }, + }, + ) + + assert result["supported"], "Add index_empty should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 6. Numeric Field Attributes +# ============================================================================== + + +class TestNumericAttributes: + """Tests for numeric field attributes: unf.""" + + def test_add_unf_numeric(self, redis_url, tmp_path, unique_ids, sample_docs): + """Test adding unf (un-normalized form) to numeric field.""" + # UNF requires sortable + schema = { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "hash", + }, + "fields": [ + {"name": "doc_id", "type": "tag"}, + {"name": "price", "type": "numeric", "attrs": {"sortable": True}}, + ], + } + index = setup_index(redis_url, schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "update_fields": [{"name": "price", "attrs": {"unf": True}}], + }, + }, + ) + + assert result["supported"], "Add UNF to numeric should be supported" + # Redis auto-applies UNF with SORTABLE on numeric fields, so both should match + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 7. Vector Field Attributes (Index-Only Changes) +# ============================================================================== + + +class TestVectorAttributes: + """Tests for vector field attributes: algorithm, distance_metric, HNSW params, etc.""" + + def test_change_algorithm_hnsw_to_flat( + self, redis_url, tmp_path, base_schema, sample_docs + ): + """Test changing vector algorithm from HNSW to FLAT.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "embedding", "attrs": {"algorithm": "flat"}} + ], + }, + }, + ) + + assert result["supported"], "Change algorithm should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_change_distance_metric( + self, redis_url, tmp_path, base_schema, sample_docs + ): + """Test changing distance metric.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "embedding", "attrs": {"distance_metric": "l2"}} + ], + }, + }, + ) + + assert result["supported"], "Change distance_metric should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_change_initial_cap(self, redis_url, tmp_path, base_schema, sample_docs): + """Test changing initial_cap.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "embedding", "attrs": {"initial_cap": 1000}} + ], + }, + }, + ) + + assert result["supported"], "Change initial_cap should be supported" + # Redis may not return initial_cap accurately in FT.INFO. + # Check doc_count_match to confirm the migration executed successfully. + assert result[ + "doc_count_match" + ], f"Migration failed: {result['validation_errors']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_change_hnsw_m(self, redis_url, tmp_path, base_schema, sample_docs): + """Test changing HNSW m parameter.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [{"name": "embedding", "attrs": {"m": 32}}], + }, + }, + ) + + assert result["supported"], "Change HNSW m should be supported" + # Redis may not return m accurately in FT.INFO. + # Check doc_count_match to confirm the migration executed successfully. + assert result[ + "doc_count_match" + ], f"Migration failed: {result['validation_errors']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_change_hnsw_ef_construction( + self, redis_url, tmp_path, base_schema, sample_docs + ): + """Test changing HNSW ef_construction parameter.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "embedding", "attrs": {"ef_construction": 400}} + ], + }, + }, + ) + + assert result["supported"], "Change ef_construction should be supported" + # Redis may not return ef_construction accurately in FT.INFO. + # Check doc_count_match to confirm the migration executed successfully. + assert result[ + "doc_count_match" + ], f"Migration failed: {result['validation_errors']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_change_hnsw_ef_runtime( + self, redis_url, tmp_path, base_schema, sample_docs + ): + """Test changing HNSW ef_runtime parameter.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "embedding", "attrs": {"ef_runtime": 20}} + ], + }, + }, + ) + + assert result["supported"], "Change ef_runtime should be supported" + # Redis may not return ef_runtime accurately in FT.INFO (often returns defaults). + # Check doc_count_match to confirm the migration executed successfully. + assert result[ + "doc_count_match" + ], f"Migration failed: {result['validation_errors']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_change_hnsw_epsilon(self, redis_url, tmp_path, base_schema, sample_docs): + """Test changing HNSW epsilon parameter.""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "embedding", "attrs": {"epsilon": 0.05}} + ], + }, + }, + ) + + assert result["supported"], "Change epsilon should be supported" + # Redis may not return epsilon accurately in FT.INFO (often returns defaults). + # Check doc_count_match to confirm the migration executed successfully. + assert result[ + "doc_count_match" + ], f"Migration failed: {result['validation_errors']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_change_datatype_quantization( + self, redis_url, tmp_path, base_schema, sample_docs + ): + """Test changing vector datatype (quantization).""" + index = setup_index(redis_url, base_schema, sample_docs) + + try: + result = run_migration( + redis_url, + tmp_path, + base_schema["index"]["name"], + { + "version": 1, + "changes": { + "update_fields": [ + {"name": "embedding", "attrs": {"datatype": "float16"}} + ], + }, + }, + ) + + assert result["supported"], "Change datatype should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + +# ============================================================================== +# 8. JSON Storage Type Tests +# ============================================================================== + + +class TestJsonStorageType: + """Tests for migrations with JSON storage type.""" + + @pytest.fixture + def json_schema(self, unique_ids): + """Schema using JSON storage type.""" + return { + "index": { + "name": unique_ids["name"], + "prefix": unique_ids["prefix"], + "storage_type": "json", + }, + "fields": [ + {"name": "doc_id", "type": "tag", "path": "$.doc_id"}, + {"name": "title", "type": "text", "path": "$.title"}, + {"name": "category", "type": "tag", "path": "$.category"}, + {"name": "price", "type": "numeric", "path": "$.price"}, + { + "name": "embedding", + "type": "vector", + "path": "$.embedding", + "attrs": { + "algorithm": "hnsw", + "dims": 4, + "distance_metric": "cosine", + "datatype": "float32", + }, + }, + ], + } + + @pytest.fixture + def json_sample_docs(self): + """Sample JSON documents (as dicts for RedisJSON).""" + return [ + { + "doc_id": "1", + "title": "Alpha Product", + "category": "electronics", + "price": 99.99, + "embedding": [0.1, 0.2, 0.3, 0.4], + }, + { + "doc_id": "2", + "title": "Beta Service", + "category": "software", + "price": 149.99, + "embedding": [0.2, 0.3, 0.4, 0.5], + }, + ] + + def test_json_add_field( + self, redis_url, tmp_path, unique_ids, json_schema, json_sample_docs, client + ): + """Test adding a field with JSON storage.""" + index = SearchIndex.from_dict(json_schema, redis_url=redis_url) + index.create(overwrite=True) + + # Load JSON docs directly + for i, doc in enumerate(json_sample_docs): + key = f"{unique_ids['prefix']}:{i+1}" + client.json().set(key, "$", json_sample_docs[i]) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "add_fields": [ + { + "name": "status", + "type": "tag", + "path": "$.status", + } + ], + }, + }, + ) + + assert result["supported"], "Add field with JSON should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise + + def test_json_rename_field( + self, redis_url, tmp_path, unique_ids, json_schema, json_sample_docs, client + ): + """Test renaming a field with JSON storage.""" + index = SearchIndex.from_dict(json_schema, redis_url=redis_url) + index.create(overwrite=True) + + # Load JSON docs + for i, doc in enumerate(json_sample_docs): + key = f"{unique_ids['prefix']}:{i+1}" + client.json().set(key, "$", doc) + + try: + result = run_migration( + redis_url, + tmp_path, + unique_ids["name"], + { + "version": 1, + "changes": { + "rename_fields": [ + {"old_name": "title", "new_name": "headline"} + ], + }, + }, + ) + + assert result["supported"], "Rename field with JSON should be supported" + assert result["succeeded"], f"Migration failed: {result['report']}" + cleanup_index(index) + except Exception: + cleanup_index(index) + raise diff --git a/tests/integration/test_migration_routes.py b/tests/integration/test_migration_routes.py new file mode 100644 index 000000000..5d897d010 --- /dev/null +++ b/tests/integration/test_migration_routes.py @@ -0,0 +1,331 @@ +""" +Integration tests for migration routes. + +Tests the full Apply+Validate flow for all supported migration operations. +Requires Redis 8.0+ for INT8/UINT8 datatype tests. +""" + +import uuid + +import pytest +from redis import Redis + +from redisvl.index import SearchIndex +from redisvl.migration import MigrationExecutor, MigrationPlanner +from redisvl.migration.models import FieldUpdate, SchemaPatch +from tests.conftest import skip_if_redis_version_below + + +def create_source_index(redis_url, worker_id, source_attrs): + """Helper to create a source index with specified vector attributes.""" + unique_id = str(uuid.uuid4())[:8] + index_name = f"mig_route_{worker_id}_{unique_id}" + prefix = f"mig_route:{worker_id}:{unique_id}" + + base_attrs = { + "dims": 128, + "datatype": "float32", + "distance_metric": "cosine", + "algorithm": "flat", + } + base_attrs.update(source_attrs) + + index = SearchIndex.from_dict( + { + "index": {"name": index_name, "prefix": prefix, "storage_type": "json"}, + "fields": [ + {"name": "title", "type": "text", "path": "$.title"}, + { + "name": "embedding", + "type": "vector", + "path": "$.embedding", + "attrs": base_attrs, + }, + ], + }, + redis_url=redis_url, + ) + index.create(overwrite=True) + return index, index_name + + +def run_migration(redis_url, index_name, patch_attrs): + """Helper to run a migration with the given patch attributes.""" + patch = SchemaPatch( + version=1, + changes={ + "add_fields": [], + "remove_fields": [], + "update_fields": [FieldUpdate(name="embedding", attrs=patch_attrs)], + "rename_fields": [], + "index": {}, + }, + ) + + planner = MigrationPlanner() + plan = planner.create_plan_from_patch( + index_name, schema_patch=patch, redis_url=redis_url + ) + + executor = MigrationExecutor() + report = executor.apply(plan, redis_url=redis_url) + return report, plan + + +class TestAlgorithmChanges: + """Test algorithm migration routes.""" + + def test_hnsw_to_flat(self, redis_url, worker_id): + index, index_name = create_source_index( + redis_url, worker_id, {"algorithm": "hnsw"} + ) + try: + report, _ = run_migration(redis_url, index_name, {"algorithm": "flat"}) + assert report.result == "succeeded" + assert report.validation.schema_match is True + + live = SearchIndex.from_existing(index_name, redis_url=redis_url) + assert str(live.schema.fields["embedding"].attrs.algorithm).endswith("FLAT") + finally: + index.delete(drop=True) + + def test_flat_to_hnsw_with_params(self, redis_url, worker_id): + index, index_name = create_source_index( + redis_url, worker_id, {"algorithm": "flat"} + ) + try: + report, _ = run_migration( + redis_url, + index_name, + {"algorithm": "hnsw", "m": 32, "ef_construction": 200}, + ) + assert report.result == "succeeded" + assert report.validation.schema_match is True + + live = SearchIndex.from_existing(index_name, redis_url=redis_url) + attrs = live.schema.fields["embedding"].attrs + assert str(attrs.algorithm).endswith("HNSW") + assert attrs.m == 32 + assert attrs.ef_construction == 200 + finally: + index.delete(drop=True) + + +class TestDatatypeChanges: + """Test datatype migration routes.""" + + @pytest.mark.parametrize( + "source_dtype,target_dtype", + [ + ("float32", "float16"), + ("float32", "bfloat16"), + ("float16", "float32"), + ], + ) + def test_flat_datatype_change( + self, redis_url, worker_id, source_dtype, target_dtype + ): + index, index_name = create_source_index( + redis_url, worker_id, {"algorithm": "flat", "datatype": source_dtype} + ) + try: + report, _ = run_migration(redis_url, index_name, {"datatype": target_dtype}) + assert report.result == "succeeded" + assert report.validation.schema_match is True + finally: + index.delete(drop=True) + + @pytest.mark.parametrize("target_dtype", ["int8", "uint8"]) + def test_flat_quantized_datatype(self, redis_url, worker_id, target_dtype): + """Test INT8/UINT8 datatypes (requires Redis 8.0+).""" + client = Redis.from_url(redis_url) + skip_if_redis_version_below(client, "8.0.0", "INT8/UINT8 requires Redis 8.0+") + index, index_name = create_source_index( + redis_url, worker_id, {"algorithm": "flat"} + ) + try: + report, _ = run_migration(redis_url, index_name, {"datatype": target_dtype}) + assert report.result == "succeeded" + assert report.validation.schema_match is True + finally: + index.delete(drop=True) + + @pytest.mark.parametrize( + "source_dtype,target_dtype", + [ + ("float32", "float16"), + ("float32", "bfloat16"), + ], + ) + def test_hnsw_datatype_change( + self, redis_url, worker_id, source_dtype, target_dtype + ): + index, index_name = create_source_index( + redis_url, worker_id, {"algorithm": "hnsw", "datatype": source_dtype} + ) + try: + report, _ = run_migration(redis_url, index_name, {"datatype": target_dtype}) + assert report.result == "succeeded" + assert report.validation.schema_match is True + finally: + index.delete(drop=True) + + @pytest.mark.parametrize("target_dtype", ["int8", "uint8"]) + def test_hnsw_quantized_datatype(self, redis_url, worker_id, target_dtype): + """Test INT8/UINT8 datatypes with HNSW (requires Redis 8.0+).""" + client = Redis.from_url(redis_url) + skip_if_redis_version_below(client, "8.0.0", "INT8/UINT8 requires Redis 8.0+") + index, index_name = create_source_index( + redis_url, worker_id, {"algorithm": "hnsw"} + ) + try: + report, _ = run_migration(redis_url, index_name, {"datatype": target_dtype}) + assert report.result == "succeeded" + assert report.validation.schema_match is True + finally: + index.delete(drop=True) + + +class TestDistanceMetricChanges: + """Test distance metric migration routes.""" + + @pytest.mark.parametrize( + "source_metric,target_metric", + [ + ("cosine", "l2"), + ("cosine", "ip"), + ("l2", "cosine"), + ("ip", "l2"), + ], + ) + def test_distance_metric_change( + self, redis_url, worker_id, source_metric, target_metric + ): + index, index_name = create_source_index( + redis_url, + worker_id, + {"algorithm": "flat", "distance_metric": source_metric}, + ) + try: + report, _ = run_migration( + redis_url, index_name, {"distance_metric": target_metric} + ) + assert report.result == "succeeded" + assert report.validation.schema_match is True + finally: + index.delete(drop=True) + + +class TestHNSWTuningParameters: + """Test HNSW parameter tuning routes.""" + + def test_hnsw_m_parameter(self, redis_url, worker_id): + index, index_name = create_source_index( + redis_url, worker_id, {"algorithm": "hnsw"} + ) + try: + report, _ = run_migration(redis_url, index_name, {"m": 64}) + assert report.result == "succeeded" + assert report.validation.schema_match is True + + live = SearchIndex.from_existing(index_name, redis_url=redis_url) + assert live.schema.fields["embedding"].attrs.m == 64 + finally: + index.delete(drop=True) + + def test_hnsw_ef_construction_parameter(self, redis_url, worker_id): + index, index_name = create_source_index( + redis_url, worker_id, {"algorithm": "hnsw"} + ) + try: + report, _ = run_migration(redis_url, index_name, {"ef_construction": 500}) + assert report.result == "succeeded" + assert report.validation.schema_match is True + + live = SearchIndex.from_existing(index_name, redis_url=redis_url) + assert live.schema.fields["embedding"].attrs.ef_construction == 500 + finally: + index.delete(drop=True) + + def test_hnsw_ef_runtime_parameter(self, redis_url, worker_id): + index, index_name = create_source_index( + redis_url, worker_id, {"algorithm": "hnsw"} + ) + try: + report, _ = run_migration(redis_url, index_name, {"ef_runtime": 50}) + assert report.result == "succeeded" + assert report.validation.schema_match is True + finally: + index.delete(drop=True) + + def test_hnsw_epsilon_parameter(self, redis_url, worker_id): + index, index_name = create_source_index( + redis_url, worker_id, {"algorithm": "hnsw"} + ) + try: + report, _ = run_migration(redis_url, index_name, {"epsilon": 0.1}) + assert report.result == "succeeded" + assert report.validation.schema_match is True + finally: + index.delete(drop=True) + + def test_hnsw_all_params_combined(self, redis_url, worker_id): + index, index_name = create_source_index( + redis_url, worker_id, {"algorithm": "hnsw"} + ) + try: + report, _ = run_migration( + redis_url, + index_name, + {"m": 48, "ef_construction": 300, "ef_runtime": 75, "epsilon": 0.05}, + ) + assert report.result == "succeeded" + assert report.validation.schema_match is True + + live = SearchIndex.from_existing(index_name, redis_url=redis_url) + attrs = live.schema.fields["embedding"].attrs + assert attrs.m == 48 + assert attrs.ef_construction == 300 + finally: + index.delete(drop=True) + + +class TestCombinedChanges: + """Test combined migration routes (multiple changes at once).""" + + def test_flat_to_hnsw_with_datatype_and_metric(self, redis_url, worker_id): + index, index_name = create_source_index( + redis_url, worker_id, {"algorithm": "flat"} + ) + try: + report, _ = run_migration( + redis_url, + index_name, + {"algorithm": "hnsw", "datatype": "float16", "distance_metric": "l2"}, + ) + assert report.result == "succeeded" + assert report.validation.schema_match is True + + live = SearchIndex.from_existing(index_name, redis_url=redis_url) + attrs = live.schema.fields["embedding"].attrs + assert str(attrs.algorithm).endswith("HNSW") + finally: + index.delete(drop=True) + + def test_flat_to_hnsw_with_int8(self, redis_url, worker_id): + """Combined algorithm + quantized datatype (requires Redis 8.0+).""" + client = Redis.from_url(redis_url) + skip_if_redis_version_below(client, "8.0.0", "INT8 requires Redis 8.0+") + index, index_name = create_source_index( + redis_url, worker_id, {"algorithm": "flat"} + ) + try: + report, _ = run_migration( + redis_url, + index_name, + {"algorithm": "hnsw", "datatype": "int8"}, + ) + assert report.result == "succeeded" + assert report.validation.schema_match is True + finally: + index.delete(drop=True) diff --git a/tests/integration/test_migration_v1.py b/tests/integration/test_migration_v1.py new file mode 100644 index 000000000..88720cb94 --- /dev/null +++ b/tests/integration/test_migration_v1.py @@ -0,0 +1,129 @@ +import uuid + +import yaml + +from redisvl.index import SearchIndex +from redisvl.migration import MigrationExecutor, MigrationPlanner, MigrationValidator +from redisvl.migration.utils import load_migration_plan, schemas_equal +from redisvl.redis.utils import array_to_buffer + + +def test_drop_recreate_plan_apply_validate_flow(redis_url, worker_id, tmp_path): + unique_id = str(uuid.uuid4())[:8] + index_name = f"migration_v1_{worker_id}_{unique_id}" + prefix = f"migration_v1:{worker_id}:{unique_id}" + + source_index = SearchIndex.from_dict( + { + "index": { + "name": index_name, + "prefix": prefix, + "storage_type": "hash", + }, + "fields": [ + {"name": "doc_id", "type": "tag"}, + {"name": "title", "type": "text"}, + {"name": "price", "type": "numeric"}, + { + "name": "embedding", + "type": "vector", + "attrs": { + "algorithm": "hnsw", + "dims": 3, + "distance_metric": "cosine", + "datatype": "float32", + }, + }, + ], + }, + redis_url=redis_url, + ) + + docs = [ + { + "doc_id": "1", + "title": "alpha", + "price": 1, + "category": "news", + "embedding": array_to_buffer([0.1, 0.2, 0.3], "float32"), + }, + { + "doc_id": "2", + "title": "beta", + "price": 2, + "category": "sports", + "embedding": array_to_buffer([0.2, 0.1, 0.4], "float32"), + }, + ] + + source_index.create(overwrite=True) + source_index.load(docs, id_field="doc_id") + + patch_path = tmp_path / "schema_patch.yaml" + patch_path.write_text( + yaml.safe_dump( + { + "version": 1, + "changes": { + "add_fields": [ + { + "name": "category", + "type": "tag", + "attrs": {"separator": ","}, + } + ], + "remove_fields": ["price"], + "update_fields": [{"name": "title", "attrs": {"sortable": True}}], + }, + }, + sort_keys=False, + ) + ) + + plan_path = tmp_path / "migration_plan.yaml" + planner = MigrationPlanner() + plan = planner.create_plan( + index_name, + redis_url=redis_url, + schema_patch_path=str(patch_path), + ) + assert plan.diff_classification.supported is True + planner.write_plan(plan, str(plan_path)) + + query_check_path = tmp_path / "query_checks.yaml" + query_check_path.write_text( + yaml.safe_dump({"fetch_ids": ["1", "2"]}, sort_keys=False) + ) + + executor = MigrationExecutor() + report = executor.apply( + load_migration_plan(str(plan_path)), + redis_url=redis_url, + query_check_file=str(query_check_path), + ) + + try: + assert report.result == "succeeded" + assert report.validation.schema_match is True + assert report.validation.doc_count_match is True + assert report.validation.key_sample_exists is True + assert report.validation.indexing_failures_delta == 0 + assert not report.validation.errors + assert report.benchmark_summary.documents_indexed_per_second is not None + + live_index = SearchIndex.from_existing(index_name, redis_url=redis_url) + assert schemas_equal(live_index.schema.to_dict(), plan.merged_target_schema) + + validator = MigrationValidator() + validation, _target_info, _duration = validator.validate( + load_migration_plan(str(plan_path)), + redis_url=redis_url, + query_check_file=str(query_check_path), + ) + assert validation.schema_match is True + assert validation.doc_count_match is True + assert validation.key_sample_exists is True + assert not validation.errors + finally: + live_index = SearchIndex.from_existing(index_name, redis_url=redis_url) + live_index.delete(drop=True)