Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 33 additions & 30 deletions chia/data_layer/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,34 @@ async def insert_into_data_store_from_file(
nodes = latest_blob.get_nodes_with_indexes()
known_hashes = {node.hash: index for index, node in nodes}

merkle_blob_queries = await self.build_merkle_blob_queries_for_missing_hashes(
known_hashes, missing_hashes, root, store_id
)
merkle_blob_queries = defaultdict[bytes32, list[TreeIndex]](list)
new_missing_hashes: list[bytes32] = []
for hash in missing_hashes:
if hash in known_hashes:
assert root.node_hash is not None, "if root.node_hash were None then known_hashes would be empty"
merkle_blob_queries[root.node_hash].append(known_hashes[hash])
else:
new_missing_hashes.append(hash)

rewrite_delta_file = bool(new_missing_hashes)
if new_missing_hashes:
await self.extend_merkle_blob_queries_for_missing_hashes(
merkle_blob_queries, known_hashes, new_missing_hashes, root, store_id
)

more_internal_nodes, more_terminal_nodes = await self.process_merkle_blob_queries(merkle_blob_queries)
internal_nodes.update(more_internal_nodes)
terminal_nodes.update(more_terminal_nodes)

merkle_blob = MerkleBlob.from_node_list(internal_nodes, terminal_nodes, root_hash)
# Don't store these blob objects into cache, since their data structures are not calculated yet.
await self.insert_root_from_merkle_blob(merkle_blob, store_id, Status.COMMITTED, update_cache=False)
root = await self.insert_root_from_merkle_blob(merkle_blob, store_id, Status.COMMITTED, update_cache=False)

# Rewrite the delta to the format that doesn't miss hashes
if rewrite_delta_file:
assert root.node_hash is not None, "if root.node_hash were None then there should be no missing hashes"
with open(filename, "wb") as writer:
await self.write_tree_to_file(root, root.node_hash, store_id, True, writer)

async def process_merkle_blob_queries(
self,
Expand All @@ -232,36 +249,24 @@ async def process_merkle_blob_queries(

return internal_nodes, terminal_nodes

async def build_merkle_blob_queries_for_missing_hashes(
async def extend_merkle_blob_queries_for_missing_hashes(
self,
queries: defaultdict[bytes32, list[TreeIndex]],
known_hashes: Mapping[bytes32, TreeIndex],
missing_hashes: Sequence[bytes32],
root: Root,
store_id: bytes32,
) -> defaultdict[bytes32, list[TreeIndex]]:
queries = defaultdict[bytes32, list[TreeIndex]](list)

new_missing_hashes: list[bytes32] = []
for hash in missing_hashes:
if hash in known_hashes:
assert root.node_hash is not None, "if root.node_hash were None then known_hashes would be empty"
queries[root.node_hash].append(known_hashes[hash])
) -> None:
async with self.db_wrapper.reader() as reader:
cursor = await reader.execute(
"SELECT MAX(generation) FROM nodes WHERE store_id = ?",
(store_id,),
)
row = await cursor.fetchone()
if row is None or row[0] is None:
current_generation = 0
else:
new_missing_hashes.append(hash)

missing_hashes = new_missing_hashes

if missing_hashes:
async with self.db_wrapper.reader() as reader:
cursor = await reader.execute(
"SELECT MAX(generation) FROM nodes WHERE store_id = ?",
(store_id,),
)
row = await cursor.fetchone()
if row is None or row[0] is None:
current_generation = 0
else:
current_generation = row[0]
current_generation = row[0]

batch_size = min(500, SQLITE_MAX_VARIABLE_NUMBER - 10)

Expand Down Expand Up @@ -298,8 +303,6 @@ async def build_merkle_blob_queries_for_missing_hashes(
await self.add_node_hashes(store_id, current_generation)
log.info(f"Missing hashes: added old hashes from generation {current_generation}")

return queries

async def read_from_file(
self, filename: Path, store_id: bytes32
) -> tuple[dict[bytes32, tuple[bytes32, bytes32]], dict[bytes32, tuple[KeyId, ValueId]]]:
Expand Down
Loading