Skip to content

fully exhaust query map #144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 65 additions & 16 deletions async_substrate_interface/async_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ async def retrieve_next_page(self, start_key) -> list:
)
if len(result.records) < self.page_size:
self.loading_complete = True

# Update last key from new result set to use as offset for next page
self.last_key = result.last_key
return result.records
Expand Down Expand Up @@ -3373,6 +3372,7 @@ async def query_map(
page_size: int = 100,
ignore_decoding_errors: bool = False,
reuse_block_hash: bool = False,
fully_exhaust: bool = False,
) -> AsyncQueryMapResult:
"""
Iterates over all key-pairs located at the given module and storage_function. The storage
Expand Down Expand Up @@ -3403,6 +3403,8 @@ async def query_map(
decoding
reuse_block_hash: use True if you wish to make the query using the last-used block hash. Do not mark True
if supplying a block_hash
fully_exhaust: Pull the entire result at once, rather than paginating. Only use if you need the entire query
map result.

Returns:
AsyncQueryMapResult object
Expand Down Expand Up @@ -3453,11 +3455,16 @@ async def query_map(
page_size = max_results

# Retrieve storage keys
response = await self.rpc_request(
method="state_getKeysPaged",
params=[prefix, page_size, start_key, block_hash],
runtime=runtime,
)
if not fully_exhaust:
response = await self.rpc_request(
method="state_getKeysPaged",
params=[prefix, page_size, start_key, block_hash],
runtime=runtime,
)
else:
response = await self.rpc_request(
method="state_getKeys", params=[prefix, block_hash], runtime=runtime
)

if "error" in response:
raise SubstrateRequestException(response["error"]["message"])
Expand All @@ -3470,18 +3477,60 @@ async def query_map(
if len(result_keys) > 0:
last_key = result_keys[-1]

# Retrieve corresponding value
response = await self.rpc_request(
method="state_queryStorageAt",
params=[result_keys, block_hash],
runtime=runtime,
)
# Retrieve corresponding value(s)
if not fully_exhaust:
response = await self.rpc_request(
method="state_queryStorageAt",
params=[result_keys, block_hash],
runtime=runtime,
)
if "error" in response:
raise SubstrateRequestException(response["error"]["message"])
for result_group in response["result"]:
result = decode_query_map(
result_group["changes"],
prefix,
runtime,
param_types,
params,
value_type,
key_hashers,
ignore_decoding_errors,
self.decode_ss58,
)
else:
all_responses = []
page_batches = [
result_keys[i : i + page_size]
for i in range(0, len(result_keys), page_size)
]
changes = []
for batch_group in [
# run five concurrent batch pulls; could go higher, but it's good to be a good citizens
# of the ecosystem
page_batches[i : i + 5]
for i in range(0, len(page_batches), 5)
]:
all_responses.extend(
await asyncio.gather(
*[
self.rpc_request(
method="state_queryStorageAt",
params=[batch_keys, block_hash],
runtime=runtime,
)
Comment on lines +3517 to +3521
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if one of the self.rpc_requests raises an error? For example, due to a short, temporary bad connection? Or a broken data structure?
You don't want to use return_exceptions=True for the gather and then filter the result? or use try except with logging in case of error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I'd rather not. The problem with returning exceptions is that we would have one of three options at that point:

  1. log the exception, and return partial data
  2. Return no data, and re-raise the exception
  3. Retry the failed calls

By not returning the exception, we're implicitly choosing option 2. Option 1 would be fine if we trusted people to correctly examine logs before moving on. However, in the case where most of this will not be watched before being used, we cannot rely on someone to say "okay, this is incomplete data, what now must I do with it?", and also the fact that this only occurs when using fully_exhaust=True, which is stated to only be used if you want all (read: not partial) data. Therefore, I think this is the best option.

Option 3 doesn't make sense, because those calls are already retried if the exception is due to a websocket timeout, and if not can be caught be something else (such as with RetryAsyncSubstrate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, we only consider the case where ALL results are successful or we get an error. Right?

for batch_keys in batch_group
]
)
)
for response in all_responses:
if "error" in response:
raise SubstrateRequestException(response["error"]["message"])
for result_group in response["result"]:
changes.extend(result_group["changes"])

if "error" in response:
raise SubstrateRequestException(response["error"]["message"])
for result_group in response["result"]:
result = decode_query_map(
result_group["changes"],
changes,
prefix,
runtime,
param_types,
Expand Down
10 changes: 5 additions & 5 deletions async_substrate_interface/utils/decoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _decode_scale_list_with_runtime(


def decode_query_map(
result_group_changes,
result_group_changes: list,
prefix,
runtime: "Runtime",
param_types,
Expand Down Expand Up @@ -123,10 +123,10 @@ def concat_hash_len(key_hasher: str) -> int:
decoded_keys = all_decoded[:middl_index]
decoded_values = all_decoded[middl_index:]
for kts, vts, dk, dv in zip(
pre_decoded_key_types,
pre_decoded_value_types,
decoded_keys,
decoded_values,
pre_decoded_key_types,
pre_decoded_value_types,
decoded_keys,
decoded_values,
):
try:
# strip key_hashers to use as item key
Expand Down
44 changes: 44 additions & 0 deletions tests/integration_tests/test_async_substrate_interface.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import time

import pytest
from scalecodec import ss58_encode

Expand Down Expand Up @@ -71,3 +73,45 @@ async def test_ss58_conversion():
if len(value.value) > 0:
for decoded_key in value.value:
assert isinstance(decoded_key, str)


@pytest.mark.asyncio
async def test_fully_exhaust_query_map():
async with AsyncSubstrateInterface(LATENT_LITE_ENTRYPOINT) as substrate:
block_hash = await substrate.get_chain_finalised_head()
non_fully_exhauster_start = time.time()
non_fully_exhausted_qm = await substrate.query_map(
"SubtensorModule",
"CRV3WeightCommits",
block_hash=block_hash,
)
initial_records_count = len(non_fully_exhausted_qm.records)
assert initial_records_count <= 100 # default page size
exhausted_records_count = 0
async for _ in non_fully_exhausted_qm:
exhausted_records_count += 1
non_fully_exhausted_time = time.time() - non_fully_exhauster_start

assert len(non_fully_exhausted_qm.records) >= initial_records_count
fully_exhausted_start = time.time()
fully_exhausted_qm = await substrate.query_map(
"SubtensorModule",
"CRV3WeightCommits",
block_hash=block_hash,
fully_exhaust=True,
)

fully_exhausted_time = time.time() - fully_exhausted_start
initial_records_count_fully_exhaust = len(fully_exhausted_qm.records)
assert fully_exhausted_time <= non_fully_exhausted_time, (
f"Fully exhausted took longer than non-fully exhausted with "
f"{len(non_fully_exhausted_qm.records)} records in non-fully exhausted "
f"in {non_fully_exhausted_time} seconds, and {initial_records_count_fully_exhaust} in fully exhausted"
f" in {fully_exhausted_time} seconds. This could be caused by the fact that on this specific block, "
f"there are fewer records than take up a single page. This difference should still be small."
)
fully_exhausted_records_count = 0
async for _ in fully_exhausted_qm:
fully_exhausted_records_count += 1
assert fully_exhausted_records_count == initial_records_count_fully_exhaust
assert initial_records_count_fully_exhaust == exhausted_records_count