Skip to content

Commit 1f71404

Browse files
FengzdadiCopilotPsiACE
authored
feat: add async scan (#377)
* feat: add scan_vertex_async and scan_edge_async * doc: add TODO for scan_vertex_async * doc: Formatting and annotation adjustments * doc: Apply doc suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: Consider whether scan_result is empty (from code review) Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Chojan Shang <chojan.shang@vesoft.com>
1 parent 45e33c4 commit 1f71404

File tree

1 file changed

+124
-0
lines changed

1 file changed

+124
-0
lines changed

nebula3/sclient/GraphStorageClient.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
The client to scan vertex and edge from storage,
1111
the return data is from thr graph database
1212
"""
13+
1314
import sys
15+
import concurrent.futures
1416

1517
from nebula3.sclient.ScanResult import ScanResult
1618
from nebula3.sclient.net import GraphStorageConnection
@@ -192,6 +194,69 @@ def scan_vertex_with_part(
192194
partial_success,
193195
)
194196

197+
# TODO: 1.Native async or PyO3
198+
# 2.Error Handling
199+
# 3.Statistical indicators
200+
def scan_vertex_async(
201+
self,
202+
space_name,
203+
tag_name,
204+
prop_names=[],
205+
start_time=DEFAULT_START_TIME,
206+
end_time=DEFAULT_END_TIME,
207+
where=None,
208+
only_latest_version=False,
209+
enable_read_from_follower=True,
210+
partial_success=False,
211+
batch_size=1000,
212+
max_workers=8,
213+
):
214+
"""
215+
scan_vertex_async:Multi-partition concurrency and streaming batch yield
216+
217+
:param space_name: the space name
218+
:param tag_name: the tag name
219+
:param prop_names: if given empty, return all property
220+
:param start_time: the min version of vertex
221+
:param end_time: the max version of vertex
222+
:param where: now is unsupported
223+
:param only_latest_version: when storage enable multi versions and only_latest_version is true,
224+
only return latest version.
225+
when storage disable multi versions, just use the default value.
226+
:param enable_read_from_follower: if set to false, forbid follower read
227+
:param partial_success: if set true, when partial success, it will continue until finish
228+
:param batch_size: The number of points in each batch (passed to scan_vertex_with_part)
229+
:param max_workers: Number of concurrent threads
230+
:yield: part_id, VertexResult # Each batch of data
231+
232+
"""
233+
part_leaders = self._meta_cache.get_part_leaders(space_name)
234+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
235+
future_to_part = {}
236+
for part, leader in part_leaders.items():
237+
future = executor.submit(
238+
self.scan_vertex_with_part,
239+
space_name,
240+
part,
241+
tag_name,
242+
prop_names,
243+
batch_size, # The limit passed to scan_vertex_with_part
244+
start_time,
245+
end_time,
246+
where,
247+
only_latest_version,
248+
enable_read_from_follower,
249+
partial_success,
250+
)
251+
future_to_part[future] = part
252+
253+
for future in concurrent.futures.as_completed(future_to_part):
254+
part = future_to_part[future]
255+
scan_result = future.result() # ScanResult
256+
while scan_result is not None and scan_result.has_next():
257+
batch = scan_result.next()
258+
yield part, batch
259+
195260
def _scan_vertex(
196261
self,
197262
space_name,
@@ -337,6 +402,65 @@ def scan_edge_with_part(
337402
partial_success,
338403
)
339404

405+
def scan_edge_async(
406+
self,
407+
space_name,
408+
edge_name,
409+
prop_names=[],
410+
start_time=DEFAULT_START_TIME,
411+
end_time=DEFAULT_END_TIME,
412+
where=None,
413+
only_latest_version=False,
414+
enable_read_from_follower=True,
415+
partial_success=False,
416+
batch_size=1000,
417+
max_workers=8,
418+
):
419+
"""
420+
scan_edge_async:Multi-partition concurrency and streaming batch yield
421+
422+
:param space_name: the space name
423+
:param prop_names: if given empty, return all property
424+
:param edge_name: the edge name
425+
:param start_time: the min version of edge
426+
:param end_time: the max version of edge
427+
:param where: now is unsupported
428+
:param only_latest_version: when storage enable multi versions and only_latest_version is true,
429+
only return latest version.
430+
when storage disable multi versions, just use the default value.
431+
:param enable_read_from_follower: if set to false, forbid follower read
432+
:param partial_success: if set true, when partial success, it will continue until finish
433+
:param batch_size: The number of edges per batch (passed to scan_edge_with_part)
434+
:param max_workers: Number of concurrent threads
435+
:yield: part_id, EdgeResult # Each batch of data
436+
"""
437+
part_leaders = self._meta_cache.get_part_leaders(space_name)
438+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
439+
future_to_part = {}
440+
for part, leader in part_leaders.items():
441+
future = executor.submit(
442+
self.scan_edge_with_part,
443+
space_name,
444+
part,
445+
edge_name,
446+
prop_names,
447+
batch_size,
448+
start_time,
449+
end_time,
450+
where,
451+
only_latest_version,
452+
enable_read_from_follower,
453+
partial_success,
454+
)
455+
future_to_part[future] = part
456+
457+
for future in concurrent.futures.as_completed(future_to_part):
458+
part = future_to_part[future]
459+
scan_result = future.result()
460+
while scan_result is not None and scan_result.has_next():
461+
batch = scan_result.next()
462+
yield part, batch
463+
340464
def _scan_edge(
341465
self,
342466
space_name,

0 commit comments

Comments
 (0)