Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 57 additions & 15 deletions pulpcore/app/tasks/migrate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from gettext import gettext as _

from django.conf import settings
from django.utils.timezone import now
from rest_framework.serializers import ValidationError

Expand All @@ -10,6 +12,35 @@

_logger = logging.getLogger(__name__)

MIGRATION_WORKERS = getattr(settings, "MIGRATION_WORKERS", 8)
MIGRATION_BATCH_SIZE = getattr(settings, "MIGRATION_BATCH_SIZE", 500)


def _copy_artifact(old_storage, new_storage, filename):
"""Copy a single artifact between storage backends. Returns (filename, error)."""
if new_storage.exists(filename):
return filename, None
try:
file = old_storage.open(filename)
except FileNotFoundError:
return filename, FileNotFoundError(filename)
try:
new_storage.save(filename, file)
finally:
file.close()
return filename, None


def _process_batch(batch, pb):
"""Wait for a batch of copy futures, increment progress, collect errors."""
errors = []
for future in as_completed(batch):
filename, error = future.result()
if error is not None:
errors.append(filename)
pb.increment()
return errors


def migrate_backend(data):
"""
Expand All @@ -25,25 +56,28 @@ def migrate_backend(data):
artifacts = Artifact.objects.filter(pulp_domain=domain)
date = now()

missing = []
with ProgressReport(
message=_("Migrating Artifacts"), code="migrate", total=artifacts.count()
) as pb:
while True:
for digest in pb.iter(artifacts.values_list("sha256", flat=True)):
filename = storage.get_artifact_path(digest)
if not new_storage.exists(filename):
try:
file = old_storage.open(filename)
except FileNotFoundError:
raise ValidationError(
_(
"Found missing file for artifact(sha256={}). Please run the repair "
"task or delete the offending artifact."
).format(digest)
)
new_storage.save(filename, file)
file.close()
# Handle new artifacts saved by the content app
batch = []
with ThreadPoolExecutor(max_workers=MIGRATION_WORKERS) as executor:
for digest in artifacts.values_list("sha256", flat=True):
filename = storage.get_artifact_path(digest)
future = executor.submit(
_copy_artifact, old_storage, new_storage, filename,
)
batch.append(future)

if len(batch) >= MIGRATION_BATCH_SIZE:
missing.extend(_process_batch(batch, pb))
batch = []

if batch:
missing.extend(_process_batch(batch, pb))

# Handle new artifacts saved by the content app during migration
artifacts = Artifact.objects.filter(pulp_domain=domain, pulp_created__gte=date)
if count := artifacts.count():
pb.total += count
Expand All @@ -52,6 +86,14 @@ def migrate_backend(data):
continue
break

if missing:
raise ValidationError(
_(
"Found missing file(s) for {} artifact(s). Please run the repair "
"task or delete the offending artifacts. First missing: {}"
).format(len(missing), missing[0])
)

# Update the current domain to the new storage backend settings
msg = _("Update Domain({domain})'s Backend Settings").format(domain=domain.name)
with ProgressReport(message=msg, code="update", total=1) as pb:
Expand Down
Loading