Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/workflow/multi-app1.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def app1_workflow(ctx: wf.DaprWorkflowContext):

try:
retry_policy = wf.RetryPolicy(
max_number_of_attempts=2,
max_attempts=2,
first_retry_interval=timedelta(milliseconds=100),
max_retry_interval=timedelta(seconds=3),
)
Expand Down
2 changes: 1 addition & 1 deletion examples/workflow/multi-app2.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def app2_workflow(ctx: wf.DaprWorkflowContext):
print('app2 - triggering app3 activity', flush=True)
try:
retry_policy = wf.RetryPolicy(
max_number_of_attempts=2,
max_attempts=2,
first_retry_interval=timedelta(milliseconds=100),
max_retry_interval=timedelta(seconds=3),
)
Expand Down
2 changes: 1 addition & 1 deletion examples/workflow/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

retry_policy = RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
max_attempts=3,
backoff_coefficient=2,
max_retry_interval=timedelta(seconds=10),
retry_timeout=timedelta(seconds=100),
Expand Down
2 changes: 1 addition & 1 deletion examples/workflow/simple_aio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

retry_policy = RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
max_attempts=3,
backoff_coefficient=2,
max_retry_interval=timedelta(seconds=10),
retry_timeout=timedelta(seconds=100),
Expand Down
139 changes: 118 additions & 21 deletions ext/dapr-ext-workflow/dapr/ext/workflow/retry_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,95 @@


class RetryPolicy:
"""Represents the retry policy for a workflow or activity function."""
"""Retry policy for workflow activities and child workflows.

A ``RetryPolicy`` is passed to ``ctx.call_activity()`` or
``ctx.call_child_workflow()`` to automatically retry the task when it
fails. The first attempt always runs; on failure, the engine waits
``first_retry_interval`` before the second attempt and multiplies the
wait by ``backoff_coefficient`` after each subsequent failure,
optionally capped per-attempt by ``max_retry_interval``.

Example::

from datetime import timedelta
from dapr.ext.workflow import RetryPolicy

policy = RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_attempts=5,
backoff_coefficient=2.0,
max_retry_interval=timedelta(seconds=30),
)

result = yield ctx.call_activity(my_activity, input=data, retry_policy=policy)

Notes:
* ``max_attempts`` (or its deprecated alias ``max_number_of_attempts``)
is the **total** number of attempts, not the number of retries.
A value of ``5`` means up to 4 retries after the first attempt.
* If ``retry_timeout`` elapses before the next scheduled attempt,
the task fails with the last error. The surrounding workflow
will then fail unless the call is wrapped in ``try`` / ``except``
in the workflow function.
* ``max_retry_interval`` only caps the per-attempt delay; retries
still proceed when it is left as ``None``.
"""

def __init__(
self,
*,
first_retry_interval: timedelta,
max_number_of_attempts: int,
max_number_of_attempts: Optional[int] = None,
backoff_coefficient: Optional[float] = 1.0,
max_retry_interval: Optional[timedelta] = None,
retry_timeout: Optional[timedelta] = None,
max_attempts: Optional[int] = None,
):
"""Creates a new RetryPolicy instance.
"""Create a new RetryPolicy.

