Skip to content

Commit 7043e01

Browse files
Merge remote-tracking branch 'upstream/main' into main
2 parents ae32c32 + 8aad508 commit 7043e01

File tree

6 files changed

+26
-8
lines changed

6 files changed

+26
-8
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added comments about calls encoders/decoders in parallel from multiply threads
2+
13
## 3.5.1 ##
24
* Fixed access to connection if connection cannot be found by node id
35

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
setuptools.setup(
1515
name="ydb",
16-
version="3.5.0", # AUTOVERSION
16+
version="3.5.1", # AUTOVERSION
1717
description="YDB Python SDK",
1818
author="Yandex LLC",
1919
author_email="ydb@yandex-team.ru",

ydb/aio/pool.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import asyncio
22
import logging
33
import random
4+
import typing
45

56
from ydb import issues
67
from ydb.pool import ConnectionsCache as _ConnectionsCache, IConnectionPool
78

8-
from .connection import Connection
9+
from .connection import Connection, EndpointKey
910

1011
from . import resolver
1112

@@ -21,7 +22,7 @@ def __init__(self, use_all_nodes: bool = False):
2122

2223
self._fast_fail_error = None
2324

24-
async def get(self, preferred_endpoint=None, fast_fail=False, wait_timeout=10):
25+
async def get(self, preferred_endpoint: typing.Optional[EndpointKey] = None, fast_fail=False, wait_timeout=10):
2526

2627
if fast_fail:
2728
await asyncio.wait_for(self._fast_fail_event.wait(), timeout=wait_timeout)

ydb/pool.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
from concurrent import futures
66
import collections
77
import random
8+
import typing
89

910
from . import connection as connection_impl, issues, resolver, _utilities, tracing
1011
from abc import abstractmethod
1112

12-
from .connection import Connection
13+
from .connection import Connection, EndpointKey
1314

1415
logger = logging.getLogger(__name__)
1516

@@ -123,7 +124,7 @@ def subscribe(self):
123124
return subscription
124125

125126
@tracing.with_trace()
126-
def get(self, preferred_endpoint=None) -> Connection:
127+
def get(self, preferred_endpoint: typing.Optional[EndpointKey] = None) -> Connection:
127128
with self.lock:
128129
if preferred_endpoint is not None and preferred_endpoint.node_id in self.connections_by_node_id:
129130
return self.connections_by_node_id[preferred_endpoint.node_id]

ydb/topic.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,11 @@ def reader(
170170
consumer: str,
171171
buffer_size_bytes: int = 50 * 1024 * 1024,
172172
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
173+
# the func will be called from multiply threads in parallel
173174
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
174-
decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
175+
# custom decoder executor for call builtin and custom decoders. If None - use shared executor pool.
176+
# if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
177+
decoder_executor: Optional[concurrent.futures.Executor] = None,
175178
) -> TopicReaderAsyncIO:
176179

177180
if not decoder_executor:
@@ -194,8 +197,12 @@ def writer(
194197
auto_seqno: bool = True,
195198
auto_created_at: bool = True,
196199
codec: Optional[TopicCodec] = None, # default mean auto-select
200+
# encoders: map[codec_code] func(encoded_bytes)->decoded_bytes
201+
# the func will be called from multiply threads in parallel.
197202
encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None,
198-
encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
203+
# custom encoder executor for call builtin and custom decoders. If None - use shared executor pool.
204+
# If max_worker in the executor is 1 - then encoders will be called from the thread without parallel.
205+
encoder_executor: Optional[concurrent.futures.Executor] = None,
199206
) -> TopicWriterAsyncIO:
200207
args = locals().copy()
201208
del args["self"]
@@ -319,7 +326,10 @@ def reader(
319326
consumer: str,
320327
buffer_size_bytes: int = 50 * 1024 * 1024,
321328
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
329+
# the func will be called from multiply threads in parallel
322330
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
331+
# custom decoder executor for call builtin and custom decoders. If None - use shared executor pool.
332+
# if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
323333
decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
324334
) -> TopicReader:
325335
if not decoder_executor:
@@ -343,7 +353,11 @@ def writer(
343353
auto_seqno: bool = True,
344354
auto_created_at: bool = True,
345355
codec: Optional[TopicCodec] = None, # default mean auto-select
356+
# encoders: map[codec_code] func(encoded_bytes)->decoded_bytes
357+
# the func will be called from multiply threads in parallel.
346358
encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None,
359+
# custom encoder executor for call builtin and custom decoders. If None - use shared executor pool.
360+
# If max_worker in the executor is 1 - then encoders will be called from the thread without parallel.
347361
encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
348362
) -> TopicWriter:
349363
args = locals().copy()

ydb/ydb_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
VERSION = "3.5.0"
1+
VERSION = "3.5.1"

0 commit comments

Comments
 (0)