Skip to content

Commit 0187a4e

Browse files
author
Valeriya Popova
committed
slo: add create/cleanup
1 parent 692937f commit 0187a4e

File tree

6 files changed

+229
-1
lines changed

6 files changed

+229
-1
lines changed

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version: "3.9"
1+
version: "3.3"
22
services:
33
ydb:
44
image: cr.yandex/yc/yandex-docker-local-ydb:latest

tests/slo/__init__.py

Whitespace-only changes.

tests/slo/__main__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import gc
2+
3+
from options import parse_options
4+
from runner import run_from_args
5+
6+
7+
if __name__ == "__main__":
8+
args = parse_options()
9+
gc.disable()
10+
run_from_args(args)

tests/slo/generator.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# -*- coding: utf-8 -*-
2+
import dataclasses
3+
import logging
4+
import random
5+
import string
6+
from datetime import datetime
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
MaxUi32 = 2**32 - 1
12+
13+
14+
def hash_ui32(value):
15+
return abs(hash(str(value))) % MaxUi32
16+
17+
18+
def generate_random_string(min_len, max_len):
19+
strlen = random.randint(min_len, max_len)
20+
return "".join(random.choices(string.ascii_lowercase, k=strlen))
21+
22+
23+
@dataclasses.dataclass(slots=True)
24+
class Row:
25+
object_id_key: int
26+
object_id: int
27+
payload_str: str
28+
payload_double: float
29+
payload_timestamp: datetime
30+
31+
# First id in current shard
32+
def get_shard_id(self, partitions_count):
33+
shard_size = int((MaxUi32 + 1) / partitions_count)
34+
return self.object_id_key / shard_size
35+
36+
37+
@dataclasses.dataclass
38+
class RowGenerator:
39+
id_counter: int = 0
40+
41+
def get(self):
42+
self.id_counter += 1
43+
if self.id_counter >= MaxUi32:
44+
self.id_counter = 0
45+
logger.warning("RowGenerator: maxint reached")
46+
47+
return Row(
48+
object_id_key=hash_ui32(self.id_counter),
49+
object_id=self.id_counter,
50+
payload_str=generate_random_string(20, 40),
51+
payload_double=random.random(),
52+
payload_timestamp=datetime.now(),
53+
)
54+
55+
56+
class PackGenerator:
57+
def __init__(self, args, start_id=0):
58+
self._row_generator = RowGenerator(start_id)
59+
60+
self._remain = args.initial_data_count
61+
self._pack_size = args.pack_size
62+
self._partitions_count = args.partitions_count
63+
64+
self._packs = {}
65+
66+
def get_next_pack(self):
67+
while self._remain:
68+
new_record = self._row_generator.get()
69+
shard_id = new_record.get_shard_id(self._partitions_count)
70+
71+
self._remain -= 1
72+
73+
if shard_id in self._packs:
74+
existing_pack = self._packs[shard_id]
75+
existing_pack.append(new_record)
76+
if len(existing_pack) >= self._pack_size:
77+
return self._packs.pop(shard_id)
78+
else:
79+
self._packs[shard_id] = [new_record]
80+
81+
for shard_id, pack in self._packs.items():
82+
if pack:
83+
return self._packs.pop(shard_id)

