Skip to content

Commit e386f1f

Browse files
authored
feat: impl INNER/LEFT/RIGHT ANY JOIN (#18779)
* feat: impl INNER/LEFT/RIGHT ANY JOIN * chore: codefmt * fix: subquery.test * chore: disable optimize of any join * chore: codefmt * chore: fix e2e test * chore: fix e2e test * chore: fix e2e test * chore: fix hash conflicts that may cause mutual coverage, and add more e2e tests for Any Join * chore: codefmt * chore: add `early_filtering` on skip_duplicates * chore: generic templated skip_duplicates parameter * chore: fix hashmap SKIP_DUPLICATES
1 parent d7a9713 commit e386f1f

File tree

32 files changed

+717
-89
lines changed

32 files changed

+717
-89
lines changed

src/common/hashtable/src/hashjoin_hashtable.rs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::alloc::Allocator;
1616
use std::marker::PhantomData;
1717
use std::sync::atomic::AtomicU64;
18+
use std::sync::atomic::AtomicUsize;
1819
use std::sync::atomic::Ordering;
1920

2021
use databend_common_base::hints::assume;
@@ -102,18 +103,31 @@ pub fn hash_bits() -> u32 {
102103
}
103104
}
104105

105-
pub struct HashJoinHashTable<K: Keyable, A: Allocator + Clone = DefaultAllocator> {
106+
pub struct HashJoinHashTable<
107+
K: Keyable,
108+
const SKIP_DUPLICATES: bool = false,
109+
A: Allocator + Clone = DefaultAllocator,
110+
> {
106111
pub(crate) pointers: Box<[u64], A>,
107112
pub(crate) atomic_pointers: *mut AtomicU64,
108113
pub(crate) hash_shift: usize,
109114
pub(crate) phantom: PhantomData<K>,
115+
pub(crate) count: AtomicUsize,
110116
}
111117

112-
unsafe impl<K: Keyable + Send, A: Allocator + Clone + Send> Send for HashJoinHashTable<K, A> {}
118+
unsafe impl<K: Keyable + Send, A: Allocator + Clone + Send, const SKIP_DUPLICATES: bool> Send
119+
for HashJoinHashTable<K, SKIP_DUPLICATES, A>
120+
{
121+
}
113122

114-
unsafe impl<K: Keyable + Sync, A: Allocator + Clone + Sync> Sync for HashJoinHashTable<K, A> {}
123+
unsafe impl<K: Keyable + Sync, A: Allocator + Clone + Sync, const SKIP_DUPLICATES: bool> Sync
124+
for HashJoinHashTable<K, SKIP_DUPLICATES, A>
125+
{
126+
}
115127

116-
impl<K: Keyable, A: Allocator + Clone + Default> HashJoinHashTable<K, A> {
128+
impl<K: Keyable, A: Allocator + Clone + Default + 'static, const SKIP_DUPLICATES: bool>
129+
HashJoinHashTable<K, SKIP_DUPLICATES, A>
130+
{
117131
pub fn with_build_row_num(row_num: usize) -> Self {
118132
let capacity = std::cmp::max((row_num * 2).next_power_of_two(), 1 << 10);
119133
let mut hashtable = Self {
@@ -123,6 +137,7 @@ impl<K: Keyable, A: Allocator + Clone + Default> HashJoinHashTable<K, A> {
123137
atomic_pointers: std::ptr::null_mut(),
124138
hash_shift: (hash_bits() - capacity.trailing_zeros()) as usize,
125139
phantom: PhantomData,
140+
count: Default::default(),
126141
};
127142
hashtable.atomic_pointers = unsafe {
128143
std::mem::transmute::<*mut u64, *mut AtomicU64>(hashtable.pointers.as_mut_ptr())
@@ -138,6 +153,12 @@ impl<K: Keyable, A: Allocator + Clone + Default> HashJoinHashTable<K, A> {
138153
// `index` is less than the capacity of hash table.
139154
let mut old_header = unsafe { (*self.atomic_pointers.add(index)).load(Ordering::Relaxed) };
140155
loop {
156+
if SKIP_DUPLICATES
157+
&& early_filtering(old_header, hash)
158+
&& self.next_contains(&key, remove_header_tag(old_header))
159+
{
160+
return;
161+
}
141162
let res = unsafe {
142163
(*self.atomic_pointers.add(index)).compare_exchange_weak(
143164
old_header,
@@ -151,11 +172,13 @@ impl<K: Keyable, A: Allocator + Clone + Default> HashJoinHashTable<K, A> {
151172
Err(x) => old_header = x,
152173
};
153174
}
175+
self.count.fetch_add(1, Ordering::Relaxed);
154176
unsafe { (*entry_ptr).next = remove_header_tag(old_header) };
155177
}
156178
}
157179

158-
impl<K, A> HashJoinHashtableLike for HashJoinHashTable<K, A>
180+
impl<K, A, const SKIP_DUPLICATES: bool> HashJoinHashtableLike
181+
for HashJoinHashTable<K, SKIP_DUPLICATES, A>
159182
where
160183
K: Keyable,
161184
A: Allocator + Clone + 'static,
@@ -373,4 +396,8 @@ where
373396
}
374397
0
375398
}
399+
400+
fn len(&self) -> usize {
401+
self.count.load(Ordering::Relaxed)
402+
}
376403
}

src/common/hashtable/src/hashjoin_string_hashtable.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::alloc::Allocator;
1616
use std::sync::atomic::AtomicU64;
17+
use std::sync::atomic::AtomicUsize;
1718
use std::sync::atomic::Ordering;
1819

1920
use databend_common_base::hints::assume;
@@ -38,17 +39,29 @@ pub struct StringRawEntry {
3839
pub next: u64,
3940
}
4041

41-
pub struct HashJoinStringHashTable<A: Allocator + Clone = DefaultAllocator> {
42+
pub struct HashJoinStringHashTable<
43+
const SKIP_DUPLICATES: bool = false,
44+
A: Allocator + Clone = DefaultAllocator,
45+
> {
4246
pub(crate) pointers: Box<[u64], A>,
4347
pub(crate) atomic_pointers: *mut AtomicU64,
4448
pub(crate) hash_shift: usize,
49+
pub(crate) count: AtomicUsize,
4550
}
4651

47-
unsafe impl<A: Allocator + Clone + Send> Send for HashJoinStringHashTable<A> {}
52+
unsafe impl<A: Allocator + Clone + Send, const SKIP_DUPLICATES: bool> Send
53+
for HashJoinStringHashTable<SKIP_DUPLICATES, A>
54+
{
55+
}
4856

49-
unsafe impl<A: Allocator + Clone + Sync> Sync for HashJoinStringHashTable<A> {}
57+
unsafe impl<A: Allocator + Clone + Sync, const SKIP_DUPLICATES: bool> Sync
58+
for HashJoinStringHashTable<SKIP_DUPLICATES, A>
59+
{
60+
}
5061

51-
impl<A: Allocator + Clone + Default> HashJoinStringHashTable<A> {
62+
impl<A: Allocator + Clone + Default + 'static, const SKIP_DUPLICATES: bool>
63+
HashJoinStringHashTable<SKIP_DUPLICATES, A>
64+
{
5265
pub fn with_build_row_num(row_num: usize) -> Self {
5366
let capacity = std::cmp::max((row_num * 2).next_power_of_two(), 1 << 10);
5467
let mut hashtable = Self {
@@ -57,6 +70,7 @@ impl<A: Allocator + Clone + Default> HashJoinStringHashTable<A> {
5770
},
5871
atomic_pointers: std::ptr::null_mut(),
5972
hash_shift: (hash_bits() - capacity.trailing_zeros()) as usize,
73+
count: Default::default(),
6074
};
6175
hashtable.atomic_pointers = unsafe {
6276
std::mem::transmute::<*mut u64, *mut AtomicU64>(hashtable.pointers.as_mut_ptr())
@@ -72,6 +86,12 @@ impl<A: Allocator + Clone + Default> HashJoinStringHashTable<A> {
7286
// `index` is less than the capacity of hash table.
7387
let mut old_header = unsafe { (*self.atomic_pointers.add(index)).load(Ordering::Relaxed) };
7488
loop {
89+
if SKIP_DUPLICATES
90+
&& early_filtering(old_header, hash)
91+
&& self.next_contains(key, remove_header_tag(old_header))
92+
{
93+
return;
94+
}
7595
let res = unsafe {
7696
(*self.atomic_pointers.add(index)).compare_exchange_weak(
7797
old_header,
@@ -85,11 +105,13 @@ impl<A: Allocator + Clone + Default> HashJoinStringHashTable<A> {
85105
Err(x) => old_header = x,
86106
};
87107
}
108+
self.count.fetch_add(1, Ordering::Relaxed);
88109
unsafe { (*entry_ptr).next = remove_header_tag(old_header) };
89110
}
90111
}
91112

92-
impl<A> HashJoinHashtableLike for HashJoinStringHashTable<A>
113+
impl<A, const SKIP_DUPLICATES: bool> HashJoinHashtableLike
114+
for HashJoinStringHashTable<SKIP_DUPLICATES, A>
93115
where A: Allocator + Clone + 'static
94116
{
95117
type Key = [u8];
@@ -341,4 +363,8 @@ where A: Allocator + Clone + 'static
341363
}
342364
0
343365
}
366+
367+
fn len(&self) -> usize {
368+
self.count.load(Ordering::Relaxed)
369+
}
344370
}

src/common/hashtable/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,10 @@ pub use hashjoin_string_hashtable::StringRawEntry;
111111
pub use hashjoin_string_hashtable::STRING_EARLY_SIZE;
112112
pub use keys_ref::KeysRef;
113113
pub use partitioned_hashtable::hash2bucket;
114-
pub type HashJoinHashMap<K> = hashjoin_hashtable::HashJoinHashTable<K>;
115-
pub type BinaryHashJoinHashMap = hashjoin_string_hashtable::HashJoinStringHashTable;
114+
pub type HashJoinHashMap<K, const SKIP_DUPLICATES: bool = false> =
115+
hashjoin_hashtable::HashJoinHashTable<K, SKIP_DUPLICATES>;
116+
pub type BinaryHashJoinHashMap<const SKIP_DUPLICATES: bool = false> =
117+
hashjoin_string_hashtable::HashJoinStringHashTable<SKIP_DUPLICATES>;
116118
pub use traits::HashJoinHashtableLike;
117119
pub use utils::fast_memcmp;
118120
pub use utils::Interval;

src/common/hashtable/src/traits.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,4 +569,10 @@ pub trait HashJoinHashtableLike {
569569

570570
// Find the next matched ptr.
571571
fn next_matched_ptr(&self, key: &Self::Key, ptr: u64) -> u64;
572+
573+
fn len(&self) -> usize;
574+
575+
fn is_empty(&self) -> bool {
576+
self.len() == 0
577+
}
572578
}

src/query/ast/src/ast/query.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,6 +1058,15 @@ impl Display for TableReference {
10581058
JoinOperator::RightAsof => {
10591059
write!(f, " ASOF RIGHT JOIN")?;
10601060
}
1061+
JoinOperator::InnerAny => {
1062+
write!(f, " INNER ANY JOIN")?;
1063+
}
1064+
JoinOperator::LeftAny => {
1065+
write!(f, " LEFT ANY JOIN")?;
1066+
}
1067+
JoinOperator::RightAny => {
1068+
write!(f, " RIGHT ANY JOIN")?;
1069+
}
10611070
}
10621071
write!(f, " {}", join.right)?;
10631072
match &join.condition {
@@ -1134,6 +1143,10 @@ pub enum JoinOperator {
11341143
Asof,
11351144
LeftAsof,
11361145
RightAsof,
1146+
// Any
1147+
InnerAny,
1148+
LeftAny,
1149+
RightAny,
11371150
}
11381151

11391152
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]

src/query/ast/src/parser/query.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -671,11 +671,14 @@ pub fn table_alias_without_as(i: Input) -> IResult<TableAlias> {
671671

672672
pub fn join_operator(i: Input) -> IResult<JoinOperator> {
673673
alt((
674+
value(JoinOperator::InnerAny, rule! { INNER ~ ANY }),
674675
value(JoinOperator::Inner, rule! { INNER }),
675676
value(JoinOperator::LeftSemi, rule! { LEFT? ~ SEMI }),
676677
value(JoinOperator::RightSemi, rule! { RIGHT ~ SEMI }),
677678
value(JoinOperator::LeftAnti, rule! { LEFT? ~ ANTI }),
678679
value(JoinOperator::RightAnti, rule! { RIGHT ~ ANTI }),
680+
value(JoinOperator::LeftAny, rule! { LEFT ~ ANY }),
681+
value(JoinOperator::RightAny, rule! { RIGHT ~ ANY }),
679682
value(JoinOperator::LeftOuter, rule! { LEFT ~ OUTER? }),
680683
value(JoinOperator::RightOuter, rule! { RIGHT ~ OUTER? }),
681684
value(JoinOperator::FullOuter, rule! { FULL ~ OUTER? }),

src/query/expression/src/kernels/group_by_hash/method.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ macro_rules! with_join_hash_method {
112112
( | $t:tt | $($tail:tt)* ) => {
113113
match_template::match_template! {
114114
$t = [Serializer, SingleBinary, KeysU8, KeysU16,
115-
KeysU32, KeysU64, KeysU128, KeysU256],
115+
KeysU32, KeysU64, KeysU128, KeysU256, SkipDuplicatesSerializer,
116+
SkipDuplicatesSingleBinary, SkipDuplicatesKeysU8, SkipDuplicatesKeysU16,
117+
SkipDuplicatesKeysU32, SkipDuplicatesKeysU64, SkipDuplicatesKeysU128,
118+
SkipDuplicatesKeysU256],
116119
$($tail)*
117120
}
118121
}

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,11 @@ impl PhysicalPlanBuilder {
552552
build_side: &PhysicalPlan,
553553
) -> Result<DataSchemaRef> {
554554
match join_type {
555-
JoinType::Left | JoinType::LeftSingle | JoinType::LeftAsof | JoinType::Full => {
555+
JoinType::Left
556+
| JoinType::LeftAny
557+
| JoinType::LeftSingle
558+
| JoinType::LeftAsof
559+
| JoinType::Full => {
556560
let build_schema = build_side.output_schema()?;
557561
// Wrap nullable type for columns in build side
558562
let build_schema = DataSchemaRefExt::create(
@@ -805,7 +809,7 @@ impl PhysicalPlanBuilder {
805809
let left_expr_for_runtime_filter = self.prepare_runtime_filter_expr(left_condition)?;
806810

807811
// Handle inner join column optimization
808-
if join.join_type == JoinType::Inner {
812+
if matches!(join.join_type, JoinType::Inner | JoinType::InnerAny) {
809813
self.handle_inner_join_column_optimization(
810814
left_condition,
811815
right_condition,
@@ -1034,9 +1038,12 @@ impl PhysicalPlanBuilder {
10341038
let merged_fields = match join.join_type {
10351039
JoinType::Cross
10361040
| JoinType::Inner
1041+
| JoinType::InnerAny
10371042
| JoinType::Left
1043+
| JoinType::LeftAny
10381044
| JoinType::LeftSingle
10391045
| JoinType::Right
1046+
| JoinType::RightAny
10401047
| JoinType::RightSingle
10411048
| JoinType::Full => {
10421049
let mut result = probe_fields.clone();

src/query/service/src/physical_plans/physical_join.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use databend_common_exception::ErrorCode;
1516
use databend_common_exception::Result;
1617
use databend_common_sql::binder::JoinPredicate;
1718
use databend_common_sql::optimizer::ir::RelExpr;
@@ -40,6 +41,11 @@ pub fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
4041
JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof
4142
);
4243

44+
if join.equi_conditions.is_empty() && join.join_type.is_any_join() {
45+
return Err(ErrorCode::SemanticError(
46+
"ANY JOIN only supports equality-based hash joins",
47+
));
48+
}
4349
if !join.equi_conditions.is_empty() && !check_asof {
4450
// Contain equi condition, use hash join
4551
return Ok(PhysicalJoinType::Hash);

0 commit comments

Comments
 (0)