Skip to content

Commit 56399f7

Browse files
committed
fix some cases in streaming_aggregate_test but still some faileds...
1 parent 06e7342 commit 56399f7

File tree

1 file changed

+14
-12
lines changed

1 file changed

+14
-12
lines changed

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,7 @@ impl GroupedHashAggregateStream {
815815
} else {
816816
self.schema()
817817
};
818+
818819
if self.group_values.is_empty() {
819820
return Ok(VecDeque::from([RecordBatch::new_empty(schema)]));
820821
}
@@ -832,14 +833,14 @@ impl GroupedHashAggregateStream {
832833
let mut rows_count_before_cur_block = 0;
833834
for output_block in output.iter_mut() {
834835
let block_start = rows_count_before_cur_block;
835-
let block_end =
836-
rows_count_before_cur_block + output_block[0].len();
836+
let block_len= output_block[0].len();
837+
837838
output_block.reserve(states.len());
838839
for state in states.iter() {
839-
output_block.push(state.slice(block_start, block_end))
840+
output_block.push(state.slice(block_start, block_len))
840841
}
841842

842-
rows_count_before_cur_block = output_block[0].len();
843+
rows_count_before_cur_block += block_len;
843844
}
844845
}
845846
_ if spilling => {
@@ -849,14 +850,14 @@ impl GroupedHashAggregateStream {
849850
let mut rows_count_before_cur_block = 0;
850851
for output_block in output.iter_mut() {
851852
let block_start = rows_count_before_cur_block;
852-
let block_end =
853-
rows_count_before_cur_block + output_block[0].len();
853+
let block_len = output_block[0].len();
854+
854855
output_block.reserve(states.len());
855856
for state in states.iter() {
856-
output_block.push(state.slice(block_start, block_end))
857+
output_block.push(state.slice(block_start, block_len))
857858
}
858859

859-
rows_count_before_cur_block = output_block[0].len();
860+
rows_count_before_cur_block += block_len;
860861
}
861862
}
862863
AggregateMode::Final
@@ -867,10 +868,11 @@ impl GroupedHashAggregateStream {
867868
let mut rows_count_before_cur_block = 0;
868869
for output_block in output.iter_mut() {
869870
let block_start = rows_count_before_cur_block;
870-
let block_end =
871-
rows_count_before_cur_block + output_block[0].len();
872-
output_block.push(state.slice(block_start, block_end));
873-
rows_count_before_cur_block = output_block[0].len();
871+
let block_len = output_block[0].len();
872+
873+
output_block.push(state.slice(block_start, block_len));
874+
875+
rows_count_before_cur_block += block_len;
874876
}
875877
}
876878
}

0 commit comments

Comments
 (0)