|
10 | 10 | The client to scan vertex and edge from storage, |
11 | 11 | the return data is from thr graph database |
12 | 12 | """ |
| 13 | + |
13 | 14 | import sys |
| 15 | +import concurrent.futures |
14 | 16 |
|
15 | 17 | from nebula3.sclient.ScanResult import ScanResult |
16 | 18 | from nebula3.sclient.net import GraphStorageConnection |
@@ -192,6 +194,69 @@ def scan_vertex_with_part( |
192 | 194 | partial_success, |
193 | 195 | ) |
194 | 196 |
|
| 197 | + # TODO: 1.Native async or PyO3 |
| 198 | + # 2.Error Handling |
| 199 | + # 3.Statistical indicators |
| 200 | + def scan_vertex_async( |
| 201 | + self, |
| 202 | + space_name, |
| 203 | + tag_name, |
| 204 | + prop_names=[], |
| 205 | + start_time=DEFAULT_START_TIME, |
| 206 | + end_time=DEFAULT_END_TIME, |
| 207 | + where=None, |
| 208 | + only_latest_version=False, |
| 209 | + enable_read_from_follower=True, |
| 210 | + partial_success=False, |
| 211 | + batch_size=1000, |
| 212 | + max_workers=8, |
| 213 | + ): |
| 214 | + """ |
| 215 | + scan_vertex_async:Multi-partition concurrency and streaming batch yield |
| 216 | +
|
| 217 | + :param space_name: the space name |
| 218 | + :param tag_name: the tag name |
| 219 | + :param prop_names: if given empty, return all property |
| 220 | + :param start_time: the min version of vertex |
| 221 | + :param end_time: the max version of vertex |
| 222 | + :param where: now is unsupported |
| 223 | + :param only_latest_version: when storage enable multi versions and only_latest_version is true, |
| 224 | + only return latest version. |
| 225 | + when storage disable multi versions, just use the default value. |
| 226 | + :param enable_read_from_follower: if set to false, forbid follower read |
| 227 | + :param partial_success: if set true, when partial success, it will continue until finish |
| 228 | + :param batch_size: The number of points in each batch (passed to scan_vertex_with_part) |
| 229 | + :param max_workers: Number of concurrent threads |
| 230 | + :yield: part_id, VertexResult # Each batch of data |
| 231 | +
|
| 232 | + """ |
| 233 | + part_leaders = self._meta_cache.get_part_leaders(space_name) |
| 234 | + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
| 235 | + future_to_part = {} |
| 236 | + for part, leader in part_leaders.items(): |
| 237 | + future = executor.submit( |
| 238 | + self.scan_vertex_with_part, |
| 239 | + space_name, |
| 240 | + part, |
| 241 | + tag_name, |
| 242 | + prop_names, |
| 243 | + batch_size, # The limit passed to scan_vertex_with_part |
| 244 | + start_time, |
| 245 | + end_time, |
| 246 | + where, |
| 247 | + only_latest_version, |
| 248 | + enable_read_from_follower, |
| 249 | + partial_success, |
| 250 | + ) |
| 251 | + future_to_part[future] = part |
| 252 | + |
| 253 | + for future in concurrent.futures.as_completed(future_to_part): |
| 254 | + part = future_to_part[future] |
| 255 | + scan_result = future.result() # ScanResult |
| 256 | + while scan_result is not None and scan_result.has_next(): |
| 257 | + batch = scan_result.next() |
| 258 | + yield part, batch |
| 259 | + |
195 | 260 | def _scan_vertex( |
196 | 261 | self, |
197 | 262 | space_name, |
@@ -337,6 +402,65 @@ def scan_edge_with_part( |
337 | 402 | partial_success, |
338 | 403 | ) |
339 | 404 |
|
| 405 | + def scan_edge_async( |
| 406 | + self, |
| 407 | + space_name, |
| 408 | + edge_name, |
| 409 | + prop_names=[], |
| 410 | + start_time=DEFAULT_START_TIME, |
| 411 | + end_time=DEFAULT_END_TIME, |
| 412 | + where=None, |
| 413 | + only_latest_version=False, |
| 414 | + enable_read_from_follower=True, |
| 415 | + partial_success=False, |
| 416 | + batch_size=1000, |
| 417 | + max_workers=8, |
| 418 | + ): |
| 419 | + """ |
| 420 | + scan_edge_async:Multi-partition concurrency and streaming batch yield |
| 421 | +
|
| 422 | + :param space_name: the space name |
| 423 | + :param prop_names: if given empty, return all property |
| 424 | + :param edge_name: the edge name |
| 425 | + :param start_time: the min version of edge |
| 426 | + :param end_time: the max version of edge |
| 427 | + :param where: now is unsupported |
| 428 | + :param only_latest_version: when storage enable multi versions and only_latest_version is true, |
| 429 | + only return latest version. |
| 430 | + when storage disable multi versions, just use the default value. |
| 431 | + :param enable_read_from_follower: if set to false, forbid follower read |
| 432 | + :param partial_success: if set true, when partial success, it will continue until finish |
| 433 | + :param batch_size: The number of edges per batch (passed to scan_edge_with_part) |
| 434 | + :param max_workers: Number of concurrent threads |
| 435 | + :yield: part_id, EdgeResult # Each batch of data |
| 436 | + """ |
| 437 | + part_leaders = self._meta_cache.get_part_leaders(space_name) |
| 438 | + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
| 439 | + future_to_part = {} |
| 440 | + for part, leader in part_leaders.items(): |
| 441 | + future = executor.submit( |
| 442 | + self.scan_edge_with_part, |
| 443 | + space_name, |
| 444 | + part, |
| 445 | + edge_name, |
| 446 | + prop_names, |
| 447 | + batch_size, |
| 448 | + start_time, |
| 449 | + end_time, |
| 450 | + where, |
| 451 | + only_latest_version, |
| 452 | + enable_read_from_follower, |
| 453 | + partial_success, |
| 454 | + ) |
| 455 | + future_to_part[future] = part |
| 456 | + |
| 457 | + for future in concurrent.futures.as_completed(future_to_part): |
| 458 | + part = future_to_part[future] |
| 459 | + scan_result = future.result() |
| 460 | + while scan_result is not None and scan_result.has_next(): |
| 461 | + batch = scan_result.next() |
| 462 | + yield part, batch |
| 463 | + |
340 | 464 | def _scan_edge( |
341 | 465 | self, |
342 | 466 | space_name, |
|
0 commit comments