Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions redisvl/cli/migrate.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import argparse
import asyncio
import sys
from typing import Optional

from redisvl.cli.utils import add_redis_connection_options, create_redis_url
from redisvl.migration import (
AsyncMigrationExecutor,
MigrationExecutor,
MigrationPlanner,
MigrationValidator,
Expand Down Expand Up @@ -32,7 +34,7 @@ class Migrate:
"\tlist List all available indexes",
"\twizard Interactively build a migration plan and schema patch",
"\tplan Generate a migration plan for a document-preserving drop/recreate migration",
"\tapply Execute a reviewed drop/recreate migration plan",
"\tapply Execute a reviewed drop/recreate migration plan (use --async for large migrations)",
"\testimate Estimate disk space required for a migration plan (dry-run, no mutations)",
"\tvalidate Validate a completed migration plan against the live index",
"\n",
Expand Down Expand Up @@ -199,11 +201,17 @@ def apply(self):
parser = argparse.ArgumentParser(
usage=(
"rvl migrate apply --plan <migration_plan.yaml> "
"[--resume <checkpoint.yaml>] "
"[--async] [--resume <checkpoint.yaml>] "
"[--report-out <migration_report.yaml>]"
)
)
parser.add_argument("--plan", help="Path to migration_plan.yaml", required=True)
parser.add_argument(
"--async",
dest="use_async",
help="Use async executor (recommended for large migrations with quantization)",
action="store_true",
)
parser.add_argument(
"--resume",
dest="checkpoint_path",
Expand Down Expand Up @@ -246,9 +254,16 @@ def apply(self):
if disk_estimate.has_quantization:
print(f"\n{disk_estimate.summary()}\n")

report = self._apply_sync(
plan, redis_url, args.query_check_file, args.checkpoint_path
)
if args.use_async:
report = asyncio.run(
self._apply_async(
plan, redis_url, args.query_check_file, args.checkpoint_path
)
)
else:
report = self._apply_sync(
plan, redis_url, args.query_check_file, args.checkpoint_path
)

write_migration_report(report, args.report_out)
if args.benchmark_out:
Expand Down Expand Up @@ -319,6 +334,29 @@ def _apply_sync(
self._print_apply_result(report)
return report

async def _apply_async(
self,
plan,
redis_url: str,
query_check_file: Optional[str],
checkpoint_path: Optional[str] = None,
):
"""Execute migration asynchronously (non-blocking for large quantization jobs)."""
executor = AsyncMigrationExecutor()

print(f"\nApplying migration to '{plan.source.index_name}' (async mode)...")

report = await executor.apply(
plan,
redis_url=redis_url,
query_check_file=query_check_file,
progress_callback=self._make_progress_callback(),
checkpoint_path=checkpoint_path,
)

self._print_apply_result(report)
return report

def _print_apply_result(self, report) -> None:
"""Print the result summary after migration apply."""
if report.result == "succeeded":
Expand Down
6 changes: 6 additions & 0 deletions redisvl/migration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from redisvl.migration.async_executor import AsyncMigrationExecutor
from redisvl.migration.async_planner import AsyncMigrationPlanner
from redisvl.migration.async_validation import AsyncMigrationValidator
from redisvl.migration.executor import MigrationExecutor
from redisvl.migration.planner import MigrationPlanner
from redisvl.migration.validation import MigrationValidator
from redisvl.migration.wizard import MigrationWizard

__all__ = [
"AsyncMigrationExecutor",
"AsyncMigrationPlanner",
"AsyncMigrationValidator",
"MigrationExecutor",
"MigrationPlanner",
"MigrationValidator",
Expand Down
Loading
Loading