diff --git a/.gitignore b/.gitignore index cd8005815..274ab5e13 100644 --- a/.gitignore +++ b/.gitignore @@ -234,3 +234,4 @@ tests/data # Local working directory (personal scripts, docs, tools) local/ +local_docs/ diff --git a/CLAUDE.md b/CLAUDE.md index 09ab66439..9f22e9b93 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -48,6 +48,9 @@ index = SearchIndex(schema, redis_url="redis://localhost:6379") token.strip().strip(",").replace(""", "").replace(""", "").lower() ``` +### Protected Directories +**CRITICAL**: NEVER delete the `local_docs/` directory or any files within it. + ### Git Operations **CRITICAL**: NEVER use `git push` or attempt to push to remote repositories. The user will handle all git push operations. diff --git a/docs/api/cli.rst b/docs/api/cli.rst new file mode 100644 index 000000000..d13ff5c9b --- /dev/null +++ b/docs/api/cli.rst @@ -0,0 +1,614 @@ +********************** +Command Line Interface +********************** + +RedisVL provides a command line interface (CLI) called ``rvl`` for managing vector search indices. The CLI enables you to create, inspect, and delete indices directly from your terminal without writing Python code. + +Installation +============ + +The ``rvl`` command is included when you install RedisVL. + +.. code-block:: bash + + pip install redisvl + +Verify the installation by running: + +.. code-block:: bash + + rvl version + +Connection Configuration +======================== + +The CLI connects to Redis using the following resolution order: + +1. The ``REDIS_URL`` environment variable, if set +2. Explicit connection flags (``--host``, ``--port``, ``--url``) +3. Default values (``localhost:6379``) + +**Connection Flags** + +All commands that interact with Redis accept these optional flags: + +.. list-table:: + :widths: 20 15 50 15 + :header-rows: 1 + + * - Flag + - Type + - Description + - Default + * - ``-u``, ``--url`` + - string + - Full Redis URL (e.g., ``redis://localhost:6379``) + - None + * - ``--host`` + - string + - Redis server hostname + - ``localhost`` + * - ``-p``, ``--port`` + - integer + - Redis server port + - ``6379`` + * - ``--user`` + - string + - Redis username for authentication + - ``default`` + * - ``-a``, ``--password`` + - string + - Redis password for authentication + - Empty + * - ``--ssl`` + - flag + - Enable SSL/TLS encryption + - Disabled + +**Examples** + +Connect using environment variable: + +.. code-block:: bash + + export REDIS_URL="redis://localhost:6379" + rvl index listall + +Connect with explicit host and port: + +.. code-block:: bash + + rvl index listall --host myredis.example.com --port 6380 + +Connect with authentication and SSL: + +.. code-block:: bash + + rvl index listall --user admin --password secret --ssl + +Getting Help +============ + +All commands support the ``-h`` and ``--help`` flags to display usage information. + +.. list-table:: + :widths: 25 75 + :header-rows: 1 + + * - Flag + - Description + * - ``-h``, ``--help`` + - Display usage information for the command + +**Examples** + +.. code-block:: bash + + # Display top-level help + rvl --help + + # Display help for a command group + rvl index --help + + # Display help for a specific subcommand + rvl index create --help + +Running ``rvl`` without any arguments also displays the top-level help message. + +.. tip:: + + For a hands-on tutorial with practical examples, see the :doc:`/user_guide/cli`. + +Commands +======== + +rvl version +----------- + +Display the installed RedisVL version. + +**Syntax** + +.. code-block:: bash + + rvl version [OPTIONS] + +**Options** + +.. list-table:: + :widths: 25 75 + :header-rows: 1 + + * - Option + - Description + * - ``-s``, ``--short`` + - Print only the version number without additional formatting + +**Examples** + +.. code-block:: bash + + # Full version output + rvl version + + # Version number only + rvl version --short + +rvl index +--------- + +Manage vector search indices. This command group provides subcommands for creating, inspecting, listing, and removing indices. + +**Syntax** + +.. code-block:: bash + + rvl index [OPTIONS] + +**Subcommands** + +.. list-table:: + :widths: 15 85 + :header-rows: 1 + + * - Subcommand + - Description + * - ``create`` + - Create a new index from a YAML schema file + * - ``info`` + - Display detailed information about an index + * - ``listall`` + - List all existing indices in the Redis instance + * - ``delete`` + - Remove an index while preserving the underlying data + * - ``destroy`` + - Remove an index and delete all associated data + +rvl index create +^^^^^^^^^^^^^^^^ + +Create a new vector search index from a YAML schema definition. + +**Syntax** + +.. code-block:: bash + + rvl index create -s [CONNECTION_OPTIONS] + +**Required Options** + +.. list-table:: + :widths: 25 75 + :header-rows: 1 + + * - Option + - Description + * - ``-s``, ``--schema`` + - Path to the YAML schema file defining the index structure + +**Example** + +.. code-block:: bash + + rvl index create -s schema.yaml + +**Schema File Format** + +The schema file must be valid YAML with the following structure: + +.. code-block:: yaml + + version: '0.1.0' + + index: + name: my_index + prefix: doc + storage_type: hash + + fields: + - name: content + type: text + - name: embedding + type: vector + attrs: + dims: 768 + algorithm: hnsw + distance_metric: cosine + +rvl index info +^^^^^^^^^^^^^^ + +Display detailed information about an existing index, including field definitions and index options. + +**Syntax** + +.. code-block:: bash + + rvl index info (-i | -s ) [OPTIONS] + +**Options** + +.. list-table:: + :widths: 25 75 + :header-rows: 1 + + * - Option + - Description + * - ``-i``, ``--index`` + - Name of the index to inspect + * - ``-s``, ``--schema`` + - Path to the schema file (alternative to specifying index name) + +**Example** + +.. code-block:: bash + + rvl index info -i my_index + +**Output** + +The command displays two tables: + +1. **Index Information** containing the index name, storage type, key prefixes, index options, and indexing status +2. **Index Fields** listing each field with its name, attribute, type, and any additional field options + +rvl index listall +^^^^^^^^^^^^^^^^^ + +List all vector search indices in the connected Redis instance. + +**Syntax** + +.. code-block:: bash + + rvl index listall [CONNECTION_OPTIONS] + +**Example** + +.. code-block:: bash + + rvl index listall + +**Output** + +Returns a numbered list of all index names: + +.. code-block:: text + + Indices: + 1. products_index + 2. documents_index + 3. embeddings_index + +rvl index delete +^^^^^^^^^^^^^^^^ + +Remove an index from Redis while preserving the underlying data. Use this when you want to rebuild an index with a different schema without losing your data. + +**Syntax** + +.. code-block:: bash + + rvl index delete (-i | -s ) [CONNECTION_OPTIONS] + +**Options** + +.. list-table:: + :widths: 25 75 + :header-rows: 1 + + * - Option + - Description + * - ``-i``, ``--index`` + - Name of the index to delete + * - ``-s``, ``--schema`` + - Path to the schema file (alternative to specifying index name) + +**Example** + +.. code-block:: bash + + rvl index delete -i my_index + +rvl index destroy +^^^^^^^^^^^^^^^^^ + +Remove an index and permanently delete all associated data from Redis. This operation cannot be undone. + +**Syntax** + +.. code-block:: bash + + rvl index destroy (-i | -s ) [CONNECTION_OPTIONS] + +**Options** + +.. list-table:: + :widths: 25 75 + :header-rows: 1 + + * - Option + - Description + * - ``-i``, ``--index`` + - Name of the index to destroy + * - ``-s``, ``--schema`` + - Path to the schema file (alternative to specifying index name) + +**Example** + +.. code-block:: bash + + rvl index destroy -i my_index + +.. warning:: + + This command permanently deletes both the index and all documents stored with the index prefix. Ensure you have backups before running this command. + +rvl stats +--------- + +Display statistics about an existing index, including document counts, memory usage, and indexing performance metrics. + +**Syntax** + +.. code-block:: bash + + rvl stats (-i | -s ) [OPTIONS] + +**Options** + +.. list-table:: + :widths: 25 75 + :header-rows: 1 + + * - Option + - Description + * - ``-i``, ``--index`` + - Name of the index to query + * - ``-s``, ``--schema`` + - Path to the schema file (alternative to specifying index name) + +**Example** + +.. code-block:: bash + + rvl stats -i my_index + +**Statistics Reference** + +The command returns the following metrics: + +.. list-table:: + :widths: 35 65 + :header-rows: 1 + + * - Metric + - Description + * - ``num_docs`` + - Total number of indexed documents + * - ``num_terms`` + - Number of distinct terms in text fields + * - ``max_doc_id`` + - Highest internal document ID + * - ``num_records`` + - Total number of index records + * - ``percent_indexed`` + - Percentage of documents fully indexed + * - ``hash_indexing_failures`` + - Number of documents that failed to index + * - ``number_of_uses`` + - Number of times the index has been queried + * - ``bytes_per_record_avg`` + - Average bytes per index record + * - ``doc_table_size_mb`` + - Document table size in megabytes + * - ``inverted_sz_mb`` + - Inverted index size in megabytes + * - ``key_table_size_mb`` + - Key table size in megabytes + * - ``offset_bits_per_record_avg`` + - Average offset bits per record + * - ``offset_vectors_sz_mb`` + - Offset vectors size in megabytes + * - ``offsets_per_term_avg`` + - Average offsets per term + * - ``records_per_doc_avg`` + - Average records per document + * - ``sortable_values_size_mb`` + - Sortable values size in megabytes + * - ``total_indexing_time`` + - Total time spent indexing in milliseconds + * - ``total_inverted_index_blocks`` + - Number of inverted index blocks + * - ``vector_index_sz_mb`` + - Vector index size in megabytes + +rvl migrate +----------- + +Manage document-preserving index migrations. This command group provides subcommands for planning, executing, and validating schema migrations that preserve existing data. + +**Syntax** + +.. code-block:: bash + + rvl migrate [OPTIONS] + +**Subcommands** + +.. list-table:: + :widths: 20 80 + :header-rows: 1 + + * - Subcommand + - Description + * - ``helper`` + - Show migration guidance and supported capabilities + * - ``list`` + - List all available indexes + * - ``plan`` + - Generate a migration plan from a schema patch or target schema + * - ``wizard`` + - Interactively build a migration plan and schema patch + * - ``apply`` + - Execute a reviewed drop/recreate migration plan + * - ``estimate`` + - Estimate disk space required for a migration (dry-run) + * - ``validate`` + - Validate a completed migration against the live index + * - ``batch-plan`` + - Generate a batch migration plan for multiple indexes + * - ``batch-apply`` + - Execute a batch migration plan with checkpointing + * - ``batch-resume`` + - Resume an interrupted batch migration + * - ``batch-status`` + - Show status of an in-progress or completed batch migration + +rvl migrate plan +^^^^^^^^^^^^^^^^ + +Generate a migration plan for a document-preserving drop/recreate migration. + +**Syntax** + +.. code-block:: bash + + rvl migrate plan --index (--schema-patch | --target-schema ) [OPTIONS] + +**Required Options** + +.. list-table:: + :widths: 30 70 + :header-rows: 1 + + * - Option + - Description + * - ``--index``, ``-i`` + - Name of the source index to migrate + * - ``--schema-patch`` + - Path to a YAML schema patch file (mutually exclusive with ``--target-schema``) + * - ``--target-schema`` + - Path to a full target schema YAML file (mutually exclusive with ``--schema-patch``) + +**Optional Options** + +.. list-table:: + :widths: 30 70 + :header-rows: 1 + + * - Option + - Description + * - ``--plan-out`` + - Output path for the migration plan YAML (default: ``migration_plan.yaml``) + +**Example** + +.. code-block:: bash + + rvl migrate plan -i my_index --schema-patch changes.yaml --plan-out plan.yaml + +rvl migrate apply +^^^^^^^^^^^^^^^^^ + +Execute a reviewed drop/recreate migration plan. Use ``--async`` for large migrations involving vector quantization. + +**Syntax** + +.. code-block:: bash + + rvl migrate apply --plan [OPTIONS] + +**Required Options** + +.. list-table:: + :widths: 30 70 + :header-rows: 1 + + * - Option + - Description + * - ``--plan`` + - Path to the migration plan YAML file + +**Optional Options** + +.. list-table:: + :widths: 30 70 + :header-rows: 1 + + * - Option + - Description + * - ``--async`` + - Run migration asynchronously (recommended for large quantization jobs) + * - ``--query-check-file`` + - Path to a YAML file with post-migration query checks + * - ``--resume`` + - Path to a checkpoint file for crash-safe recovery + +**Example** + +.. code-block:: bash + + rvl migrate apply --plan plan.yaml + rvl migrate apply --plan plan.yaml --async --resume checkpoint.yaml + +rvl migrate wizard +^^^^^^^^^^^^^^^^^^ + +Interactively build a schema patch and migration plan through a guided wizard. + +**Syntax** + +.. code-block:: bash + + rvl migrate wizard [--index ] [OPTIONS] + +**Example** + +.. code-block:: bash + + rvl migrate wizard -i my_index --plan-out plan.yaml + +Exit Codes +========== + +The CLI returns the following exit codes: + +.. list-table:: + :widths: 15 85 + :header-rows: 1 + + * - Code + - Description + * - ``0`` + - Command completed successfully + * - ``1`` + - Command failed due to missing required arguments or invalid input + +Related Resources +================= + +- :doc:`/user_guide/cli` for a tutorial-style walkthrough +- :doc:`schema` for YAML schema format details +- :doc:`searchindex` for the Python ``SearchIndex`` API + diff --git a/docs/concepts/field-attributes.md b/docs/concepts/field-attributes.md index c7764a4a7..96060d2fb 100644 --- a/docs/concepts/field-attributes.md +++ b/docs/concepts/field-attributes.md @@ -267,7 +267,7 @@ Key vector attributes: - `dims`: Vector dimensionality (required) - `algorithm`: `flat`, `hnsw`, or `svs-vamana` - `distance_metric`: `COSINE`, `L2`, or `IP` -- `datatype`: `float16`, `float32`, `float64`, or `bfloat16` +- `datatype`: Vector precision (see table below) - `index_missing`: Allow searching for documents without vectors ```yaml @@ -281,6 +281,48 @@ Key vector attributes: index_missing: true # Handle documents without embeddings ``` +### Vector Datatypes + +The `datatype` attribute controls how vector components are stored. Smaller datatypes reduce memory usage but may affect precision. + +| Datatype | Bits | Memory (768 dims) | Use Case | +|----------|------|-------------------|----------| +| `float32` | 32 | 3 KB | Default. Best precision for most applications. | +| `float16` | 16 | 1.5 KB | Good balance of memory and precision. Recommended for large-scale deployments. | +| `bfloat16` | 16 | 1.5 KB | Better dynamic range than float16. Useful when embeddings have large value ranges. | +| `float64` | 64 | 6 KB | Maximum precision. Rarely needed. | +| `int8` | 8 | 768 B | Integer quantization. Significant memory savings with some precision loss. | +| `uint8` | 8 | 768 B | Unsigned integer quantization. For embeddings with non-negative values. | + +**Algorithm Compatibility:** + +| Datatype | FLAT | HNSW | SVS-VAMANA | +|----------|------|------|------------| +| `float32` | Yes | Yes | Yes | +| `float16` | Yes | Yes | Yes | +| `bfloat16` | Yes | Yes | No | +| `float64` | Yes | Yes | No | +| `int8` | Yes | Yes | No | +| `uint8` | Yes | Yes | No | + +**Choosing a Datatype:** + +- **Start with `float32`** unless you have memory constraints +- **Use `float16`** for production systems with millions of vectors (50% memory savings, minimal precision loss) +- **Use `int8`/`uint8`** only after benchmarking recall on your specific dataset +- **SVS-VAMANA users**: Must use `float16` or `float32` + +**Quantization with the Migrator:** + +You can change vector datatypes on existing indexes using the migration wizard: + +```bash +rvl migrate wizard --index my_index --url redis://localhost:6379 +# Select "Update field" > choose vector field > change datatype +``` + +The migrator automatically re-encodes stored vectors to the new precision. See {doc}`/user_guide/how_to_guides/migrate-indexes` for details. + ## Redis-Specific Subtleties ### Modifier Ordering @@ -304,6 +346,53 @@ Not all attributes work with all field types: | `unf` | ✓ | ✗ | ✓ | ✗ | ✗ | | `withsuffixtrie` | ✓ | ✓ | ✗ | ✗ | ✗ | +### Migration Support + +The migration wizard (`rvl migrate wizard`) supports updating field attributes on existing indexes. The table below shows which attributes can be updated via the wizard vs requiring manual schema patch editing. + +**Wizard Prompts:** + +| Attribute | Text | Tag | Numeric | Geo | Vector | +|-----------|------|-----|---------|-----|--------| +| `sortable` | Wizard | Wizard | Wizard | Wizard | N/A | +| `index_missing` | Wizard | Wizard | Wizard | Wizard | N/A | +| `index_empty` | Wizard | Wizard | N/A | N/A | N/A | +| `no_index` | Wizard | Wizard | Wizard | Wizard | N/A | +| `unf` | Wizard* | N/A | Wizard* | N/A | N/A | +| `separator` | N/A | Wizard | N/A | N/A | N/A | +| `case_sensitive` | N/A | Wizard | N/A | N/A | N/A | +| `no_stem` | Wizard | N/A | N/A | N/A | N/A | +| `weight` | Wizard | N/A | N/A | N/A | N/A | +| `algorithm` | N/A | N/A | N/A | N/A | Wizard | +| `datatype` | N/A | N/A | N/A | N/A | Wizard | +| `distance_metric` | N/A | N/A | N/A | N/A | Wizard | +| `m`, `ef_construction` | N/A | N/A | N/A | N/A | Wizard | + +*\* `unf` is only prompted when `sortable` is enabled.* + +**Manual Schema Patch Required:** + +| Attribute | Notes | +|-----------|-------| +| `phonetic_matcher` | Enable phonetic search | +| `withsuffixtrie` | Suffix/contains search optimization | + +**Example manual patch** for adding `index_missing` to a field: + +```yaml +# schema_patch.yaml +version: 1 +changes: + update_fields: + - name: category + attrs: + index_missing: true +``` + +```bash +rvl migrate plan --index my_index --schema-patch schema_patch.yaml +``` + ### JSON Path for Nested Fields When using JSON storage, use the `path` attribute to index nested fields: diff --git a/docs/concepts/index-migrations.md b/docs/concepts/index-migrations.md new file mode 100644 index 000000000..fb12d75ca --- /dev/null +++ b/docs/concepts/index-migrations.md @@ -0,0 +1,255 @@ +--- +myst: + html_meta: + "description lang=en": | + Learn how RedisVL index migrations work and which schema changes are supported. +--- + +# Index Migrations + +Redis Search indexes are immutable. To change an index schema, you must drop the existing index and create a new one. RedisVL provides a migration workflow that automates this process while preserving your data. + +This page explains how migrations work and which changes are supported. For step by step instructions, see the [migration guide](../user_guide/how_to_guides/migrate-indexes.md). + +## Supported and blocked changes + +The migrator classifies schema changes into two categories: + +| Change | Status | +|--------|--------| +| Add or remove a field | Supported | +| Rename a field | Supported | +| Change field options (sortable, separator) | Supported | +| Change key prefix | Supported | +| Rename the index | Supported | +| Change vector algorithm (FLAT, HNSW, SVS-VAMANA) | Supported | +| Change distance metric (COSINE, L2, IP) | Supported | +| Tune algorithm parameters (M, EF_CONSTRUCTION) | Supported | +| Quantize vectors (float32 to float16/bfloat16/int8/uint8) | Supported | +| Change vector dimensions | Blocked | +| Change storage type (hash to JSON) | Blocked | +| Add a new vector field | Blocked | + +**Note:** INT8 and UINT8 vector datatypes require Redis 8.0+. SVS-VAMANA algorithm requires Redis 8.2+ and Intel AVX-512 hardware. + +**Supported** changes can be applied automatically using `rvl migrate`. The migrator handles the index rebuild and any necessary data transformations. + +**Blocked** changes require manual intervention because they involve incompatible data formats or missing data. The migrator will reject these changes and explain why. + +## How the migrator works + +The migrator uses a plan first workflow: + +1. **Plan**: Capture the current schema, classify your changes, and generate a migration plan +2. **Review**: Inspect the plan before making any changes +3. **Apply**: Drop the index, transform data if needed, and recreate with the new schema +4. **Validate**: Verify the result matches expectations + +This separation ensures you always know what will happen before any changes are made. + +## Migration mode: drop_recreate + +The `drop_recreate` mode rebuilds the index in place while preserving your documents. + +The process: + +1. Drop only the index structure (documents remain in Redis) +2. For datatype changes, re-encode vectors to the target precision +3. Recreate the index with the new schema +4. Wait for Redis to re-index the existing documents +5. Validate the result + +**Tradeoff**: The index is unavailable during the rebuild. Review the migration plan carefully before applying. + +## Index only vs document dependent changes + +Schema changes fall into two categories based on whether they require modifying stored data. + +**Index only changes** affect how Redis Search indexes data, not the data itself: + +- Algorithm changes: The stored vector bytes are identical. Only the index structure differs. +- Distance metric changes: Same vectors, different similarity calculation. +- Adding or removing fields: The documents already contain the data. The index just starts or stops indexing it. + +These changes complete quickly because they only require rebuilding the index. + +**Document dependent changes** require modifying the stored data: + +- Datatype changes (float32 to float16): Stored vector bytes must be re-encoded. +- Field renames: Stored field names must be updated in every document. +- Dimension changes: Vectors must be re-embedded with a different model. + +The migrator handles datatype changes and field renames automatically. Dimension changes are blocked because they require re-embedding with a different model (application level logic). + +## Vector quantization + +Changing vector precision from float32 to float16 reduces memory usage at the cost of slight precision loss. The migrator handles this automatically by: + +1. Reading all vectors from Redis +2. Converting to the target precision +3. Writing updated vectors back +4. Recreating the index with the new schema + +Typical reductions: + +| Metric | Value | +|--------|-------| +| Index size reduction | ~50% | +| Memory reduction | ~35% | + +Quantization time is proportional to document count. Plan for downtime accordingly. + +## Why some changes are blocked + +### Vector dimension changes + +Vector dimensions are determined by your embedding model. A 384 dimensional vector from one model is mathematically incompatible with a 768 dimensional index expecting vectors from a different model. There is no way to resize an embedding. + +**Resolution**: Re-embed your documents using the new model and load them into a new index. + +### Storage type changes + +Hash and JSON have different data layouts. Hash stores flat key value pairs. JSON stores nested structures. Converting between them requires understanding your schema and restructuring each document. + +**Resolution**: Export your data, transform it to the new format, and reload into a new index. + +### Adding a vector field + +Adding a vector field means all existing documents need vectors for that field. The migrator cannot generate these vectors because it does not know which embedding model to use or what content to embed. + +**Resolution**: Add vectors to your documents using your application, then run the migration. + +## Downtime considerations + +With `drop_recreate`, your index is unavailable between the drop and when re-indexing completes. + +**CRITICAL**: Downtime requires both reads AND writes to be paused: + +| Requirement | Reason | +|-------------|--------| +| **Pause reads** | Index is unavailable during migration | +| **Pause writes** | Redis updates indexes synchronously. Writes during migration may conflict with vector re-encoding or be missed | + +Plan for: + +- Search unavailability during the migration window +- Partial results while indexing is in progress +- Resource usage from the re-indexing process +- Quantization time if changing vector datatypes + +The duration depends on document count, field count, and vector dimensions. For large indexes, consider running migrations during low traffic periods. + +## Sync vs async execution + +The migrator provides both synchronous and asynchronous execution modes. + +### What becomes async and what stays sync + +The migration workflow has distinct phases. Here is what each mode affects: + +| Phase | Sync mode | Async mode | Notes | +|-------|-----------|------------|-------| +| **Plan generation** | `MigrationPlanner.create_plan()` | `AsyncMigrationPlanner.create_plan()` | Reads index metadata from Redis | +| **Schema snapshot** | Sync Redis calls | Async Redis calls | Single `FT.INFO` command | +| **Enumeration** | FT.AGGREGATE (or SCAN fallback) | FT.AGGREGATE (or SCAN fallback) | Before drop, only if quantization needed | +| **Drop index** | `index.delete()` | `await index.delete()` | Single `FT.DROPINDEX` command | +| **Quantization** | Sequential HGET + HSET | Sequential HGET + batched HSET | Uses pre-enumerated keys | +| **Create index** | `index.create()` | `await index.create()` | Single `FT.CREATE` command | +| **Readiness polling** | `time.sleep()` loop | `asyncio.sleep()` loop | Polls `FT.INFO` until indexed | +| **Validation** | Sync Redis calls | Async Redis calls | Schema and doc count checks | +| **CLI interaction** | Always sync | Always sync | User prompts, file I/O | +| **YAML read/write** | Always sync | Always sync | Local filesystem only | + +### When to use sync (default) + +Sync execution is simpler and sufficient for most migrations: + +- Small to medium indexes (under 100K documents) +- Index-only changes (algorithm, distance metric, field options) +- Interactive CLI usage where blocking is acceptable + +For migrations without quantization, the Redis operations are fast single commands. Sync mode adds no meaningful overhead. + +### When to use async + +Async execution (`--async` flag) provides benefits in specific scenarios: + +**Large quantization jobs (1M+ vectors)** + +Converting float32 to float16 requires reading every vector, converting it, and writing it back. The async executor: + +- Enumerates documents using `FT.AGGREGATE WITHCURSOR` for index-specific enumeration (falls back to `SCAN` only if indexing failures exist) +- Pipelines `HSET` operations in batches (100-1000 operations per pipeline is optimal for Redis) +- Yields to the event loop between batches so other tasks can proceed + +**Large keyspaces (40M+ keys)** + +When your Redis instance has many keys and the index has indexing failures (requiring SCAN fallback), async mode yields between batches. + +**Async application integration** + +If your application uses asyncio, you can integrate migration directly: + +```python +import asyncio +from redisvl.migration import AsyncMigrationPlanner, AsyncMigrationExecutor + +async def migrate(): + planner = AsyncMigrationPlanner() + plan = await planner.create_plan("myindex", redis_url="redis://localhost:6379") + + executor = AsyncMigrationExecutor() + report = await executor.apply(plan, redis_url="redis://localhost:6379") + +asyncio.run(migrate()) +``` + +### Why async helps with quantization + +The migrator uses an optimized enumeration strategy: + +1. **Index-based enumeration**: Uses `FT.AGGREGATE WITHCURSOR` to enumerate only indexed documents (not the entire keyspace) +2. **Fallback for safety**: If the index has indexing failures (`hash_indexing_failures > 0`), falls back to `SCAN` to ensure completeness +3. **Enumerate before drop**: Captures the document list while the index still exists, then drops and quantizes + +This optimization provides 10-1000x speedup for sparse indexes (where only a small fraction of prefix-matching keys are indexed). + +**Sync quantization:** +``` +enumerate keys (FT.AGGREGATE or SCAN) -> store list +for each batch of 500 keys: + for each key: + HGET field (blocks) + convert array + pipeline.HSET(field, new_bytes) + pipeline.execute() (blocks) +``` + +**Async quantization:** +``` +enumerate keys (FT.AGGREGATE or SCAN) -> store list +for each batch of 500 keys: + for each key: + await HGET field (yields) + convert array + pipeline.HSET(field, new_bytes) + await pipeline.execute() (yields) +``` + +Each `await` is a yield point where other coroutines can run. For millions of vectors, this prevents your application from freezing. + +### What async does NOT improve + +Async execution does not reduce: + +- **Total migration time**: Same work, different scheduling +- **Redis server load**: Same commands execute on the server +- **Downtime window**: Index remains unavailable during rebuild +- **Network round trips**: Same number of Redis calls + +The benefit is application responsiveness, not faster migration. + +## Learn more + +- [Migration guide](../user_guide/how_to_guides/migrate-indexes.md): Step by step instructions +- [Search and indexing](search-and-indexing.md): How Redis Search indexes work diff --git a/docs/concepts/index.md b/docs/concepts/index.md index 0e522b1a2..02f4d8b01 100644 --- a/docs/concepts/index.md +++ b/docs/concepts/index.md @@ -26,6 +26,13 @@ How RedisVL components connect: schemas, indexes, queries, and extensions. Schemas, fields, documents, storage types, and query patterns. ::: +:::{grid-item-card} 🔄 Index Migrations +:link: index-migrations +:link-type: doc + +How RedisVL handles migration planning, rebuilds, and future shadow migration. +::: + :::{grid-item-card} 🏷️ Field Attributes :link: field-attributes :link-type: doc @@ -62,6 +69,7 @@ Pre-built patterns: caching, message history, and semantic routing. architecture search-and-indexing +index-migrations field-attributes queries utilities diff --git a/docs/concepts/search-and-indexing.md b/docs/concepts/search-and-indexing.md index b4fe69569..5312d7dfb 100644 --- a/docs/concepts/search-and-indexing.md +++ b/docs/concepts/search-and-indexing.md @@ -106,9 +106,14 @@ To change a schema, you create a new index with the updated configuration, reind Planning your schema carefully upfront reduces the need for migrations, but the capability exists when requirements evolve. ---- +RedisVL now includes a dedicated migration workflow for this lifecycle: + +- `drop_recreate` for document-preserving rebuilds, including vector quantization (`float32` → `float16`) -**Related concepts:** {doc}`field-attributes` explains how to configure field options like `sortable` and `index_missing`. {doc}`queries` covers the different query types available. +That means schema evolution is no longer only a manual operational pattern. It is also a product surface in RedisVL with a planner, CLI, and validation artifacts. + +--- -**Learn more:** {doc}`/user_guide/01_getting_started` walks through building your first index. {doc}`/user_guide/05_hash_vs_json` compares storage options in depth. {doc}`/user_guide/02_complex_filtering` covers query composition. +**Related concepts:** {doc}`field-attributes` explains how to configure field options like `sortable` and `index_missing`. {doc}`queries` covers the different query types available. {doc}`index-migrations` explains migration modes, supported changes, and architecture. +**Learn more:** {doc}`/user_guide/01_getting_started` walks through building your first index. {doc}`/user_guide/05_hash_vs_json` compares storage options in depth. {doc}`/user_guide/02_complex_filtering` covers query composition. {doc}`/user_guide/how_to_guides/migrate-indexes` shows how to use the migration CLI in practice. diff --git a/docs/user_guide/cli.ipynb b/docs/user_guide/cli.ipynb index ba9d645a3..02bc68b13 100644 --- a/docs/user_guide/cli.ipynb +++ b/docs/user_guide/cli.ipynb @@ -6,7 +6,7 @@ "source": [ "# The RedisVL CLI\n", "\n", - "RedisVL is a Python library with a dedicated CLI to help load and create vector search indices within Redis.\n", + "RedisVL is a Python library with a dedicated CLI to help load, inspect, migrate, and create vector search indices within Redis.\n", "\n", "This notebook will walk through how to use the Redis Vector Library CLI (``rvl``).\n", "\n", @@ -50,7 +50,16 @@ "| `rvl index` | `delete --index` or `-i ` | remove the specified index, leaving the data still in Redis|\n", "| `rvl index` | `destroy --index` or `-i `| remove the specified index, as well as the associated data|\n", "| `rvl stats` | `--index` or `-i ` | display the index statistics, including number of docs, average bytes per record, indexing time, etc|\n", - "| `rvl stats` | `--schema` or `-s ` | display the index statistics of a schema defined in . The index must have already been created within Redis|" + "| `rvl stats` | `--schema` or `-s ` | display the index statistics of a schema defined in . The index must have already been created within Redis|\n", + "| `rvl migrate` | `helper` or `list` | show migration guidance and list indexes available for migration|\n", + "| `rvl migrate` | `wizard` | interactively build a migration plan and schema patch|\n", + "| `rvl migrate` | `plan` | generate `migration_plan.yaml` from a patch or target schema|\n", + "| `rvl migrate` | `apply` | execute a reviewed `drop_recreate` migration|\n", + "| `rvl migrate` | `validate` | validate a completed migration and emit report artifacts|\n", + "| `rvl migrate` | `batch-plan` | create a batch migration plan for multiple indexes|\n", + "| `rvl migrate` | `batch-apply` | execute a batch migration|\n", + "| `rvl migrate` | `batch-resume` | resume an interrupted batch migration|\n", + "| `rvl migrate` | `batch-status` | check batch migration progress|" ] }, { diff --git a/docs/user_guide/how_to_guides/index.md b/docs/user_guide/how_to_guides/index.md index c03d705da..f6511d54c 100644 --- a/docs/user_guide/how_to_guides/index.md +++ b/docs/user_guide/how_to_guides/index.md @@ -34,6 +34,7 @@ How-to guides are **task-oriented** recipes that help you accomplish specific go :::{grid-item-card} 💾 Storage - [Choose a Storage Type](../05_hash_vs_json.ipynb) -- Hash vs JSON formats and nested data +- [Migrate an Index](migrate-indexes.md) -- use the migrator helper, wizard, plan, apply, and validate workflow ::: :::{grid-item-card} 💻 CLI Operations @@ -59,6 +60,7 @@ How-to guides are **task-oriented** recipes that help you accomplish specific go | Optimize index performance | [Optimize Indexes with SVS-VAMANA](../09_svs_vamana.ipynb) | | Decide on storage format | [Choose a Storage Type](../05_hash_vs_json.ipynb) | | Manage indices from terminal | [Manage Indices with the CLI](../cli.ipynb) | +| Plan and run a supported index migration | [Migrate an Index](migrate-indexes.md) | ```{toctree} :hidden: @@ -74,4 +76,5 @@ Optimize Indexes with SVS-VAMANA <../09_svs_vamana> Cache Embeddings <../10_embeddings_cache> Use Advanced Query Types <../11_advanced_queries> Write SQL Queries for Redis <../12_sql_to_redis_queries> +Migrate an Index ``` diff --git a/docs/user_guide/how_to_guides/migrate-indexes.md b/docs/user_guide/how_to_guides/migrate-indexes.md new file mode 100644 index 000000000..5795f6a0b --- /dev/null +++ b/docs/user_guide/how_to_guides/migrate-indexes.md @@ -0,0 +1,935 @@ +--- +myst: + html_meta: + "description lang=en": | + How to migrate a RedisVL index schema without losing data. +--- + +# Migrate an Index + +This guide shows how to safely change your index schema using the RedisVL migrator. + +## Quick Start + +Add a field to your index in 4 commands: + +```bash +# 1. See what indexes exist +rvl migrate list --url redis://localhost:6379 + +# 2. Use the wizard to build a migration plan +rvl migrate wizard --index myindex --url redis://localhost:6379 + +# 3. Apply the migration +rvl migrate apply --plan migration_plan.yaml --url redis://localhost:6379 + +# 4. Verify the result +rvl migrate validate --plan migration_plan.yaml --url redis://localhost:6379 +``` + +## Prerequisites + +- Redis with the Search module (Redis Stack, Redis Cloud, or Redis Enterprise) +- An existing index to migrate +- `redisvl` installed (`pip install redisvl`) + +```bash +# Local development with Redis 8.0+ (recommended for full feature support) +docker run -d --name redis -p 6379:6379 redis:8.0 +``` + +**Note:** Redis 8.0+ is required for INT8/UINT8 vector datatypes. SVS-VAMANA algorithm requires Redis 8.2+ and Intel AVX-512 hardware. + +## Step 1: Discover Available Indexes + +```bash +rvl migrate list --url redis://localhost:6379 +``` + +**Example output:** +``` +Available indexes: + 1. products_idx + 2. users_idx + 3. orders_idx +``` + +## Step 2: Build Your Schema Change + +Choose one of these approaches: + +### Option A: Use the Wizard (Recommended) + +The wizard guides you through building a migration interactively. Run: + +```bash +rvl migrate wizard --index myindex --url redis://localhost:6379 +``` + +**Example wizard session (adding a field):** + +```text +Building a migration plan for index 'myindex' +Current schema: +- Index name: myindex +- Storage type: hash + - title (text) + - embedding (vector) + +Choose an action: +1. Add field (text, tag, numeric, geo) +2. Update field (sortable, weight, separator) +3. Remove field +4. Preview patch (show pending changes as YAML) +5. Finish +Enter a number: 1 + +Field name: category +Field type options: text, tag, numeric, geo +Field type: tag + Sortable: enables sorting and aggregation on this field +Sortable [y/n]: n + Separator: character that splits multiple values (default: comma) +Separator [leave blank to keep existing/default]: | + +Choose an action: +1. Add field (text, tag, numeric, geo) +2. Update field (sortable, weight, separator) +3. Remove field +4. Preview patch (show pending changes as YAML) +5. Finish +Enter a number: 5 + +Migration plan written to /path/to/migration_plan.yaml +Mode: drop_recreate +Supported: True +Warnings: +- Index downtime is required +``` + +**Example wizard session (quantizing vectors):** + +```text +Choose an action: +1. Add field (text, tag, numeric, geo) +2. Update field (sortable, weight, separator) +3. Remove field +4. Preview patch (show pending changes as YAML) +5. Finish +Enter a number: 2 + +Updatable fields: +1. title (text) +2. embedding (vector) +Select a field to update by number or name: 2 + +Current vector config for 'embedding': + algorithm: HNSW + datatype: float32 + distance_metric: cosine + dims: 384 (cannot be changed) + m: 16 + ef_construction: 200 + +Leave blank to keep current value. + Algorithm: vector search method (FLAT=brute force, HNSW=graph, SVS-VAMANA=compressed graph) +Algorithm [current: HNSW]: + Datatype: float16, float32, bfloat16, float64, int8, uint8 + (float16 reduces memory ~50%, int8/uint8 reduce ~75%) +Datatype [current: float32]: float16 + Distance metric: how similarity is measured (cosine, l2, ip) +Distance metric [current: cosine]: + M: number of connections per node (higher=better recall, more memory) +M [current: 16]: + EF_CONSTRUCTION: build-time search depth (higher=better recall, slower build) +EF_CONSTRUCTION [current: 200]: + +Choose an action: +... +5. Finish +Enter a number: 5 + +Migration plan written to /path/to/migration_plan.yaml +Mode: drop_recreate +Supported: True +``` + +### Option B: Write a Schema Patch (YAML) + +Create `schema_patch.yaml` manually: + +```yaml +version: 1 +changes: + add_fields: + - name: category + type: tag + path: $.category + attrs: + separator: "|" + remove_fields: + - legacy_field + update_fields: + - name: title + attrs: + sortable: true + - name: embedding + attrs: + datatype: float16 # quantize vectors + algorithm: HNSW + distance_metric: cosine +``` + +Then generate the plan: + +```bash +rvl migrate plan \ + --index myindex \ + --schema-patch schema_patch.yaml \ + --url redis://localhost:6379 \ + --plan-out migration_plan.yaml +``` + +### Option C: Provide a Target Schema + +If you have the complete target schema, use it directly: + +```bash +rvl migrate plan \ + --index myindex \ + --target-schema target_schema.yaml \ + --url redis://localhost:6379 \ + --plan-out migration_plan.yaml +``` + +## Step 3: Review the Migration Plan + +Before applying, review `migration_plan.yaml`: + +```yaml +# migration_plan.yaml (example) +version: 1 +mode: drop_recreate + +source: + schema_snapshot: + index: + name: myindex + prefix: "doc:" + storage_type: json + fields: + - name: title + type: text + - name: embedding + type: vector + attrs: + dims: 384 + algorithm: hnsw + datatype: float32 + stats_snapshot: + num_docs: 10000 + keyspace: + prefixes: ["doc:"] + key_sample: ["doc:1", "doc:2", "doc:3"] + +requested_changes: + add_fields: + - name: category + type: tag + +diff_classification: + supported: true + mode: drop_recreate + warnings: + - "Index will be unavailable during migration" + blocked_reasons: [] + +rename_operations: + rename_index: null + change_prefix: null + rename_fields: [] + +merged_target_schema: + index: + name: myindex + prefix: "doc:" + storage_type: json + fields: + - name: title + type: text + - name: category + type: tag + - name: embedding + type: vector + attrs: + dims: 384 + algorithm: hnsw + datatype: float32 + +warnings: [] +``` + +**Key fields to check:** +- `diff_classification.supported` - Must be `true` to proceed +- `diff_classification.blocked_reasons` - Must be empty +- `merged_target_schema` - The final schema after migration + +## Understanding Downtime Requirements + +**CRITICAL**: During a `drop_recreate` migration, your application must: + +| Requirement | Description | +|-------------|-------------| +| **Pause reads** | Index is unavailable during migration | +| **Pause writes** | Writes during migration may be missed or cause conflicts | + +### Why Both Reads AND Writes Must Be Paused + +- **Reads**: The index definition is dropped and recreated. Any queries during this window will fail. +- **Writes**: Redis updates indexes synchronously on every write. If your app writes documents while the index is dropped, those writes are not indexed. Additionally, if you're quantizing vectors (float32 → float16), concurrent writes may conflict with the migration's re-encoding process. + +### What "Downtime" Means + +| Downtime Type | Reads | Writes | Safe? | +|---------------|-------|--------|-------| +| Full quiesce (recommended) | Stopped | Stopped | **YES** | +| Read-only pause | Stopped | Continuing | **NO** | +| Active | Active | Active | **NO** | + +### Recovery from Interrupted Migration + +| Interruption Point | Documents | Index | Recovery | +|--------------------|-----------|-------|----------| +| After drop, before quantize | Unchanged | **None** | Re-run apply (or `--resume` if checkpoint exists) | +| During quantization | Partially quantized | **None** | Re-run with `--resume` to continue from checkpoint | +| After quantization, before create | Quantized | **None** | Re-run apply (will recreate index) | +| After create | Correct | Rebuilding | Wait for index ready | + +The underlying documents are **never deleted** by `drop_recreate` mode. For large quantization jobs, use `--resume` to enable checkpoint-based recovery. See [Crash-safe resume for quantization](#crash-safe-resume-for-quantization) below. + +## Step 4: Apply the Migration + +The `apply` command executes the migration. The index will be temporarily unavailable during the drop-recreate process. + +```bash +rvl migrate apply \ + --plan migration_plan.yaml \ + --url redis://localhost:6379 \ + --report-out migration_report.yaml \ + --benchmark-out benchmark_report.yaml +``` + +### What `apply` does + +The migration executor follows this sequence: + +**STEP 1: Enumerate keys** (before any modifications) +- Discovers all document keys belonging to the source index +- Uses `FT.AGGREGATE WITHCURSOR` for efficient enumeration +- Falls back to `SCAN` if the index has indexing failures +- Keys are stored in memory for quantization or rename operations + +**STEP 2: Drop source index** +- Issues `FT.DROPINDEX` to remove the index structure +- **The underlying documents remain in Redis** - only the index metadata is deleted +- After this point, the index is unavailable until step 6 completes + +**STEP 3: Quantize vectors** (if changing vector datatype) +- For each document in the enumerated key list: + - Reads the document (including the old vector) + - Converts the vector to the new datatype (e.g., float32 → float16) + - Writes back the converted vector to the same document +- Processes documents in batches of 500 using Redis pipelines +- Skipped for JSON storage (vectors are re-indexed automatically on recreate) +- **Checkpoint support**: For large datasets, use `--resume` to enable crash-safe recovery + +**STEP 4: Key renames** (if changing key prefix) +- If the migration changes the key prefix, renames each key from old prefix to new prefix +- Skipped if no prefix change + +**STEP 5: Create target index** +- Issues `FT.CREATE` with the merged target schema +- Redis begins background indexing of existing documents + +**STEP 6: Wait for re-indexing** +- Polls `FT.INFO` until indexing completes +- The index becomes available for queries when this completes + +**Summary**: The migration preserves all documents, drops only the index structure, performs any document-level transformations (quantization, renames), then recreates the index with the new schema. + +### Async execution for large migrations + +For large migrations (especially those involving vector quantization), use the `--async` flag: + +```bash +rvl migrate apply \ + --plan migration_plan.yaml \ + --async \ + --url redis://localhost:6379 +``` + +**What becomes async:** + +- Document enumeration during quantization (uses `FT.AGGREGATE WITHCURSOR` for index-specific enumeration, falling back to SCAN only if indexing failures exist) +- Vector read/write operations (sequential async HGET, batched HSET via pipeline) +- Index readiness polling (uses `asyncio.sleep()` instead of blocking) +- Validation checks + +**What stays sync:** + +- CLI prompts and user interaction +- YAML file reading/writing +- Progress display + +**When to use async:** + +- Quantizing millions of vectors (float32 to float16) +- Integrating into an async application + +For most migrations (index-only changes, small datasets), sync mode is sufficient and simpler. + +See {doc}`/concepts/index-migrations` for detailed async vs sync guidance. + +### Crash-safe resume for quantization + +When migrating large datasets with vector quantization (e.g. float32 to float16), the re-encoding step can take minutes or hours. If the process is interrupted (crash, network drop, OOM kill), you don't want to start over. The `--resume` flag enables checkpoint-based recovery. + +#### How it works + +1. **Pre-flight estimate**: before any mutations, `apply` prints a disk space estimate showing RDB snapshot cost, AOF growth (if enabled), and post-migration memory savings. +2. **BGSAVE safety snapshot**: the migrator triggers a Redis `BGSAVE` and waits for it to complete before modifying any data. This gives you a point-in-time snapshot to fall back on. +3. **Checkpoint file**: when `--resume` is provided, the migrator writes a YAML checkpoint after every batch of 500 documents. The checkpoint records how many keys have been processed and the last batch of keys written. +4. **Batch undo buffer**: if a single batch fails mid-write, original vector values are rolled back via pipeline before the error propagates. Only the current batch is held in memory. +5. **Idempotent skip**: on resume, vectors that were already converted are detected by byte-width inspection and skipped automatically. + +#### Step-by-step: using crash-safe resume + +**1. Estimate disk space (dry-run, no mutations):** + +```bash +rvl migrate estimate --plan migration_plan.yaml +``` + +Example output: + +```text +Pre-migration disk space estimate: + Index: products_idx (1,000,000 documents) + Vector field 'embedding': 768 dims, float32 -> float16 + + RDB snapshot (BGSAVE): ~2.87 GB + AOF growth: not estimated (pass aof_enabled=True if AOF is on) + Total new disk required: ~2.87 GB + + Post-migration memory savings: ~1.43 GB (50% reduction) +``` + +If AOF is enabled: + +```bash +rvl migrate estimate --plan migration_plan.yaml --aof-enabled +``` + +**2. Apply with checkpoint enabled:** + +```bash +rvl migrate apply \ + --plan migration_plan.yaml \ + --resume quantize_checkpoint.yaml \ + --url redis://localhost:6379 \ + --report-out migration_report.yaml +``` + +The `--resume` flag takes a path to a checkpoint file. If the file does not exist, a new checkpoint is created. If it already exists (from a previous interrupted run), the migrator resumes from where it left off. + +**3. If the process crashes or is interrupted:** + +The checkpoint file (`quantize_checkpoint.yaml`) will contain the progress: + +```yaml +index_name: products_idx +total_keys: 1000000 +completed_keys: 450000 +completed_batches: 900 +last_batch_keys: + - 'products:449501' + - 'products:449502' + # ... +status: in_progress +checkpoint_path: quantize_checkpoint.yaml +``` + +**4. Resume the migration:** + +Re-run the exact same command: + +```bash +rvl migrate apply \ + --plan migration_plan.yaml \ + --resume quantize_checkpoint.yaml \ + --url redis://localhost:6379 \ + --report-out migration_report.yaml +``` + +The migrator will: +- Detect the existing checkpoint and skip already-processed keys +- Re-enumerate documents via SCAN (the index was already dropped before the crash) +- Continue quantizing from where it left off +- Print progress like `[4/6] Quantize vectors: 450,000/1,000,000 docs` + +**5. On successful completion:** + +The checkpoint status is set to `completed`. You can safely delete the checkpoint file. + +#### What gets rolled back on batch failure + +If a batch of 500 documents fails mid-write (e.g. Redis returns an error), the migrator: +1. Restores original vector bytes for all documents in that batch using the undo buffer +2. Saves the checkpoint (so progress up to the last successful batch is preserved) +3. Raises the error + +This means you never end up with partially-written vectors in a single batch. + +#### Limitations + +- **Same-width conversions** (float16 to bfloat16, or int8 to uint8) are **not supported** with `--resume`. These conversions cannot be detected by byte-width inspection, so idempotent skip is impossible. The migrator will refuse to proceed and suggest running without `--resume`. +- **JSON storage** does not need vector re-encoding (Redis re-indexes JSON vectors on `FT.CREATE`). The checkpoint is still created for consistency but no batched writes occur. +- The checkpoint file must match the migration plan. If you change the plan, delete the old checkpoint and start fresh. + +#### Python API with checkpoints + +```python +from redisvl.migration import MigrationExecutor + +executor = MigrationExecutor() +report = executor.apply( + plan, + redis_url="redis://localhost:6379", + checkpoint_path="quantize_checkpoint.yaml", +) +``` + +For async: + +```python +from redisvl.migration import AsyncMigrationExecutor + +executor = AsyncMigrationExecutor() +report = await executor.apply( + plan, + redis_url="redis://localhost:6379", + checkpoint_path="quantize_checkpoint.yaml", +) +``` + +## Step 5: Validate the Result + +Validation happens automatically during `apply`, but you can run it separately: + +```bash +rvl migrate validate \ + --plan migration_plan.yaml \ + --url redis://localhost:6379 \ + --report-out migration_report.yaml +``` + +**Validation checks:** +- Live schema matches `merged_target_schema` +- Document count matches the source snapshot +- Sampled keys still exist +- No increase in indexing failures + +## What's Supported + +| Change | Supported | Notes | +|--------|-----------|-------| +| Add text/tag/numeric/geo field | ✅ | | +| Remove a field | ✅ | | +| Rename a field | ✅ | Renames field in all documents | +| Change key prefix | ✅ | Renames keys via RENAME command | +| Rename the index | ✅ | Index-only | +| Make a field sortable | ✅ | | +| Change field options (separator, stemming) | ✅ | | +| Change vector algorithm (FLAT ↔ HNSW ↔ SVS-VAMANA) | ✅ | Index-only | +| Change distance metric (COSINE ↔ L2 ↔ IP) | ✅ | Index-only | +| Tune HNSW parameters (M, EF_CONSTRUCTION) | ✅ | Index-only | +| Quantize vectors (float32 → float16/bfloat16/int8/uint8) | ✅ | Auto re-encode | + +## What's Blocked + +| Change | Why | Workaround | +|--------|-----|------------| +| Change vector dimensions | Requires re-embedding | Re-embed with new model, reload data | +| Change storage type (hash ↔ JSON) | Different data format | Export, transform, reload | +| Add a new vector field | Requires vectors for all docs | Add vectors first, then migrate | + +## CLI Reference + +### Single-Index Commands + +| Command | Description | +|---------|-------------| +| `rvl migrate list` | List all indexes | +| `rvl migrate wizard` | Build a migration interactively | +| `rvl migrate plan` | Generate a migration plan | +| `rvl migrate apply` | Execute a migration | +| `rvl migrate estimate` | Estimate disk space for a migration (dry-run) | +| `rvl migrate validate` | Verify a migration result | + +### Batch Commands + +| Command | Description | +|---------|-------------| +| `rvl migrate batch-plan` | Create a batch migration plan | +| `rvl migrate batch-apply` | Execute a batch migration | +| `rvl migrate batch-resume` | Resume an interrupted batch | +| `rvl migrate batch-status` | Check batch progress | + +**Common flags:** +- `--url` : Redis connection URL +- `--index` : Index name to migrate +- `--plan` / `--plan-out` : Path to migration plan +- `--async` : Use async executor for large migrations (apply only) +- `--resume` : Path to checkpoint file for crash-safe quantization resume (apply only) +- `--report-out` : Path for validation report +- `--benchmark-out` : Path for performance metrics + +**Batch-specific flags:** +- `--pattern` : Glob pattern to match index names (e.g., `*_idx`) +- `--indexes` : Explicit list of index names +- `--indexes-file` : File containing index names (one per line) +- `--schema-patch` : Path to shared schema patch YAML +- `--state` : Path to checkpoint state file +- `--failure-policy` : `fail_fast` or `continue_on_error` +- `--accept-data-loss` : Required for quantization (lossy changes) +- `--retry-failed` : Retry previously failed indexes on resume + +## Troubleshooting + +### Migration blocked: "unsupported change" + +The planner detected a change that requires data transformation. Check `diff_classification.blocked_reasons` in the plan for details. + +### Apply failed: "source schema mismatch" + +The live index schema changed since the plan was generated. Re-run `rvl migrate plan` to create a fresh plan. + +### Apply failed: "timeout waiting for index ready" + +The index is taking longer to rebuild than expected. This can happen with large datasets. Check Redis logs and consider increasing the timeout or running during lower traffic periods. + +### Validation failed: "document count mismatch" + +Documents were added or removed between plan and apply. This is expected if your application is actively writing. Re-run `plan` and `apply` during a quieter period when the document count is stable, or verify the mismatch is due only to normal application traffic. + +### How to recover from a failed migration + +If `apply` fails mid-migration: + +1. **Check if the index exists:** `rvl index info --index myindex` +2. **If the index exists but is wrong:** Re-run `apply` with the same plan +3. **If the index was dropped:** Recreate it from the plan's `merged_target_schema` + +The underlying documents are never deleted by `drop_recreate`. + +## Python API + +For programmatic migrations, use the migration classes directly: + +### Sync API + +```python +from redisvl.migration import MigrationPlanner, MigrationExecutor + +planner = MigrationPlanner() +plan = planner.create_plan( + "myindex", + redis_url="redis://localhost:6379", + schema_patch_path="schema_patch.yaml", +) + +executor = MigrationExecutor() +report = executor.apply(plan, redis_url="redis://localhost:6379") +print(f"Migration result: {report.result}") +``` + +### Async API + +```python +import asyncio +from redisvl.migration import AsyncMigrationPlanner, AsyncMigrationExecutor + +async def migrate(): + planner = AsyncMigrationPlanner() + plan = await planner.create_plan( + "myindex", + redis_url="redis://localhost:6379", + schema_patch_path="schema_patch.yaml", + ) + + executor = AsyncMigrationExecutor() + report = await executor.apply(plan, redis_url="redis://localhost:6379") + print(f"Migration result: {report.result}") + +asyncio.run(migrate()) +``` + +## Batch Migration + +When you need to apply the same schema change to multiple indexes, use batch migration. This is common for: + +- Quantizing all indexes from float32 → float16 +- Standardizing vector algorithms across indexes +- Coordinated migrations during maintenance windows + +### Quick Start: Batch Migration + +```bash +# 1. Create a shared patch (applies to any index with an 'embedding' field) +cat > quantize_patch.yaml << 'EOF' +version: 1 +changes: + update_fields: + - name: embedding + attrs: + datatype: float16 +EOF + +# 2. Create a batch plan for all indexes matching a pattern +rvl migrate batch-plan \ + --pattern "*_idx" \ + --schema-patch quantize_patch.yaml \ + --plan-out batch_plan.yaml \ + --url redis://localhost:6379 + +# 3. Apply the batch plan +rvl migrate batch-apply \ + --plan batch_plan.yaml \ + --accept-data-loss \ + --url redis://localhost:6379 + +# 4. Check status +rvl migrate batch-status --state batch_state.yaml +``` + +### Batch Plan Options + +**Select indexes by pattern:** +```bash +rvl migrate batch-plan \ + --pattern "*_idx" \ + --schema-patch quantize_patch.yaml \ + --plan-out batch_plan.yaml \ + --url redis://localhost:6379 +``` + +**Select indexes by explicit list:** +```bash +rvl migrate batch-plan \ + --indexes "products_idx,users_idx,orders_idx" \ + --schema-patch quantize_patch.yaml \ + --plan-out batch_plan.yaml \ + --url redis://localhost:6379 +``` + +**Select indexes from a file (for 100+ indexes):** +```bash +# Create index list file +echo -e "products_idx\nusers_idx\norders_idx" > indexes.txt + +rvl migrate batch-plan \ + --indexes-file indexes.txt \ + --schema-patch quantize_patch.yaml \ + --plan-out batch_plan.yaml \ + --url redis://localhost:6379 +``` + +### Batch Plan Review + +The generated `batch_plan.yaml` shows which indexes will be migrated: + +```yaml +version: 1 +batch_id: "batch_20260320_100000" +mode: drop_recreate +failure_policy: fail_fast +requires_quantization: true + +shared_patch: + version: 1 + changes: + update_fields: + - name: embedding + attrs: + datatype: float16 + +indexes: + - name: products_idx + applicable: true + skip_reason: null + - name: users_idx + applicable: true + skip_reason: null + - name: legacy_idx + applicable: false + skip_reason: "Field 'embedding' not found" + +created_at: "2026-03-20T10:00:00Z" +``` + +**Key fields:** +- `applicable: true` means the patch applies to this index +- `skip_reason` explains why an index will be skipped + +### Applying a Batch Plan + +```bash +# Apply with fail-fast (default: stop on first error) +rvl migrate batch-apply \ + --plan batch_plan.yaml \ + --accept-data-loss \ + --url redis://localhost:6379 + +# Apply with continue-on-error (set at batch-plan time) +# Note: failure_policy is set during batch-plan, not batch-apply +rvl migrate batch-plan \ + --pattern "*_idx" \ + --schema-patch quantize_patch.yaml \ + --failure-policy continue_on_error \ + --plan-out batch_plan.yaml \ + --url redis://localhost:6379 + +rvl migrate batch-apply \ + --plan batch_plan.yaml \ + --accept-data-loss \ + --url redis://localhost:6379 +``` + +**Flags for batch-apply:** +- `--accept-data-loss` : Required when quantizing vectors (float32 → float16 is lossy) +- `--state` : Path to checkpoint file (default: `batch_state.yaml`) +- `--report-dir` : Directory for per-index reports (default: `./reports/`) + +**Note:** `--failure-policy` is set during `batch-plan`, not `batch-apply`. The policy is stored in the batch plan file. + +### Resume After Failure + +Batch migration automatically checkpoints progress. If interrupted: + +```bash +# Resume from where it left off +rvl migrate batch-resume \ + --state batch_state.yaml \ + --url redis://localhost:6379 + +# Retry previously failed indexes +rvl migrate batch-resume \ + --state batch_state.yaml \ + --retry-failed \ + --url redis://localhost:6379 +``` + +**Note:** If the batch plan involves quantization (e.g., `float32` → `float16`), you must pass `--accept-data-loss` to `batch-resume`, just as with `batch-apply`. + +### Checking Batch Status + +```bash +rvl migrate batch-status --state batch_state.yaml +``` + +**Example output:** +``` +Batch Migration Status +====================== +Batch ID: batch_20260320_100000 +Started: 2026-03-20T10:00:00Z +Updated: 2026-03-20T10:25:00Z + +Completed: 2 + - products_idx: success (10:02:30) + - users_idx: failed - Redis connection timeout (10:05:45) + +In Progress: inventory_idx +Remaining: 1 (analytics_idx) +``` + +### Batch Report + +After completion, a `batch_report.yaml` is generated: + +```yaml +version: 1 +batch_id: "batch_20260320_100000" +status: completed # or partial_failure, failed +summary: + total_indexes: 3 + successful: 3 + failed: 0 + skipped: 0 + total_duration_seconds: 127.5 +indexes: + - name: products_idx + status: success + duration_seconds: 45.2 + docs_migrated: 15000 + report_path: ./reports/products_idx_report.yaml + - name: users_idx + status: success + duration_seconds: 38.1 + docs_migrated: 8500 + - name: orders_idx + status: success + duration_seconds: 44.2 + docs_migrated: 22000 +completed_at: "2026-03-20T10:02:07Z" +``` + +### Python API for Batch Migration + +```python +from redisvl.migration import BatchMigrationPlanner, BatchMigrationExecutor + +# Create batch plan +planner = BatchMigrationPlanner() +batch_plan = planner.create_batch_plan( + redis_url="redis://localhost:6379", + pattern="*_idx", + schema_patch_path="quantize_patch.yaml", +) + +# Review applicability +for idx in batch_plan.indexes: + if idx.applicable: + print(f"Will migrate: {idx.name}") + else: + print(f"Skipping {idx.name}: {idx.skip_reason}") + +# Execute batch +executor = BatchMigrationExecutor() +report = executor.apply( + batch_plan, + redis_url="redis://localhost:6379", + state_path="batch_state.yaml", + report_dir="./reports/", + progress_callback=lambda name, pos, total, status: print(f"[{pos}/{total}] {name}: {status}"), +) + +print(f"Batch status: {report.status}") +print(f"Successful: {report.summary.successful}/{report.summary.total_indexes}") +``` + +### Batch Migration Tips + +1. **Test on a single index first**: Run a single-index migration to verify the patch works before applying to a batch. + +2. **Use `continue_on_error` for large batches**: This ensures one failure doesn't block all remaining indexes. + +3. **Schedule during low-traffic periods**: Each index has downtime during migration. + +4. **Review skipped indexes**: The `skip_reason` often indicates schema differences that need attention. + +5. **Keep checkpoint files**: The `batch_state.yaml` is essential for resume. Don't delete it until the batch completes successfully. + +## Learn more + +- {doc}`/concepts/index-migrations`: How migrations work and which changes are supported diff --git a/docs/user_guide/index.md b/docs/user_guide/index.md index 5d2cf6dfd..d85177e73 100644 --- a/docs/user_guide/index.md +++ b/docs/user_guide/index.md @@ -39,7 +39,7 @@ Schema → Index → Load → Query **Solve specific problems.** Task-oriented recipes for LLM extensions, querying, embeddings, optimization, and storage. +++ -LLM Caching • Filtering • Vectorizers • Reranking +LLM Caching • Filtering • Vectorizers • Reranking • Migrations ::: :::{grid-item-card} 💻 CLI Reference @@ -49,7 +49,7 @@ LLM Caching • Filtering • Vectorizers • Reranking **Command-line tools.** Manage indices, inspect stats, and work with schemas using the `rvl` CLI. +++ -rvl index • rvl stats • Schema YAML +rvl index • rvl stats • rvl migrate • Schema YAML ::: :::{grid-item-card} 💡 Use Cases diff --git a/redisvl/cli/migrate.py b/redisvl/cli/migrate.py index 9b91f549c..95598eb04 100644 --- a/redisvl/cli/migrate.py +++ b/redisvl/cli/migrate.py @@ -637,6 +637,11 @@ def batch_resume(self): help="Retry previously failed indexes", action="store_true", ) + parser.add_argument( + "--accept-data-loss", + help="Acknowledge vector quantization data loss", + action="store_true", + ) parser.add_argument( "--report-dir", help="Directory for per-index migration reports", @@ -645,8 +650,24 @@ def batch_resume(self): parser = add_redis_connection_options(parser) args = parser.parse_args(sys.argv[3:]) - redis_url = create_redis_url(args) + # Load the batch plan to check for quantization safety gate executor = BatchMigrationExecutor() + state = executor._load_state(args.state) + plan_path = args.plan or state.plan_path or None + if plan_path: + batch_plan = executor._load_batch_plan(plan_path) + if batch_plan.requires_quantization and not args.accept_data_loss: + print( + """WARNING: This batch migration includes quantization (e.g., float32 -> float16). + Vector data will be modified. Original precision cannot be recovered. + To proceed, add --accept-data-loss flag. + + If you need to preserve original vectors, backup your data first: + redis-cli BGSAVE""" + ) + sys.exit(1) + + redis_url = create_redis_url(args) def progress_callback( index_name: str, position: int, total: int, status: str diff --git a/redisvl/migration/async_executor.py b/redisvl/migration/async_executor.py index 22fd3a832..8de6b9401 100644 --- a/redisvl/migration/async_executor.py +++ b/redisvl/migration/async_executor.py @@ -986,7 +986,7 @@ async def _async_quantize_vectors( for i in range(0, remaining_keys, batch_size): batch = keys[i : i + batch_size] - pipe = client.pipeline() + pipe = client.pipeline(transaction=False) undo = BatchUndoBuffer() keys_updated_in_batch: set[str] = set() @@ -1073,12 +1073,19 @@ async def _async_wait_for_index_ready( percent_indexed = latest_info.get("percent_indexed") if percent_indexed is not None or indexing is not None: - ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) + pct = float(percent_indexed) if percent_indexed is not None else None + is_indexing = bool(indexing) + if pct is not None: + ready = pct >= 1.0 and not is_indexing + else: + # percent_indexed missing but indexing flag present: + # treat as ready when indexing flag is falsy (0 / False). + ready = not is_indexing if progress_callback: total_docs = int(latest_info.get("num_docs", 0)) - pct = float(percent_indexed or 0) - indexed_docs = int(total_docs * pct) - progress_callback(indexed_docs, total_docs, pct * 100) + display_pct = pct if pct is not None else (1.0 if ready else 0.0) + indexed_docs = int(total_docs * display_pct) + progress_callback(indexed_docs, total_docs, display_pct * 100) else: current_docs = latest_info.get("num_docs") if current_docs is None: diff --git a/redisvl/migration/async_validation.py b/redisvl/migration/async_validation.py index bd0dcf876..a76d9e0ea 100644 --- a/redisvl/migration/async_validation.py +++ b/redisvl/migration/async_validation.py @@ -167,14 +167,23 @@ async def _run_functional_checks( try: search_result = await target_index.search(Query("*").paging(0, 1)) total_found = search_result.total - passed = total_found == expected_doc_count + # When expected_doc_count is 0 (empty index), a successful + # search returning 0 docs is correct behaviour, not a failure. + if expected_doc_count == 0: + passed = total_found == 0 + else: + passed = total_found > 0 + if expected_doc_count == 0: + detail_expectation = "expected 0" + else: + detail_expectation = f"expected >0, source had {expected_doc_count}" results.append( QueryCheckResult( name="functional:wildcard_search", passed=passed, details=( f"Wildcard search returned {total_found} docs " - f"(expected {expected_doc_count})" + f"({detail_expectation})" ), ) ) diff --git a/redisvl/migration/batch_executor.py b/redisvl/migration/batch_executor.py index 62024d38e..038e0a2a3 100644 --- a/redisvl/migration/batch_executor.py +++ b/redisvl/migration/batch_executor.py @@ -190,6 +190,7 @@ def resume( # Re-run apply with the updated state return self.apply( batch_plan, + batch_plan_path=batch_plan_path, state_path=state_path, report_dir=report_dir, redis_url=redis_url, diff --git a/redisvl/migration/validation.py b/redisvl/migration/validation.py index 192216434..fcea94aac 100644 --- a/redisvl/migration/validation.py +++ b/redisvl/migration/validation.py @@ -155,13 +155,17 @@ def _run_functional_checks( passed = total_found == 0 else: passed = total_found > 0 + if expected_doc_count == 0: + detail_expectation = "expected 0" + else: + detail_expectation = f"expected >0, source had {expected_doc_count}" results.append( QueryCheckResult( name="functional:wildcard_search", passed=passed, details=( f"Wildcard search returned {total_found} docs " - f"(expected {expected_doc_count})" + f"({detail_expectation})" ), ) ) diff --git a/redisvl/migration/wizard.py b/redisvl/migration/wizard.py index 24ec15cde..ae646027f 100644 --- a/redisvl/migration/wizard.py +++ b/redisvl/migration/wizard.py @@ -564,8 +564,13 @@ def _prompt_common_attrs( # No index - only meaningful with sortable. # When updating (allow_blank), also check the existing field's sortable # state so we offer dependent prompts even if the user left sortable blank. + # But if sortable was explicitly set to False, skip dependent prompts. _existing_sortable = self._existing_sortable - if sortable or (allow_blank and (_existing_sortable or attrs.get("sortable"))): + if sortable or ( + sortable is None + and allow_blank + and (_existing_sortable or attrs.get("sortable")) + ): print(" No index: store field for sorting only, not searchable") no_index = self._prompt_bool("No index", allow_blank=allow_blank) if no_index is not None: @@ -604,8 +609,10 @@ def _prompt_text_attrs(self, attrs: Dict[str, Any], allow_blank: bool) -> None: if phonetic: attrs["phonetic_matcher"] = phonetic - # UNF (only if sortable) - if attrs.get("sortable") or self._existing_sortable: + # UNF (only if sortable – skip if sortable was explicitly set to False) + if attrs.get("sortable") or ( + attrs.get("sortable") is not False and self._existing_sortable + ): print(" UNF: preserve original form (no lowercasing) for sorting") unf = self._prompt_bool("UNF (un-normalized form)", allow_blank=allow_blank) if unf is not None: @@ -629,8 +636,10 @@ def _prompt_numeric_attrs( self, attrs: Dict[str, Any], allow_blank: bool, sortable: Optional[bool] ) -> None: """Prompt for numeric field specific attributes.""" - # UNF (only if sortable) - if sortable or attrs.get("sortable") or self._existing_sortable: + # UNF (only if sortable – skip if sortable was explicitly set to False) + if sortable or ( + sortable is not False and (attrs.get("sortable") or self._existing_sortable) + ): print(" UNF: preserve exact numeric representation for sorting") unf = self._prompt_bool("UNF (un-normalized form)", allow_blank=allow_blank) if unf is not None: diff --git a/tests/benchmarks/index_migrator_real_benchmark.py b/tests/benchmarks/index_migrator_real_benchmark.py new file mode 100644 index 000000000..c2a28bd1a --- /dev/null +++ b/tests/benchmarks/index_migrator_real_benchmark.py @@ -0,0 +1,647 @@ +from __future__ import annotations + +import argparse +import csv +import json +import statistics +import tempfile +import time +from pathlib import Path +from typing import Any, Dict, Iterable, List, Sequence + +import numpy as np +import yaml +from datasets import load_dataset +from redis import Redis +from sentence_transformers import SentenceTransformer + +from redisvl.index import SearchIndex +from redisvl.migration import MigrationPlanner +from redisvl.query import VectorQuery +from redisvl.redis.utils import array_to_buffer + +AG_NEWS_LABELS = { + 0: "world", + 1: "sports", + 2: "business", + 3: "sci_tech", +} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=( + "Run a real local benchmark for migrating from HNSW/FP32 to FLAT/FP16 " + "with a real internet dataset and sentence-transformers embeddings." + ) + ) + parser.add_argument( + "--redis-url", + default="redis://localhost:6379", + help="Redis URL for the local benchmark target.", + ) + parser.add_argument( + "--sizes", + nargs="+", + type=int, + default=[1000, 10000, 100000], + help="Dataset sizes to benchmark.", + ) + parser.add_argument( + "--query-count", + type=int, + default=25, + help="Number of held-out query documents to benchmark search latency.", + ) + parser.add_argument( + "--top-k", + type=int, + default=10, + help="Number of nearest neighbors to fetch for overlap checks.", + ) + parser.add_argument( + "--embedding-batch-size", + type=int, + default=256, + help="Batch size for sentence-transformers encoding.", + ) + parser.add_argument( + "--load-batch-size", + type=int, + default=500, + help="Batch size for SearchIndex.load calls.", + ) + parser.add_argument( + "--model", + default="sentence-transformers/all-MiniLM-L6-v2", + help="Sentence-transformers model name.", + ) + parser.add_argument( + "--dataset-csv", + default="", + help=( + "Optional path to a local AG News CSV file with label,title,description columns. " + "If provided, the benchmark skips Hugging Face dataset downloads." + ), + ) + parser.add_argument( + "--output", + default="index_migrator_benchmark_results.json", + help="Where to write the benchmark report.", + ) + return parser.parse_args() + + +def build_schema( + *, + index_name: str, + prefix: str, + dims: int, + algorithm: str, + datatype: str, +) -> Dict[str, Any]: + return { + "index": { + "name": index_name, + "prefix": prefix, + "storage_type": "hash", + }, + "fields": [ + {"name": "doc_id", "type": "tag"}, + {"name": "label", "type": "tag"}, + {"name": "text", "type": "text"}, + { + "name": "embedding", + "type": "vector", + "attrs": { + "dims": dims, + "distance_metric": "cosine", + "algorithm": algorithm, + "datatype": datatype, + }, + }, + ], + } + + +def load_ag_news_records(num_docs: int, query_count: int) -> List[Dict[str, Any]]: + dataset = load_dataset("ag_news", split=f"train[:{num_docs + query_count}]") + records: List[Dict[str, Any]] = [] + for idx, row in enumerate(dataset): + records.append( + { + "doc_id": f"ag-news-{idx}", + "text": row["text"], + "label": AG_NEWS_LABELS[int(row["label"])], + } + ) + return records + + +def load_ag_news_records_from_csv( + csv_path: str, + *, + required_docs: int, +) -> List[Dict[str, Any]]: + records: List[Dict[str, Any]] = [] + with open(csv_path, "r", newline="", encoding="utf-8") as f: + reader = csv.reader(f) + for idx, row in enumerate(reader): + if len(row) < 3: + continue + # Skip header row if present (label column should be a digit) + if idx == 0 and not row[0].strip().isdigit(): + continue + if len(records) >= required_docs: + break + label, title, description = row + text = f"{title}. {description}".strip() + records.append( + { + "doc_id": f"ag-news-{len(records)}", + "text": text, + "label": AG_NEWS_LABELS[int(label) - 1], + } + ) + + if len(records) < required_docs: + raise ValueError( + f"Expected at least {required_docs} records in {csv_path}, found {len(records)}" + ) + return records + + +def encode_texts( + model_name: str, + texts: Sequence[str], + batch_size: int, +) -> tuple[np.ndarray, float]: + try: + encoder = SentenceTransformer(model_name, local_files_only=True) + except OSError: + # Model not cached locally yet; download it + print(f"Model '{model_name}' not found locally, downloading...") + encoder = SentenceTransformer(model_name) + start = time.perf_counter() + embeddings = encoder.encode( + list(texts), + batch_size=batch_size, + show_progress_bar=True, + convert_to_numpy=True, + normalize_embeddings=True, + ) + duration = time.perf_counter() - start + return np.asarray(embeddings, dtype=np.float32), duration + + +def iter_documents( + records: Sequence[Dict[str, Any]], + embeddings: np.ndarray, + *, + dtype: str, +) -> Iterable[Dict[str, Any]]: + for record, embedding in zip(records, embeddings): + yield { + "doc_id": record["doc_id"], + "label": record["label"], + "text": record["text"], + "embedding": array_to_buffer(embedding, dtype), + } + + +def wait_for_index_ready( + index: SearchIndex, + *, + timeout_seconds: int = 1800, + poll_interval_seconds: float = 0.5, +) -> Dict[str, Any]: + deadline = time.perf_counter() + timeout_seconds + latest_info = index.info() + while time.perf_counter() < deadline: + latest_info = index.info() + percent_indexed = float(latest_info.get("percent_indexed", 1)) + indexing = latest_info.get("indexing", 0) + if percent_indexed >= 1.0 and not indexing: + return latest_info + time.sleep(poll_interval_seconds) + raise TimeoutError( + f"Index {index.schema.index.name} did not finish indexing within {timeout_seconds} seconds" + ) + + +def get_memory_snapshot(client: Redis) -> Dict[str, Any]: + info = client.info("memory") + used_memory_bytes = int(info.get("used_memory", 0)) + return { + "used_memory_bytes": used_memory_bytes, + "used_memory_mb": round(used_memory_bytes / (1024 * 1024), 3), + "used_memory_human": info.get("used_memory_human"), + } + + +def summarize_index_info(index_info: Dict[str, Any]) -> Dict[str, Any]: + return { + "num_docs": int(index_info.get("num_docs", 0) or 0), + "percent_indexed": float(index_info.get("percent_indexed", 0) or 0), + "hash_indexing_failures": int(index_info.get("hash_indexing_failures", 0) or 0), + "vector_index_sz_mb": float(index_info.get("vector_index_sz_mb", 0) or 0), + "total_indexing_time": float(index_info.get("total_indexing_time", 0) or 0), + } + + +def percentile(values: Sequence[float], pct: float) -> float: + if not values: + return 0.0 + return round(float(np.percentile(np.asarray(values), pct)), 3) + + +def run_query_benchmark( + index: SearchIndex, + query_embeddings: np.ndarray, + *, + dtype: str, + top_k: int, +) -> Dict[str, Any]: + latencies_ms: List[float] = [] + result_sets: List[List[str]] = [] + + for query_embedding in query_embeddings: + query = VectorQuery( + vector=query_embedding.tolist(), + vector_field_name="embedding", + return_fields=["doc_id", "label"], + num_results=top_k, + dtype=dtype, + ) + start = time.perf_counter() + results = index.query(query) + latencies_ms.append((time.perf_counter() - start) * 1000) + result_sets.append( + [result.get("doc_id") or result.get("id") for result in results if result] + ) + + return { + "count": len(latencies_ms), + "p50_ms": percentile(latencies_ms, 50), + "p95_ms": percentile(latencies_ms, 95), + "p99_ms": percentile(latencies_ms, 99), + "mean_ms": round(statistics.mean(latencies_ms), 3), + "result_sets": result_sets, + } + + +def compute_overlap( + source_result_sets: Sequence[Sequence[str]], + target_result_sets: Sequence[Sequence[str]], + *, + top_k: int, +) -> Dict[str, Any]: + overlap_ratios: List[float] = [] + for source_results, target_results in zip(source_result_sets, target_result_sets): + source_set = set(source_results[:top_k]) + target_set = set(target_results[:top_k]) + overlap_ratios.append(len(source_set.intersection(target_set)) / max(top_k, 1)) + return { + "mean_overlap_at_k": round(statistics.mean(overlap_ratios), 4), + "min_overlap_at_k": round(min(overlap_ratios), 4), + "max_overlap_at_k": round(max(overlap_ratios), 4), + } + + +def run_quantization_migration( + planner: MigrationPlanner, + client: Redis, + source_index_name: str, + source_schema: Dict[str, Any], + dims: int, +) -> Dict[str, Any]: + """Run full HNSW/FP32 -> FLAT/FP16 migration with quantization.""" + from redisvl.migration import MigrationExecutor + + target_schema = build_schema( + index_name=source_schema["index"]["name"], + prefix=source_schema["index"]["prefix"], + dims=dims, + algorithm="flat", # Change algorithm + datatype="float16", # Change datatype (quantization) + ) + + with tempfile.TemporaryDirectory() as tmpdir: + target_schema_path = Path(tmpdir) / "target_schema.yaml" + plan_path = Path(tmpdir) / "migration_plan.yaml" + with open(target_schema_path, "w") as f: + yaml.safe_dump(target_schema, f, sort_keys=False) + + plan_start = time.perf_counter() + plan = planner.create_plan( + source_index_name, + redis_client=client, + target_schema_path=str(target_schema_path), + ) + planner.write_plan(plan, str(plan_path)) + plan_duration = time.perf_counter() - plan_start + + if not plan.diff_classification.supported: + raise AssertionError( + f"Expected planner to ALLOW quantization migration, " + f"but it blocked with: {plan.diff_classification.blocked_reasons}" + ) + + # Check datatype changes detected + datatype_changes = MigrationPlanner.get_vector_datatype_changes( + plan.source.schema_snapshot, plan.merged_target_schema + ) + + # Execute migration + executor = MigrationExecutor() + migrate_start = time.perf_counter() + report = executor.apply(plan, redis_client=client) + migrate_duration = time.perf_counter() - migrate_start + + if report.result != "succeeded": + raise AssertionError(f"Migration failed: {report.validation.errors}") + + return { + "test": "quantization_migration", + "plan_duration_seconds": round(plan_duration, 3), + "migration_duration_seconds": round(migrate_duration, 3), + "quantize_duration_seconds": report.timings.quantize_duration_seconds, + "supported": plan.diff_classification.supported, + "datatype_changes": datatype_changes, + "result": report.result, + } + + +def assert_planner_allows_algorithm_change( + planner: MigrationPlanner, + client: Redis, + source_index_name: str, + source_schema: Dict[str, Any], + dims: int, +) -> Dict[str, Any]: + """Test that algorithm-only changes (HNSW -> FLAT) are allowed.""" + target_schema = build_schema( + index_name=source_schema["index"]["name"], + prefix=source_schema["index"]["prefix"], + dims=dims, + algorithm="flat", # Different algorithm - should be allowed + datatype="float32", # Same datatype + ) + + with tempfile.TemporaryDirectory() as tmpdir: + target_schema_path = Path(tmpdir) / "target_schema.yaml" + plan_path = Path(tmpdir) / "migration_plan.yaml" + with open(target_schema_path, "w") as f: + yaml.safe_dump(target_schema, f, sort_keys=False) + + start = time.perf_counter() + plan = planner.create_plan( + source_index_name, + redis_client=client, + target_schema_path=str(target_schema_path), + ) + planner.write_plan(plan, str(plan_path)) + duration = time.perf_counter() - start + + if not plan.diff_classification.supported: + raise AssertionError( + f"Expected planner to ALLOW algorithm change (HNSW -> FLAT), " + f"but it blocked with: {plan.diff_classification.blocked_reasons}" + ) + + return { + "test": "algorithm_change_allowed", + "plan_duration_seconds": round(duration, 3), + "supported": plan.diff_classification.supported, + "blocked_reasons": plan.diff_classification.blocked_reasons, + } + + +def benchmark_scale( + *, + client: Redis, + all_records: Sequence[Dict[str, Any]], + all_embeddings: np.ndarray, + size: int, + query_count: int, + top_k: int, + load_batch_size: int, +) -> Dict[str, Any]: + records = list(all_records[:size]) + query_records = list(all_records[size : size + query_count]) + doc_embeddings = all_embeddings[:size] + query_embeddings = all_embeddings[size : size + query_count] + dims = int(all_embeddings.shape[1]) + + client.flushdb() + + baseline_memory = get_memory_snapshot(client) + planner = MigrationPlanner(key_sample_limit=5) + source_schema = build_schema( + index_name=f"benchmark_source_{size}", + prefix=f"benchmark:source:{size}", + dims=dims, + algorithm="hnsw", + datatype="float32", + ) + + source_index = SearchIndex.from_dict(source_schema, redis_client=client) + migrated_index = None # Will be set after migration + + try: + source_index.create(overwrite=True) + source_load_start = time.perf_counter() + source_index.load( + iter_documents(records, doc_embeddings, dtype="float32"), + id_field="doc_id", + batch_size=load_batch_size, + ) + source_info = wait_for_index_ready(source_index) + source_setup_duration = time.perf_counter() - source_load_start + source_memory = get_memory_snapshot(client) + + # Query source index before migration + source_query_metrics = run_query_benchmark( + source_index, + query_embeddings, + dtype="float32", + top_k=top_k, + ) + + # Run full quantization migration: HNSW/FP32 -> FLAT/FP16 + quantization_result = run_quantization_migration( + planner=planner, + client=client, + source_index_name=source_schema["index"]["name"], + source_schema=source_schema, + dims=dims, + ) + + # Get migrated index info and memory + migrated_index = SearchIndex.from_existing( + source_schema["index"]["name"], redis_client=client + ) + target_info = wait_for_index_ready(migrated_index) + overlap_memory = get_memory_snapshot(client) + + # Query migrated index + target_query_metrics = run_query_benchmark( + migrated_index, + query_embeddings.astype(np.float16), + dtype="float16", + top_k=top_k, + ) + + overlap_metrics = compute_overlap( + source_query_metrics["result_sets"], + target_query_metrics["result_sets"], + top_k=top_k, + ) + + post_cutover_memory = get_memory_snapshot(client) + + return { + "size": size, + "query_count": len(query_records), + "vector_dims": dims, + "source": { + "algorithm": "hnsw", + "datatype": "float32", + "setup_duration_seconds": round(source_setup_duration, 3), + "index_info": summarize_index_info(source_info), + "query_metrics": { + k: v for k, v in source_query_metrics.items() if k != "result_sets" + }, + }, + "migration": { + "quantization": quantization_result, + }, + "target": { + "algorithm": "flat", + "datatype": "float16", + "migration_duration_seconds": quantization_result[ + "migration_duration_seconds" + ], + "quantize_duration_seconds": quantization_result[ + "quantize_duration_seconds" + ], + "index_info": summarize_index_info(target_info), + "query_metrics": { + k: v for k, v in target_query_metrics.items() if k != "result_sets" + }, + }, + "memory": { + "baseline": baseline_memory, + "after_source": source_memory, + "during_overlap": overlap_memory, + "after_cutover": post_cutover_memory, + "overlap_increase_mb": round( + overlap_memory["used_memory_mb"] - source_memory["used_memory_mb"], + 3, + ), + "net_change_after_cutover_mb": round( + post_cutover_memory["used_memory_mb"] + - source_memory["used_memory_mb"], + 3, + ), + }, + "correctness": { + "source_num_docs": int(source_info.get("num_docs", 0) or 0), + "target_num_docs": int(target_info.get("num_docs", 0) or 0), + "doc_count_match": int(source_info.get("num_docs", 0) or 0) + == int(target_info.get("num_docs", 0) or 0), + "migration_succeeded": quantization_result["result"] == "succeeded", + **overlap_metrics, + }, + } + finally: + for idx in (source_index, migrated_index): + try: + if idx is not None: + idx.delete(drop=True) + except Exception: + pass + + +def main() -> None: + args = parse_args() + sizes = sorted(args.sizes) + max_size = max(sizes) + required_docs = max_size + args.query_count + + if args.dataset_csv: + print( + f"Loading AG News CSV from {args.dataset_csv} with {required_docs} records" + ) + records = load_ag_news_records_from_csv( + args.dataset_csv, + required_docs=required_docs, + ) + else: + print(f"Loading AG News dataset with {required_docs} records") + records = load_ag_news_records( + required_docs - args.query_count, + args.query_count, + ) + print(f"Encoding {len(records)} texts with {args.model}") + embeddings, embedding_duration = encode_texts( + args.model, + [record["text"] for record in records], + args.embedding_batch_size, + ) + + client = Redis.from_url(args.redis_url, decode_responses=False) + client.ping() + + report = { + "dataset": "ag_news", + "model": args.model, + "sizes": sizes, + "query_count": args.query_count, + "top_k": args.top_k, + "embedding_duration_seconds": round(embedding_duration, 3), + "results": [], + } + + for size in sizes: + print(f"\nRunning benchmark for {size} documents") + result = benchmark_scale( + client=client, + all_records=records, + all_embeddings=embeddings, + size=size, + query_count=args.query_count, + top_k=args.top_k, + load_batch_size=args.load_batch_size, + ) + report["results"].append(result) + print( + json.dumps( + { + "size": size, + "source_setup_duration_seconds": result["source"][ + "setup_duration_seconds" + ], + "migration_duration_seconds": result["target"][ + "migration_duration_seconds" + ], + "quantize_duration_seconds": result["target"][ + "quantize_duration_seconds" + ], + "migration_succeeded": result["correctness"]["migration_succeeded"], + "mean_overlap_at_k": result["correctness"]["mean_overlap_at_k"], + "memory_change_mb": result["memory"]["net_change_after_cutover_mb"], + }, + indent=2, + ) + ) + + output_path = Path(args.output).resolve() + with open(output_path, "w") as f: + json.dump(report, f, indent=2) + + print(f"\nBenchmark report written to {output_path}") + + +if __name__ == "__main__": + main() diff --git a/tests/benchmarks/migration_benchmark.py b/tests/benchmarks/migration_benchmark.py new file mode 100644 index 000000000..d2ef0a085 --- /dev/null +++ b/tests/benchmarks/migration_benchmark.py @@ -0,0 +1,642 @@ +"""Migration Benchmark: Measure end-to-end migration time at scale. + +Populates a realistic 16-field index (matching the KM production schema) +at 1K, 10K, 100K, and 1M vectors, then migrates: + - Sub-1M: HNSW FP32 -> FLAT FP16 + - 1M: HNSW FP32 -> HNSW FP16 + +Collects full MigrationTimings from MigrationExecutor.apply(). + +Usage: + python tests/benchmarks/migration_benchmark.py \\ + --redis-url redis://localhost:6379 \\ + --sizes 1000 10000 100000 \\ + --trials 3 \\ + --output tests/benchmarks/results_migration.json +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import random +import time +from pathlib import Path +from typing import Any, Dict, List, Optional + +import numpy as np +from redis import Redis + +from redisvl.index import SearchIndex +from redisvl.migration import ( + AsyncMigrationExecutor, + AsyncMigrationPlanner, + MigrationExecutor, + MigrationPlanner, +) +from redisvl.migration.models import FieldUpdate, SchemaPatch, SchemaPatchChanges +from redisvl.migration.utils import wait_for_index_ready +from redisvl.redis.utils import array_to_buffer + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +VECTOR_DIMS = 3072 +INDEX_PREFIX = "KM:benchmark:" +HNSW_M = 16 +HNSW_EF_CONSTRUCTION = 200 +BATCH_SIZE = 500 + +# Vocabularies for synthetic data +TAG_VOCABS = { + "doc_base_id": [f"base_{i}" for i in range(50)], + "file_id": [f"file_{i:06d}" for i in range(200)], + "created_by": ["alice", "bob", "carol", "dave", "eve"], + "CUSIP": [f"{random.randint(100000000, 999999999)}" for _ in range(100)], +} + +TEXT_WORDS = [ + "financial", + "report", + "quarterly", + "analysis", + "revenue", + "growth", + "market", + "portfolio", + "investment", + "dividend", + "equity", + "bond", + "asset", + "liability", + "balance", + "income", + "statement", + "forecast", + "risk", + "compliance", +] + + +# --------------------------------------------------------------------------- +# Schema helpers +# --------------------------------------------------------------------------- + + +def make_source_schema(index_name: str) -> Dict[str, Any]: + """Build the 16-field HNSW FP32 source schema dict.""" + return { + "index": { + "name": index_name, + "prefix": INDEX_PREFIX, + "storage_type": "hash", + }, + "fields": [ + {"name": "doc_base_id", "type": "tag", "attrs": {"separator": ","}}, + {"name": "file_id", "type": "tag", "attrs": {"separator": ","}}, + {"name": "page_text", "type": "text", "attrs": {"weight": 1}}, + {"name": "chunk_number", "type": "numeric"}, + {"name": "start_page", "type": "numeric"}, + {"name": "end_page", "type": "numeric"}, + {"name": "created_by", "type": "tag", "attrs": {"separator": ","}}, + {"name": "file_name", "type": "text", "attrs": {"weight": 1}}, + {"name": "created_time", "type": "numeric"}, + {"name": "last_updated_by", "type": "text", "attrs": {"weight": 1}}, + {"name": "last_updated_time", "type": "numeric"}, + { + "name": "embedding", + "type": "vector", + "attrs": { + "algorithm": "hnsw", + "datatype": "float32", + "dims": VECTOR_DIMS, + "distance_metric": "COSINE", + "m": HNSW_M, + "ef_construction": HNSW_EF_CONSTRUCTION, + }, + }, + { + "name": "CUSIP", + "type": "tag", + "attrs": {"separator": ",", "index_missing": True}, + }, + { + "name": "description", + "type": "text", + "attrs": {"weight": 1, "index_missing": True}, + }, + { + "name": "name", + "type": "text", + "attrs": {"weight": 1, "index_missing": True}, + }, + {"name": "price", "type": "numeric", "attrs": {"index_missing": True}}, + ], + } + + +def make_migration_patch(target_algo: str) -> SchemaPatch: + """Build a SchemaPatch to change embedding from FP32 to FP16 (and optionally HNSW to FLAT).""" + attrs = {"datatype": "float16"} + if target_algo == "FLAT": + attrs["algorithm"] = "flat" + return SchemaPatch( + version=1, + changes=SchemaPatchChanges( + update_fields=[ + FieldUpdate(name="embedding", attrs=attrs), + ] + ), + ) + + +# --------------------------------------------------------------------------- +# Data generation +# --------------------------------------------------------------------------- + + +def generate_random_text(min_words: int = 10, max_words: int = 50) -> str: + """Generate a random sentence from the vocabulary.""" + n = random.randint(min_words, max_words) + return " ".join(random.choice(TEXT_WORDS) for _ in range(n)) + + +def generate_document(doc_id: int, vector: np.ndarray) -> Dict[str, Any]: + """Generate a single document with all 16 fields.""" + doc: Dict[str, Any] = { + "doc_base_id": random.choice(TAG_VOCABS["doc_base_id"]), + "file_id": random.choice(TAG_VOCABS["file_id"]), + "page_text": generate_random_text(), + "chunk_number": random.randint(0, 100), + "start_page": random.randint(1, 500), + "end_page": random.randint(1, 500), + "created_by": random.choice(TAG_VOCABS["created_by"]), + "file_name": f"document_{doc_id}.pdf", + "created_time": int(time.time()) - random.randint(0, 86400 * 365), + "last_updated_by": random.choice(TAG_VOCABS["created_by"]), + "last_updated_time": int(time.time()) - random.randint(0, 86400 * 30), + "embedding": array_to_buffer(vector, dtype="float32"), + } + # INDEXMISSING fields: populate ~80% of docs + if random.random() < 0.8: + doc["CUSIP"] = random.choice(TAG_VOCABS["CUSIP"]) + if random.random() < 0.8: + doc["description"] = generate_random_text(5, 20) + if random.random() < 0.8: + doc["name"] = f"Entity {doc_id}" + if random.random() < 0.8: + doc["price"] = round(random.uniform(1.0, 10000.0), 2) + return doc + + +# --------------------------------------------------------------------------- +# Population +# --------------------------------------------------------------------------- + + +def populate_index( + redis_url: str, + index_name: str, + num_docs: int, +) -> float: + """Create the source index and populate it with synthetic data. + + Returns the time taken in seconds. + """ + schema_dict = make_source_schema(index_name) + index = SearchIndex.from_dict(schema_dict, redis_url=redis_url) + + # Drop existing index if any + try: + index.delete(drop=True) + except Exception: + pass + + # Clean up any leftover keys from previous runs + client = Redis.from_url(redis_url) + cursor = 0 + while True: + cursor, keys = client.scan(cursor, match=f"{INDEX_PREFIX}*", count=5000) + if keys: + client.delete(*keys) + if cursor == 0: + break + client.close() + + index.create(overwrite=True) + + print(f" Populating {num_docs:,} documents...") + start = time.perf_counter() + + # Generate vectors in batches to manage memory + rng = np.random.default_rng(seed=42) + client = Redis.from_url(redis_url) + + for batch_start in range(0, num_docs, BATCH_SIZE): + batch_end = min(batch_start + BATCH_SIZE, num_docs) + batch_count = batch_end - batch_start + + # Generate batch of random unit-normalized vectors + vectors = rng.standard_normal((batch_count, VECTOR_DIMS)).astype(np.float32) + norms = np.linalg.norm(vectors, axis=1, keepdims=True) + vectors = vectors / norms + + pipe = client.pipeline(transaction=False) + for i in range(batch_count): + doc_id = batch_start + i + key = f"{INDEX_PREFIX}{doc_id}" + doc = generate_document(doc_id, vectors[i]) + pipe.hset(key, mapping=doc) + + pipe.execute() + + if (batch_end % 10000 == 0) or batch_end == num_docs: + elapsed = time.perf_counter() - start + rate = batch_end / elapsed if elapsed > 0 else 0 + print(f" {batch_end:,}/{num_docs:,} docs ({rate:,.0f} docs/sec)") + + populate_duration = time.perf_counter() - start + client.close() + + # Wait for indexing to complete + print(" Waiting for index to be ready...") + idx = SearchIndex.from_existing(index_name, redis_url=redis_url) + _, indexing_wait = wait_for_index_ready(idx) + print( + f" Index ready (waited {indexing_wait:.1f}s after {populate_duration:.1f}s populate)" + ) + + return populate_duration + indexing_wait + + +# --------------------------------------------------------------------------- +# Migration execution +# --------------------------------------------------------------------------- + + +def run_migration( + redis_url: str, + index_name: str, + target_algo: str, +) -> Dict[str, Any]: + """Run a single migration and return the full report as a dict. + + Returns a dict with 'report' (model_dump) and 'enumerate_method' + indicating whether FT.AGGREGATE or SCAN was used for key discovery. + """ + import logging + + patch = make_migration_patch(target_algo) + planner = MigrationPlanner() + plan = planner.create_plan_from_patch( + index_name, + schema_patch=patch, + redis_url=redis_url, + ) + + if not plan.diff_classification.supported: + raise RuntimeError( + f"Migration not supported: {plan.diff_classification.blocked_reasons}" + ) + + executor = MigrationExecutor() + + # Capture enumerate method by intercepting executor logger warnings + enumerate_method = "FT.AGGREGATE" # default (happy path) + _orig_logger = logging.getLogger("redisvl.migration.executor") + _orig_level = _orig_logger.level + + class _EnumMethodHandler(logging.Handler): + def emit(self, record): + nonlocal enumerate_method + msg = record.getMessage() + if "Using SCAN" in msg or "Falling back to SCAN" in msg: + enumerate_method = "SCAN" + + handler = _EnumMethodHandler() + _orig_logger.addHandler(handler) + _orig_logger.setLevel(logging.WARNING) + + def progress(step: str, detail: Optional[str] = None) -> None: + if detail: + print(f" [{step}] {detail}") + + try: + report = executor.apply( + plan, + redis_url=redis_url, + progress_callback=progress, + ) + finally: + _orig_logger.removeHandler(handler) + _orig_logger.setLevel(_orig_level) + + return {"report": report.model_dump(), "enumerate_method": enumerate_method} + + +async def async_run_migration( + redis_url: str, + index_name: str, + target_algo: str, +) -> Dict[str, Any]: + """Run a single migration using AsyncMigrationExecutor. + + Returns a dict with 'report' (model_dump) and 'enumerate_method' + indicating whether FT.AGGREGATE or SCAN was used for key discovery. + """ + import logging + + patch = make_migration_patch(target_algo) + planner = AsyncMigrationPlanner() + plan = await planner.create_plan_from_patch( + index_name, + schema_patch=patch, + redis_url=redis_url, + ) + + if not plan.diff_classification.supported: + raise RuntimeError( + f"Migration not supported: {plan.diff_classification.blocked_reasons}" + ) + + executor = AsyncMigrationExecutor() + + # Capture enumerate method by intercepting executor logger warnings + enumerate_method = "FT.AGGREGATE" # default (happy path) + _orig_logger = logging.getLogger("redisvl.migration.async_executor") + _orig_level = _orig_logger.level + + class _EnumMethodHandler(logging.Handler): + def emit(self, record): + nonlocal enumerate_method + msg = record.getMessage() + if "Using SCAN" in msg or "Falling back to SCAN" in msg: + enumerate_method = "SCAN" + + handler = _EnumMethodHandler() + _orig_logger.addHandler(handler) + _orig_logger.setLevel(logging.WARNING) + + def progress(step: str, detail: Optional[str] = None) -> None: + if detail: + print(f" [{step}] {detail}") + + try: + report = await executor.apply( + plan, + redis_url=redis_url, + progress_callback=progress, + ) + finally: + _orig_logger.removeHandler(handler) + _orig_logger.setLevel(_orig_level) + + return {"report": report.model_dump(), "enumerate_method": enumerate_method} + + +# --------------------------------------------------------------------------- +# Benchmark driver +# --------------------------------------------------------------------------- + + +def run_benchmark( + redis_url: str, + sizes: List[int], + trials: int, + output_path: Optional[str], + use_async: bool = False, +) -> Dict[str, Any]: + """Run the full migration benchmark across all sizes and trials.""" + executor_label = "async" if use_async else "sync" + results: Dict[str, Any] = { + "benchmark": "migration_timing", + "executor": executor_label, + "schema_field_count": 16, + "vector_dims": VECTOR_DIMS, + "trials_per_size": trials, + "results": [], + } + + for size in sizes: + target_algo = "HNSW" if size >= 1_000_000 else "FLAT" + index_name = f"bench_migration_{size}" + print(f"\n{'='*60}") + print( + f"Size: {size:,} | Migration: HNSW FP32 -> {target_algo} FP16 | Executor: {executor_label}" + ) + print(f"{'='*60}") + + size_result = { + "size": size, + "source_algo": "HNSW", + "source_dtype": "FLOAT32", + "target_algo": target_algo, + "target_dtype": "FLOAT16", + "trials": [], + } + + for trial_num in range(1, trials + 1): + print(f"\n Trial {trial_num}/{trials}") + + # Step 1: Populate + populate_time = populate_index(redis_url, index_name, size) + + # Capture source memory + client = Redis.from_url(redis_url) + try: + info_raw = client.execute_command("FT.INFO", index_name) + # Parse the flat list into a dict + info_dict = {} + for i in range(0, len(info_raw), 2): + key = info_raw[i] + if isinstance(key, bytes): + key = key.decode() + info_dict[key] = info_raw[i + 1] + source_mem_mb = float(info_dict.get("vector_index_sz_mb", 0)) + source_total_mb = float(info_dict.get("total_index_memory_sz_mb", 0)) + source_num_docs = int(info_dict.get("num_docs", 0)) + except Exception as e: + print(f" Warning: could not read source FT.INFO: {e}") + source_mem_mb = 0.0 + source_total_mb = 0.0 + source_num_docs = 0 + finally: + client.close() + + print( + f" Source: {source_num_docs:,} docs, " + f"vector_idx={source_mem_mb:.1f}MB, " + f"total_idx={source_total_mb:.1f}MB" + ) + + # Step 2: Migrate + print(f" Running migration ({executor_label})...") + if use_async: + migration_result = asyncio.run( + async_run_migration(redis_url, index_name, target_algo) + ) + else: + migration_result = run_migration(redis_url, index_name, target_algo) + report_dict = migration_result["report"] + enumerate_method = migration_result["enumerate_method"] + + # Capture target memory + target_index_name = report_dict.get("target_index", index_name) + client = Redis.from_url(redis_url) + try: + info_raw = client.execute_command("FT.INFO", target_index_name) + info_dict = {} + for i in range(0, len(info_raw), 2): + key = info_raw[i] + if isinstance(key, bytes): + key = key.decode() + info_dict[key] = info_raw[i + 1] + target_mem_mb = float(info_dict.get("vector_index_sz_mb", 0)) + target_total_mb = float(info_dict.get("total_index_memory_sz_mb", 0)) + except Exception as e: + print(f" Warning: could not read target FT.INFO: {e}") + target_mem_mb = 0.0 + target_total_mb = 0.0 + finally: + client.close() + + timings = report_dict.get("timings", {}) + migrate_s = timings.get("total_migration_duration_seconds", 0) or 0 + total_s = round(populate_time + migrate_s, 3) + + # Vector memory savings (the real savings from FP32 -> FP16) + vec_savings_pct = ( + round((1 - target_mem_mb / source_mem_mb) * 100, 1) + if source_mem_mb > 0 + else 0 + ) + + trial_result = { + "trial": trial_num, + "load_time_seconds": round(populate_time, 3), + "migrate_time_seconds": round(migrate_s, 3), + "total_time_seconds": total_s, + "enumerate_method": enumerate_method, + "timings": timings, + "benchmark_summary": report_dict.get("benchmark_summary", {}), + "source_vector_index_mb": round(source_mem_mb, 3), + "source_total_index_mb": round(source_total_mb, 3), + "target_vector_index_mb": round(target_mem_mb, 3), + "target_total_index_mb": round(target_total_mb, 3), + "vector_memory_savings_pct": vec_savings_pct, + "validation_passed": report_dict.get("result") == "succeeded", + "num_docs": source_num_docs, + } + + # Print isolated timings + _enum_s = timings.get("drop_duration_seconds", 0) or 0 # noqa: F841 + quant_s = timings.get("quantize_duration_seconds") or 0 + index_s = timings.get("initial_indexing_duration_seconds") or 0 + down_s = timings.get("downtime_duration_seconds") or 0 + print( + f""" Results + load = {populate_time:.1f}s + migrate = {migrate_s:.1f}s (enumerate + drop + quantize + create + reindex + validate) + total = {total_s:.1f}s + enumerate = {enumerate_method} + quantize = {quant_s:.1f}s + reindex = {index_s:.1f}s + downtime = {down_s:.1f}s + vec memory = {source_mem_mb:.1f}MB -> {target_mem_mb:.1f}MB ({vec_savings_pct:.1f}% saved) + passed = {trial_result['validation_passed']}""" + ) + + size_result["trials"].append(trial_result) + + # Clean up for next trial (drop index + keys) + client = Redis.from_url(redis_url) + try: + try: + client.execute_command("FT.DROPINDEX", target_index_name) + except Exception: + pass + # Delete document keys + cursor = 0 + while True: + cursor, keys = client.scan( + cursor, match=f"{INDEX_PREFIX}*", count=5000 + ) + if keys: + client.delete(*keys) + if cursor == 0: + break + finally: + client.close() + + results["results"].append(size_result) + + # Save results + if output_path: + output = Path(output_path) + output.parent.mkdir(parents=True, exist_ok=True) + with open(output, "w") as f: + json.dump(results, f, indent=2, default=str) + print(f"\nResults saved to {output}") + + return results + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def main(): + parser = argparse.ArgumentParser(description="Migration timing benchmark") + parser.add_argument( + "--redis-url", default="redis://localhost:6379", help="Redis connection URL" + ) + parser.add_argument( + "--sizes", + nargs="+", + type=int, + default=[1000, 10000, 100000], + help="Corpus sizes to benchmark", + ) + parser.add_argument( + "--trials", type=int, default=3, help="Number of trials per size" + ) + parser.add_argument( + "--output", + default="tests/benchmarks/results_migration.json", + help="Output JSON file", + ) + parser.add_argument( + "--async", + dest="use_async", + action="store_true", + default=False, + help="Use AsyncMigrationExecutor instead of sync MigrationExecutor", + ) + args = parser.parse_args() + + executor_label = "AsyncMigrationExecutor" if args.use_async else "MigrationExecutor" + print( + f"""Migration Benchmark + Redis: {args.redis_url} + Sizes: {args.sizes} + Trials: {args.trials} + Vector dims: {VECTOR_DIMS} + Fields: 16 + Executor: {executor_label}""" + ) + + run_benchmark( + redis_url=args.redis_url, + sizes=args.sizes, + trials=args.trials, + output_path=args.output, + use_async=args.use_async, + ) + + +if __name__ == "__main__": + main() diff --git a/tests/benchmarks/retrieval_benchmark.py b/tests/benchmarks/retrieval_benchmark.py new file mode 100644 index 000000000..584a098cf --- /dev/null +++ b/tests/benchmarks/retrieval_benchmark.py @@ -0,0 +1,680 @@ +"""Retrieval Benchmark: FP32 vs FP16 x HNSW vs FLAT + +Replicates the methodology from the Redis SVS-VAMANA study using +pre-embedded datasets from HuggingFace (no embedding step required). + +Comparison matrix (4 configurations): + - HNSW / FLOAT32 (approximate, full precision) + - HNSW / FLOAT16 (approximate, quantized) + - FLAT / FLOAT32 (exact, full precision -- ground truth) + - FLAT / FLOAT16 (exact, quantized) + +Datasets: + - dbpedia: 1536-dim OpenAI embeddings (KShivendu/dbpedia-entities-openai-1M) + - cohere: 768-dim Cohere embeddings (Cohere/wikipedia-22-12-en-embeddings) + +Metrics: + - Overlap@K (precision vs FLAT/FP32 ground truth) + - Query latency: p50, p95, p99, mean + - QPS (queries per second) + - Memory footprint per configuration + - Index build / load time + +Usage: + python tests/benchmarks/retrieval_benchmark.py \\ + --redis-url redis://localhost:6379 \\ + --dataset dbpedia \\ + --sizes 1000 10000 \\ + --top-k 10 \\ + --query-count 100 \\ + --output retrieval_benchmark_results.json +""" + +from __future__ import annotations + +import argparse +import json +import statistics +import time +from pathlib import Path +from typing import Any, Dict, Iterable, List, Sequence, Tuple + +import numpy as np +from redis import Redis + +from redisvl.index import SearchIndex +from redisvl.query import VectorQuery +from redisvl.redis.utils import array_to_buffer + +# --------------------------------------------------------------------------- +# Dataset registry +# --------------------------------------------------------------------------- + +DATASETS = { + "dbpedia": { + "hf_name": "KShivendu/dbpedia-entities-openai-1M", + "embedding_column": "openai", + "dims": 1536, + "distance_metric": "cosine", + "description": "DBpedia entities, OpenAI text-embedding-ada-002, 1536d", + }, + "cohere": { + "hf_name": "Cohere/wikipedia-22-12-en-embeddings", + "embedding_column": "emb", + "dims": 768, + "distance_metric": "cosine", + "description": "Wikipedia EN, Cohere multilingual encoder, 768d", + }, + "random768": { + "hf_name": None, + "embedding_column": None, + "dims": 768, + "distance_metric": "cosine", + "description": "Synthetic random unit vectors, 768d (Cohere-scale proxy)", + }, +} + +# Index configurations to benchmark +INDEX_CONFIGS = [ + {"algorithm": "flat", "datatype": "float32", "label": "FLAT_FP32"}, + {"algorithm": "flat", "datatype": "float16", "label": "FLAT_FP16"}, + {"algorithm": "hnsw", "datatype": "float32", "label": "HNSW_FP32"}, + {"algorithm": "hnsw", "datatype": "float16", "label": "HNSW_FP16"}, +] + +# HNSW parameters matching SVS-VAMANA study +HNSW_M = 16 +HNSW_EF_CONSTRUCTION = 200 +HNSW_EF_RUNTIME = 10 + +# Recall K values to compute recall curves +RECALL_K_VALUES = [1, 5, 10, 20, 50, 100] + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Retrieval benchmark: FP32 vs FP16 x HNSW vs FLAT." + ) + parser.add_argument("--redis-url", default="redis://localhost:6379") + parser.add_argument( + "--dataset", + choices=list(DATASETS.keys()), + default="dbpedia", + ) + parser.add_argument( + "--sizes", + nargs="+", + type=int, + default=[1000, 10000], + ) + parser.add_argument("--query-count", type=int, default=100) + parser.add_argument("--top-k", type=int, default=10) + parser.add_argument("--ef-runtime", type=int, default=10) + parser.add_argument("--load-batch-size", type=int, default=500) + parser.add_argument( + "--recall-k-max", + type=int, + default=100, + help="Max K for recall curve (queries will fetch this many results).", + ) + parser.add_argument( + "--output", + default="retrieval_benchmark_results.json", + ) + return parser.parse_args() + + +# --------------------------------------------------------------------------- +# Dataset loading +# --------------------------------------------------------------------------- + + +def load_dataset_vectors( + dataset_key: str, + num_vectors: int, +) -> Tuple[np.ndarray, int]: + """Load pre-embedded vectors from HuggingFace or generate synthetic.""" + ds_info = DATASETS[dataset_key] + dims = ds_info["dims"] + + if ds_info["hf_name"] is None: + # Synthetic random unit vectors + print(f"Generating {num_vectors} random unit vectors ({dims}d) ...") + rng = np.random.default_rng(42) + vectors = rng.standard_normal((num_vectors, dims)).astype(np.float32) + norms = np.linalg.norm(vectors, axis=1, keepdims=True) + vectors = vectors / norms + print(f" Generated shape: {vectors.shape}") + return vectors, dims + + # Local import to avoid requiring datasets for synthetic mode + from datasets import load_dataset + + hf_name = ds_info["hf_name"] + emb_col = ds_info["embedding_column"] + + print(f"Loading {num_vectors} vectors from {hf_name} ...") + ds = load_dataset(hf_name, split=f"train[:{num_vectors}]") + vectors = np.array(ds[emb_col], dtype=np.float32) + print(f" Loaded shape: {vectors.shape}") + return vectors, dims + + +# --------------------------------------------------------------------------- +# Schema helpers +# --------------------------------------------------------------------------- + + +def build_schema( + *, + index_name: str, + prefix: str, + dims: int, + algorithm: str, + datatype: str, + distance_metric: str, + ef_runtime: int = HNSW_EF_RUNTIME, +) -> Dict[str, Any]: + """Build an index schema dict for a given config.""" + vector_attrs: Dict[str, Any] = { + "dims": dims, + "distance_metric": distance_metric, + "algorithm": algorithm, + "datatype": datatype, + } + if algorithm == "hnsw": + vector_attrs["m"] = HNSW_M + vector_attrs["ef_construction"] = HNSW_EF_CONSTRUCTION + vector_attrs["ef_runtime"] = ef_runtime + + return { + "index": { + "name": index_name, + "prefix": prefix, + "storage_type": "hash", + }, + "fields": [ + {"name": "doc_id", "type": "tag"}, + { + "name": "embedding", + "type": "vector", + "attrs": vector_attrs, + }, + ], + } + + +# --------------------------------------------------------------------------- +# Data loading into Redis +# --------------------------------------------------------------------------- + + +def iter_documents( + vectors: np.ndarray, + *, + dtype: str, +) -> Iterable[Dict[str, Any]]: + """Yield documents ready for SearchIndex.load().""" + for i, vec in enumerate(vectors): + yield { + "doc_id": f"doc-{i}", + "embedding": array_to_buffer(vec, dtype), + } + + +def wait_for_index_ready( + index: SearchIndex, + *, + timeout_seconds: int = 3600, + poll_interval: float = 0.5, +) -> Dict[str, Any]: + """Block until the index reports 100% indexed.""" + deadline = time.perf_counter() + timeout_seconds + info = index.info() + while time.perf_counter() < deadline: + info = index.info() + pct = float(info.get("percent_indexed", 1)) + indexing = info.get("indexing", 0) + if pct >= 1.0 and not indexing: + return info + time.sleep(poll_interval) + raise TimeoutError( + f"Index {index.schema.index.name} not ready within {timeout_seconds}s" + ) + + +# --------------------------------------------------------------------------- +# Memory helpers +# --------------------------------------------------------------------------- + + +def get_memory_mb(client: Redis) -> float: + info = client.info("memory") + return round(int(info.get("used_memory", 0)) / (1024 * 1024), 3) + + +# --------------------------------------------------------------------------- +# Query execution & overlap +# --------------------------------------------------------------------------- + + +def percentile(values: Sequence[float], pct: float) -> float: + if not values: + return 0.0 + return round(float(np.percentile(np.asarray(values), pct)), 6) + + +def run_queries( + index: SearchIndex, + query_vectors: np.ndarray, + *, + dtype: str, + top_k: int, +) -> Dict[str, Any]: + """Run query vectors; return latency stats and result doc-id lists.""" + latencies_ms: List[float] = [] + result_sets: List[List[str]] = [] + + for qvec in query_vectors: + q = VectorQuery( + vector=qvec.tolist(), + vector_field_name="embedding", + return_fields=["doc_id"], + num_results=top_k, + dtype=dtype, + ) + t0 = time.perf_counter() + results = index.query(q) + latencies_ms.append((time.perf_counter() - t0) * 1000) + result_sets.append([r.get("doc_id") or r.get("id", "") for r in results if r]) + + total_s = sum(latencies_ms) / 1000 + qps = len(latencies_ms) / total_s if total_s > 0 else 0 + + return { + "count": len(latencies_ms), + "p50_ms": percentile(latencies_ms, 50), + "p95_ms": percentile(latencies_ms, 95), + "p99_ms": percentile(latencies_ms, 99), + "mean_ms": round(statistics.mean(latencies_ms), 3), + "qps": round(qps, 2), + "result_sets": result_sets, + } + + +def compute_overlap( + ground_truth: List[List[str]], + candidate: List[List[str]], + *, + top_k: int, +) -> Dict[str, Any]: + """Compute Overlap@K (precision) of candidate vs ground truth.""" + ratios: List[float] = [] + for gt, cand in zip(ground_truth, candidate): + gt_set = set(gt[:top_k]) + cand_set = set(cand[:top_k]) + ratios.append(len(gt_set & cand_set) / max(top_k, 1)) + return { + "mean_overlap_at_k": round(statistics.mean(ratios), 4), + "min_overlap_at_k": round(min(ratios), 4), + "max_overlap_at_k": round(max(ratios), 4), + "std_overlap_at_k": ( + round(statistics.stdev(ratios), 4) if len(ratios) > 1 else 0.0 + ), + } + + +def compute_recall( + ground_truth: List[List[str]], + candidate: List[List[str]], + *, + k_values: Sequence[int], + ground_truth_depth: int, +) -> Dict[str, Any]: + """Compute Recall@K at multiple K values. + + For each K, recall is defined as: + |candidate_top_K intersection ground_truth_top_GT_DEPTH| / GT_DEPTH + + The ground truth set is FIXED at ground_truth_depth (e.g., top-100 from + FLAT FP32). As K increases from 1 to ground_truth_depth, recall should + climb from low to 1.0 (for exact search) or near-1.0 (for approximate). + + This is the standard recall metric from ANN benchmarks -- it answers + "what fraction of the true nearest neighbors did we find?" + """ + recall_at_k: Dict[str, float] = {} + recall_detail: Dict[str, Dict[str, float]] = {} + for k in k_values: + ratios: List[float] = [] + for gt, cand in zip(ground_truth, candidate): + gt_set = set(gt[:ground_truth_depth]) + cand_set = set(cand[:k]) + denom = min(ground_truth_depth, len(gt_set)) + if denom == 0: + # Empty ground truth means nothing to recall; use 0.0 + ratios.append(0.0) + else: + ratios.append(len(gt_set & cand_set) / denom) + mean_recall = round(statistics.mean(ratios), 4) + recall_at_k[f"recall@{k}"] = mean_recall + recall_detail[f"recall@{k}"] = { + "mean": mean_recall, + "min": round(min(ratios), 4), + "max": round(max(ratios), 4), + "std": round(statistics.stdev(ratios), 4) if len(ratios) > 1 else 0.0, + } + return { + "recall_at_k": recall_at_k, + "recall_detail": recall_detail, + "ground_truth_depth": ground_truth_depth, + } + + +# --------------------------------------------------------------------------- +# Single-config benchmark +# --------------------------------------------------------------------------- + + +def benchmark_single_config( + *, + client: Redis, + doc_vectors: np.ndarray, + query_vectors: np.ndarray, + config: Dict[str, str], + dims: int, + distance_metric: str, + size: int, + top_k: int, + ef_runtime: int, + load_batch_size: int, +) -> Dict[str, Any]: + """Build one index config, load data, query, and return metrics.""" + label = config["label"] + algo = config["algorithm"] + dtype = config["datatype"] + + index_name = f"bench_{label}_{size}" + prefix = f"bench:{label}:{size}" + + schema = build_schema( + index_name=index_name, + prefix=prefix, + dims=dims, + algorithm=algo, + datatype=dtype, + distance_metric=distance_metric, + ef_runtime=ef_runtime, + ) + + idx = SearchIndex.from_dict(schema, redis_client=client) + try: + idx.create(overwrite=True) + + # Load data + load_start = time.perf_counter() + idx.load( + iter_documents(doc_vectors, dtype=dtype), + id_field="doc_id", + batch_size=load_batch_size, + ) + info = wait_for_index_ready(idx) + load_duration = time.perf_counter() - load_start + + memory_mb = get_memory_mb(client) + + # Query + query_metrics = run_queries( + idx, + query_vectors, + dtype=dtype, + top_k=top_k, + ) + + return { + "label": label, + "algorithm": algo, + "datatype": dtype, + "load_duration_seconds": round(load_duration, 3), + "num_docs": int(info.get("num_docs", 0) or 0), + "vector_index_sz_mb": float(info.get("vector_index_sz_mb", 0) or 0), + "memory_mb": memory_mb, + "latency": { + "queried_top_k": top_k, + **{k: v for k, v in query_metrics.items() if k != "result_sets"}, + }, + "result_sets": query_metrics["result_sets"], + } + finally: + try: + idx.delete(drop=True) + except Exception: + pass + + +# --------------------------------------------------------------------------- +# Scale-level benchmark (runs all 4 configs for one size) +# --------------------------------------------------------------------------- + + +def benchmark_scale( + *, + client: Redis, + all_vectors: np.ndarray, + size: int, + query_count: int, + dims: int, + distance_metric: str, + top_k: int, + ef_runtime: int, + load_batch_size: int, + recall_k_max: int = 100, +) -> Dict[str, Any]: + """Run all 4 index configs for a given dataset size.""" + doc_vectors = all_vectors[:size] + query_vectors = all_vectors[size : size + query_count].copy() + + # Use the larger of top_k and recall_k_max for querying + # so we have enough results for recall curve computation + effective_top_k = max(top_k, recall_k_max) + + baseline_memory = get_memory_mb(client) + + config_results: Dict[str, Any] = {} + ground_truth_results: List[List[str]] = [] + + # Run FLAT_FP32 first to establish ground truth + gt_config = INDEX_CONFIGS[0] # FLAT_FP32 + assert gt_config["label"] == "FLAT_FP32" + + for config in INDEX_CONFIGS: + label = config["label"] + print(f" [{label}] Building and querying ...") + + result = benchmark_single_config( + client=client, + doc_vectors=doc_vectors, + query_vectors=query_vectors, + config=config, + dims=dims, + distance_metric=distance_metric, + size=size, + top_k=effective_top_k, + ef_runtime=ef_runtime, + load_batch_size=load_batch_size, + ) + + if label == "FLAT_FP32": + ground_truth_results = result["result_sets"] + + config_results[label] = result + + # Compute overlap vs ground truth for every config (at original top_k) + overlap_results: Dict[str, Any] = {} + for label, result in config_results.items(): + overlap = compute_overlap( + ground_truth_results, + result["result_sets"], + top_k=top_k, + ) + overlap_results[label] = overlap + + # Compute recall at multiple K values. + # Ground truth depth is fixed at top_k (e.g., 10). We measure what + # fraction of those top_k true results appear in candidate top-K as + # K varies from 1 up to effective_top_k. + valid_k_values = [k for k in RECALL_K_VALUES if k <= effective_top_k] + recall_results: Dict[str, Any] = {} + for label, result in config_results.items(): + recall = compute_recall( + ground_truth_results, + result["result_sets"], + k_values=valid_k_values, + ground_truth_depth=top_k, + ) + recall_results[label] = recall + + # Strip raw result_sets from output (too large for JSON) + for label in config_results: + del config_results[label]["result_sets"] + + return { + "size": size, + "query_count": query_count, + "dims": dims, + "distance_metric": distance_metric, + "top_k": top_k, + "recall_k_max": recall_k_max, + "ef_runtime": ef_runtime, + "hnsw_m": HNSW_M, + "hnsw_ef_construction": HNSW_EF_CONSTRUCTION, + "baseline_memory_mb": baseline_memory, + "configs": config_results, + "overlap_vs_ground_truth": overlap_results, + "recall_vs_ground_truth": recall_results, + } + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +def main() -> None: + args = parse_args() + sizes = sorted(args.sizes) + max_needed = max(sizes) + args.query_count + ds_info = DATASETS[args.dataset] + + print( + f"""Retrieval Benchmark + Dataset: {args.dataset} ({ds_info['description']}) + Dims: {ds_info['dims']} + Sizes: {sizes} + Query count: {args.query_count} + Top-K: {args.top_k} + Recall K max: {args.recall_k_max} + EF runtime: {args.ef_runtime} + HNSW M: {HNSW_M} + EF construct: {HNSW_EF_CONSTRUCTION} + Redis URL: {args.redis_url} + Configs: {[c['label'] for c in INDEX_CONFIGS]}""" + ) + + # Load vectors once + all_vectors, dims = load_dataset_vectors(args.dataset, max_needed) + if all_vectors.shape[0] < max_needed: + raise ValueError( + f"Dataset has {all_vectors.shape[0]} vectors but need {max_needed} " + f"(max_size={max(sizes)} + query_count={args.query_count})" + ) + + client = Redis.from_url(args.redis_url, decode_responses=False) + client.ping() + print("Connected to Redis") + + report = { + "benchmark": "retrieval_fp32_vs_fp16", + "dataset": args.dataset, + "dataset_description": ds_info["description"], + "dims": dims, + "distance_metric": ds_info["distance_metric"], + "hnsw_m": HNSW_M, + "hnsw_ef_construction": HNSW_EF_CONSTRUCTION, + "ef_runtime": args.ef_runtime, + "top_k": args.top_k, + "recall_k_max": args.recall_k_max, + "recall_k_values": [ + k for k in RECALL_K_VALUES if k <= max(args.top_k, args.recall_k_max) + ], + "query_count": args.query_count, + "configs": [c["label"] for c in INDEX_CONFIGS], + "results": [], + } + + for size in sizes: + print(f"\n{'='*60}") + print(f" Size: {size:,} documents") + print(f"{'='*60}") + + client.flushdb() + + result = benchmark_scale( + client=client, + all_vectors=all_vectors, + size=size, + query_count=args.query_count, + dims=dims, + distance_metric=ds_info["distance_metric"], + top_k=args.top_k, + ef_runtime=args.ef_runtime, + load_batch_size=args.load_batch_size, + recall_k_max=args.recall_k_max, + ) + report["results"].append(result) + + # Print summary table for this size + print( + f"\n {'Config':<12} {'Load(s)':>8} {'Memory(MB)':>11} " + f"{'p50(ms)':>8} {'p95(ms)':>8} {'QPS':>7} {'Overlap@K':>10}" + ) + print(f" {'-'*12} {'-'*8} {'-'*11} {'-'*8} {'-'*8} {'-'*7} {'-'*10}") + for label, cfg in result["configs"].items(): + overlap = result["overlap_vs_ground_truth"][label] + print( + f" {label:<12} " + f"{cfg['load_duration_seconds']:>8.1f} " + f"{cfg['memory_mb']:>11.1f} " + f"{cfg['latency']['p50_ms']:>8.2f} " + f"{cfg['latency']['p95_ms']:>8.2f} " + f"{cfg['latency']['qps']:>7.1f} " + f"{overlap['mean_overlap_at_k']:>10.4f}" + ) + + # Print recall curve summary + recall_data = result.get("recall_vs_ground_truth", {}) + if recall_data: + first_label = next(iter(recall_data)) + k_keys = sorted( + recall_data[first_label].get("recall_at_k", {}).keys(), + key=lambda x: int(x.split("@")[1]), + ) + header = f" {'Config':<12} " + " ".join(f"{k:>10}" for k in k_keys) + print(f"\n Recall Curve:") + print(header) + print(f" {'-'*12} " + " ".join(f"{'-'*10}" for _ in k_keys)) + for label, rdata in recall_data.items(): + vals = " ".join( + f"{rdata['recall_at_k'].get(k, 0):>10.4f}" for k in k_keys + ) + print(f" {label:<12} {vals}") + + # Write report + output_path = Path(args.output).resolve() + with open(output_path, "w") as f: + json.dump(report, f, indent=2) + print(f"\nReport written to {output_path}") + + +if __name__ == "__main__": + main() diff --git a/tests/benchmarks/visualize_results.py b/tests/benchmarks/visualize_results.py new file mode 100644 index 000000000..8b282743a --- /dev/null +++ b/tests/benchmarks/visualize_results.py @@ -0,0 +1,529 @@ +#!/usr/bin/env python3 +""" +Visualization script for retrieval benchmark results. + +Generates charts replicating the style of the Redis SVS-VAMANA blog post: + 1. Memory footprint comparison (FP32 vs FP16, bar chart) + 2. Precision (Overlap@K) comparison (grouped bar chart) + 3. QPS comparison (grouped bar chart) + 4. Latency comparison (p50/p95, grouped bar chart) + 5. QPS vs Overlap@K curve (line chart) + +Usage: + python tests/benchmarks/visualize_results.py \ + --input tests/benchmarks/results_dbpedia.json \ + --output-dir tests/benchmarks/charts/ +""" + +import argparse +import json +import os +from typing import Any, Dict, List + +try: + import matplotlib.pyplot as plt + import matplotlib.ticker as mticker +except ImportError: + raise ImportError( + "matplotlib is required by this visualization script. " + "Install it with: pip install matplotlib" + ) +import numpy as np + +# Redis-inspired color palette +COLORS = { + "FLAT_FP32": "#1E3A5F", # dark navy + "FLAT_FP16": "#3B82F6", # bright blue + "HNSW_FP32": "#DC2626", # Redis red + "HNSW_FP16": "#F97316", # orange +} + +LABELS = { + "FLAT_FP32": "FLAT FP32", + "FLAT_FP16": "FLAT FP16", + "HNSW_FP32": "HNSW FP32", + "HNSW_FP16": "HNSW FP16", +} + + +def load_results(path: str) -> Dict[str, Any]: + with open(path) as f: + return json.load(f) + + +def setup_style(): + """Apply a clean, modern chart style.""" + plt.rcParams.update( + { + "figure.facecolor": "white", + "axes.facecolor": "#F8F9FA", + "axes.edgecolor": "#DEE2E6", + "axes.grid": True, + "grid.color": "#E9ECEF", + "grid.alpha": 0.7, + "font.family": "sans-serif", + "font.size": 11, + "axes.titlesize": 14, + "axes.titleweight": "bold", + "axes.labelsize": 12, + } + ) + + +def chart_memory(results: List[Dict], dataset: str, output_dir: str): + """Chart 1: Memory footprint comparison per size (grouped bar chart).""" + fig, ax = plt.subplots(figsize=(10, 6)) + configs = ["FLAT_FP32", "FLAT_FP16", "HNSW_FP32", "HNSW_FP16"] + sizes = [r["size"] for r in results] + x = np.arange(len(sizes)) + width = 0.18 + + for i, cfg in enumerate(configs): + mem = [r["configs"][cfg]["memory_mb"] for r in results] + bars = ax.bar( + x + i * width, + mem, + width, + label=LABELS[cfg], + color=COLORS[cfg], + edgecolor="white", + linewidth=0.5, + ) + for bar, val in zip(bars, mem): + ax.text( + bar.get_x() + bar.get_width() / 2, + bar.get_height() + 1, + f"{val:.0f}", + ha="center", + va="bottom", + fontsize=8, + ) + + ax.set_xlabel("Corpus Size") + ax.set_ylabel("Total Memory (MB)") + ax.set_title(f"Memory Footprint: FP32 vs FP16 -- {dataset}") + ax.set_xticks(x + width * 1.5) + ax.set_xticklabels([f"{s:,}" for s in sizes]) + ax.legend(loc="upper left") + ax.set_ylim(bottom=0) + fig.tight_layout() + fig.savefig(os.path.join(output_dir, f"{dataset}_memory.png"), dpi=150) + plt.close(fig) + print(f" Saved {dataset}_memory.png") + + +def chart_overlap(results: List[Dict], dataset: str, output_dir: str): + """Chart 2: Overlap@K (precision) comparison per size.""" + fig, ax = plt.subplots(figsize=(10, 6)) + configs = ["FLAT_FP32", "FLAT_FP16", "HNSW_FP32", "HNSW_FP16"] + sizes = [r["size"] for r in results] + x = np.arange(len(sizes)) + width = 0.18 + + for i, cfg in enumerate(configs): + overlap = [ + r["overlap_vs_ground_truth"][cfg]["mean_overlap_at_k"] for r in results + ] + bars = ax.bar( + x + i * width, + overlap, + width, + label=LABELS[cfg], + color=COLORS[cfg], + edgecolor="white", + linewidth=0.5, + ) + for bar, val in zip(bars, overlap): + ax.text( + bar.get_x() + bar.get_width() / 2, + bar.get_height() + 0.005, + f"{val:.3f}", + ha="center", + va="bottom", + fontsize=8, + ) + + ax.set_xlabel("Corpus Size") + ax.set_ylabel("Overlap@K (Precision vs FLAT FP32)") + ax.set_title(f"Search Precision: FP32 vs FP16 -- {dataset}") + ax.set_xticks(x + width * 1.5) + ax.set_xticklabels([f"{s:,}" for s in sizes]) + ax.legend(loc="lower left") + ax.set_ylim(0, 1.1) + fig.tight_layout() + fig.savefig(os.path.join(output_dir, f"{dataset}_overlap.png"), dpi=150) + plt.close(fig) + print(f" Saved {dataset}_overlap.png") + + +def chart_qps(results: List[Dict], dataset: str, output_dir: str): + """Chart 3: QPS comparison per size.""" + fig, ax = plt.subplots(figsize=(10, 6)) + configs = ["FLAT_FP32", "FLAT_FP16", "HNSW_FP32", "HNSW_FP16"] + sizes = [r["size"] for r in results] + x = np.arange(len(sizes)) + width = 0.18 + + for i, cfg in enumerate(configs): + qps = [r["configs"][cfg]["latency"]["qps"] for r in results] + bars = ax.bar( + x + i * width, + qps, + width, + label=LABELS[cfg], + color=COLORS[cfg], + edgecolor="white", + linewidth=0.5, + ) + for bar, val in zip(bars, qps): + ax.text( + bar.get_x() + bar.get_width() / 2, + bar.get_height() + 10, + f"{val:.0f}", + ha="center", + va="bottom", + fontsize=7, + rotation=45, + ) + + ax.set_xlabel("Corpus Size") + ax.set_ylabel("Queries Per Second (QPS)") + ax.set_title(f"Query Throughput: FP32 vs FP16 -- {dataset}") + ax.set_xticks(x + width * 1.5) + ax.set_xticklabels([f"{s:,}" for s in sizes]) + ax.legend(loc="upper right") + ax.set_ylim(bottom=0) + fig.tight_layout() + fig.savefig(os.path.join(output_dir, f"{dataset}_qps.png"), dpi=150) + plt.close(fig) + print(f" Saved {dataset}_qps.png") + + +def chart_latency(results: List[Dict], dataset: str, output_dir: str): + """Chart 4: p50 and p95 latency comparison per size.""" + fig, axes = plt.subplots(1, 2, figsize=(14, 6), sharey=True) + configs = ["FLAT_FP32", "FLAT_FP16", "HNSW_FP32", "HNSW_FP16"] + sizes = [r["size"] for r in results] + x = np.arange(len(sizes)) + width = 0.18 + + for ax, metric, title in zip( + axes, ["p50_ms", "p95_ms"], ["p50 Latency", "p95 Latency"] + ): + for i, cfg in enumerate(configs): + vals = [r["configs"][cfg]["latency"][metric] for r in results] + bars = ax.bar( + x + i * width, + vals, + width, + label=LABELS[cfg], + color=COLORS[cfg], + edgecolor="white", + linewidth=0.5, + ) + for bar, val in zip(bars, vals): + ax.text( + bar.get_x() + bar.get_width() / 2, + bar.get_height() + 0.02, + f"{val:.2f}", + ha="center", + va="bottom", + fontsize=7, + ) + ax.set_xlabel("Corpus Size") + ax.set_ylabel("Latency (ms)") + ax.set_title(f"{title} -- {dataset}") + ax.set_xticks(x + width * 1.5) + ax.set_xticklabels([f"{s:,}" for s in sizes]) + ax.legend(loc="upper left", fontsize=9) + ax.set_ylim(bottom=0) + + fig.tight_layout() + fig.savefig(os.path.join(output_dir, f"{dataset}_latency.png"), dpi=150) + plt.close(fig) + print(f" Saved {dataset}_latency.png") + + +def chart_qps_vs_overlap(results: List[Dict], dataset: str, output_dir: str): + """Chart 5: QPS vs Overlap@K curve (Redis blog Chart 2 style).""" + fig, ax = plt.subplots(figsize=(10, 6)) + configs = ["FLAT_FP32", "FLAT_FP16", "HNSW_FP32", "HNSW_FP16"] + markers = {"FLAT_FP32": "s", "FLAT_FP16": "D", "HNSW_FP32": "o", "HNSW_FP16": "^"} + + for cfg in configs: + overlaps = [] + qps_vals = [] + for r in results: + overlaps.append(r["overlap_vs_ground_truth"][cfg]["mean_overlap_at_k"]) + qps_vals.append(r["configs"][cfg]["latency"]["qps"]) + + ax.plot( + overlaps, + qps_vals, + marker=markers[cfg], + markersize=8, + linewidth=2, + label=LABELS[cfg], + color=COLORS[cfg], + ) + # Annotate points with size + for ov, qps, r in zip(overlaps, qps_vals, results): + ax.annotate( + f'{r["size"]//1000}K', + (ov, qps), + textcoords="offset points", + xytext=(5, 5), + fontsize=7, + color=COLORS[cfg], + ) + + ax.set_xlabel("Overlap@K (Precision)") + ax.set_ylabel("Queries Per Second (QPS)") + ax.set_title(f"Precision vs Throughput -- {dataset}") + ax.legend(loc="best") + ax.set_xlim(0, 1.05) + ax.set_ylim(bottom=0) + fig.tight_layout() + fig.savefig(os.path.join(output_dir, f"{dataset}_qps_vs_overlap.png"), dpi=150) + plt.close(fig) + print(f" Saved {dataset}_qps_vs_overlap.png") + + +def chart_memory_savings(results: List[Dict], dataset: str, output_dir: str): + """Chart 6: Memory savings percentage (Redis blog Chart 1 style).""" + fig, ax = plt.subplots(figsize=(10, 6)) + sizes = [r["size"] for r in results] + + # Calculate savings: FP16 vs FP32 for both FLAT and HNSW + pairs = [ + ("FLAT", "FLAT_FP32", "FLAT_FP16", "#3B82F6"), + ("HNSW", "HNSW_FP32", "HNSW_FP16", "#F97316"), + ] + + x = np.arange(len(sizes)) + width = 0.3 + + for i, (label, fp32, fp16, color) in enumerate(pairs): + savings = [] + for r in results: + m32 = r["configs"][fp32]["memory_mb"] + m16 = r["configs"][fp16]["memory_mb"] + pct = (1 - m16 / m32) * 100 if m32 > 0 else 0.0 + savings.append(pct) + + bars = ax.bar( + x + i * width, + savings, + width, + label=f"{label} FP16 savings", + color=color, + edgecolor="white", + linewidth=0.5, + ) + for bar, val in zip(bars, savings): + ax.text( + bar.get_x() + bar.get_width() / 2, + bar.get_height() + 0.5, + f"{val:.1f}%", + ha="center", + va="bottom", + fontsize=9, + fontweight="bold", + ) + + ax.set_xlabel("Corpus Size") + ax.set_ylabel("Memory Savings (%)") + ax.set_title(f"FP16 Memory Savings vs FP32 -- {dataset}") + ax.set_xticks(x + width * 0.5) + ax.set_xticklabels([f"{s:,}" for s in sizes]) + ax.legend(loc="lower right") + ax.set_ylim(0, 60) + ax.yaxis.set_major_formatter(mticker.PercentFormatter()) + fig.tight_layout() + fig.savefig(os.path.join(output_dir, f"{dataset}_memory_savings.png"), dpi=150) + plt.close(fig) + print(f" Saved {dataset}_memory_savings.png") + + +def chart_build_time(results: List[Dict], dataset: str, output_dir: str): + """Chart 7: Index build/load time comparison.""" + fig, ax = plt.subplots(figsize=(10, 6)) + configs = ["FLAT_FP32", "FLAT_FP16", "HNSW_FP32", "HNSW_FP16"] + sizes = [r["size"] for r in results] + x = np.arange(len(sizes)) + width = 0.18 + + for i, cfg in enumerate(configs): + times = [r["configs"][cfg]["load_duration_seconds"] for r in results] + bars = ax.bar( + x + i * width, + times, + width, + label=LABELS[cfg], + color=COLORS[cfg], + edgecolor="white", + linewidth=0.5, + ) + for bar, val in zip(bars, times): + if val > 0.1: + ax.text( + bar.get_x() + bar.get_width() / 2, + bar.get_height() + 0.2, + f"{val:.1f}s", + ha="center", + va="bottom", + fontsize=7, + ) + + ax.set_xlabel("Corpus Size") + ax.set_ylabel("Build Time (seconds)") + ax.set_title(f"Index Build Time -- {dataset}") + ax.set_xticks(x + width * 1.5) + ax.set_xticklabels([f"{s:,}" for s in sizes]) + ax.legend(loc="upper left") + ax.set_ylim(bottom=0) + fig.tight_layout() + fig.savefig(os.path.join(output_dir, f"{dataset}_build_time.png"), dpi=150) + plt.close(fig) + print(f" Saved {dataset}_build_time.png") + + +def chart_recall_curve(results: List[Dict], dataset: str, output_dir: str): + """Chart 8: Recall@K curve -- recall at multiple K values for the largest size.""" + # Use the largest corpus size for the recall curve + r = results[-1] + recall_data = r.get("recall_vs_ground_truth") + if not recall_data: + print(f" Skipping recall curve (no recall data in results)") + return + + fig, ax = plt.subplots(figsize=(10, 6)) + configs = ["FLAT_FP32", "FLAT_FP16", "HNSW_FP32", "HNSW_FP16"] + markers = {"FLAT_FP32": "s", "FLAT_FP16": "D", "HNSW_FP32": "o", "HNSW_FP16": "^"} + linestyles = { + "FLAT_FP32": "-", + "FLAT_FP16": "--", + "HNSW_FP32": "-", + "HNSW_FP16": "--", + } + + for cfg in configs: + if cfg not in recall_data: + continue + recall_at_k = recall_data[cfg].get("recall_at_k", {}) + if not recall_at_k: + continue + k_vals = sorted([int(k.split("@")[1]) for k in recall_at_k.keys()]) + recalls = [recall_at_k[f"recall@{k}"] for k in k_vals] + + ax.plot( + k_vals, + recalls, + marker=markers[cfg], + markersize=7, + linewidth=2, + linestyle=linestyles[cfg], + label=LABELS[cfg], + color=COLORS[cfg], + ) + + ax.set_xlabel("K (number of results)") + ax.set_ylabel("Recall@K") + ax.set_title(f"Recall@K Curve at {r['size']:,} documents -- {dataset}") + ax.legend(loc="lower right") + ax.set_ylim(0, 1.05) + ax.set_xlim(left=0) + ax.grid(True, alpha=0.3) + fig.tight_layout() + fig.savefig(os.path.join(output_dir, f"{dataset}_recall_curve.png"), dpi=150) + plt.close(fig) + print(f" Saved {dataset}_recall_curve.png") + + +def chart_recall_by_size(results: List[Dict], dataset: str, output_dir: str): + """Chart 9: Recall@10 comparison across corpus sizes (grouped bar chart).""" + # Check if recall data exists + if not results[0].get("recall_vs_ground_truth"): + print(f" Skipping recall by size (no recall data)") + return + + fig, ax = plt.subplots(figsize=(10, 6)) + configs = ["FLAT_FP32", "FLAT_FP16", "HNSW_FP32", "HNSW_FP16"] + sizes = [r["size"] for r in results] + x = np.arange(len(sizes)) + width = 0.18 + + for i, cfg in enumerate(configs): + recalls = [] + for r in results: + recall_data = r.get("recall_vs_ground_truth", {}).get(cfg, {}) + recall_at_k = recall_data.get("recall_at_k", {}) + recalls.append(recall_at_k.get("recall@10", 0)) + bars = ax.bar( + x + i * width, + recalls, + width, + label=LABELS[cfg], + color=COLORS[cfg], + edgecolor="white", + linewidth=0.5, + ) + for bar, val in zip(bars, recalls): + ax.text( + bar.get_x() + bar.get_width() / 2, + bar.get_height() + 0.005, + f"{val:.3f}", + ha="center", + va="bottom", + fontsize=8, + ) + + ax.set_xlabel("Corpus Size") + ax.set_ylabel("Recall@10") + ax.set_title(f"Recall@10: FP32 vs FP16 -- {dataset}") + ax.set_xticks(x + width * 1.5) + ax.set_xticklabels([f"{s:,}" for s in sizes]) + ax.legend(loc="lower left") + ax.set_ylim(0, 1.1) + fig.tight_layout() + fig.savefig(os.path.join(output_dir, f"{dataset}_recall.png"), dpi=150) + plt.close(fig) + print(f" Saved {dataset}_recall.png") + + +def main(): + parser = argparse.ArgumentParser(description="Visualize benchmark results.") + parser.add_argument( + "--input", nargs="+", required=True, help="One or more result JSON files." + ) + parser.add_argument( + "--output-dir", + default="tests/benchmarks/charts/", + help="Directory to save chart images.", + ) + args = parser.parse_args() + + os.makedirs(args.output_dir, exist_ok=True) + setup_style() + + for path in args.input: + data = load_results(path) + dataset = data["dataset"] + results = data["results"] + print(f"\nGenerating charts for {dataset} ({len(results)} sizes) ...") + + chart_memory(results, dataset, args.output_dir) + chart_overlap(results, dataset, args.output_dir) + chart_qps(results, dataset, args.output_dir) + chart_latency(results, dataset, args.output_dir) + chart_qps_vs_overlap(results, dataset, args.output_dir) + chart_memory_savings(results, dataset, args.output_dir) + chart_build_time(results, dataset, args.output_dir) + chart_recall_curve(results, dataset, args.output_dir) + chart_recall_by_size(results, dataset, args.output_dir) + + print(f"\nAll charts saved to {args.output_dir}") + + +if __name__ == "__main__": + main() diff --git a/tests/integration/test_batch_migration_integration.py b/tests/integration/test_batch_migration_integration.py index 976db0528..92ea7b94d 100644 --- a/tests/integration/test_batch_migration_integration.py +++ b/tests/integration/test_batch_migration_integration.py @@ -381,7 +381,7 @@ def test_resume_from_checkpoint(self, redis_url, worker_id, tmp_path): "completed": [ { "name": index_names[0], - "status": "succeeded", + "status": "success", "completed_at": "2026-03-20T10:00:30Z", } ], diff --git a/tests/integration/test_migration_comprehensive.py b/tests/integration/test_migration_comprehensive.py index 370546c68..3fef6acbe 100644 --- a/tests/integration/test_migration_comprehensive.py +++ b/tests/integration/test_migration_comprehensive.py @@ -24,7 +24,7 @@ - prefix change (keys renamed), HNSW params, initial_cap, phonetic_matcher, numeric unf Run: pytest tests/integration/test_migration_comprehensive.py -v -Spec: nitin_docs/index_migrator/32_integration_test_spec.md +Spec: local_docs/index_migrator/32_integration_test_spec.md """ import uuid @@ -93,7 +93,7 @@ def sample_docs(): "description": "First product description", "category": "electronics", "price": 99.99, - "location": "37.7749,-122.4194", # SF coordinates (lat,lon) + "location": "-122.4194,37.7749", # SF coordinates (lon,lat) "embedding": array_to_buffer([0.1, 0.2, 0.3, 0.4], "float32"), }, { @@ -102,7 +102,7 @@ def sample_docs(): "description": "Second service description", "category": "software", "price": 149.99, - "location": "40.7484,-73.9857", # NYC coordinates (lat,lon) + "location": "-73.9857,40.7484", # NYC coordinates (lon,lat) "embedding": array_to_buffer([0.2, 0.3, 0.4, 0.5], "float32"), }, { @@ -111,7 +111,7 @@ def sample_docs(): "description": "", # Empty for index_empty tests "category": "", # Empty for index_empty tests "price": 0, - "location": "34.0522,-118.2437", # LA coordinates (lat,lon) + "location": "-118.2437,34.0522", # LA coordinates (lon,lat) "embedding": array_to_buffer([0.3, 0.4, 0.5, 0.6], "float32"), }, ]