Args:
first_retry_interval(timedelta): The retry interval to use for the first retry attempt.
max_number_of_attempts(int): The maximum number of retry attempts.
backoff_coefficient(Optional[float]): The backoff coefficient to use for calculating
the next retry interval.
max_retry_interval(Optional[timedelta]): The maximum retry interval to use for any
retry attempt.
retry_timeout(Optional[timedelta]): The maximum amount of time to spend retrying the
operation.
first_retry_interval: Delay between the first attempt and the
first retry (attempt #2). Must be ``>= 0``.
max_number_of_attempts: **Deprecated** alias for ``max_attempts``,
kept for backward compatibility. Exactly one of
``max_attempts`` or ``max_number_of_attempts`` must be
provided.
backoff_coefficient: Exponential backoff multiplier applied to
successive retry intervals. Must be ``>= 1``. Defaults to
``1.0`` (constant delay between retries).
Comment on lines +81 to +83
max_retry_interval: Upper bound on the delay between any two
consecutive attempts. When ``None`` (the default) the
delay grows unbounded according to the backoff. Retries
still occur when this is ``None``; it only caps the
per-attempt delay.
Comment on lines +84 to +88
retry_timeout: Total budget for the retry sequence, measured
from when the task first started. If the next attempt
would start after this deadline, the task fails
immediately with the last error. ``None`` (the default)
means no timeout. When the timeout fires, the surrounding
workflow handles the failure like any other task failure:
the workflow fails unless the call is wrapped in
``try`` / ``except``.
max_attempts: Total number of attempts the task may run,
including the first one. Must be ``>= 1``. Exactly one of
``max_attempts`` or ``max_number_of_attempts`` must be
provided.

Raises:
ValueError: If neither or both of ``max_attempts`` and
``max_number_of_attempts`` are provided, or if any other
field fails its range check.
"""
# validate inputs
attempts_resolved = _resolve_max_attempts(
max_attempts=max_attempts,
max_number_of_attempts=max_number_of_attempts,
)

if first_retry_interval < timedelta(seconds=0):
raise ValueError('first_retry_interval must be >= 0')
if max_number_of_attempts < 1:
raise ValueError('max_number_of_attempts must be >= 1')
if attempts_resolved < 1:
raise ValueError('max_attempts must be >= 1')
if backoff_coefficient is not None and backoff_coefficient < 1:
Comment on lines 113 to 116
raise ValueError('backoff_coefficient must be >= 1')
if max_retry_interval is not None and max_retry_interval < timedelta(seconds=0):
Expand All @@ -61,38 +122,74 @@ def __init__(

self._obj = task.RetryPolicy(
first_retry_interval=first_retry_interval,
max_number_of_attempts=max_number_of_attempts,
max_number_of_attempts=attempts_resolved,
backoff_coefficient=backoff_coefficient,
max_retry_interval=max_retry_interval,
retry_timeout=retry_timeout,
)

@property
def obj(self) -> task.RetryPolicy:
"""Returns the underlying RetryPolicy object."""
"""The underlying durabletask ``RetryPolicy`` instance."""
return self._obj

@property
def first_retry_interval(self) -> timedelta:
"""The retry interval to use for the first retry attempt."""
"""Delay between the first attempt and the first retry."""
return self._obj._first_retry_interval

@property
def max_number_of_attempts(self) -> int:
"""The maximum number of retry attempts."""
"""Total number of attempts (alias of :attr:`max_attempts`).

Kept for backward compatibility. New code should prefer
:attr:`max_attempts`, which has the same value but a clearer name.
"""
return self._obj._max_number_of_attempts

@property
def max_attempts(self) -> int:
"""Total number of attempts (including the first one)."""
return self._obj._max_number_of_attempts

@property
def backoff_coefficient(self) -> Optional[float]:
"""The backoff coefficient to use for calculating the next retry interval."""
"""Multiplier applied to the retry interval after each failure."""
return self._obj._backoff_coefficient

@property
def max_retry_interval(self) -> Optional[timedelta]:
"""The maximum retry interval to use for any retry attempt."""
"""Upper bound on the per-attempt retry delay (``None`` for no cap)."""
return self._obj._max_retry_interval

@property
def retry_timeout(self) -> Optional[timedelta]:
"""The maximum amount of time to spend retrying the operation."""
"""Total time budget for retries (``None`` for no timeout).

When the timeout fires, the task fails with the last error and the
surrounding workflow fails unless the call is wrapped in
``try`` / ``except``.
"""
return self._obj._retry_timeout


def _resolve_max_attempts(
*,
max_attempts: Optional[int],
max_number_of_attempts: Optional[int],
) -> int:
"""Pick between ``max_attempts`` and its deprecated alias.

Exactly one of the two names must be supplied. Supplying both is
rejected to avoid silent inconsistencies between the values.
"""
if max_attempts is not None and max_number_of_attempts is not None:
raise ValueError(
'Specify only one of max_attempts or max_number_of_attempts; '
'max_number_of_attempts is a deprecated alias.'
)
if max_attempts is not None:
return max_attempts
if max_number_of_attempts is not None:
return max_number_of_attempts
raise ValueError('max_attempts is required (max_number_of_attempts is a deprecated alias).')
131 changes: 131 additions & 0 deletions ext/dapr-ext-workflow/tests/test_retry_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# -*- coding: utf-8 -*-

"""
Copyright 2026 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import unittest
from datetime import timedelta

from dapr.ext.workflow import RetryPolicy


class RetryPolicyConstructionTests(unittest.TestCase):
def test_constructs_with_legacy_max_number_of_attempts(self):
policy = RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=5,
)

