Skip to content

Commit b1fee15

Browse files
committed
Handle async grpc errors
1 parent b59d85c commit b1fee15

File tree

3 files changed

+9
-3
lines changed

3 files changed

+9
-3
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed to handle deadline on topic stream in async driver
2+
13
## 3.8.0 ##
24
* Added clients for draft.BaseDynamicConfig service
35

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ async def receive(self) -> Any:
203203
# todo handle grpc exceptions and convert it to internal exceptions
204204
try:
205205
grpc_message = await self.from_server_grpc.__anext__()
206-
except grpc.RpcError as e:
206+
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
207207
raise connection._rpc_error_handler(self._connection_state, e)
208208

209209
issues._process_response(grpc_message)

ydb/connection.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# -*- coding: utf-8 -*-
22
import logging
33
import copy
4+
import typing
45
from concurrent import futures
56
import uuid
67
import threading
@@ -61,15 +62,18 @@ def _log_request(rpc_state, request):
6162
logger.debug("%s: request = { %s }", rpc_state, _message_to_string(request))
6263

6364

64-
def _rpc_error_handler(rpc_state, rpc_error, on_disconnected=None):
65+
def _rpc_error_handler(
66+
rpc_state,
67+
rpc_error: typing.Union[grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call],
68+
on_disconnected: typing.Callable[[], None] = None):
6569
"""
6670
RPC call error handler, that translates gRPC error into YDB issue
6771
:param rpc_state: A state of rpc
6872
:param rpc_error: an underlying rpc error to handle
6973
:param on_disconnected: a handler to call on disconnected connection
7074
"""
7175
logger.info("%s: received error, %s", rpc_state, rpc_error)
72-
if isinstance(rpc_error, grpc.Call):
76+
if isinstance(rpc_error, (grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call)):
7377
if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED:
7478
return issues.Unauthenticated(rpc_error.details())
7579
elif rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED:

0 commit comments

Comments
 (0)