Skip to content

Commit aecca67

Browse files
committed
modified plan to use the same workers, but a fraction of the time those workers will perform inserts
1 parent 1650dc9 commit aecca67

File tree

1 file changed

+46
-95
lines changed

1 file changed

+46
-95
lines changed

docs/MIXED_WORKLOAD_IMPLEMENTATION_PLAN.md

Lines changed: 46 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11

22
## TODO
3-
Instead of having a separate pool of search and insert workers, have a single pool as before.
4-
However, some fraction of search requests will be converted to inserts. Reuse the test_set for vector inserts, and do not use random vectors.
3+
Implement a single pool of workers. For each task, a worker randomly chooses to perform a search or an insert, based on a configurable `insert_fraction` parameter. For inserts, reuse vectors from the test set (not random vectors).
54

65

76
# **Mixed Workload Implementation Plan**
@@ -33,11 +32,13 @@ Add mixed workload capabilities to vector-db-benchmark to measure search perform
3332
- **Unified Worker Pool**: Extend existing worker pattern to handle both search and insert tasks
3433
- **Existing CLI Patterns**: Just add `--mixed-workload` flag
3534

35+
3636
### **Technical Approach**
3737
```python
38-
# Extend existing BaseSearcher.search_all() to support mixed workloads
39-
# Some workers get search chunks, others get insert tasks
40-
# Same worker_function pattern, different task types
38+
# In BaseSearcher.search_all() or equivalent:
39+
# - Each worker processes a chunk of tasks (queries).
40+
# - For each task, with probability insert_fraction, do an insert (using test_set vector), else do a search.
41+
# - Use a parameter like insert_fraction to control the ratio.
4142
```
4243

4344
---
@@ -66,29 +67,13 @@ Flow: Search + Concurrent Inserts (reuses existing data)
6667

6768
### **Code Changes Required**
6869

69-
**File 1: `engine/base_client/search.py`** (~25 lines)
70-
```python
71-
# Add insert_one method to BaseSearcher for consistency
72-
@classmethod
73-
def insert_one(cls, vector_id: int, vector: List[float], metadata: Optional[dict] = None):
74-
"""Insert a single vector - raw database operation (like search_one)"""
75-
# Delegate to uploader's upload_batch with single item
76-
# This will be overridden by engines that have direct insert methods
77-
raise NotImplementedError("insert_one must be implemented by each engine")
7870

79-
@classmethod
80-
def _insert_one(cls, insert_record):
81-
"""Timed insert operation (like _search_one)"""
82-
vector_id, vector, metadata = insert_record
83-
start = time.perf_counter()
84-
cls.insert_one(vector_id, vector, metadata)
85-
end = time.perf_counter()
86-
87-
# Return consistent metrics (no precision for inserts)
88-
return 1.0, end - start # Always "successful", return latency
71+
**File 1: `engine/base_client/search.py`**
72+
```python
73+
# Add insert_one method to BaseSearcher for consistency (engine-specific)
74+
# Add insert_fraction parameter to control insert/search ratio
8975