tests/slo/options.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import argparse
2+
3+
4+
def add_common_options(parser):
5+
parser.add_argument("endpoint", help="YDB endpoint")
6+
parser.add_argument("db", help="YDB database name")
7+
parser.add_argument("-t", "--table_name", default="key_value", help="Table name")
8+
parser.add_argument("--write_timeout", default=20000, type=int, help="Read requests execution timeout [ms]")
9+
10+
11+
def make_create_parser(subparsers):
12+
create_parser = subparsers.add_parser("create", help="Create tables and fill with initial content")
13+
add_common_options(create_parser)
14+
create_parser.add_argument("-p", "--partitions_count", default=64, type=int, help="Number of partition in table")
15+
create_parser.add_argument(
16+
"-c", "--initial-data-count", default=1000, type=int, help="Total number of records to generate"
17+
)
18+
create_parser.add_argument(
19+
"--pack_size", default="100", type=int, help="Number of new records in each create request"
20+
)
21+
22+
23+
def make_run_parser(subparsers, name="run"):
24+
run_parser = subparsers.add_parser(name, help="Run measurable workload")
25+
add_common_options(run_parser)
26+
run_parser.add_argument("--write_rps", default=10, type=int, help="Write request rps")
27+
run_parser.add_argument("--read_rps", default=100, type=int, help="Read request rps")
28+
run_parser.add_argument("--no_write", default=False, action="store_true")
29+
run_parser.add_argument("--no_read", default=False, action="store_true")
30+
run_parser.add_argument("--time", default=10, type=int, help="Time to run (Seconds)")
31+
run_parser.add_argument("--read_timeout", default=70, type=int, help="Read requests execution timeout [ms]")
32+
run_parser.add_argument("--save_result", default=False, action="store_true", help="Save result to file")
33+
run_parser.add_argument("--result_file_name", default="slo_result.json", help="Result json file name")
34+
run_parser.add_argument("--no_prepare", default=False, action="store_true", help="Do not prepare requests")
35+
36+
37+
def make_cleanup_parser(subparsers):
38+
cleanup_parser = subparsers.add_parser("cleanup", help="Drop tables")
39+
add_common_options(cleanup_parser)
40+
41+
42+
def get_root_parser():
43+
parser = argparse.ArgumentParser(
44+
formatter_class=argparse.RawDescriptionHelpFormatter,
45+
description="YDB Python SLO application",
46+
)
47+
48+
subparsers = parser.add_subparsers(
49+
title="subcommands",
50+
dest="subcommand",
51+
help="List of subcommands",
52+
)
53+
54+
make_create_parser(subparsers)
55+
make_run_parser(subparsers)
56+
make_cleanup_parser(subparsers)
57+
58+
return parser
59+
60+
61+
def parse_options():
62+
parser = get_root_parser()
63+
return parser.parse_args()

tests/slo/runner.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import ydb
2+
3+
from os import path
4+
from generator import PackGenerator
5+
6+
from prometheus_client import Summary, Counter
7+
8+
9+
def run_create(args, driver):
10+
session = ydb.retry_operation_sync(lambda: driver.table_client.session().create())
11+
tb_name = path.join(args.db, args.table_name)
12+
session.create_table(
13+
tb_name,
14+
ydb.TableDescription()
15+
.with_column(ydb.Column("object_id_key", ydb.OptionalType(ydb.PrimitiveType.Uint32)))
16+
.with_column(ydb.Column("object_id", ydb.OptionalType(ydb.PrimitiveType.Uint32)))
17+
.with_column(ydb.Column("payload_str", ydb.OptionalType(ydb.PrimitiveType.Utf8)))
18+
.with_column(ydb.Column("payload_double", ydb.OptionalType(ydb.PrimitiveType.Double)))
19+
.with_column(ydb.Column("payload_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp)))
20+
.with_primary_keys("object_id_key", "object_id")
21+
.with_profile(
22+
ydb.TableProfile().with_partitioning_policy(
23+
ydb.PartitioningPolicy().with_uniform_partitions(args.partitions_count)
24+
)
25+
),
26+
)
27+
28+
prepare_q = """
29+
DECLARE $items AS List<Struct<
30+
object_id_key: Uint32,
31+
object_id: Uint32,
32+
payload_str: Utf8,
33+
payload_double: Double,
34+
payload_timestamp: Timestamp>>;
35+
UPSERT INTO `{}` SELECT * FROM AS_TABLE($items);
36+
"""
37+
prepared = session.prepare(prepare_q.format(tb_name))
38+
39+
generator = PackGenerator(args)
40+
while data := generator.get_next_pack():
41+
tx = session.transaction()
42+
tx.execute(prepared, {"$items": data})
43+
tx.commit()
44+
45+
46+
def run_cleanup(args, driver):
47+
session = driver.table_client.session().create()
48+
session.drop_table(path.join(args.db, args.table_name))
49+
50+
51+
def run_from_args(args):
52+
driver_config = ydb.DriverConfig(
53+
args.endpoint,
54+
database=args.db,
55+
credentials=ydb.credentials_from_env_variables(),
56+
grpc_keep_alive_timeout=5000,
57+
)
58+
59+
with ydb.Driver(driver_config) as driver:
60+
driver.wait(timeout=5)
61+
62+
try:
63+
if args.subcommand == "create":
64+
run_create(args, driver)
65+
elif args.subcommand == "run":
66+
pass
67+
elif args.subcommand == "cleanup":
68+
run_cleanup(args, driver)
69+
else:
70+
raise RuntimeError(f"Unknown command {args.subcommand}")
71+
finally:
72+
driver.stop()

0 commit comments

Comments
 (0)