self.assertEqual(policy.first_retry_interval, timedelta(seconds=1))
self.assertEqual(policy.max_number_of_attempts, 5)
self.assertEqual(policy.max_attempts, 5)
self.assertEqual(policy.backoff_coefficient, 1.0)
self.assertIsNone(policy.max_retry_interval)
self.assertIsNone(policy.retry_timeout)

def test_constructs_with_new_max_attempts(self):
policy = RetryPolicy(
first_retry_interval=timedelta(seconds=2),
max_attempts=3,
backoff_coefficient=2.0,
max_retry_interval=timedelta(seconds=10),
retry_timeout=timedelta(minutes=5),
)

self.assertEqual(policy.first_retry_interval, timedelta(seconds=2))
self.assertEqual(policy.max_attempts, 3)
self.assertEqual(policy.max_number_of_attempts, 3)
self.assertEqual(policy.backoff_coefficient, 2.0)
self.assertEqual(policy.max_retry_interval, timedelta(seconds=10))
self.assertEqual(policy.retry_timeout, timedelta(minutes=5))

def test_exposes_underlying_durabletask_object(self):
policy = RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_attempts=2,
)

underlying = policy.obj
self.assertEqual(underlying._max_number_of_attempts, 2)
self.assertEqual(underlying._first_retry_interval, timedelta(seconds=1))


class RetryPolicyAttemptsResolutionTests(unittest.TestCase):
def test_rejects_when_both_attempts_fields_supplied(self):
with self.assertRaises(ValueError) as ctx:
RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_attempts=3,
max_number_of_attempts=3,
)

self.assertIn('only one of max_attempts', str(ctx.exception))

def test_rejects_when_neither_attempts_field_supplied(self):
with self.assertRaises(ValueError) as ctx:
RetryPolicy(first_retry_interval=timedelta(seconds=1))

self.assertIn('max_attempts is required', str(ctx.exception))


class RetryPolicyValidationTests(unittest.TestCase):
def test_rejects_negative_first_retry_interval(self):
with self.assertRaisesRegex(ValueError, 'first_retry_interval'):
RetryPolicy(
first_retry_interval=timedelta(seconds=-1),
max_attempts=2,
)

def test_rejects_max_attempts_below_one(self):
with self.assertRaisesRegex(ValueError, 'max_attempts'):
RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_attempts=0,
)

def test_rejects_backoff_coefficient_below_one(self):
with self.assertRaisesRegex(ValueError, 'backoff_coefficient'):
RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_attempts=2,
backoff_coefficient=0.5,
)

def test_rejects_negative_max_retry_interval(self):
with self.assertRaisesRegex(ValueError, 'max_retry_interval'):
RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_attempts=2,
max_retry_interval=timedelta(seconds=-1),
)

def test_rejects_negative_retry_timeout(self):
with self.assertRaisesRegex(ValueError, 'retry_timeout'):
RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_attempts=2,
retry_timeout=timedelta(seconds=-1),
)

def test_allows_backoff_coefficient_none(self):
policy = RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_attempts=2,
backoff_coefficient=None,
)

self.assertIsNone(policy.backoff_coefficient)


if __name__ == '__main__':
unittest.main()