Skip to content

Commit 3da64a0

Browse files
committed
add autosplit example
1 parent f5099d3 commit 3da64a0

File tree

1 file changed

+89
-0
lines changed

1 file changed

+89
-0
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import argparse
2+
import asyncio
3+
import datetime
4+
import logging
5+
6+
import ydb
7+
8+
logger = logging.getLogger(__name__)
9+
logger.setLevel(logging.DEBUG)
10+
11+
12+
async def connect(endpoint: str, database: str) -> ydb.aio.Driver:
13+
config = ydb.DriverConfig(endpoint=endpoint, database=database)
14+
config.credentials = ydb.credentials_from_env_variables()
15+
driver = ydb.aio.Driver(config)
16+
await driver.wait(5, fail_fast=True)
17+
return driver
18+
19+
20+
async def recreate_topic(driver: ydb.aio.Driver, topic: str, consumer: str):
21+
try:
22+
await driver.topic_client.drop_topic(topic)
23+
except ydb.SchemeError:
24+
pass
25+
26+
await driver.topic_client.create_topic(
27+
topic,
28+
consumers=[consumer],
29+
max_active_partitions=100,
30+
auto_partitioning_settings=ydb.TopicAutoPartitioningSettings(
31+
strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP,
32+
up_utilization_percent=1,
33+
down_utilization_percent=1,
34+
stabilization_window=datetime.timedelta(seconds=1),
35+
),
36+
)
37+
38+
39+
async def write_messages(driver: ydb.aio.Driver, topic: str, id: int = 0):
40+
async with driver.topic_client.writer(topic) as writer:
41+
for i in range(100):
42+
mess = ydb.TopicWriterMessage(data=f"[{id}] mess-{i}", metadata_items={"index": f"{i}"})
43+
await writer.write(mess)
44+
await asyncio.sleep(0.01)
45+
46+
47+
async def read_messages(driver: ydb.aio.Driver, topic: str, consumer: str):
48+
async with driver.topic_client.reader(topic, consumer, auto_partitioning_support=True) as reader:
49+
count = 0
50+
while True:
51+
try:
52+
mess = await asyncio.wait_for(reader.receive_message(), 5)
53+
count += 1
54+
print(mess.data.decode())
55+
reader.commit(mess)
56+
except asyncio.TimeoutError:
57+
assert count == 200
58+
return
59+
60+
61+
async def main():
62+
parser = argparse.ArgumentParser(
63+
formatter_class=argparse.RawDescriptionHelpFormatter,
64+
description="""YDB topic basic example.\n""",
65+
)
66+
parser.add_argument("-d", "--database", default="/local", help="Name of the database to use")
67+
parser.add_argument("-e", "--endpoint", default="grpc://localhost:2136", help="Endpoint url to use")
68+
parser.add_argument("-p", "--path", default="test-topic", help="Topic name")
69+
parser.add_argument("-c", "--consumer", default="consumer", help="Consumer name")
70+
parser.add_argument("-v", "--verbose", default=True, action="store_true")
71+
72+
args = parser.parse_args()
73+
74+
if args.verbose:
75+
logger.addHandler(logging.StreamHandler())
76+
77+
driver = await connect(args.endpoint, args.database)
78+
79+
await recreate_topic(driver, args.path, args.consumer)
80+
81+
await asyncio.gather(
82+
write_messages(driver, args.path, 0),
83+
write_messages(driver, args.path, 1),
84+
read_messages(driver, args.path, args.consumer),
85+
)
86+
87+
88+
if __name__ == "__main__":
89+
asyncio.run(main())

0 commit comments

Comments
 (0)