Skip to content

compute: factor out PeekResultIterator #32514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 59 additions & 155 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::cell::RefCell;
use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet};
use std::num::NonZeroUsize;
use std::ops::DerefMut;
use std::rc::Rc;
use std::sync::{Arc, mpsc};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -74,6 +73,8 @@ use crate::metrics::{CollectionMetrics, WorkerMetrics};
use crate::render::{LinearJoinSpec, StartSignal};
use crate::server::{ComputeInstanceContext, ResponseSender};

mod peek_result_iterator;

/// Worker-local state that is maintained across dataflows.
///
/// This state is restricted to the COMPUTE state, the deterministic, idempotent work
Expand Down Expand Up @@ -1390,12 +1391,12 @@ impl IndexPeek {
cursor.step_key(&storage);
}

Self::collect_ok_finished_data(&mut self.peek, self.trace_bundle.oks_mut(), max_result_size)
Self::collect_ok_finished_data(&self.peek, self.trace_bundle.oks_mut(), max_result_size)
}

/// Collects data for a known-complete peek from the ok stream.
fn collect_ok_finished_data<Tr>(
peek: &mut Peek<Timestamp>,
peek: &Peek<Timestamp>,
oks_handle: &mut Tr,
max_result_size: u64,
) -> Result<Vec<(Row, NonZeroUsize)>, String>
Expand All @@ -1408,8 +1409,14 @@ impl IndexPeek {
let max_result_size = usize::cast_from(max_result_size);
let count_byte_size = std::mem::size_of::<NonZeroUsize>();

// Cursor and bound lifetime for `Row` data in the backing trace.
let (mut cursor, storage) = oks_handle.cursor();
let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
peek.target.id().clone(),
peek.map_filter_project.clone(),
peek.timestamp,
peek.literal_constraints.clone(),
oks_handle,
);

// Accumulated `Vec<(row, count)>` results that we are likely to return.
let mut results = Vec::new();
let mut total_size: usize = 0;
Expand All @@ -1424,164 +1431,61 @@ impl IndexPeek {
.limit
.map(|l| usize::cast_from(u64::from(l)) + peek.finishing.offset);

use mz_ore::result::ResultExt;

let mut row_builder = Row::default();
let mut datum_vec = DatumVec::new();
let mut l_datum_vec = DatumVec::new();
let mut r_datum_vec = DatumVec::new();

// We have to sort the literal constraints because cursor.seek_key can seek only forward.
peek.literal_constraints
.iter_mut()
.for_each(|vec| vec.sort());
let has_literal_constraints = peek.literal_constraints.is_some();
let mut literals = peek.literal_constraints.iter().flatten();
let mut current_literal = None;
while let Some(row) = peek_iterator.next() {
let (row, copies) = row?;
let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");

while cursor.key_valid(&storage) {
if has_literal_constraints {
loop {
// Go to the next literal constraint.
// (i.e., to the next OR argument in something like `c=3 OR c=7 OR c=9`)
current_literal = literals.next();
match current_literal {
None => return Ok(results),
Some(current_literal) => {
// NOTE(vmarcos): We expect the extra allocations below to be manageable
// since we only perform as many of them as there are literals.
cursor.seek_key(&storage, IntoOwned::borrow_as(current_literal));
if !cursor.key_valid(&storage) {
return Ok(results);
}
if cursor.get_key(&storage).unwrap()
== IntoOwned::borrow_as(current_literal)
{
// The cursor found a record whose key matches the current literal.
// We break from the inner loop, and process this key.
break;
}
// The cursor landed on a record that has a different key, meaning that there is
// no record whose key would match the current literal.
}
}
}
total_size = total_size
.saturating_add(row.byte_len())
.saturating_add(count_byte_size);
if total_size > max_result_size {
return Err(format!(
"result exceeds max size of {}",
ByteSize::b(u64::cast_from(max_result_size))
));
}

while cursor.val_valid(&storage) {
// TODO: This arena could be maintained and reused for longer,
// but it wasn't clear at what interval we should flush
// it to ensure we don't accidentally spike our memory use.
// This choice is conservative, and not the end of the world
// from a performance perspective.
let arena = RowArena::new();

let key_item = cursor.key(&storage);
let key = key_item.to_datum_iter();
let row_item = cursor.val(&storage);
let row = row_item.to_datum_iter();

let mut borrow = datum_vec.borrow();
borrow.extend(key);
borrow.extend(row);

if has_literal_constraints {
// The peek was created from an IndexedFilter join. We have to add those columns
// here that the join would add in a dataflow.
let datum_vec = borrow.deref_mut();
// unwrap is ok, because it could be None only if !has_literal_constraints or if
// the iteration is finished. In the latter case we already exited the while
// loop.
datum_vec.extend(current_literal.unwrap().iter());
}
if let Some(result) = peek
.map_filter_project
.evaluate_into(&mut borrow, &arena, &mut row_builder)
.map(|row| row.cloned())
.map_err_to_string_with_causes()?
{
let mut copies = Diff::ZERO;
cursor.map_times(&storage, |time, diff| {
if time.less_equal(&peek.timestamp) {
copies += diff;
}
});
let copies: usize = if copies.is_negative() {
let row = &*borrow;
tracing::error!(
target = %peek.target.id(), diff = %copies, ?row,
"index peek encountered negative multiplicities in ok trace",
);
return Err(format!(
"Invalid data in source, \
saw retractions ({}) for row that does not exist: {:?}",
-copies, row,
));
results.push((row, copies));

// If we hold many more than `max_results` records, we can thin down
// `results` using `self.finishing.ordering`.
if let Some(max_results) = max_results {
// We use a threshold twice what we intend, to amortize the work
// across all of the insertions. We could tighten this, but it
// works for the moment.
if results.len() >= 2 * max_results {
if peek.finishing.order_by.is_empty() {
results.truncate(max_results);
return Ok(results);
} else {
copies.into_inner().try_into().unwrap()
};
// if copies > 0 ... otherwise skip
if let Some(copies) = NonZeroUsize::new(copies) {
total_size = total_size
.saturating_add(result.byte_len())
.saturating_add(count_byte_size);
if total_size > max_result_size {
return Err(format!(
"result exceeds max size of {}",
ByteSize::b(u64::cast_from(max_result_size))
));
}
results.push((result, copies));
}

// If we hold many more than `max_results` records, we can thin down
// `results` using `self.finishing.ordering`.
if let Some(max_results) = max_results {
// We use a threshold twice what we intend, to amortize the work
// across all of the insertions. We could tighten this, but it
// works for the moment.
if results.len() >= 2 * max_results {
if peek.finishing.order_by.is_empty() {
results.truncate(max_results);
return Ok(results);
} else {
// We can sort `results` and then truncate to `max_results`.
// This has an effect similar to a priority queue, without
// its interactive dequeueing properties.
// TODO: Had we left these as `Vec<Datum>` we would avoid
// the unpacking; we should consider doing that, although
// it will require a re-pivot of the code to branch on this
// inner test (as we prefer not to maintain `Vec<Datum>`
// in the other case).
results.sort_by(|left, right| {
let left_datums = l_datum_vec.borrow_with(&left.0);
let right_datums = r_datum_vec.borrow_with(&right.0);
mz_expr::compare_columns(
&peek.finishing.order_by,
&left_datums,
&right_datums,
|| left.0.cmp(&right.0),
)
});
let dropped = results.drain(max_results..);
let dropped_size =
dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
acc.saturating_add(
row.byte_len().saturating_add(count_byte_size),
)
});
total_size = total_size.saturating_sub(dropped_size);
}
}
// We can sort `results` and then truncate to `max_results`.
// This has an effect similar to a priority queue, without
// its interactive dequeueing properties.
// TODO: Had we left these as `Vec<Datum>` we would avoid
// the unpacking; we should consider doing that, although
// it will require a re-pivot of the code to branch on this
// inner test (as we prefer not to maintain `Vec<Datum>`
// in the other case).
results.sort_by(|left, right| {
let left_datums = l_datum_vec.borrow_with(&left.0);
let right_datums = r_datum_vec.borrow_with(&right.0);
mz_expr::compare_columns(
&peek.finishing.order_by,
&left_datums,
&right_datums,
|| left.0.cmp(&right.0),
)
});
let dropped = results.drain(max_results..);
let dropped_size =
dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
});
total_size = total_size.saturating_sub(dropped_size);
}
}
cursor.step_val(&storage);
}
// The cursor doesn't have anything more to say for the current key.

if !has_literal_constraints {
// We are simply stepping through all the keys that the index has.
cursor.step_key(&storage);
}
}

Expand Down
Loading