Skip to content

Commit 1dbc0f4

Browse files
authored
Merge pull request #395 Handle async grpc errors
2 parents b59d85c + 037136a commit 1dbc0f4

File tree

3 files changed

+10
-3
lines changed

3 files changed

+10
-3
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed to handle deadline on topic stream in async driver
2+
13
## 3.8.0 ##
24
* Added clients for draft.BaseDynamicConfig service
35

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ async def receive(self) -> Any:
203203
# todo handle grpc exceptions and convert it to internal exceptions
204204
try:
205205
grpc_message = await self.from_server_grpc.__anext__()
206-
except grpc.RpcError as e:
206+
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
207207
raise connection._rpc_error_handler(self._connection_state, e)
208208

209209
issues._process_response(grpc_message)

ydb/connection.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# -*- coding: utf-8 -*-
22
import logging
33
import copy
4+
import typing
45
from concurrent import futures
56
import uuid
67
import threading
@@ -61,15 +62,19 @@ def _log_request(rpc_state, request):
6162
logger.debug("%s: request = { %s }", rpc_state, _message_to_string(request))
6263

6364

64-
def _rpc_error_handler(rpc_state, rpc_error, on_disconnected=None):
65+
def _rpc_error_handler(
66+
rpc_state,
67+
rpc_error: typing.Union[grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call],
68+
on_disconnected: typing.Callable[[], None] = None,
69+
):
6570
"""
6671
RPC call error handler, that translates gRPC error into YDB issue
6772
:param rpc_state: A state of rpc
6873
:param rpc_error: an underlying rpc error to handle
6974
:param on_disconnected: a handler to call on disconnected connection
7075
"""
7176
logger.info("%s: received error, %s", rpc_state, rpc_error)
72-
if isinstance(rpc_error, grpc.Call):
77+
if isinstance(rpc_error, (grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call)):
7378
if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED:
7479
return issues.Unauthenticated(rpc_error.details())
7580
elif rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED:

0 commit comments

Comments
 (0)