-
Notifications
You must be signed in to change notification settings - Fork 471
compute: factor out PeekResultIterator #32514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
compute: factor out PeekResultIterator #32514
Conversation
570a68c
to
c3a400a
Compare
e75dd05
to
214ebd8
Compare
src/compute/src/compute_state.rs
Outdated
PendingPeek::Index(peek) => 'response: { | ||
let is_ready = peek.is_ready(upper); | ||
|
||
match is_ready { | ||
Ok(false) => break 'response None, | ||
Err(err) => break 'response Some(err), | ||
Ok(true) => (), // Falling through..., | ||
} | ||
|
||
if let Some(err) = peek.extract_errs(upper) { | ||
break 'response Some(err); | ||
} | ||
|
||
Some(peek.read_result(upper, self.compute_state.max_result_size)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like early returns usually, but in this case the nested-if version ends up more readable, imo:
PendingPeek::Index(peek) => match peek.is_ready(upper) {
Ok(true) => {
let resp = peek.extract_errs(upper).unwrap_or_else(|| {
peek.read_result(upper, self.compute_state.max_result_size)
});
Some(resp)
}
Ok(false) => None,
Err(err) => Some(err),
},
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, this was from an outdated branch, I updated and backed out all of the extra refactorings, it's not only about the peek iterator
peek_timestamp: mz_repr::Timestamp, | ||
has_literal_constraints: bool, | ||
literals: L, | ||
oks_handle: &mut Tr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It's a bit confusing to have an oks_handle
if we don't also have an errs_handle
. We could call it trace
/trace_reader
/trace_handle
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change to trace_reader
literals: L, | ||
oks_handle: &mut Tr, | ||
) -> Self { | ||
let (cursor, storage): (<Tr as TraceReader>::Cursor, <Tr as TraceReader>::Storage) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These type annotations are not needed, are they?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no, these where from some work in progress where types didn't line up
tracing::trace!( | ||
?self.literals_exhausted, | ||
key_valid = self.cursor.key_valid(&self.storage), | ||
val_valid = self.cursor.val_valid(&self.storage), "next"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How useful are these traces? I think they will mostly print a lot of true
/false
that are hard to interpret? Should they also print the actual keys and values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading the key/val wouldn't always succeed because it fails when they're not valid. So I should rather remove all these trace logs, yeah?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If they are not useful I'd remove them, yes. There is some small readability/maintainability cost associated with them that we can avoid.
if self.cursor.val_valid(&self.storage) { | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is surprising I think. The doc string says that this method will step the key forward, but it only does so if the current value is not valid, which seems like an important detail!
A thing that would make sense to check here is key_valid
. Maybe you meant to do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intended behavior was actually maybe_step_key
, and it would only step when the val is not valid. Which is what the code did. In practice the method is only called when the val is not valid, so I changed that check into an assert and left the docstring as is.
if self.cursor.val_valid(&self.storage) { | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this break
to be invoked we would need to have a key with zero values. Is this possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you meant it the other way round, right? That you expect val_valid
to always be true
so we always break?
I think it can happen that we go around the loop multiple times but I don't remember why. I pushed a commit that panics when that happens, so let's see what ci has to say.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry, yeah that's what I meant, I just got the condition flipped.
Pretty sure this will conflict with #32593, sorry 🙈 |
4640550
to
a74366e
Compare
@teskje thanks for the review! I hope I addressed all comments, could you please take a look again? 🙇♂️ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I had only gotten to the PeekResultIterator
implementation before. Not I've got everything. Looks good overall, some comments about tightening the invariants.
src/compute/src/compute_state.rs
Outdated
// We have to sort the literal constraints because cursor.seek_key can | ||
// seek only forward. | ||
peek.literal_constraints | ||
.iter_mut() | ||
.for_each(|vec| vec.sort()); | ||
let has_literal_constraints = peek.literal_constraints.is_some(); | ||
let literals = Option::take(&mut peek.literal_constraints) | ||
.into_iter() | ||
.flatten(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of moving the sorting into PeekResultIterator::new
? It's an invariant the PeekResultIterator
requires but doesn't document and cannot check.
Doing this would require changing the type of the literals
argument to something containing Vec
, not sure if that causes problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do! I'll change it to take a Option<Vec<>>
and tighten up that unspoken invariant. 👌
// if copies > 0 ... otherwise skip | ||
if let Some(copies) = NonZeroI64::new(copies) { | ||
Ok(Some((result, copies))) | ||
} else { | ||
Ok(None) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is somewhat misleading: We only test if copies != 0
in the line below. We already tested that copies
is not negative above, so it's technically not wrong, but... still confusing!
I would:
- Remove the comment, it doesn't add anything imo.
- Change the diff output type of the iterator to
NonZeroUsize
.
The second thing also safes an unwrap in the caller, which is nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code has NonZeroUsize, but I changed to this because the large SELECT changes that stash these updates in a persist batch need to write i64
, and the original data coming out of the trace is i64
, so it felt better to preserve the type here and to the cast/expect in the current in-memory peek. Otherwise I'd have to cast from NonZeroUsize
to i64
in the persist stash work, which felt worse.
I think a more correct solution here could be do change it to NonZeroI64
throughout the whole code paths of sending back the result, but I didn't want to go down that rabbit hole. What do you think now?
assert_eq!( | ||
false, | ||
self.cursor.val_valid(&self.storage), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert_eq!( | |
false, | |
self.cursor.val_valid(&self.storage), | |
assert!( | |
!self.cursor.val_valid(&self.storage), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it like this on purpose because I don't like the assert!(!
past, with the two exclamation marks, but happy to change to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine. Left some comments inline.
On a higher level, this code could use some modernization. We don't need the separate key/val_valid
APIs anymore, we can just use the _get
variants in all places---they return options, which avoids one of the bounds checks, and might make the code more obvious in what it's doing. Feel free to pick this up as part of this PR, but don't block on it.
{ | ||
// The cursor found a record whose key matches the current literal. | ||
// We return and calls to `next()` will start | ||
// returning it's vals. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// returning it's vals. | |
// returning its vals. |
if !self.cursor.key_valid(&self.storage) { | ||
return; | ||
} | ||
if self.cursor.get_key(&self.storage).unwrap() | ||
== IntoOwned::borrow_as(current_literal) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if !self.cursor.key_valid(&self.storage) { | |
return; | |
} | |
if self.cursor.get_key(&self.storage).unwrap() | |
== IntoOwned::borrow_as(current_literal) | |
if self.cursor.get_key(&self.storage).map_or(true, |key| key == IntoOwned::borrow_as(current_literal)) |
We can simplify this a bit.
// NOTE(vmarcos): We expect the extra allocations below to be manageable | ||
// since we only perform as many of them as there are literals. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// NOTE(vmarcos): We expect the extra allocations below to be manageable | |
// since we only perform as many of them as there are literals. |
if self.cursor.get_key(&self.storage).unwrap() | ||
== IntoOwned::borrow_as(current_literal) | ||
{ | ||
// The cursor found a record whose key matches the current literal. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// The cursor found a record whose key matches the current literal. | |
// The cursor found a record whose key matches the current literal, or we're exhausted. |
/// Extracts and returns the row currently pointed at by our cursor. Returns | ||
/// `Ok(None)` if our MapFilterProject evaluates to `None`. Also returns any | ||
/// errors that arise from evaluating the MapFilterProject. | ||
fn extract_current_row(&mut self) -> Result<Option<(Row, NonZeroI64)>, String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd try to pass Tr::Key
and Tr::Val
to this function, if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried but then stopped because this uses the cursor for more things, like map_times
. And I didn't want to spend too much time on changing this much beyond the original code.
The original motivation for this is so that the code that extracts peek results can be re-used in MaterializeInc/database-issues#9180, where we want to use a different transport for sending back peek responses but still need to read them out of arrangements the same way. The nice side effect is that we separate extracting the result from the logic that accumulates it in a response for sending it back. Which leads to clearer separation.
a74366e
to
7eb5470
Compare
The original motivation for this is so that the code that extracts peek results can be re-used in
https://github.com/MaterializeInc/database-issues/issues/9180, where we want to use a different transport for sending back peek responses but still need to read them out of arrangements the same way.
The nice side effect is that we separate extracting the result from the logic that accumulates it in a response for sending it back. Which leads to clearer separation.
Work towards https://github.com/MaterializeInc/database-issues/issues/9180