Skip to content

Commit 50b5cc6

Browse files
committed
fixed bug when multiple concurrent workers are trying to insert using the same doc_id
1 parent 18e1b9f commit 50b5cc6

File tree

1 file changed

+13
-6
lines changed

1 file changed

+13
-6
lines changed

engine/base_client/search.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121

2222
class BaseSearcher:
23-
_doc_id_counter = itertools.count(100000000)
23+
_doc_id_counter = None # Will be initialized per process
2424
MP_CONTEXT = None
2525

2626
def __init__(self, host, connection_params, search_params):
@@ -67,15 +67,22 @@ def _search_one(cls, query, top: Optional[int] = None):
6767
precision = len(ids.intersection(query.expected_result[:top])) / top
6868
return precision, end - start
6969

70+
@classmethod
71+
def _get_doc_id_counter(cls):
72+
if cls._doc_id_counter is None:
73+
# Use process ID to create unique starting point for each worker
74+
process_id = os.getpid()
75+
# Each process gets a unique range: 1000000000 + (pid * 1000000)
76+
start_offset = 1000000000 + (process_id % 1000) * 1000000
77+
cls._doc_id_counter = itertools.count(start_offset)
78+
return cls._doc_id_counter
79+
7080
@classmethod
7181
def _insert_one(cls, query):
7282
start = time.perf_counter()
7383

74-
# Generate unique doc_id here
75-
doc_id = next(cls._doc_id_counter)
76-
77-
# Debug logging to verify inserts are happening
78-
#print(f"DEBUG: Inserting vector with doc_id={doc_id}")
84+
# Generate unique doc_id with process-safe counter
85+
doc_id = next(cls._get_doc_id_counter())
7986

8087
cls.insert_one(str(doc_id), query.vector, query.meta_conditions)
8188
end = time.perf_counter()

0 commit comments

Comments
 (0)