diff --git a/examples/workflow/multi-app1.py b/examples/workflow/multi-app1.py index e6af5f75a..eb103368a 100644 --- a/examples/workflow/multi-app1.py +++ b/examples/workflow/multi-app1.py @@ -25,7 +25,7 @@ def app1_workflow(ctx: wf.DaprWorkflowContext): try: retry_policy = wf.RetryPolicy( - max_number_of_attempts=2, + max_attempts=2, first_retry_interval=timedelta(milliseconds=100), max_retry_interval=timedelta(seconds=3), ) diff --git a/examples/workflow/multi-app2.py b/examples/workflow/multi-app2.py index 7994af2e3..dbfad7952 100644 --- a/examples/workflow/multi-app2.py +++ b/examples/workflow/multi-app2.py @@ -29,7 +29,7 @@ def app2_workflow(ctx: wf.DaprWorkflowContext): print('app2 - triggering app3 activity', flush=True) try: retry_policy = wf.RetryPolicy( - max_number_of_attempts=2, + max_attempts=2, first_retry_interval=timedelta(milliseconds=100), max_retry_interval=timedelta(seconds=3), ) diff --git a/examples/workflow/simple.py b/examples/workflow/simple.py index dc0ea0b6a..0ed09952d 100644 --- a/examples/workflow/simple.py +++ b/examples/workflow/simple.py @@ -42,7 +42,7 @@ retry_policy = RetryPolicy( first_retry_interval=timedelta(seconds=1), - max_number_of_attempts=3, + max_attempts=3, backoff_coefficient=2, max_retry_interval=timedelta(seconds=10), retry_timeout=timedelta(seconds=100), diff --git a/examples/workflow/simple_aio_client.py b/examples/workflow/simple_aio_client.py index fd93a5016..24347ad4c 100644 --- a/examples/workflow/simple_aio_client.py +++ b/examples/workflow/simple_aio_client.py @@ -42,7 +42,7 @@ retry_policy = RetryPolicy( first_retry_interval=timedelta(seconds=1), - max_number_of_attempts=3, + max_attempts=3, backoff_coefficient=2, max_retry_interval=timedelta(seconds=10), retry_timeout=timedelta(seconds=100), diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py index 255e7bc6c..f7922d11a 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py @@ -24,34 +24,95 @@ class RetryPolicy: - """Represents the retry policy for a workflow or activity function.""" + """Retry policy for workflow activities and child workflows. + + A ``RetryPolicy`` is passed to ``ctx.call_activity()`` or + ``ctx.call_child_workflow()`` to automatically retry the task when it + fails. The first attempt always runs; on failure, the engine waits + ``first_retry_interval`` before the second attempt and multiplies the + wait by ``backoff_coefficient`` after each subsequent failure, + optionally capped per-attempt by ``max_retry_interval``. + + Example:: + + from datetime import timedelta + from dapr.ext.workflow import RetryPolicy + + policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_attempts=5, + backoff_coefficient=2.0, + max_retry_interval=timedelta(seconds=30), + ) + + result = yield ctx.call_activity(my_activity, input=data, retry_policy=policy) + + Notes: + * ``max_attempts`` (or its deprecated alias ``max_number_of_attempts``) + is the **total** number of attempts, not the number of retries. + A value of ``5`` means up to 4 retries after the first attempt. + * If ``retry_timeout`` elapses before the next scheduled attempt, + the task fails with the last error. The surrounding workflow + will then fail unless the call is wrapped in ``try`` / ``except`` + in the workflow function. + * ``max_retry_interval`` only caps the per-attempt delay; retries + still proceed when it is left as ``None``. + """ def __init__( self, *, first_retry_interval: timedelta, - max_number_of_attempts: int, + max_number_of_attempts: Optional[int] = None, backoff_coefficient: Optional[float] = 1.0, max_retry_interval: Optional[timedelta] = None, retry_timeout: Optional[timedelta] = None, + max_attempts: Optional[int] = None, ): - """Creates a new RetryPolicy instance. + """Create a new RetryPolicy. Args: - first_retry_interval(timedelta): The retry interval to use for the first retry attempt. - max_number_of_attempts(int): The maximum number of retry attempts. - backoff_coefficient(Optional[float]): The backoff coefficient to use for calculating - the next retry interval. - max_retry_interval(Optional[timedelta]): The maximum retry interval to use for any - retry attempt. - retry_timeout(Optional[timedelta]): The maximum amount of time to spend retrying the - operation. + first_retry_interval: Delay between the first attempt and the + first retry (attempt #2). Must be ``>= 0``. + max_number_of_attempts: **Deprecated** alias for ``max_attempts``, + kept for backward compatibility. Exactly one of + ``max_attempts`` or ``max_number_of_attempts`` must be + provided. + backoff_coefficient: Exponential backoff multiplier applied to + successive retry intervals. Must be ``>= 1``. Defaults to + ``1.0`` (constant delay between retries). + max_retry_interval: Upper bound on the delay between any two + consecutive attempts. When ``None`` (the default) the + delay grows unbounded according to the backoff. Retries + still occur when this is ``None``; it only caps the + per-attempt delay. + retry_timeout: Total budget for the retry sequence, measured + from when the task first started. If the next attempt + would start after this deadline, the task fails + immediately with the last error. ``None`` (the default) + means no timeout. When the timeout fires, the surrounding + workflow handles the failure like any other task failure: + the workflow fails unless the call is wrapped in + ``try`` / ``except``. + max_attempts: Total number of attempts the task may run, + including the first one. Must be ``>= 1``. Exactly one of + ``max_attempts`` or ``max_number_of_attempts`` must be + provided. + + Raises: + ValueError: If neither or both of ``max_attempts`` and + ``max_number_of_attempts`` are provided, or if any other + field fails its range check. """ - # validate inputs + attempts_resolved = _resolve_max_attempts( + max_attempts=max_attempts, + max_number_of_attempts=max_number_of_attempts, + ) + if first_retry_interval < timedelta(seconds=0): raise ValueError('first_retry_interval must be >= 0') - if max_number_of_attempts < 1: - raise ValueError('max_number_of_attempts must be >= 1') + if attempts_resolved < 1: + raise ValueError('max_attempts must be >= 1') if backoff_coefficient is not None and backoff_coefficient < 1: raise ValueError('backoff_coefficient must be >= 1') if max_retry_interval is not None and max_retry_interval < timedelta(seconds=0): @@ -61,7 +122,7 @@ def __init__( self._obj = task.RetryPolicy( first_retry_interval=first_retry_interval, - max_number_of_attempts=max_number_of_attempts, + max_number_of_attempts=attempts_resolved, backoff_coefficient=backoff_coefficient, max_retry_interval=max_retry_interval, retry_timeout=retry_timeout, @@ -69,30 +130,66 @@ def __init__( @property def obj(self) -> task.RetryPolicy: - """Returns the underlying RetryPolicy object.""" + """The underlying durabletask ``RetryPolicy`` instance.""" return self._obj @property def first_retry_interval(self) -> timedelta: - """The retry interval to use for the first retry attempt.""" + """Delay between the first attempt and the first retry.""" return self._obj._first_retry_interval @property def max_number_of_attempts(self) -> int: - """The maximum number of retry attempts.""" + """Total number of attempts (alias of :attr:`max_attempts`). + + Kept for backward compatibility. New code should prefer + :attr:`max_attempts`, which has the same value but a clearer name. + """ + return self._obj._max_number_of_attempts + + @property + def max_attempts(self) -> int: + """Total number of attempts (including the first one).""" return self._obj._max_number_of_attempts @property def backoff_coefficient(self) -> Optional[float]: - """The backoff coefficient to use for calculating the next retry interval.""" + """Multiplier applied to the retry interval after each failure.""" return self._obj._backoff_coefficient @property def max_retry_interval(self) -> Optional[timedelta]: - """The maximum retry interval to use for any retry attempt.""" + """Upper bound on the per-attempt retry delay (``None`` for no cap).""" return self._obj._max_retry_interval @property def retry_timeout(self) -> Optional[timedelta]: - """The maximum amount of time to spend retrying the operation.""" + """Total time budget for retries (``None`` for no timeout). + + When the timeout fires, the task fails with the last error and the + surrounding workflow fails unless the call is wrapped in + ``try`` / ``except``. + """ return self._obj._retry_timeout + + +def _resolve_max_attempts( + *, + max_attempts: Optional[int], + max_number_of_attempts: Optional[int], +) -> int: + """Pick between ``max_attempts`` and its deprecated alias. + + Exactly one of the two names must be supplied. Supplying both is + rejected to avoid silent inconsistencies between the values. + """ + if max_attempts is not None and max_number_of_attempts is not None: + raise ValueError( + 'Specify only one of max_attempts or max_number_of_attempts; ' + 'max_number_of_attempts is a deprecated alias.' + ) + if max_attempts is not None: + return max_attempts + if max_number_of_attempts is not None: + return max_number_of_attempts + raise ValueError('max_attempts is required (max_number_of_attempts is a deprecated alias).') diff --git a/ext/dapr-ext-workflow/tests/test_retry_policy.py b/ext/dapr-ext-workflow/tests/test_retry_policy.py new file mode 100644 index 000000000..49a8f77c8 --- /dev/null +++ b/ext/dapr-ext-workflow/tests/test_retry_policy.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import unittest +from datetime import timedelta + +from dapr.ext.workflow import RetryPolicy + + +class RetryPolicyConstructionTests(unittest.TestCase): + def test_constructs_with_legacy_max_number_of_attempts(self): + policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=5, + ) + + self.assertEqual(policy.first_retry_interval, timedelta(seconds=1)) + self.assertEqual(policy.max_number_of_attempts, 5) + self.assertEqual(policy.max_attempts, 5) + self.assertEqual(policy.backoff_coefficient, 1.0) + self.assertIsNone(policy.max_retry_interval) + self.assertIsNone(policy.retry_timeout) + + def test_constructs_with_new_max_attempts(self): + policy = RetryPolicy( + first_retry_interval=timedelta(seconds=2), + max_attempts=3, + backoff_coefficient=2.0, + max_retry_interval=timedelta(seconds=10), + retry_timeout=timedelta(minutes=5), + ) + + self.assertEqual(policy.first_retry_interval, timedelta(seconds=2)) + self.assertEqual(policy.max_attempts, 3) + self.assertEqual(policy.max_number_of_attempts, 3) + self.assertEqual(policy.backoff_coefficient, 2.0) + self.assertEqual(policy.max_retry_interval, timedelta(seconds=10)) + self.assertEqual(policy.retry_timeout, timedelta(minutes=5)) + + def test_exposes_underlying_durabletask_object(self): + policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_attempts=2, + ) + + underlying = policy.obj + self.assertEqual(underlying._max_number_of_attempts, 2) + self.assertEqual(underlying._first_retry_interval, timedelta(seconds=1)) + + +class RetryPolicyAttemptsResolutionTests(unittest.TestCase): + def test_rejects_when_both_attempts_fields_supplied(self): + with self.assertRaises(ValueError) as ctx: + RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_attempts=3, + max_number_of_attempts=3, + ) + + self.assertIn('only one of max_attempts', str(ctx.exception)) + + def test_rejects_when_neither_attempts_field_supplied(self): + with self.assertRaises(ValueError) as ctx: + RetryPolicy(first_retry_interval=timedelta(seconds=1)) + + self.assertIn('max_attempts is required', str(ctx.exception)) + + +class RetryPolicyValidationTests(unittest.TestCase): + def test_rejects_negative_first_retry_interval(self): + with self.assertRaisesRegex(ValueError, 'first_retry_interval'): + RetryPolicy( + first_retry_interval=timedelta(seconds=-1), + max_attempts=2, + ) + + def test_rejects_max_attempts_below_one(self): + with self.assertRaisesRegex(ValueError, 'max_attempts'): + RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_attempts=0, + ) + + def test_rejects_backoff_coefficient_below_one(self): + with self.assertRaisesRegex(ValueError, 'backoff_coefficient'): + RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_attempts=2, + backoff_coefficient=0.5, + ) + + def test_rejects_negative_max_retry_interval(self): + with self.assertRaisesRegex(ValueError, 'max_retry_interval'): + RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_attempts=2, + max_retry_interval=timedelta(seconds=-1), + ) + + def test_rejects_negative_retry_timeout(self): + with self.assertRaisesRegex(ValueError, 'retry_timeout'): + RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_attempts=2, + retry_timeout=timedelta(seconds=-1), + ) + + def test_allows_backoff_coefficient_none(self): + policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_attempts=2, + backoff_coefficient=None, + ) + + self.assertIsNone(policy.backoff_coefficient) + + +if __name__ == '__main__': + unittest.main()