-
Notifications
You must be signed in to change notification settings - Fork 81
feat: add async scan #377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat: add async scan #377
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the scan_vertex_async function,
scan_result = future.result()
may throw an exception but is not handled.
In the scan_edge_async function,
while scan_result and scan_result.has_next():
batch = scan_result.next()
If batch = scan_result.next() throws an exception, the scan_result may not be closed, leading to improper resource cleanup.
The comment may be incorrect:
"""
:type prop_names: if given empty, return all property # Should be :param
"""
Perhaps it should be:
"""
:param prop_names: if given empty, return all property
"""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds asynchronous scanning capabilities to the GraphStorageClient by introducing two new methods: scan_vertex_async()
and scan_edge_async()
. These methods enable concurrent, multi-partition scanning of vertices and edges using thread pools, addressing the limitation of only having blocking, single-threaded scan operations.
- Added concurrent.futures import for thread pool execution
- Implemented
scan_vertex_async()
method for parallel vertex scanning across partitions - Implemented
scan_edge_async()
method for parallel edge scanning across partitions
|
||
for future in concurrent.futures.as_completed(future_to_part): | ||
part = future_to_part[future] | ||
scan_result = future.result() # ScanResult |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The future.result()
call can raise exceptions from the background thread, but there's no exception handling. Consider wrapping this in a try-except block to handle potential errors gracefully, especially since the TODO comment mentions error handling as a known issue.
Copilot uses AI. Check for mistakes.
|
||
for future in concurrent.futures.as_completed(future_to_part): | ||
part = future_to_part[future] | ||
scan_result = future.result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as in scan_vertex_async: the future.result()
call can raise exceptions from the background thread, but there's no exception handling. Consider wrapping this in a try-except block to handle potential errors gracefully.
Copilot uses AI. Check for mistakes.
Tks to @Malone-AI @wey-gu, I will fix these errors in the next few days. |
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
What type of PR is this?
What problem(s) does this PR solve?
Issue(s) number:
N/A
Description:
scan_vertex_async()
andscan_edge_async()
asyncio
‑friendly way, filling the gap where only blocking, single‑threaded scans existed.How do you solve it?
concurrent.futures.ThreadPoolExecutor
to submit onescan_*_with_part
job per partition (viameta_cache.get_part_leaders
), dispatching them in parallel.ScanResult
in awhile scan_result.has_next(): yield part, batch
loop to stream(partition_id, batch)
pairs.Special notes for your reviewer, ex. impact of this fix, design document, etc: