Skip to content

Commit 48df96a

Browse files
committed
chore: generic templated skip_duplicates parameter
1 parent fbd34de commit 48df96a

File tree

9 files changed

+225
-66
lines changed

9 files changed

+225
-66
lines changed

src/common/hashtable/src/hashjoin_hashtable.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,15 @@ impl<K: Keyable, A: Allocator + Clone + Default + 'static> HashJoinHashTable<K,
133133
hashtable
134134
}
135135

136-
pub fn insert(&self, key: K, entry_ptr: *mut RawEntry<K>, skip_duplicates: bool) {
136+
pub fn insert<const SKIP_DUPLICATES: bool>(&self, key: K, entry_ptr: *mut RawEntry<K>) {
137137
let hash = key.hash();
138138
let index = (hash >> self.hash_shift) as usize;
139139
let new_header = new_header(entry_ptr as u64, hash);
140140
// # Safety
141141
// `index` is less than the capacity of hash table.
142142
let mut old_header = unsafe { (*self.atomic_pointers.add(index)).load(Ordering::Relaxed) };
143143
loop {
144-
if skip_duplicates
144+
if SKIP_DUPLICATES
145145
&& early_filtering(old_header, hash)
146146
&& self.next_contains(&key, remove_header_tag(old_header))
147147
{

src/common/hashtable/src/hashjoin_string_hashtable.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,15 @@ impl<A: Allocator + Clone + Default + 'static> HashJoinStringHashTable<A> {
6767
hashtable
6868
}
6969

70-
pub fn insert(&self, key: &[u8], entry_ptr: *mut StringRawEntry, skip_duplicates: bool) {
70+
pub fn insert<const SKIP_DUPLICATES: bool>(&self, key: &[u8], entry_ptr: *mut StringRawEntry) {
7171
let hash = hash_join_fast_string_hash(key);
7272
let index = (hash >> self.hash_shift) as usize;
7373
let new_header = new_header(entry_ptr as u64, hash);
7474
// # Safety
7575
// `index` is less than the capacity of hash table.
7676
let mut old_header = unsafe { (*self.atomic_pointers.add(index)).load(Ordering::Relaxed) };
7777
loop {
78-
if skip_duplicates
78+
if SKIP_DUPLICATES
7979
&& early_filtering(old_header, hash)
8080
&& self.next_contains(key, remove_header_tag(old_header))
8181
{

src/query/expression/src/kernels/group_by_hash/method.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ macro_rules! with_join_hash_method {
112112
( | $t:tt | $($tail:tt)* ) => {
113113
match_template::match_template! {
114114
$t = [Serializer, SingleBinary, KeysU8, KeysU16,
115-
KeysU32, KeysU64, KeysU128, KeysU256],
115+
KeysU32, KeysU64, KeysU128, KeysU256, SkipDuplicatesSerializer,
116+
SkipDuplicatesSingleBinary, SkipDuplicatesKeysU8, SkipDuplicatesKeysU16,
117+
SkipDuplicatesKeysU32, SkipDuplicatesKeysU64, SkipDuplicatesKeysU128,
118+
SkipDuplicatesKeysU256],
116119
$($tail)*
117120
}
118121
}

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs

Lines changed: 107 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -250,68 +250,130 @@ impl HashJoinBuildState {
250250
if self.hash_join_state.hash_join_desc.join_type == JoinType::Cross {
251251
return Ok(());
252252
}
253+
let skip_duplicates = matches!(
254+
self.hash_join_state.hash_join_desc.join_type,
255+
JoinType::InnerAny | JoinType::LeftAny
256+
);
253257

254258
// Divide the finalize phase into multiple tasks.
255259
self.generate_finalize_task()?;
256260

257261
// Create a fixed size hash table.
258-
let (hash_join_hash_table, entry_size) = match self.method.clone() {
259-
HashMethodKind::Serializer(_) => (
262+
let (hash_join_hash_table, entry_size) = match (self.method.clone(), skip_duplicates) {
263+
(HashMethodKind::Serializer(_), false) => (
260264
HashJoinHashTable::Serializer(SerializerHashJoinHashTable::new(
261265
BinaryHashJoinHashMap::with_build_row_num(build_num_rows),
262266
HashMethodSerializer::default(),
263267
)),
264268
std::mem::size_of::<StringRawEntry>(),
265269
),
266-
HashMethodKind::SingleBinary(_) => (
270+
(HashMethodKind::Serializer(_), true) => (
271+
HashJoinHashTable::SkipDuplicatesSerializer(SerializerHashJoinHashTable::new(
272+
BinaryHashJoinHashMap::with_build_row_num(build_num_rows),
273+
HashMethodSerializer::default(),
274+
)),
275+
std::mem::size_of::<StringRawEntry>(),
276+
),
277+
(HashMethodKind::SingleBinary(_), false) => (
267278
HashJoinHashTable::SingleBinary(SingleBinaryHashJoinHashTable::new(
268279
BinaryHashJoinHashMap::with_build_row_num(build_num_rows),
269280
HashMethodSingleBinary::default(),
270281
)),
271282
std::mem::size_of::<StringRawEntry>(),
272283
),
273-
HashMethodKind::KeysU8(hash_method) => (
284+
(HashMethodKind::SingleBinary(_), true) => (
285+
HashJoinHashTable::SkipDuplicatesSingleBinary(
286+
SingleBinaryHashJoinHashTable::new(
287+
BinaryHashJoinHashMap::with_build_row_num(build_num_rows),
288+
HashMethodSingleBinary::default(),
289+
),
290+
),
291+
std::mem::size_of::<StringRawEntry>(),
292+
),
293+
(HashMethodKind::KeysU8(hash_method), false) => (
274294
HashJoinHashTable::KeysU8(FixedKeyHashJoinHashTable::new(
275295
HashJoinHashMap::<u8>::with_build_row_num(build_num_rows),
276296
hash_method,
277297
)),
278298
std::mem::size_of::<RawEntry<u8>>(),
279299
),
280-
HashMethodKind::KeysU16(hash_method) => (
300+
(HashMethodKind::KeysU8(hash_method), true) => (
301+
HashJoinHashTable::SkipDuplicatesKeysU8(FixedKeyHashJoinHashTable::new(
302+
HashJoinHashMap::<u8>::with_build_row_num(build_num_rows),
303+
hash_method,
304+
)),
305+
std::mem::size_of::<RawEntry<u8>>(),
306+
),
307+
(HashMethodKind::KeysU16(hash_method), false) => (
281308
HashJoinHashTable::KeysU16(FixedKeyHashJoinHashTable::new(
282309
HashJoinHashMap::<u16>::with_build_row_num(build_num_rows),
283310
hash_method,
284311
)),
285312
std::mem::size_of::<RawEntry<u16>>(),
286313
),
287-
HashMethodKind::KeysU32(hash_method) => (
314+
(HashMethodKind::KeysU16(hash_method), true) => (
315+
HashJoinHashTable::SkipDuplicatesKeysU16(FixedKeyHashJoinHashTable::new(
316+
HashJoinHashMap::<u16>::with_build_row_num(build_num_rows),
317+
hash_method,
318+
)),
319+
std::mem::size_of::<RawEntry<u16>>(),
320+
),
321+
(HashMethodKind::KeysU32(hash_method), false) => (
288322
HashJoinHashTable::KeysU32(FixedKeyHashJoinHashTable::new(
289323
HashJoinHashMap::<u32>::with_build_row_num(build_num_rows),
290324
hash_method,
291325
)),
292326
std::mem::size_of::<RawEntry<u32>>(),
293327
),
294-
HashMethodKind::KeysU64(hash_method) => (
328+
(HashMethodKind::KeysU32(hash_method), true) => (
329+
HashJoinHashTable::SkipDuplicatesKeysU32(FixedKeyHashJoinHashTable::new(
330+
HashJoinHashMap::<u32>::with_build_row_num(build_num_rows),
331+
hash_method,
332+
)),
333+
std::mem::size_of::<RawEntry<u32>>(),
334+
),
335+
(HashMethodKind::KeysU64(hash_method), false) => (
295336
HashJoinHashTable::KeysU64(FixedKeyHashJoinHashTable::new(
296337
HashJoinHashMap::<u64>::with_build_row_num(build_num_rows),
297338
hash_method,
298339
)),
299340
std::mem::size_of::<RawEntry<u64>>(),
300341
),
301-
HashMethodKind::KeysU128(hash_method) => (
342+
(HashMethodKind::KeysU64(hash_method), true) => (
343+
HashJoinHashTable::SkipDuplicatesKeysU64(FixedKeyHashJoinHashTable::new(
344+
HashJoinHashMap::<u64>::with_build_row_num(build_num_rows),
345+
hash_method,
346+
)),
347+
std::mem::size_of::<RawEntry<u64>>(),
348+
),
349+
(HashMethodKind::KeysU128(hash_method), false) => (
302350
HashJoinHashTable::KeysU128(FixedKeyHashJoinHashTable::new(
303351
HashJoinHashMap::<u128>::with_build_row_num(build_num_rows),
304352
hash_method,
305353
)),
306354
std::mem::size_of::<RawEntry<u128>>(),
307355
),
308-
HashMethodKind::KeysU256(hash_method) => (
356+
(HashMethodKind::KeysU128(hash_method), true) => (
357+
HashJoinHashTable::SkipDuplicatesKeysU128(FixedKeyHashJoinHashTable::new(
358+
HashJoinHashMap::<u128>::with_build_row_num(build_num_rows),
359+
hash_method,
360+
)),
361+
std::mem::size_of::<RawEntry<u128>>(),
362+
),
363+
(HashMethodKind::KeysU256(hash_method), false) => (
309364
HashJoinHashTable::KeysU256(FixedKeyHashJoinHashTable::new(
310365
HashJoinHashMap::<U256>::with_build_row_num(build_num_rows),
311366
hash_method,
312367
)),
313368
std::mem::size_of::<RawEntry<U256>>(),
314369
),
370+
(HashMethodKind::KeysU256(hash_method), true) => (
371+
HashJoinHashTable::SkipDuplicatesKeysU256(FixedKeyHashJoinHashTable::new(
372+
HashJoinHashMap::<U256>::with_build_row_num(build_num_rows),
373+
hash_method,
374+
)),
375+
std::mem::size_of::<RawEntry<U256>>(),
376+
),
315377
};
316378
self.entry_size.store(entry_size, Ordering::Release);
317379
let hash_table = unsafe { &mut *self.hash_join_state.hash_table.get() };
@@ -378,7 +440,7 @@ impl HashJoinBuildState {
378440
next: 0,
379441
}
380442
}
381-
$table.insert(*key, raw_entry_ptr, $skip_duplicates);
443+
$table.insert::<$skip_duplicates>(*key, raw_entry_ptr);
382444
raw_entry_ptr = unsafe { raw_entry_ptr.add(1) };
383445
}
384446
}
@@ -398,7 +460,7 @@ impl HashJoinBuildState {
398460
next: 0,
399461
}
400462
}
401-
$table.insert(*key, raw_entry_ptr, $skip_duplicates);
463+
$table.insert::<$skip_duplicates>(*key, raw_entry_ptr);
402464
raw_entry_ptr = unsafe { raw_entry_ptr.add(1) };
403465
}
404466
}
@@ -469,7 +531,7 @@ impl HashJoinBuildState {
469531
string_local_space_ptr = string_local_space_ptr.add(key.len());
470532
}
471533

472-
$table.insert(key, raw_entry_ptr, $skip_duplicates);
534+
$table.insert::<$skip_duplicates>(key, raw_entry_ptr);
473535
raw_entry_ptr = unsafe { raw_entry_ptr.add(1) };
474536
}
475537
}
@@ -502,7 +564,7 @@ impl HashJoinBuildState {
502564
string_local_space_ptr = string_local_space_ptr.add(key.len());
503565
}
504566

505-
$table.insert(key, raw_entry_ptr, $skip_duplicates);
567+
$table.insert::<$skip_duplicates>(key, raw_entry_ptr);
506568
raw_entry_ptr = unsafe { raw_entry_ptr.add(1) };
507569
}
508570
}
@@ -627,10 +689,6 @@ impl HashJoinBuildState {
627689
}
628690
_ => {}
629691
};
630-
let skip_duplicates = matches!(
631-
self.hash_join_state.hash_join_desc.join_type,
632-
JoinType::InnerAny | JoinType::LeftAny
633-
);
634692

635693
keys_entries
636694
.iter_mut()
@@ -641,34 +699,58 @@ impl HashJoinBuildState {
641699

642700
match hashtable {
643701
HashJoinHashTable::Serializer(table) => insert_binary_key! {
644-
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, skip_duplicates,
702+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, false,
645703
},
646704
HashJoinHashTable::SingleBinary(table) => insert_binary_key! {
647-
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, skip_duplicates,
705+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, false,
648706
},
649707
HashJoinHashTable::KeysU8(table) => insert_key! {
650-
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u8, skip_duplicates,
708+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u8, false,
651709
},
652710
HashJoinHashTable::KeysU16(table) => insert_key! {
653-
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u16, skip_duplicates,
711+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u16, false,
654712
},
655713
HashJoinHashTable::KeysU32(table) => insert_key! {
656-
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u32, skip_duplicates,
714+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u32, false,
657715
},
658716
HashJoinHashTable::KeysU64(table) => insert_key! {
659-
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u64, skip_duplicates,
717+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u64, false,
660718
},
661719
HashJoinHashTable::KeysU128(table) => insert_key! {
662-
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u128, skip_duplicates,
720+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u128, false,
663721
},
664722
HashJoinHashTable::KeysU256(table) => insert_key! {
665-
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, U256, skip_duplicates,
723+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, U256, false,
666724
},
667725
HashJoinHashTable::Null => {
668726
return Err(ErrorCode::AbortedQuery(
669727
"Aborted query, because the hash table is uninitialized.",
670728
));
671729
}
730+
HashJoinHashTable::SkipDuplicatesSerializer(table) => insert_binary_key! {
731+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, true,
732+
},
733+
HashJoinHashTable::SkipDuplicatesSingleBinary(table) => insert_binary_key! {
734+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, true,
735+
},
736+
HashJoinHashTable::SkipDuplicatesKeysU8(table) => insert_key! {
737+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u8, true,
738+
},
739+
HashJoinHashTable::SkipDuplicatesKeysU16(table) => insert_key! {
740+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u16, true,
741+
},
742+
HashJoinHashTable::SkipDuplicatesKeysU32(table) => insert_key! {
743+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u32, true,
744+
},
745+
HashJoinHashTable::SkipDuplicatesKeysU64(table) => insert_key! {
746+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u64, true,
747+
},
748+
HashJoinHashTable::SkipDuplicatesKeysU128(table) => insert_key! {
749+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, u128, true,
750+
},
751+
HashJoinHashTable::SkipDuplicatesKeysU256(table) => insert_key! {
752+
&mut table.hash_table, &table.hash_method, chunk, build_keys, valids, chunk_index as u32, entry_size, &mut local_raw_entry_spaces, U256, true,
753+
},
672754
}
673755

674756
{

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,21 @@ pub struct FixedKeyHashJoinHashTable<T: HashtableKeyable + FixedKey> {
7272
pub enum HashJoinHashTable {
7373
Null,
7474
Serializer(SerializerHashJoinHashTable),
75+
SkipDuplicatesSerializer(SerializerHashJoinHashTable),
7576
SingleBinary(SingleBinaryHashJoinHashTable),
77+
SkipDuplicatesSingleBinary(SingleBinaryHashJoinHashTable),
7678
KeysU8(FixedKeyHashJoinHashTable<u8>),
79+
SkipDuplicatesKeysU8(FixedKeyHashJoinHashTable<u8>),
7780
KeysU16(FixedKeyHashJoinHashTable<u16>),
81+
SkipDuplicatesKeysU16(FixedKeyHashJoinHashTable<u16>),
7882
KeysU32(FixedKeyHashJoinHashTable<u32>),
83+
SkipDuplicatesKeysU32(FixedKeyHashJoinHashTable<u32>),
7984
KeysU64(FixedKeyHashJoinHashTable<u64>),
85+
SkipDuplicatesKeysU64(FixedKeyHashJoinHashTable<u64>),
8086
KeysU128(FixedKeyHashJoinHashTable<u128>),
87+
SkipDuplicatesKeysU128(FixedKeyHashJoinHashTable<u128>),
8188
KeysU256(FixedKeyHashJoinHashTable<U256>),
89+
SkipDuplicatesKeysU256(FixedKeyHashJoinHashTable<U256>),
8290
}
8391

8492
/// Define some shared states for hash join build and probe.
@@ -344,6 +352,14 @@ impl HashJoinHashTable {
344352
HashJoinHashTable::KeysU64(table) => table.hash_table.len(),
345353
HashJoinHashTable::KeysU128(table) => table.hash_table.len(),
346354
HashJoinHashTable::KeysU256(table) => table.hash_table.len(),
355+
HashJoinHashTable::SkipDuplicatesSerializer(table) => table.hash_table.len(),
356+
HashJoinHashTable::SkipDuplicatesSingleBinary(table) => table.hash_table.len(),
357+
HashJoinHashTable::SkipDuplicatesKeysU8(table) => table.hash_table.len(),
358+
HashJoinHashTable::SkipDuplicatesKeysU16(table) => table.hash_table.len(),
359+
HashJoinHashTable::SkipDuplicatesKeysU32(table) => table.hash_table.len(),
360+
HashJoinHashTable::SkipDuplicatesKeysU64(table) => table.hash_table.len(),
361+
HashJoinHashTable::SkipDuplicatesKeysU128(table) => table.hash_table.len(),
362+
HashJoinHashTable::SkipDuplicatesKeysU256(table) => table.hash_table.len(),
347363
}
348364
}
349365

0 commit comments

Comments
 (0)