Skip to content

Commit 570a68c

Browse files
committed
compute: factor out PeekResultIterator, refactor peek fulfillment
The original motivation for this is so that the code that extracts peek results can be re-used in MaterializeInc/database-issues#9180, where we want to use a different transport for sending back peek responses but still need to read them out of arrangements the same way. The nice side effect is that we separate extracting the result from the logic that accumulates it in a response for sending it back. Which leads to clearer separation. Work towards MaterializeInc/database-issues#9180
1 parent 66ce3f4 commit 570a68c

File tree

2 files changed

+413
-169
lines changed

2 files changed

+413
-169
lines changed

src/compute/src/compute_state.rs

Lines changed: 130 additions & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use std::cell::RefCell;
1010
use std::cmp::Ordering;
1111
use std::collections::{BTreeMap, BTreeSet};
1212
use std::num::NonZeroUsize;
13-
use std::ops::DerefMut;
1413
use std::rc::Rc;
1514
use std::sync::{Arc, mpsc};
1615
use std::time::{Duration, Instant};
@@ -74,6 +73,8 @@ use crate::metrics::{CollectionMetrics, WorkerMetrics};
7473
use crate::render::{LinearJoinSpec, StartSignal};
7574
use crate::server::{ComputeInstanceContext, ResponseSender};
7675

76+
mod peek_result_iterator;
77+
7778
/// Worker-local state that is maintained across dataflows.
7879
///
7980
/// This state is restricted to the COMPUTE state, the deterministic, idempotent work
@@ -868,8 +869,20 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
868869
/// Either complete the peek (and send the response) or put it in the pending set.
869870
fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
870871
let response = match &mut peek {
871-
PendingPeek::Index(peek) => {
872-
peek.seek_fulfillment(upper, self.compute_state.max_result_size)
872+
PendingPeek::Index(peek) => 'response: {
873+
let is_ready = peek.is_ready(upper);
874+
875+
match is_ready {
876+
Ok(false) => break 'response None,
877+
Err(err) => break 'response Some(err),
878+
Ok(true) => (), // Falling through...,
879+
}
880+
881+
if let Some(err) = peek.extract_errs(upper) {
882+
break 'response Some(err);
883+
}
884+
885+
Some(peek.read_result(upper, self.compute_state.max_result_size))
873886
}
874887
PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
875888
self.compute_state
@@ -1310,7 +1323,7 @@ pub struct IndexPeek {
13101323
}
13111324

13121325
impl IndexPeek {
1313-
/// Attempts to fulfill the peek and reports success.
1326+
/// Reports whether the data this peek requires is ready.
13141327
///
13151328
/// To produce output at `peek.timestamp`, we must be certain that
13161329
/// it is no longer changing. A trace guarantees that all future
@@ -1322,18 +1335,14 @@ impl IndexPeek {
13221335
/// then for any time `t` less or equal to `peek.timestamp` it is
13231336
/// not the case that `upper` is less or equal to that timestamp,
13241337
/// and so the result cannot further evolve.
1325-
fn seek_fulfillment(
1326-
&mut self,
1327-
upper: &mut Antichain<Timestamp>,
1328-
max_result_size: u64,
1329-
) -> Option<PeekResponse> {
1338+
fn is_ready(&mut self, upper: &mut Antichain<Timestamp>) -> Result<bool, PeekResponse> {
13301339
self.trace_bundle.oks_mut().read_upper(upper);
13311340
if upper.less_equal(&self.peek.timestamp) {
1332-
return None;
1341+
return Ok(false);
13331342
}
13341343
self.trace_bundle.errs_mut().read_upper(upper);
13351344
if upper.less_equal(&self.peek.timestamp) {
1336-
return None;
1345+
return Ok(false);
13371346
}
13381347

13391348
let read_frontier = self.trace_bundle.compaction_frontier();
@@ -1343,21 +1352,55 @@ impl IndexPeek {
13431352
read_frontier.elements(),
13441353
self.peek.timestamp,
13451354
);
1346-
return Some(PeekResponse::Error(error));
1355+
return Err(PeekResponse::Error(error));
13471356
}
13481357

1349-
let response = match self.collect_finished_data(max_result_size) {
1358+
Ok(true)
1359+
}
1360+
1361+
/// Reads the results for a ready peek and returns the [PeekResponse]. Must
1362+
/// only be called when the results for the peek are ready.
1363+
fn read_result(
1364+
&mut self,
1365+
upper: &mut Antichain<Timestamp>,
1366+
max_result_size: u64,
1367+
) -> PeekResponse {
1368+
self.trace_bundle.oks_mut().read_upper(upper);
1369+
assert!(!upper.less_equal(&self.peek.timestamp));
1370+
1371+
self.trace_bundle.errs_mut().read_upper(upper);
1372+
assert!(!upper.less_equal(&self.peek.timestamp));
1373+
1374+
let read_frontier = self.trace_bundle.compaction_frontier();
1375+
assert!(read_frontier.less_equal(&self.peek.timestamp));
1376+
1377+
let result = Self::collect_ok_finished_data(
1378+
&mut self.peek,
1379+
self.trace_bundle.oks_mut(),
1380+
max_result_size,
1381+
);
1382+
1383+
let response = match result {
13501384
Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &self.peek.finishing.order_by)),
13511385
Err(text) => PeekResponse::Error(text),
13521386
};
1353-
Some(response)
1387+
response
13541388
}
13551389

1356-
/// Collects data for a known-complete peek from the ok stream.
1357-
fn collect_finished_data(
1358-
&mut self,
1359-
max_result_size: u64,
1360-
) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1390+
/// Returns errors from this peeks result, if any. Must only be called when
1391+
/// this peek is ready.
1392+
///
1393+
/// Errors are returned as a `PeekResponse::Error`.
1394+
fn extract_errs(&mut self, upper: &mut Antichain<Timestamp>) -> Option<PeekResponse> {
1395+
self.trace_bundle.oks_mut().read_upper(upper);
1396+
assert!(!upper.less_equal(&self.peek.timestamp));
1397+
1398+
self.trace_bundle.errs_mut().read_upper(upper);
1399+
assert!(!upper.less_equal(&self.peek.timestamp));
1400+
1401+
let read_frontier = self.trace_bundle.compaction_frontier();
1402+
assert!(read_frontier.less_equal(&self.peek.timestamp));
1403+
13611404
// Check if there exist any errors and, if so, return whatever one we
13621405
// find first.
13631406
let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
@@ -1369,19 +1412,19 @@ impl IndexPeek {
13691412
}
13701413
});
13711414
if copies.is_negative() {
1372-
return Err(format!(
1415+
return Some(PeekResponse::Error(format!(
13731416
"Invalid data in source errors, saw retractions ({}) for row that does not exist: {}",
13741417
-copies,
13751418
cursor.key(&storage),
1376-
));
1419+
)));
13771420
}
13781421
if copies.is_positive() {
1379-
return Err(cursor.key(&storage).to_string());
1422+
return Some(PeekResponse::Error(cursor.key(&storage).to_string()));
13801423
}
13811424
cursor.step_key(&storage);
13821425
}
13831426

