diff --git a/pulp_file/app/tasks/synchronizing.py b/pulp_file/app/tasks/synchronizing.py index 295f7a8c32..22eb7558bc 100644 --- a/pulp_file/app/tasks/synchronizing.py +++ b/pulp_file/app/tasks/synchronizing.py @@ -117,7 +117,7 @@ def synchronize(remote_pk, repository_pk, mirror, optimize=False, url=None, **kw optimize = optimize or settings.FILE_SYNC_OPTIMIZATION if not remote.url: - raise ValueError(_("A remote must have a url specified to synchronize.")) + raise SyncError(_("A remote must have a url specified to synchronize.")) if isinstance(remote, FileGitRemote): first_stage = GitFirstStage(remote) diff --git a/pulpcore/app/tasks/replica.py b/pulpcore/app/tasks/replica.py index 272388246c..25bfde06e7 100644 --- a/pulpcore/app/tasks/replica.py +++ b/pulpcore/app/tasks/replica.py @@ -7,11 +7,13 @@ from django.db.models import Min from pulp_glue.common import __version__ as pulp_glue_version from pulp_glue.common.context import PluginRequirement +from requests.exceptions import SSLError from pulpcore.app.apps import PulpAppConfig, pulp_plugin_configs from pulpcore.app.models import Distribution, Repository, Task, TaskGroup, UpstreamPulp from pulpcore.app.replica import ReplicaContext from pulpcore.constants import TASK_STATES +from pulpcore.exceptions import SslConnectionError from pulpcore.tasking.tasks import dispatch @@ -79,32 +81,35 @@ def replicate_distributions(server_pk): distro_repo_pairs = [] for replicator in supported_replicators: - distros = replicator.upstream_distributions(q=server.q_select) distro_names = [] - for distro in distros: - # Create remote - remote = replicator.create_or_update_remote(upstream_distribution=distro) - if not remote: - # The upstream distribution is not serving any content, - # let if fall through the cracks and be cleanup below. - continue - # Check if there is already a repository - repository = replicator.create_or_update_repository(remote=remote) - if not repository: - # No update occurred because server.policy==LABELED and there was - # an already existing local repository with the same name - continue - - # Dispatch a sync task if needed - if replicator.requires_syncing(distro): - replicator.sync(repository, remote) - - # Get or create a distribution - replicator.create_or_update_distribution(repository, distro) - - # Add name to the list of known distribution names - distro_names.append(distro["name"]) - distro_repo_pairs.append((distro["name"], str(repository.pk))) + try: + distros = replicator.upstream_distributions(q=server.q_select) + for distro in distros: + # Create remote + remote = replicator.create_or_update_remote(upstream_distribution=distro) + if not remote: + # The upstream distribution is not serving any content, + # let if fall through the cracks and be cleanup below. + continue + # Check if there is already a repository + repository = replicator.create_or_update_repository(remote=remote) + if not repository: + # No update occurred because server.policy==LABELED and there was + # an already existing local repository with the same name + continue + + # Dispatch a sync task if needed + if replicator.requires_syncing(distro): + replicator.sync(repository, remote) + + # Get or create a distribution + replicator.create_or_update_distribution(repository, distro) + + # Add name to the list of known distribution names + distro_names.append(distro["name"]) + distro_repo_pairs.append((distro["name"], str(repository.pk))) + except SSLError as e: + raise SslConnectionError(url=server.base_url, details=str(e)) replicator.remove_missing(distro_names) diff --git a/pulpcore/app/tasks/repository.py b/pulpcore/app/tasks/repository.py index c989f17f06..919c134a3a 100644 --- a/pulpcore/app/tasks/repository.py +++ b/pulpcore/app/tasks/repository.py @@ -7,11 +7,11 @@ from asgiref.sync import sync_to_async from django.db import transaction -from rest_framework.serializers import ValidationError from pulpcore.app import models from pulpcore.app.models import ProgressReport from pulpcore.app.util import get_domain +from pulpcore.exceptions.base import RepositoryVersionDeleteError log = getLogger(__name__) @@ -44,12 +44,7 @@ def delete_version(pk): return if version.repository.versions.complete().count() <= 1: - raise ValidationError( - _( - "Cannot delete repository version. Repositories must have at least one " - "repository version." - ) - ) + raise RepositoryVersionDeleteError() log.info( "Deleting and squashing version {num} of repository '{repo}'".format( diff --git a/pulpcore/download/http.py b/pulpcore/download/http.py index 0a714e67ea..3cc698b16e 100644 --- a/pulpcore/download/http.py +++ b/pulpcore/download/http.py @@ -6,7 +6,12 @@ from pulpcore.exceptions import ( DigestValidationError, + DnsDomainNameException, + HttpResponseError, + ProxyAuthenticationError, + RemoteConnectionError, SizeValidationError, + SslConnectionError, TimeoutException, ) @@ -236,6 +241,7 @@ async def run(self, extra_data=None): aiohttp.ClientPayloadError, aiohttp.ClientResponseError, aiohttp.ServerDisconnectedError, + DnsDomainNameException, TimeoutError, TimeoutException, DigestValidationError, @@ -269,9 +275,21 @@ async def download_wrapper(): e.message, ) ) - raise e + raise ProxyAuthenticationError(self.proxy) - return await download_wrapper() + try: + return await download_wrapper() + except aiohttp.ClientResponseError as e: + raise HttpResponseError(url=self.url, status=e.status, message=e.message) + except aiohttp.ClientConnectorSSLError as e: + raise SslConnectionError(url=self.url, details=str(e)) + except ( + aiohttp.ClientConnectorError, + aiohttp.ClientOSError, + aiohttp.ClientPayloadError, + aiohttp.ServerDisconnectedError, + ) as e: + raise RemoteConnectionError(url=self.url, details=str(e)) async def _run(self, extra_data=None): """ @@ -296,10 +314,13 @@ async def _run(self, extra_data=None): } if extra_data and extra_data.get("request_kwargs"): request_kwargs.update(extra_data["request_kwargs"]) - async with self.session.get(self.url, **request_kwargs) as response: - self.raise_for_status(response) - to_return = await self._handle_response(response) - await response.release() + try: + async with self.session.get(self.url, **request_kwargs) as response: + self.raise_for_status(response) + to_return = await self._handle_response(response) + await response.release() + except aiohttp.ClientConnectorDNSError: + raise DnsDomainNameException(self.url) if self._close_session_on_finalize: await self.session.close() return to_return diff --git a/pulpcore/exceptions/__init__.py b/pulpcore/exceptions/__init__.py index e6b935de6e..b5f0ed7d0a 100644 --- a/pulpcore/exceptions/__init__.py +++ b/pulpcore/exceptions/__init__.py @@ -18,6 +18,11 @@ ReplicateError, SyncError, PublishError, + TaskConfigurationError, + TaskTimeoutException, + HttpResponseError, + SslConnectionError, + RemoteConnectionError, ) from .validation import ( DigestValidationError, diff --git a/pulpcore/exceptions/base.py b/pulpcore/exceptions/base.py index 63e805981f..8873f529a2 100644 --- a/pulpcore/exceptions/base.py +++ b/pulpcore/exceptions/base.py @@ -326,3 +326,107 @@ class ReplicateError(PulpException): def __str__(self): return f"[{self.error_code}] " + _("Replication failed") + + +class TaskConfigurationError(PulpException): + """ + Raised when a task is incorrectly configured. + """ + + error_code = "PLP0023" + + def __init__(self, task_name, message): + """ + :param task_name: the fully qualified name of the task function + :type task_name: str + :param message: description of the configuration error + :type message: str + """ + self.task_name = task_name + self.message = message + + def __str__(self): + return f"[{self.error_code}] " + _( + "Task type '{task_name}' is misconfigured: {message}" + ).format(task_name=self.task_name, message=self.message) + + +class TaskTimeoutException(PulpException): + """ + Raised when an immediate task exceeds its execution timeout. + """ + + error_code = "PLP0024" + + def __init__(self, task_name, task_pk, timeout_seconds): + """ + :param task_name: the fully qualified name of the task function + :type task_name: str + :param task_pk: the unique task identifier + :type task_pk: str + :param timeout_seconds: the timeout value that was exceeded + :type timeout_seconds: int + """ + self.task_name = task_name + self.task_pk = task_pk + self.timeout_seconds = timeout_seconds + + def __str__(self): + return f"[{self.error_code}] " + _( + "Immediate task {task_pk} (type: {task_name}) timed out after {timeout} seconds." + ).format(task_pk=self.task_pk, task_name=self.task_name, timeout=self.timeout_seconds) + + +class HttpResponseError(PulpException): + """ + Raised when a remote server returns an HTTP error response after retries are exhausted. + """ + + error_code = "PLP0025" + + def __init__(self, url, status, message): + super().__init__() + self.url = url + self.status = status + self.message = message + + def __str__(self): + return f"[{self.error_code}] " + _( + "HTTP error {status} when downloading {url}: {message}" + ).format(url=self.url, status=self.status, message=self.message) + + +class SslConnectionError(PulpException): + """ + Raised when an SSL/TLS connection fails after retries are exhausted. + """ + + error_code = "PLP0026" + + def __init__(self, url, details): + super().__init__() + self.url = url + self.details = details + + def __str__(self): + return f"[{self.error_code}] " + _("SSL connection failed for {url}: {details}").format( + url=self.url, details=self.details + ) + + +class RemoteConnectionError(PulpException): + """ + Raised when a connection to a remote server fails after retries are exhausted. + """ + + error_code = "PLP0027" + + def __init__(self, url, details): + super().__init__() + self.url = url + self.details = details + + def __str__(self): + return f"[{self.error_code}] " + _("Connection failed for {url}: {details}").format( + url=self.url, details=self.details + ) diff --git a/pulpcore/exceptions/validation.py b/pulpcore/exceptions/validation.py index 985aad65de..9c3a74b66d 100644 --- a/pulpcore/exceptions/validation.py +++ b/pulpcore/exceptions/validation.py @@ -96,14 +96,12 @@ class UnsupportedDigestValidationError(ValidationError): error_code = "PLP0020" - def __init__(self, digest_name=None): - self.digest_name = digest_name + def __init__(self, message=None): + self.message = message def __str__(self): - if self.digest_name: - return f"[{self.error_code}] " + _( - "Checksum type '{digest}' is not supported or enabled." - ).format(digest=self.digest_name) + if self.message: + return f"[{self.error_code}] {self.message}" return f"[{self.error_code}] " + _("Unsupported checksum type.") diff --git a/pulpcore/plugin/exceptions.py b/pulpcore/plugin/exceptions.py index 3e7e33852f..03441311b7 100644 --- a/pulpcore/plugin/exceptions.py +++ b/pulpcore/plugin/exceptions.py @@ -1,12 +1,15 @@ from pulpcore.exceptions import ( DigestValidationError, ExternalServiceError, + HttpResponseError, InvalidSignatureError, MissingDigestValidationError, PublishError, PulpException, + RemoteConnectionError, ReplicateError, SizeValidationError, + SslConnectionError, SyncError, SystemStateError, TimeoutException, @@ -28,4 +31,7 @@ "ExternalServiceError", "SystemStateError", "ReplicateError", + "HttpResponseError", + "SslConnectionError", + "RemoteConnectionError", ] diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index b7ca64c27b..58dd317275 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -15,6 +15,7 @@ from django.db import connection from django.db.models import Model from django_guid import get_guid +from rest_framework.exceptions import APIException from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS from pulpcore.app.contexts import awith_task_context, with_task_context, x_task_diagnostics_var @@ -32,7 +33,12 @@ TASK_WAKEUP_HANDLE, TASK_WAKEUP_UNBLOCK, ) -from pulpcore.exceptions import InternalErrorException, PulpException +from pulpcore.exceptions import ( + InternalErrorException, + PulpException, + TaskConfigurationError, + TaskTimeoutException, +) from pulpcore.tasking.kafka import send_task_notification _logger = logging.getLogger(__name__) @@ -75,7 +81,7 @@ def _execute_task(task): except Exception as e: exc_type, exc, tb = sys.exc_info() task_exc = exc - if not isinstance(e, PulpException): + if not isinstance(e, (PulpException, APIException)): if settings.REDACT_UNSAFE_EXCEPTIONS: # Replace exception with generic error task_exc = InternalErrorException() @@ -112,7 +118,7 @@ async def _aexecute_task(task): except Exception as e: exc_type, exc, tb = sys.exc_info() task_exc = exc - if not isinstance(e, PulpException): + if not isinstance(e, (PulpException, APIException)): if settings.REDACT_UNSAFE_EXCEPTIONS: # Replace exception with generic error task_exc = InternalErrorException() @@ -187,11 +193,11 @@ async def aget_task_function(task): func, is_coroutine_fn = _load_function(task) if task.immediate and not is_coroutine_fn: - raise ValueError("Immediate tasks must be async functions.") + raise TaskConfigurationError(task.name, "Immediate tasks must be async functions.") elif not task.immediate: - raise ValueError("Non-immediate tasks can't run in async context.") + raise TaskConfigurationError(task.name, "Non-immediate tasks can't run in async context.") - return _add_timeout_to(func, task.pk) + return _add_timeout_to(func, task.name, task.pk) def get_task_function(task): @@ -205,7 +211,7 @@ def get_task_function(task): func, is_coroutine_fn = _load_function(task) if task.immediate and not is_coroutine_fn: - raise ValueError("Immediate tasks must be async functions.") + raise TaskConfigurationError(task.name, "Immediate tasks must be async functions.") # no sync wrapper required if not is_coroutine_fn: @@ -213,7 +219,7 @@ def get_task_function(task): # async function in sync context requires wrapper if task.immediate: - coro_fn_with_timeout = _add_timeout_to(func, task.pk) + coro_fn_with_timeout = _add_timeout_to(func, task.name, task.pk) return async_to_sync(coro_fn_with_timeout) return async_to_sync(func) @@ -230,16 +236,13 @@ def _load_function(task): return func_with_args, is_coroutine_fn -def _add_timeout_to(coro_fn, task_pk): +def _add_timeout_to(coro_fn, task_name, task_pk): async def _wrapper(): try: return await asyncio.wait_for(coro_fn(), timeout=IMMEDIATE_TIMEOUT) except asyncio.TimeoutError: - msg_template = "Immediate task %s timed out after %s seconds." - error_msg = msg_template % (task_pk, IMMEDIATE_TIMEOUT) - _logger.info(error_msg) - raise RuntimeError(error_msg) + raise TaskTimeoutException(task_name, task_pk, IMMEDIATE_TIMEOUT) return _wrapper diff --git a/pulpcore/tests/functional/api/test_replication.py b/pulpcore/tests/functional/api/test_replication.py index 5669598cf6..1af1bc55e0 100644 --- a/pulpcore/tests/functional/api/test_replication.py +++ b/pulpcore/tests/functional/api/test_replication.py @@ -421,7 +421,8 @@ def test_replication_with_wrong_ca_cert( monitor_task_group(response.task_group) task = pulpcore_bindings.TasksApi.read(e.value.task_group.tasks[0].pulp_href) - assert "SSLError" in task.error["description"] + assert "[PLP0026]" in task.error["description"] + assert "SSL" in task.error["description"] # Update Upstream Pulp with tls_validation=False pulpcore_bindings.UpstreamPulpsApi.partial_update( diff --git a/pulpcore/tests/functional/api/using_plugin/test_proxy.py b/pulpcore/tests/functional/api/using_plugin/test_proxy.py index a2c4085c4a..fe84b29e51 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_proxy.py +++ b/pulpcore/tests/functional/api/using_plugin/test_proxy.py @@ -31,7 +31,9 @@ def test_sync_http_through_http_proxy( Test syncing http through a http proxy. """ remote_on_demand = file_remote_factory( - manifest_path=basic_manifest_path, policy="on_demand", proxy_url=http_proxy.proxy_url + manifest_path=basic_manifest_path, + policy="on_demand", + proxy_url=http_proxy.proxy_url, ) _run_basic_sync_and_assert(file_bindings, monitor_task, remote_on_demand, file_repo) @@ -50,7 +52,9 @@ def test_sync_https_through_http_proxy( Test syncing https through a http proxy. """ remote_on_demand = file_remote_ssl_factory( - manifest_path=basic_manifest_path, policy="on_demand", proxy_url=http_proxy.proxy_url + manifest_path=basic_manifest_path, + policy="on_demand", + proxy_url=http_proxy.proxy_url, ) _run_basic_sync_and_assert(file_bindings, monitor_task, remote_on_demand, file_repo) @@ -99,10 +103,12 @@ def test_sync_https_through_http_proxy_with_auth_but_auth_not_configured( proxy_url=http_proxy_with_auth.proxy_url, ) - try: + with pytest.raises(PulpTaskError) as excinfo: _run_basic_sync_and_assert(file_bindings, monitor_task, remote_on_demand, file_repo) - except PulpTaskError as exc: - assert "407, message='Proxy Authentication Required'" in exc.task.error["description"] + + error_desc = excinfo.value.task.error.get("description", "") + + assert "[PLP0010]" in error_desc @pytest.mark.parallel