90-
# Extend worker_function to support both task types
91-
def worker_function(self, distance, task_func, chunk_or_task, result_queue):
76+
def worker_function(self, distance, chunk, result_queue, insert_fraction=0.1, test_set=None):
9277
self.init_client(
9378
self.host,
9479
distance,
@@ -98,86 +83,51 @@ def worker_function(self, distance, task_func, chunk_or_task, result_queue):
9883
self.setup_search()
9984

10085
start_time = time.perf_counter()
101-
results = task_func(chunk_or_task) # process_chunk OR process_insert_chunk
86+
results = []
87+
for i, query in enumerate(chunk):
88+
if random.random() < insert_fraction:
89+
# Do insert using test_set[i % len(test_set)]
90+
vector_id, vector, metadata = test_set[i % len(test_set)]
91+
result = self._insert_one((vector_id, vector, metadata))
92+
else:
93+
result = self._search_one(query)
94+
results.append(result)
10295
result_queue.put((start_time, results))
103-
104-
# Mirror process_chunk for inserts
105-
def process_insert_chunk(insert_chunk):
106-
"""Process insert operations (mirrors process_chunk)"""
107-
# insert_chunk contains insert records instead of queries
108-
# Same pattern as search: [task_func(item) for item in chunk]
109-
return [BaseSearcher._insert_one(record) for record in insert_chunk]
11096
```
11197

112-
**File 2: `engine/base_client/client.py`** (~15 lines)
98+
99+
**File 2: `engine/base_client/client.py`**
113100
```python
114101
def run_experiment(
115102
# ... existing parameters ...
116-
mixed_workload_params: Optional[dict] = None, # NEW
103+
mixed_workload_params: Optional[dict] = None,
117104
):
118105
if mixed_workload_params:
119-
# Generate insert chunks and modify searcher for mixed workload
120-
return self._run_mixed_workload(dataset, mixed_workload_params, num_queries)
106+
insert_fraction = mixed_workload_params.get("insert_fraction", 0.1)
107+
test_set = ... # load or pass test set vectors
108+
# Pass insert_fraction and test_set to searchers
109+
return self._run_mixed_workload(dataset, insert_fraction, test_set, num_queries)
121110
# ... existing code unchanged ...
122111

123-
def _run_mixed_workload(self, dataset, mixed_params, num_queries):
124-
"""Generate insert records and configure searcher for unified workers"""
125-
insert_workers = mixed_params.get("insert_workers", 2)
126-
127-
# Generate insert chunks (same pattern as query chunks)
128-
insert_chunk_size = 100 # Inserts per chunk
129-
insert_chunks = []
130-
for worker_id in range(insert_workers):
131-
vector_id_start = 1000000 + worker_id * 100000
132-
chunk = []
133-
for i in range(insert_chunk_size):
134-
vector = np.random.random(dataset.config.vector_size).astype(np.float32).tolist()
135-
chunk.append((vector_id_start + i, vector, {}))
136-
insert_chunks.append(chunk)
137-
138-
# Configure searcher for mixed workload
112+
def _run_mixed_workload(self, dataset, insert_fraction, test_set, num_queries):
139113
self.searchers[0].search_params["mixed_workload"] = {
140-
"insert_chunks": insert_chunks,
114+
"insert_fraction": insert_fraction,
115+
"test_set": test_set,
141116
"dataset": dataset
142117
}
143-
144-
# Run normal search_all (now processes both search and insert chunks)
145-
results = self.searchers[0].search_all(dataset.config.distance, dataset.get_queries(), num_queries)
146-
147-
# Cleanup
118+
results = self.searchers[0].search_all(
119+
dataset.config.distance, dataset.get_queries(), num_queries,
120+
insert_fraction=insert_fraction, test_set=test_set
121+
)
148122
self.searchers[0].search_params.pop("mixed_workload", None)
149-
150123
return {"search": results}
151124
```
152125

153126
**File 3: Extend `BaseSearcher.search_all()`** (~15 lines)
154127
```python
155-
# In BaseSearcher.search_all(), around line 160 where processes are created:
156-
157-
# Check if mixed workload is enabled
158-
mixed_config = self.search_params.get("mixed_workload", None)
159-
if mixed_config:
160-
insert_chunks = mixed_config["insert_chunks"]
161-
162-
# Create search worker processes (unchanged)
163-
search_processes = []
164-
for chunk in query_chunks:
165-
process = Process(target=worker_function, args=(self, distance, process_chunk, chunk, result_queue))
166-
search_processes.append(process)
167-
168-
# Create insert worker processes (same pattern!)
169-
insert_processes = []
170-
for chunk in insert_chunks:
171-
process = Process(target=worker_function, args=(self, distance, process_insert_chunk, chunk, result_queue))
172-
insert_processes.append(process)
173-
174-
processes = search_processes + insert_processes
175-
else:
176-
# Original search-only processes
177-
processes = []
178-
for chunk in query_chunks:
179-
process = Process(target=worker_function, args=(self, distance, process_chunk, chunk, result_queue))
180-
processes.append(process)
128+
**File 3: `BaseSearcher.search_all()`**
129+
- No need to create separate insert/search processes.
130+
- All workers use the same function and decide per-task.
181131
```
182132

183133
**File 4: Engine-specific `insert_one` implementations** (~5 lines each)
@@ -279,12 +229,13 @@ def run(
279229

280230
### **Configuration**
281231

232+
282233
#### **Mixed Workload Parameters**
283234
```json
284235
{
285-
"mixed_workload_params": {
286-
"insert_workers": 2
287-
}
236+
"mixed_workload_params": {
237+
"insert_fraction": 0.1
238+
}
288239
}
289240
```
290241

@@ -326,10 +277,10 @@ def run(
326277
"mean_precisions": 0.945,
327278
"p95_time": 0.103,
328279
"mixed_workload": {
329-
"insert_count": 360,
330-
"insert_rate": 6.8,
331-
"insert_workers": 2,
332-
"search_workers": 6
280+
"insert_count": 360,
281+
"insert_rate": 6.8,
282+
"insert_fraction": 0.1,
283+
"search_workers": 8
333284
}
334285
}
335286
}
@@ -455,4 +406,4 @@ vector_bytes = np.array(vector).astype(np.float32).tobytes()
455406
**Goal**: Measure search performance under concurrent insert load
456407
**Approach**: Perfect consistency using insert_one mirroring search_one pattern
457408
**Changes**: ~60 lines across 4+ files
458-
**Benefits**: Architectural consistency with single-operation semantics
409+
**Benefits**: Architectural consistency with single-operation semantics

0 commit comments

Comments
 (0)