Skip to content

Commit 5fd63e8

Browse files
committed
define GroupIndexContext.
1 parent 13c9489 commit 5fd63e8

File tree

2 files changed

+41
-9
lines changed

2 files changed

+41
-9
lines changed

datafusion/physical-plan/src/aggregates/group_values/column.rs

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,42 @@ use datafusion_physical_expr::binary_map::OutputType;
4545
use datafusion_physical_expr_common::datum::compare_with_eq;
4646
use hashbrown::raw::RawTable;
4747

48+
/// Group index context for performing `vectorized compare` and `vectorized append`
49+
struct GroupIndexContext {
50+
/// It is possible that hash value collision exists,
51+
/// and we will chain the `group indices` with same hash value
52+
///
53+
/// The chained indices is like:
54+
/// `latest group index -> older group index -> even older group index -> ...`
55+
prev_group_index: usize,
56+
57+
/// It is possible that rows with same hash values exist in `input cols`.
58+
/// And if we `vectorized compare` and `vectorized append` them
59+
/// in the same round, some fault cases will occur especially when
60+
/// they are totally the repeated rows...
61+
///
62+
/// For example:
63+
/// - Two repeated rows exist in `input cols`.
64+
///
65+
/// - We found their hash values equal to one exist group
66+
///
67+
/// - We then perform `vectorized compare` for them to the exist group,
68+
/// and found their values not equal to the exist one
69+
///
70+
/// - Finally when perform `vectorized append`, we decide to build two
71+
/// respective new groups for them, even we actually just need one
72+
/// new group...
73+
///
74+
/// So for solving such cases simply, if some rows with same hash value
75+
/// in `input cols`, just allow to process one of them in a round,
76+
/// and this flag is used to represent that one of them is processing
77+
/// in current round.
78+
///
79+
checking: bool,
80+
}
81+
4882
/// A [`GroupValues`] that stores multiple columns of group values.
4983
///
50-
///
5184
pub struct GroupValuesColumn {
5285
/// The output schema
5386
schema: SchemaRef,
@@ -62,6 +95,11 @@ pub struct GroupValuesColumn {
6295
/// values: (hash, group_index)
6396
map: RawTable<(u64, usize)>,
6497

98+
group_index_ctxs: Vec<GroupIndexContext>,
99+
100+
/// Some
101+
remaining_indices: Vec<usize>,
102+
65103
/// The size of `map` in bytes
66104
map_size: usize,
67105

@@ -94,6 +132,7 @@ impl GroupValuesColumn {
94132
Ok(Self {
95133
schema,
96134
map,
135+
group_index_ctxs: Vec::new(),
97136
map_size: 0,
98137
group_values: vec![],
99138
hashes_buffer: Default::default(),
@@ -160,13 +199,6 @@ macro_rules! instantiate_primitive {
160199
};
161200
}
162201

163-
fn append_col_value<C>(mut core: C, array: &ArrayRef, row: usize)
164-
where
165-
C: FnMut(&ArrayRef, usize),
166-
{
167-
core(array, row);
168-
}
169-
170202
impl GroupValues for GroupValuesColumn {
171203
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
172204
let n_rows = cols[0].len();

datafusion/physical-plan/src/aggregates/group_values/group_column.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
554554
all_non_null: bool,
555555
) {
556556
let arr = array.as_byte_view::<B>();
557-
557+
558558
if all_non_null {
559559
self.nulls.append_n(rows.len(), false);
560560
for &row in rows {

0 commit comments

Comments
 (0)