Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 100 additions & 25 deletions ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import logging
import time
import uuid
from datetime import datetime
from typing import Any, Optional, Sequence, Union
Expand All @@ -33,6 +35,7 @@
TOutput,
WorkflowIdReusePolicy,
WorkflowState,
_TransientTimeout,
new_orchestration_state,
)
from google.protobuf import wrappers_pb2
Expand Down Expand Up @@ -120,34 +123,33 @@ async def get_orchestration_state(
return new_orchestration_state(req.instanceId, res)

async def wait_for_orchestration_start(
self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0
self, instance_id: str, *, fetch_payloads: bool = False, timeout: Optional[int] = 0
) -> Optional[WorkflowState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
grpc_timeout = None if timeout == 0 else timeout
self._logger.info(
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
)
self._logger.info(
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to start."
)

async def _call(grpc_timeout):
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceStart(
req, timeout=grpc_timeout
)
return new_orchestration_state(req.instanceId, res)
except grpc.RpcError as rpc_error:
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
# Replace gRPC error with the built-in TimeoutError
raise TimeoutError('Timed-out waiting for the orchestration to start')
else:
raise

try:
return await self._call_with_transient_retry(instance_id, timeout, _call)
except _TransientTimeout:
raise TimeoutError('Timed-out waiting for the orchestration to start')

async def wait_for_orchestration_completion(
self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0
self, instance_id: str, *, fetch_payloads: bool = True, timeout: Optional[int] = 0
) -> Optional[WorkflowState]:
Comment thread
javier-aliaga marked this conversation as resolved.
Comment thread
javier-aliaga marked this conversation as resolved.
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
grpc_timeout = None if timeout == 0 else timeout
self._logger.info(
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
)
self._logger.info(
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to complete."
)

async def _call(grpc_timeout):
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceCompletion(
req, timeout=grpc_timeout
)
Expand All @@ -167,14 +169,87 @@ async def wait_for_orchestration_completion(
self._logger.info(f"Instance '{instance_id}' was terminated.")
elif state.runtime_status == OrchestrationStatus.COMPLETED:
self._logger.info(f"Instance '{instance_id}' completed.")

return state
except grpc.RpcError as rpc_error:
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
# Replace gRPC error with the built-in TimeoutError
raise TimeoutError('Timed-out waiting for the orchestration to complete')
else:
raise

try:
return await self._call_with_transient_retry(instance_id, timeout, _call)
except _TransientTimeout:
raise TimeoutError('Timed-out waiting for the orchestration to complete')

# Transient gRPC codes that indicate the workflow runtime is temporarily
# unable to locate the workflow actor — typically immediately after a Dapr
# sidecar restart (e.g. recovery from chaos). The placement service has the
# actor registration, but local daprd hasn't received the dissemination yet.
# Without retry, every poll fails permanently with FAILED_PRECONDITION even
# though the workflow runtime state is intact.
_TRANSIENT_RPC_CODES = (
grpc.StatusCode.FAILED_PRECONDITION,
grpc.StatusCode.UNAVAILABLE,
)

# See TaskHubGrpcClient._MAX_TRANSIENT_RETRY_SECONDS — same grace window for
# unbounded (timeout=0) callers so a down sidecar surfaces the original
# error instead of retrying forever.
_MAX_TRANSIENT_RETRY_SECONDS = 30.0

async def _call_with_transient_retry(self, instance_id, timeout, call_fn):
"""Async mirror of TaskHubGrpcClient._call_with_transient_retry.
Retries FAILED_PRECONDITION/UNAVAILABLE with capped exponential
backoff while clamping sleep and per-call gRPC timeout to the
remaining budget. The first call uses the caller's timeout unchanged
(``None`` when unbounded) so callers observe identical behavior on a
healthy runtime. In unbounded
mode, continuous transient retries are capped at
``_MAX_TRANSIENT_RETRY_SECONDS`` before the original error propagates.
"""
Comment thread
javier-aliaga marked this conversation as resolved.
unbounded = timeout in (0, None)
deadline = None if unbounded else time.monotonic() + timeout
grpc_timeout = None if unbounded else timeout
backoff = 0.5
transient_deadline = None # unbounded mode only; anchored on first transient
while True:
try:
return await call_fn(grpc_timeout)
except grpc.RpcError as rpc_error:
code = rpc_error.code() # type: ignore
if code == grpc.StatusCode.DEADLINE_EXCEEDED:
raise _TransientTimeout()
if code not in self._TRANSIENT_RPC_CODES:
raise

now = time.monotonic()

if unbounded:
if transient_deadline is None:
transient_deadline = now + self._MAX_TRANSIENT_RETRY_SECONDS
elif now >= transient_deadline:
raise

if deadline is None:
remaining = None
else:
remaining = deadline - now
if remaining <= 0:
raise _TransientTimeout()

sleep_for = min(backoff, 5.0)
if remaining is not None:
sleep_for = min(sleep_for, remaining)
if transient_deadline is not None:
sleep_for = min(sleep_for, transient_deadline - now)
Comment on lines +235 to +239
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the sync client — keeping as-is. Skipping the clamped backoff would busy-loop against the sidecar because these transient codes return immediately; the clamp-then-stop behavior is intentional. See the explanation on the sync client.py thread.

self._logger.warning(
f"Transient gRPC error {code.name} waiting on instance '{instance_id}'; "
f'retrying in {sleep_for:.2f}s'
)
await asyncio.sleep(sleep_for)
backoff = min(backoff * 2, 5.0)

if deadline is None:
grpc_timeout = None
else:
grpc_timeout = deadline - time.monotonic()
if grpc_timeout <= 0:
raise _TransientTimeout()

async def raise_orchestration_event(
self, instance_id: str, event_name: str, *, data: Optional[Any] = None
Expand Down
142 changes: 117 additions & 25 deletions ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# limitations under the License.

import logging
import time
import uuid
from dataclasses import dataclass
from datetime import datetime
Expand All @@ -25,6 +26,12 @@
from dapr.ext.workflow._durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
from google.protobuf import wrappers_pb2


class _TransientTimeout(Exception):
"""Internal sentinel: the retry loop exhausted the user-provided timeout
budget. Callers convert this to a public ``TimeoutError``."""


TInput = TypeVar('TInput')
TOutput = TypeVar('TOutput')

Expand Down Expand Up @@ -217,32 +224,31 @@ def get_orchestration_state(
return new_orchestration_state(req.instanceId, res)

def wait_for_orchestration_start(
self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0
self, instance_id: str, *, fetch_payloads: bool = False, timeout: Optional[int] = 0
) -> Optional[WorkflowState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
grpc_timeout = None if timeout == 0 else timeout
self._logger.info(
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start."
)
self._logger.info(
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to start."
)

def _call(grpc_timeout):
res: pb.GetInstanceResponse = self._stub.WaitForInstanceStart(req, timeout=grpc_timeout)
return new_orchestration_state(req.instanceId, res)
except grpc.RpcError as rpc_error:
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
# Replace gRPC error with the built-in TimeoutError
raise TimeoutError('Timed-out waiting for the orchestration to start')
else:
raise

try:
return self._call_with_transient_retry(instance_id, timeout, _call)
except _TransientTimeout:
raise TimeoutError('Timed-out waiting for the orchestration to start')

def wait_for_orchestration_completion(
self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0
self, instance_id: str, *, fetch_payloads: bool = True, timeout: Optional[int] = 0
) -> Optional[WorkflowState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
grpc_timeout = None if timeout == 0 else timeout
self._logger.info(
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
)
self._logger.info(
Comment thread
javier-aliaga marked this conversation as resolved.
Comment thread
javier-aliaga marked this conversation as resolved.
f"Waiting {'indefinitely' if timeout in (0, None) else f'up to {timeout}s'} for instance '{instance_id}' to complete."
)

def _call(grpc_timeout):
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(
req, timeout=grpc_timeout
)
Expand All @@ -262,14 +268,100 @@ def wait_for_orchestration_completion(
self._logger.info(f"Instance '{instance_id}' was terminated.")
elif state.runtime_status == OrchestrationStatus.COMPLETED:
self._logger.info(f"Instance '{instance_id}' completed.")

return state
except grpc.RpcError as rpc_error:
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
# Replace gRPC error with the built-in TimeoutError
raise TimeoutError('Timed-out waiting for the orchestration to complete')
else:
raise

try:
return self._call_with_transient_retry(instance_id, timeout, _call)
except _TransientTimeout:
raise TimeoutError('Timed-out waiting for the orchestration to complete')
Comment thread
javier-aliaga marked this conversation as resolved.
Comment thread
javier-aliaga marked this conversation as resolved.

# Transient gRPC codes that indicate the workflow runtime is temporarily
# unable to locate the workflow actor — typically immediately after a Dapr
# sidecar restart (e.g. recovery from chaos). The placement service has the
# actor registration, but local daprd hasn't received the dissemination yet.
# Without retry, every poll fails permanently with FAILED_PRECONDITION even
# though the workflow runtime state is intact.
_TRANSIENT_RPC_CODES = (
grpc.StatusCode.FAILED_PRECONDITION,
grpc.StatusCode.UNAVAILABLE,
)
Comment thread
javier-aliaga marked this conversation as resolved.

# When the caller sets no timeout (timeout=0), bound how long we keep
# retrying *consecutive* transient errors so a permanently-unavailable
# sidecar surfaces the original error instead of hanging forever. This
# window comfortably covers placement re-dissemination after a restart;
# a slow-but-healthy workflow never enters this path (it just blocks in
# the long-poll), so its indefinite wait is preserved.
_MAX_TRANSIENT_RETRY_SECONDS = 30.0

def _call_with_transient_retry(self, instance_id, timeout, call_fn):
"""Run a gRPC wait call, retrying transient errors until the user
timeout deadline. Re-raises non-transient errors immediately.
timeout in (0, None) means unbounded; transients are still retried,
but only for up to ``_MAX_TRANSIENT_RETRY_SECONDS`` of continuous
failures, after which the original transient error propagates.

The first call passes the caller's ``grpc_timeout`` (``None`` when
unbounded) to ``call_fn`` so callers observe identical behavior to a
non-retrying client when no transient occurs (preserves prior public
behavior). On a retry, both the sleep
and the per-call gRPC deadline are clamped to the remaining budget so
the helper never sleeps past ``timeout`` or starts a gRPC call with
no time left.
"""
unbounded = timeout in (0, None)
deadline = None if unbounded else time.monotonic() + timeout
grpc_timeout = None if unbounded else timeout
backoff = 0.5
transient_deadline = None # unbounded mode only; anchored on first transient
while True:
try:
return call_fn(grpc_timeout)
except grpc.RpcError as rpc_error:
code = rpc_error.code() # type: ignore
if code == grpc.StatusCode.DEADLINE_EXCEEDED:
raise _TransientTimeout()
if code not in self._TRANSIENT_RPC_CODES:
raise

now = time.monotonic()

# In unbounded mode the user budget can't end the loop, so cap
# continuous transient retries and re-raise the original error
# (matching pre-retry behavior) once the grace window elapses.
if unbounded:
if transient_deadline is None:
transient_deadline = now + self._MAX_TRANSIENT_RETRY_SECONDS
elif now >= transient_deadline:
raise

# Compute remaining budget once and reuse so the sleep and the
# next per-call grpc_timeout agree on "how much time is left".
if deadline is None:
remaining = None
else:
remaining = deadline - now
if remaining <= 0:
raise _TransientTimeout()

sleep_for = min(backoff, 5.0)
if remaining is not None:
sleep_for = min(sleep_for, remaining)
if transient_deadline is not None:
sleep_for = min(sleep_for, transient_deadline - now)
Comment on lines +347 to +351
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional, keeping as-is. These transient codes (FAILED_PRECONDITION/UNAVAILABLE) return immediately rather than long-polling, so skipping the backoff near the deadline would turn the final window into a tight retry loop hammering the sidecar — up to ~5s of rapid-fire calls, since backoff caps at 5s. The clamp-then-stop behavior deliberately backs off instead. The cost is at most one skipped final attempt right at the deadline, which is an acceptable trade for not flooding a struggling sidecar.

self._logger.warning(
f"Transient gRPC error {code.name} waiting on instance '{instance_id}'; "
f'retrying in {sleep_for:.2f}s'
)
time.sleep(sleep_for)
backoff = min(backoff * 2, 5.0)

if deadline is None:
grpc_timeout = None
else:
grpc_timeout = deadline - time.monotonic()
if grpc_timeout <= 0:
raise _TransientTimeout()

def raise_orchestration_event(
self, instance_id: str, event_name: str, *, data: Optional[Any] = None
Expand Down
Loading
Loading