Skip to content

Commit 522a2a4

Browse files
xudong963alamb
andauthored
Optimize filter executor in pull-based executor (#4421)
* optimzie filter executor * Update datafusion/core/src/physical_plan/filter.rs Co-authored-by: Andrew Lamb <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent fdc83e8 commit 522a2a4

File tree

1 file changed

+25
-8
lines changed

1 file changed

+25
-8
lines changed

datafusion/core/src/physical_plan/filter.rs

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -238,15 +238,32 @@ impl Stream for FilterExecStream {
238238
mut self: Pin<&mut Self>,
239239
cx: &mut Context<'_>,
240240
) -> Poll<Option<Self::Item>> {
241-
let poll = self.input.poll_next_unpin(cx).map(|x| match x {
242-
Some(Ok(batch)) => {
243-
let timer = self.baseline_metrics.elapsed_compute().timer();
244-
let filtered_batch = batch_filter(&batch, &self.predicate);
245-
timer.done();
246-
Some(filtered_batch)
241+
let poll;
242+
loop {
243+
match self.input.poll_next_unpin(cx) {
244+
Poll::Ready(value) => match value {
245+
Some(Ok(batch)) => {
246+
let timer = self.baseline_metrics.elapsed_compute().timer();
247+
let filtered_batch = batch_filter(&batch, &self.predicate)?;
248+
// skip entirely filtered batches
249+
if filtered_batch.num_rows() == 0 {
250+
continue;
251+
}
252+
timer.done();
253+
poll = Poll::Ready(Some(Ok(filtered_batch)));
254+
break;
255+
}
256+
_ => {
257+
poll = Poll::Ready(value);
258+
break;
259+
}
260+
},
261+
Poll::Pending => {
262+
poll = Poll::Pending;
263+
break;
264+
}
247265
}
248-
other => other,
249-
});
266+
}
250267
self.baseline_metrics.record_poll(poll)
251268
}
252269

0 commit comments

Comments
 (0)