1384-
Self::collect_ok_finished_data(&mut self.peek, self.trace_bundle.oks_mut(), max_result_size)
1427+
None
13851428
}
13861429

13871430
/// Collects data for a known-complete peek from the ok stream.
@@ -1399,8 +1442,24 @@ impl IndexPeek {
13991442
let max_result_size = usize::cast_from(max_result_size);
14001443
let count_byte_size = std::mem::size_of::<NonZeroUsize>();
14011444

1402-
// Cursor and bound lifetime for `Row` data in the backing trace.
1403-
let (mut cursor, storage) = oks_handle.cursor();
1445+
// We have to sort the literal constraints because cursor.seek_key can
1446+
// seek only forward.
1447+
peek.literal_constraints
1448+
.iter_mut()
1449+
.for_each(|vec| vec.sort());
1450+
let has_literal_constraints = peek.literal_constraints.is_some();
1451+
let literals = Option::take(&mut peek.literal_constraints)
1452+
.into_iter()
1453+
.flatten();
1454+
1455+
let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1456+
peek.map_filter_project.clone(),
1457+
peek.timestamp,
1458+
has_literal_constraints,
1459+
literals,
1460+
oks_handle,
1461+
);
1462+
14041463
// Accumulated `Vec<(row, count)>` results that we are likely to return.
14051464
let mut results = Vec::new();
14061465
let mut total_size: usize = 0;
@@ -1415,158 +1474,60 @@ impl IndexPeek {
14151474
.limit
14161475
.map(|l| usize::cast_from(u64::from(l)) + peek.finishing.offset);
14171476

1418-
use mz_ore::result::ResultExt;
1419-
1420-
let mut row_builder = Row::default();
1421-
let mut datum_vec = DatumVec::new();
14221477
let mut l_datum_vec = DatumVec::new();
14231478
let mut r_datum_vec = DatumVec::new();
14241479

1425-
// We have to sort the literal constraints because cursor.seek_key can seek only forward.
1426-
peek.literal_constraints
1427-
.iter_mut()
1428-
.for_each(|vec| vec.sort());
1429-
let has_literal_constraints = peek.literal_constraints.is_some();
1430-
let mut literals = peek.literal_constraints.iter().flatten();
1431-
let mut current_literal = None;
1480+
while let Some(row) = peek_iterator.next() {
1481+
let (row, copies) = row?;
14321482

1433-
while cursor.key_valid(&storage) {
1434-
if has_literal_constraints {
1435-
loop {
1436-
// Go to the next literal constraint.
1437-
// (i.e., to the next OR argument in something like `c=3 OR c=7 OR c=9`)
1438-
current_literal = literals.next();
1439-
match current_literal {
1440-
None => return Ok(results),
1441-
Some(current_literal) => {
1442-
// NOTE(vmarcos): We expect the extra allocations below to be manageable
1443-
// since we only perform as many of them as there are literals.
1444-
cursor.seek_key(&storage, IntoOwned::borrow_as(current_literal));
1445-
if !cursor.key_valid(&storage) {
1446-
return Ok(results);
1447-
}
1448-
if cursor.get_key(&storage).unwrap()
1449-
== IntoOwned::borrow_as(current_literal)
1450-
{
1451-
// The cursor found a record whose key matches the current literal.
1452-
// We break from the inner loop, and process this key.
1453-
break;
1454-
}
1455-
// The cursor landed on a record that has a different key, meaning that there is
1456-
// no record whose key would match the current literal.
1457-
}
1458-
}
1459-
}
1483+
total_size = total_size
1484+
.saturating_add(row.byte_len())
1485+
.saturating_add(count_byte_size);
1486+
if total_size > max_result_size {
1487+
return Err(format!(
1488+
"result exceeds max size of {}",
1489+
ByteSize::b(u64::cast_from(max_result_size))
1490+
));
14601491
}
1461-
1462-
while cursor.val_valid(&storage) {
1463-
// TODO: This arena could be maintained and reused for longer,
1464-
// but it wasn't clear at what interval we should flush
1465-
// it to ensure we don't accidentally spike our memory use.
1466-
// This choice is conservative, and not the end of the world
1467-
// from a performance perspective.
1468-
let arena = RowArena::new();
1469-
1470-
let key_item = cursor.key(&storage);
1471-
let key = key_item.to_datum_iter();
1472-
let row_item = cursor.val(&storage);
1473-
let row = row_item.to_datum_iter();
1474-
1475-
let mut borrow = datum_vec.borrow();
1476-
borrow.extend(key);
1477-
borrow.extend(row);
1478-
1479-
if has_literal_constraints {
1480-
// The peek was created from an IndexedFilter join. We have to add those columns
1481-
// here that the join would add in a dataflow.
1482-
let datum_vec = borrow.deref_mut();
1483-
// unwrap is ok, because it could be None only if !has_literal_constraints or if
1484-
// the iteration is finished. In the latter case we already exited the while
1485-
// loop.
1486-
datum_vec.extend(current_literal.unwrap().iter());
1487-
}
1488-
if let Some(result) = peek
1489-
.map_filter_project
1490-
.evaluate_into(&mut borrow, &arena, &mut row_builder)
1491-
.map(|row| row.cloned())
1492-
.map_err_to_string_with_causes()?
1493-
{
1494-
let mut copies = Diff::ZERO;
1495-
cursor.map_times(&storage, |time, diff| {
1496-
if time.less_equal(&peek.timestamp) {
1497-
copies += diff;
1498-
}
1499-
});
1500-
let copies: usize = if copies.is_negative() {
1501-
return Err(format!(
1502-
"Invalid data in source, saw retractions ({}) for row that does not exist: {:?}",
1503-
-copies, &*borrow,
1504-
));
1492+
results.push((row, copies));
1493+
1494+
// If we hold many more than `max_results` records, we can thin down
1495+
// `results` using `self.finishing.ordering`.
1496+
if let Some(max_results) = max_results {
1497+
// We use a threshold twice what we intend, to amortize the work
1498+
// across all of the insertions. We could tighten this, but it
1499+
// works for the moment.
1500+
if results.len() >= 2 * max_results {
1501+
if peek.finishing.order_by.is_empty() {
1502+
results.truncate(max_results);
1503+
return Ok(results);
15051504
} else {
1506-
copies.into_inner().try_into().unwrap()
1507-
};
1508-
// if copies > 0 ... otherwise skip
1509-
if let Some(copies) = NonZeroUsize::new(copies) {
1510-
total_size = total_size
1511-
.saturating_add(result.byte_len())
1512-
.saturating_add(count_byte_size);
1513-
if total_size > max_result_size {
1514-
return Err(format!(
1515-
"result exceeds max size of {}",
1516-
ByteSize::b(u64::cast_from(max_result_size))
1517-
));
1518-
}
1519-
results.push((result, copies));
1520-
}
1521-
1522-
// If we hold many more than `max_results` records, we can thin down
1523-
// `results` using `self.finishing.ordering`.
1524-
if let Some(max_results) = max_results {
1525-
// We use a threshold twice what we intend, to amortize the work
1526-
// across all of the insertions. We could tighten this, but it
1527-
// works for the moment.
1528-
if results.len() >= 2 * max_results {
1529-
if peek.finishing.order_by.is_empty() {
1530-
results.truncate(max_results);
1531-
return Ok(results);
1532-
} else {
1533-
// We can sort `results` and then truncate to `max_results`.
1534-
// This has an effect similar to a priority queue, without
1535-
// its interactive dequeueing properties.
1536-
// TODO: Had we left these as `Vec<Datum>` we would avoid
1537-
// the unpacking; we should consider doing that, although
1538-
// it will require a re-pivot of the code to branch on this
1539-
// inner test (as we prefer not to maintain `Vec<Datum>`
1540-
// in the other case).
1541-
results.sort_by(|left, right| {
1542-
let left_datums = l_datum_vec.borrow_with(&left.0);
1543-
let right_datums = r_datum_vec.borrow_with(&right.0);
1544-
mz_expr::compare_columns(
1545-
&peek.finishing.order_by,
1546-
&left_datums,
1547-
&right_datums,
1548-
|| left.0.cmp(&right.0),
1549-
)
1550-
});
1551-
let dropped = results.drain(max_results..);
1552-
let dropped_size =
1553-
dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1554-
acc.saturating_add(
1555-
row.byte_len().saturating_add(count_byte_size),
1556-
)
1557-
});
1558-
total_size = total_size.saturating_sub(dropped_size);
1559-
}
1560-
}
1505+
// We can sort `results` and then truncate to `max_results`.
1506+
// This has an effect similar to a priority queue, without
1507+
// its interactive dequeueing properties.
1508+
// TODO: Had we left these as `Vec<Datum>` we would avoid
1509+
// the unpacking; we should consider doing that, although
1510+
// it will require a re-pivot of the code to branch on this
1511+
// inner test (as we prefer not to maintain `Vec<Datum>`
1512+
// in the other case).
1513+
results.sort_by(|left, right| {
1514+
let left_datums = l_datum_vec.borrow_with(&left.0);
1515+
let right_datums = r_datum_vec.borrow_with(&right.0);
1516+
mz_expr::compare_columns(
1517+
&peek.finishing.order_by,
1518+
&left_datums,
1519+
&right_datums,
1520+
|| left.0.cmp(&right.0),
1521+
)
1522+
});
1523+
let dropped = results.drain(max_results..);
1524+
let dropped_size =
1525+
dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1526+
acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1527+
});
1528+
total_size = total_size.saturating_sub(dropped_size);
15611529
}
15621530
}
1563-
cursor.step_val(&storage);
1564-
}
1565-
// The cursor doesn't have anything more to say for the current key.
1566-
1567-
if !has_literal_constraints {
1568-
// We are simply stepping through all the keys that the index has.
1569-
cursor.step_key(&storage);
15701531
}
15711532
}
15721533

0 commit comments

Comments
 (0)