perf: parallelize sequential awaits with asyncio in async_subtensor#3275
perf: parallelize sequential awaits with asyncio in async_subtensor#3275altis0725 wants to merge 1 commit intoopentensor:stagingfrom
Conversation
Apply asyncio.gather to three methods for concurrent RPC execution: - get_all_dynamic_info: run runtime_call and get_subnet_prices concurrently - blocks_since_last_update: run get_block_number and get_hyperparameter concurrently, pre-resolve block_hash for chain state consistency - blocks_until_next_epoch: dynamically gather block and tempo lookups Fixes opentensor#3129 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
4b35a56 to
209e0c3
Compare
|
Hi @basfroman, thanks for the feedback. I've rebased the branch to a single clean commit. This PR implements the two items from #3129:
The previous version had too many incremental commits that obscured the intent — apologies for the noise. |
thewhaleking
left a comment
There was a problem hiding this comment.
Overall, I think this is good. Just needs some tweaks.
| ) | ||
| subnet_prices = await self.get_subnet_prices(block_hash=block_hash) | ||
|
|
||
| if isinstance(query, BaseException): |
There was a problem hiding this comment.
In what case would this raise a BaseException and why would that be the only thing we'd want to raise?
If we just don't return exceptions [i.e. return_exceptions=False (the default)], we don't have to do this logic at all.
| if need_block: | ||
| coros.append(self.substrate.get_block_number(block_hash=block_hash)) | ||
| if need_tempo: | ||
| coros.append(self.tempo(netuid=netuid, block_hash=block_hash)) | ||
| if coros: | ||
| results = await asyncio.gather(*coros) | ||
| idx = 0 | ||
| if need_block: | ||
| block = results[idx] | ||
| idx += 1 | ||
| if need_tempo: | ||
| tempo = results[idx] |
There was a problem hiding this comment.
This works, but this logic is fairly fragile. Why don't we either hold it in a dict or assign tasks and use the task name?
Fixes #3129
Summary
Apply
asyncio.gatherto three methods to run sequential RPC calls concurrently:get_all_dynamic_info:runtime_callandget_subnet_pricesin parallelblocks_since_last_update:get_block_numberandget_hyperparameterin parallel, with pre-resolvedblock_hashfor chain state consistencyblocks_until_next_epoch: dynamic gather forget_block_numberandtempowhen both need fetchingEach parallelized pair saves ~1 RPC round-trip (50-200ms depending on node latency).
Test plan
test_blocks_since_last_update_successandtest_blocks_since_last_update_no_last_updatefor new calling convention