From 40fd49f7762bb47ab1d72f772c4753d19e665c9b Mon Sep 17 00:00:00 2001 From: Aliaksei Klimau Date: Mon, 30 Mar 2026 17:31:58 +0200 Subject: [PATCH] Added new PulpException replacing existing errors --- pulp_file/app/tasks/synchronizing.py | 2 +- pulpcore/app/tasks/repository.py | 9 +--- pulpcore/download/http.py | 16 ++++-- pulpcore/exceptions/__init__.py | 2 + pulpcore/exceptions/base.py | 49 +++++++++++++++++++ pulpcore/exceptions/validation.py | 10 ++-- pulpcore/tasking/tasks.py | 30 +++++++----- .../functional/api/using_plugin/test_proxy.py | 16 ++++-- 8 files changed, 97 insertions(+), 37 deletions(-) diff --git a/pulp_file/app/tasks/synchronizing.py b/pulp_file/app/tasks/synchronizing.py index 295f7a8c32..22eb7558bc 100644 --- a/pulp_file/app/tasks/synchronizing.py +++ b/pulp_file/app/tasks/synchronizing.py @@ -117,7 +117,7 @@ def synchronize(remote_pk, repository_pk, mirror, optimize=False, url=None, **kw optimize = optimize or settings.FILE_SYNC_OPTIMIZATION if not remote.url: - raise ValueError(_("A remote must have a url specified to synchronize.")) + raise SyncError(_("A remote must have a url specified to synchronize.")) if isinstance(remote, FileGitRemote): first_stage = GitFirstStage(remote) diff --git a/pulpcore/app/tasks/repository.py b/pulpcore/app/tasks/repository.py index c989f17f06..919c134a3a 100644 --- a/pulpcore/app/tasks/repository.py +++ b/pulpcore/app/tasks/repository.py @@ -7,11 +7,11 @@ from asgiref.sync import sync_to_async from django.db import transaction -from rest_framework.serializers import ValidationError from pulpcore.app import models from pulpcore.app.models import ProgressReport from pulpcore.app.util import get_domain +from pulpcore.exceptions.base import RepositoryVersionDeleteError log = getLogger(__name__) @@ -44,12 +44,7 @@ def delete_version(pk): return if version.repository.versions.complete().count() <= 1: - raise ValidationError( - _( - "Cannot delete repository version. Repositories must have at least one " - "repository version." - ) - ) + raise RepositoryVersionDeleteError() log.info( "Deleting and squashing version {num} of repository '{repo}'".format( diff --git a/pulpcore/download/http.py b/pulpcore/download/http.py index 0a714e67ea..eeed8310ca 100644 --- a/pulpcore/download/http.py +++ b/pulpcore/download/http.py @@ -6,6 +6,8 @@ from pulpcore.exceptions import ( DigestValidationError, + DnsDomainNameException, + ProxyAuthenticationError, SizeValidationError, TimeoutException, ) @@ -236,6 +238,7 @@ async def run(self, extra_data=None): aiohttp.ClientPayloadError, aiohttp.ClientResponseError, aiohttp.ServerDisconnectedError, + DnsDomainNameException, TimeoutError, TimeoutException, DigestValidationError, @@ -269,7 +272,7 @@ async def download_wrapper(): e.message, ) ) - raise e + raise ProxyAuthenticationError(self.proxy) return await download_wrapper() @@ -296,10 +299,13 @@ async def _run(self, extra_data=None): } if extra_data and extra_data.get("request_kwargs"): request_kwargs.update(extra_data["request_kwargs"]) - async with self.session.get(self.url, **request_kwargs) as response: - self.raise_for_status(response) - to_return = await self._handle_response(response) - await response.release() + try: + async with self.session.get(self.url, **request_kwargs) as response: + self.raise_for_status(response) + to_return = await self._handle_response(response) + await response.release() + except aiohttp.ClientConnectorDNSError: + raise DnsDomainNameException(self.url) if self._close_session_on_finalize: await self.session.close() return to_return diff --git a/pulpcore/exceptions/__init__.py b/pulpcore/exceptions/__init__.py index e6b935de6e..9224ea1f2c 100644 --- a/pulpcore/exceptions/__init__.py +++ b/pulpcore/exceptions/__init__.py @@ -18,6 +18,8 @@ ReplicateError, SyncError, PublishError, + TaskConfigurationError, + TaskTimeoutException, ) from .validation import ( DigestValidationError, diff --git a/pulpcore/exceptions/base.py b/pulpcore/exceptions/base.py index 63e805981f..af6b9ee770 100644 --- a/pulpcore/exceptions/base.py +++ b/pulpcore/exceptions/base.py @@ -326,3 +326,52 @@ class ReplicateError(PulpException): def __str__(self): return f"[{self.error_code}] " + _("Replication failed") + + +class TaskConfigurationError(PulpException): + """ + Raised when a task is incorrectly configured. + """ + + error_code = "PLP0023" + + def __init__(self, task_name, message): + """ + :param task_name: the fully qualified name of the task function + :type task_name: str + :param message: description of the configuration error + :type message: str + """ + self.task_name = task_name + self.message = message + + def __str__(self): + return f"[{self.error_code}] " + _( + "Task type '{task_name}' is misconfigured: {message}" + ).format(task_name=self.task_name, message=self.message) + + +class TaskTimeoutException(PulpException): + """ + Raised when an immediate task exceeds its execution timeout. + """ + + error_code = "PLP0024" + + def __init__(self, task_name, task_pk, timeout_seconds): + """ + :param task_name: the fully qualified name of the task function + :type task_name: str + :param task_pk: the unique task identifier + :type task_pk: str + :param timeout_seconds: the timeout value that was exceeded + :type timeout_seconds: int + """ + self.task_name = task_name + self.task_pk = task_pk + self.timeout_seconds = timeout_seconds + + def __str__(self): + return f"[{self.error_code}] " + _( + "Immediate task {task_pk} (type: {task_name}) timed out after {timeout} seconds." + ).format(task_pk=self.task_pk, task_name=self.task_name, timeout=self.timeout_seconds) diff --git a/pulpcore/exceptions/validation.py b/pulpcore/exceptions/validation.py index 985aad65de..9c3a74b66d 100644 --- a/pulpcore/exceptions/validation.py +++ b/pulpcore/exceptions/validation.py @@ -96,14 +96,12 @@ class UnsupportedDigestValidationError(ValidationError): error_code = "PLP0020" - def __init__(self, digest_name=None): - self.digest_name = digest_name + def __init__(self, message=None): + self.message = message def __str__(self): - if self.digest_name: - return f"[{self.error_code}] " + _( - "Checksum type '{digest}' is not supported or enabled." - ).format(digest=self.digest_name) + if self.message: + return f"[{self.error_code}] {self.message}" return f"[{self.error_code}] " + _("Unsupported checksum type.") diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index b7ca64c27b..984cfb3955 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -32,7 +32,14 @@ TASK_WAKEUP_HANDLE, TASK_WAKEUP_UNBLOCK, ) -from pulpcore.exceptions import InternalErrorException, PulpException +from rest_framework.exceptions import APIException + +from pulpcore.exceptions import ( + InternalErrorException, + PulpException, + TaskConfigurationError, + TaskTimeoutException, +) from pulpcore.tasking.kafka import send_task_notification _logger = logging.getLogger(__name__) @@ -75,7 +82,7 @@ def _execute_task(task): except Exception as e: exc_type, exc, tb = sys.exc_info() task_exc = exc - if not isinstance(e, PulpException): + if not isinstance(e, (PulpException, APIException)): if settings.REDACT_UNSAFE_EXCEPTIONS: # Replace exception with generic error task_exc = InternalErrorException() @@ -112,7 +119,7 @@ async def _aexecute_task(task): except Exception as e: exc_type, exc, tb = sys.exc_info() task_exc = exc - if not isinstance(e, PulpException): + if not isinstance(e, (PulpException, APIException)): if settings.REDACT_UNSAFE_EXCEPTIONS: # Replace exception with generic error task_exc = InternalErrorException() @@ -187,11 +194,11 @@ async def aget_task_function(task): func, is_coroutine_fn = _load_function(task) if task.immediate and not is_coroutine_fn: - raise ValueError("Immediate tasks must be async functions.") + raise TaskConfigurationError(task.name, "Immediate tasks must be async functions.") elif not task.immediate: - raise ValueError("Non-immediate tasks can't run in async context.") + raise TaskConfigurationError(task.name, "Non-immediate tasks can't run in async context.") - return _add_timeout_to(func, task.pk) + return _add_timeout_to(func, task.name, task.pk) def get_task_function(task): @@ -205,7 +212,7 @@ def get_task_function(task): func, is_coroutine_fn = _load_function(task) if task.immediate and not is_coroutine_fn: - raise ValueError("Immediate tasks must be async functions.") + raise TaskConfigurationError(task.name, "Immediate tasks must be async functions.") # no sync wrapper required if not is_coroutine_fn: @@ -213,7 +220,7 @@ def get_task_function(task): # async function in sync context requires wrapper if task.immediate: - coro_fn_with_timeout = _add_timeout_to(func, task.pk) + coro_fn_with_timeout = _add_timeout_to(func, task.name, task.pk) return async_to_sync(coro_fn_with_timeout) return async_to_sync(func) @@ -230,16 +237,13 @@ def _load_function(task): return func_with_args, is_coroutine_fn -def _add_timeout_to(coro_fn, task_pk): +def _add_timeout_to(coro_fn, task_name, task_pk): async def _wrapper(): try: return await asyncio.wait_for(coro_fn(), timeout=IMMEDIATE_TIMEOUT) except asyncio.TimeoutError: - msg_template = "Immediate task %s timed out after %s seconds." - error_msg = msg_template % (task_pk, IMMEDIATE_TIMEOUT) - _logger.info(error_msg) - raise RuntimeError(error_msg) + raise TaskTimeoutException(task_name, task_pk, IMMEDIATE_TIMEOUT) return _wrapper diff --git a/pulpcore/tests/functional/api/using_plugin/test_proxy.py b/pulpcore/tests/functional/api/using_plugin/test_proxy.py index a2c4085c4a..fe84b29e51 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_proxy.py +++ b/pulpcore/tests/functional/api/using_plugin/test_proxy.py @@ -31,7 +31,9 @@ def test_sync_http_through_http_proxy( Test syncing http through a http proxy. """ remote_on_demand = file_remote_factory( - manifest_path=basic_manifest_path, policy="on_demand", proxy_url=http_proxy.proxy_url + manifest_path=basic_manifest_path, + policy="on_demand", + proxy_url=http_proxy.proxy_url, ) _run_basic_sync_and_assert(file_bindings, monitor_task, remote_on_demand, file_repo) @@ -50,7 +52,9 @@ def test_sync_https_through_http_proxy( Test syncing https through a http proxy. """ remote_on_demand = file_remote_ssl_factory( - manifest_path=basic_manifest_path, policy="on_demand", proxy_url=http_proxy.proxy_url + manifest_path=basic_manifest_path, + policy="on_demand", + proxy_url=http_proxy.proxy_url, ) _run_basic_sync_and_assert(file_bindings, monitor_task, remote_on_demand, file_repo) @@ -99,10 +103,12 @@ def test_sync_https_through_http_proxy_with_auth_but_auth_not_configured( proxy_url=http_proxy_with_auth.proxy_url, ) - try: + with pytest.raises(PulpTaskError) as excinfo: _run_basic_sync_and_assert(file_bindings, monitor_task, remote_on_demand, file_repo) - except PulpTaskError as exc: - assert "407, message='Proxy Authentication Required'" in exc.task.error["description"] + + error_desc = excinfo.value.task.error.get("description", "") + + assert "[PLP0010]" in error_desc @pytest.mark.parallel