-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Implement asynchronous iterators for reading data in the storage. #4975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
linera-views/src/store.rs
Outdated
| /// Returns `Ok(None)` when iteration is complete. | ||
| /// Returns `Ok(Some(None))` when a key doesn't exist. | ||
| /// Returns `Ok(Some(Some(value)))` when a key exists with a value. | ||
| async fn next(&mut self) -> Result<Option<Option<Vec<u8>>>, E>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use Stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I.e. should ReadMultiIterator<E: KeyValueStoreError> just have no methods, and instead have impls and bounds so that it is equivalent to Stream<Item = Result<Option<Vec<u8>>, E> + MaybeSend where MaybeSend is Send iff not(web)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you write -> impl Stream<…> then trait_variant::make will do the MaybeSend part for you.
linera-views/src/store.rs
Outdated
| /// Returns `Ok(None)` when iteration is complete. | ||
| /// Returns `Ok(Some(None))` when a key doesn't exist. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not the reverse? i.e. when key doesn't exist:
- return
Ok(None)when key doesn't exist. This is clearer b/c key doesn't exist form the first element. whileOk(Some(None))could be returned for the subsequent calls to the iterator. Ok(Some(None))when there are no more elements in the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No.
The function fn next returns a Option<Item> with Item being what is being produced. Here what is being produced is Option<Vec<u8>>.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if so, then why this signature isn't Option<Item> ? Where Item is a type member to be defined by the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here Item = Option<Vec<u8>> which gets you Option<Option<Vec<u8>>>.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand. I mean that from Option<Option<Vec<u8>> you can't tell why you chose the semantics described in the comment wheras with Option<Item> it is.
Actually, even with Option<Item> and type Item = Option<Vec<u8>> it still makes more sense for None to mean the key doesn't exist, rather than Some(None) that you're proposing.
| if result { | ||
| return true; | ||
| } | ||
| if self.has_exclusive_access { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why for self.has_exclusive_access == false we claim that key is not present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If !self.has_exclusive_access then that array is empty so there is no point in checking in it.
| let mut values_from_iter = Vec::new(); | ||
| loop { | ||
| match iter.next().await { | ||
| None => break, | ||
| Some(Ok(value)) => values_from_iter.push(value), | ||
| Some(Err(e)) => panic!("Error reading from iterator: {:?}", e), | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe more concise:
| let mut values_from_iter = Vec::new(); | |
| loop { | |
| match iter.next().await { | |
| None => break, | |
| Some(Ok(value)) => values_from_iter.push(value), | |
| Some(Err(e)) => panic!("Error reading from iterator: {:?}", e), | |
| } | |
| } | |
| let values_from_iter = iter | |
| .collect::<Result<Vec<_>>>() | |
| .await | |
| .expect("reading from iterator")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can replace by
let values_from_iter = store
.read_multi_values_bytes_iter(keys)
.try_collect::<Vec<_>>()
.await
.expect("Error reading from iterator");|
|
||
| fn read_multi_values_bytes_iter( | ||
| &self, | ||
| keys: Vec<Vec<u8>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would make sense (maybe not in this PR) to also take a stream as an input. (Or just a regular iterator? Needs discussion…) That way, e.g. we wouldn't even put together the big_key for the 10000th element if the caller cancels the stream after the 5000th.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We potentially could have those everywhere. But I want to have a limited change here.
| let value_result = first_segments_stream.next().await | ||
| .expect("keys and first_segments should have same length"); | ||
|
|
||
| let value = match value_result { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use try_stream! instead of stream!. Then this line could probably be let value = value_result?;, or let value = value_result.map_err(ValueSplittingError::InnerStoreError)? or something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can do that.
But no, we do not need the try_stream.
| .map(|chunk| chunk.to_vec()) | ||
| .collect::<Vec<_>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| .map(|chunk| chunk.to_vec()) | |
| .collect::<Vec<_>>(); | |
| .map(|chunk| chunk.to_vec()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to collect the entries in order to be able to iterate on them. Removing the collect makes the code not compile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's just a borrow checker issue. This compiles, because it makes the stream own keys, and it avoids the allocation:
- let batches = keys
- .chunks(MAX_MULTI_KEYS)
- .map(|chunk| chunk.to_vec())
- .collect::<Vec<_>>();
let store = self.clone();
async_stream::stream! {
- for batch_keys in batches {
+ for chunk in keys.chunks(MAX_MULTI_KEYS) {
let vals = {
+ let batch_keys = chunk.to_vec();| let end_position = std::cmp::min(current_position + BATCH_SIZE, keys.len()); | ||
|
|
||
| // Extract the next batch of keys | ||
| let batch_keys = keys[current_position..end_position].to_vec(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use chunks here, too, like in the ScyllaDB case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can use the terminolology chunk systematically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant the chunks method, so that you don't have to keep track of the positions manually here. (Or maybe there's a reason for it?)
| value | ||
| } else { | ||
| // The key has been evicted. Should be rare. | ||
| store.read_value_bytes(key).await? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Ah, that works? Then I'm not sure I understand the difference between stream and try_stream. Maybe just that in the latter case you write yield a instead of yield Ok(a)?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that works. I do not need try_stream.
Motivation
Accessing data at once can be suboptimal when reading data.
The solution to that is to implement asynchronous iterators.
Fixes #4831
Fixes #2742
Proposal
The implementation takes the keys by value (that is
Vec<Vec<u8>>).We cannot use references because when we create specific keys in
lru-caching and value-splitting, we would need to have a self-reference
which is not allowed.
The implementation by container is then as follows:
The implementation in the views is done for two cases where we have a use case in the code (the two issues in question):
MapView.try_load_all_entries.For those two use cases, the loop can end prematurely, justifying the use of iterators.
It is unsure whether we can really reduce memory usage
Test Plan
The CI.
Release Plan
Those changes could be backported to TestNet Conway.
Links
None.