|
1 | 1 |
|
2 | | -## TODO |
3 | | -Implement a single pool of workers. For each task, a worker randomly chooses to perform a search or an insert, based on a configurable `insert_fraction` parameter. For inserts, reuse vectors from the test set (not random vectors). |
4 | | - |
5 | | - |
6 | | -# **Mixed Workload Implementation Plan** |
7 | | -*Simple Strategy for Concurrent Search and Insert Operations* |
8 | | - |
9 | | -**Date:** August 8, 2025 |
10 | | -**Repository:** redis-scripts/vector-db-benchmark |
11 | | -**Branch:** mixed_workload |
12 | 2 |
|
13 | 3 | --- |
14 | 4 |
|
15 | 5 | ## **Overview** |
16 | 6 |
|
17 | 7 | Add mixed workload capabilities to vector-db-benchmark to measure search performance under concurrent insert load. This enables realistic testing of production scenarios where vector databases handle both read and write traffic simultaneously. |
18 | 8 |
|
19 | | -### **Core Approach** |
20 | | -- **Concurrent Operations**: Search existing vectors while inserting new ones |
21 | | -- **Shared Graph**: Both operations work on the same HNSW graph structure |
22 | | -- **Insert-Only**: Append new vectors (no updates/deletes) to avoid accuracy complications |
23 | | -- **Performance Focus**: Measure search QPS and latency degradation under insert load |
24 | | - |
25 | | ---- |
26 | | - |
27 | | -## **Implementation Strategy** |
28 | | - |
29 | | -### **Ultra-Simple Design** |
30 | | -- **Reuse Everything**: Leverage existing search and upload infrastructure |
31 | | -- **Zero New Files**: No new classes, configs, or dependencies |
32 | | -- **Unified Worker Pool**: Extend existing worker pattern to handle both search and insert tasks |
33 | | -- **Existing CLI Patterns**: Just add `--mixed-workload` flag |
34 | | - |
35 | | - |
36 | 9 | ### **Technical Approach** |
37 | 10 | ```python |
38 | 11 | # In BaseSearcher.search_all() or equivalent: |
@@ -65,70 +38,37 @@ Flow: Search + Concurrent Inserts (reuses existing data) |
65 | 38 |
|
66 | 39 | ## **Implementation Details** |
67 | 40 |
|
68 | | -### **Code Changes Required** |
69 | 41 |
|
| 42 | +### **Code Changes Required** |
70 | 43 |
|
71 | 44 | **File 1: `engine/base_client/search.py`** |
72 | 45 | ```python |
73 | | -# Add insert_one method to BaseSearcher for consistency (engine-specific) |
74 | | -# Add insert_fraction parameter to control insert/search ratio |
75 | | - |
76 | | -def worker_function(self, distance, chunk, result_queue, insert_fraction=0.1, test_set=None): |
77 | | - self.init_client( |
78 | | - self.host, |
79 | | - distance, |
80 | | - self.connection_params, |
81 | | - self.search_params, |
82 | | - ) |
83 | | - self.setup_search() |
84 | | - |
85 | | - start_time = time.perf_counter() |
| 46 | +def process_chunk(chunk, search_one, insert_one, insert_fraction=0.1, test_set=None): |
86 | 47 | results = [] |
87 | 48 | for i, query in enumerate(chunk): |
88 | 49 | if random.random() < insert_fraction: |
89 | | - # Do insert using test_set[i % len(test_set)] |
| 50 | + # Insert: use a vector from test_set |
90 | 51 | vector_id, vector, metadata = test_set[i % len(test_set)] |
91 | | - result = self._insert_one((vector_id, vector, metadata)) |
| 52 | + result = insert_one(vector_id, vector, metadata) |
92 | 53 | else: |
93 | | - result = self._search_one(query) |
| 54 | + # Search |
| 55 | + result = search_one(query) |
94 | 56 | results.append(result) |
95 | | - result_queue.put((start_time, results)) |
| 57 | + return results |
96 | 58 | ``` |
97 | 59 |
|
98 | | - |
99 | | -**File 2: `engine/base_client/client.py`** |
| 60 | +**File 2: `worker_function`** |
100 | 61 | ```python |
101 | | -def run_experiment( |
102 | | - # ... existing parameters ... |
103 | | - mixed_workload_params: Optional[dict] = None, |
104 | | -): |
105 | | - if mixed_workload_params: |
106 | | - insert_fraction = mixed_workload_params.get("insert_fraction", 0.1) |
107 | | - test_set = ... # load or pass test set vectors |
108 | | - # Pass insert_fraction and test_set to searchers |
109 | | - return self._run_mixed_workload(dataset, insert_fraction, test_set, num_queries) |
110 | | - # ... existing code unchanged ... |
111 | | - |
112 | | -def _run_mixed_workload(self, dataset, insert_fraction, test_set, num_queries): |
113 | | - self.searchers[0].search_params["mixed_workload"] = { |
114 | | - "insert_fraction": insert_fraction, |
115 | | - "test_set": test_set, |
116 | | - "dataset": dataset |
117 | | - } |
118 | | - results = self.searchers[0].search_all( |
119 | | - dataset.config.distance, dataset.get_queries(), num_queries, |
120 | | - insert_fraction=insert_fraction, test_set=test_set |
121 | | - ) |
122 | | - self.searchers[0].search_params.pop("mixed_workload", None) |
123 | | - return {"search": results} |
| 62 | +def worker_function(self, distance, search_one, insert_one, chunk, result_queue, insert_fraction=0.1, test_set=None): |
| 63 | + self.init_client(self.host, distance, self.connection_params, self.search_params) |
| 64 | + self.setup_search() |
| 65 | + start_time = time.perf_counter() |
| 66 | + results = process_chunk(chunk, search_one, insert_one, insert_fraction, test_set) |
| 67 | + result_queue.put((start_time, results)) |
124 | 68 | ``` |
125 | 69 |
|
126 | | -**File 3: Extend `BaseSearcher.search_all()`** (~15 lines) |
127 | | -```python |
128 | 70 | **File 3: `BaseSearcher.search_all()`** |
129 | | -- No need to create separate insert/search processes. |
130 | | -- All workers use the same function and decide per-task. |
131 | | -``` |
| 71 | +- When creating worker processes, pass `search_one`, `insert_one`, `insert_fraction`, and `test_set` as arguments to each worker. |
132 | 72 |
|
133 | 73 | **File 4: Engine-specific `insert_one` implementations** (~5 lines each) |
134 | 74 | ```python |
@@ -399,11 +339,12 @@ vector_bytes = np.array(vector).astype(np.float32).tobytes() |
399 | 339 |
|
400 | 340 | **Total: ~7 hours** |
401 | 341 |
|
402 | | ---- |
403 | | - |
404 | | -## **Summary** |
405 | | - |
406 | | -**Goal**: Measure search performance under concurrent insert load |
407 | | -**Approach**: Perfect consistency using insert_one mirroring search_one pattern |
408 | | -**Changes**: ~60 lines across 4+ files |
409 | | -**Benefits**: Architectural consistency with single-operation semantics |
| 342 | +**File 2: `worker_function`** |
| 343 | +```python |
| 344 | +def worker_function(self, distance, search_one, insert_one, chunk, result_queue, insert_fraction=0.1, test_set=None): |
| 345 | + self.init_client(self.host, distance, self.connection_params, self.search_params) |
| 346 | + self.setup_search() |
| 347 | + start_time = time.perf_counter() |
| 348 | + results = process_chunk(chunk, search_one, insert_one, insert_fraction, test_set) |
| 349 | + result_queue.put((start_time, results)) |
| 350 | +``` |
0 commit comments