Skip to content

Commit b8f9544

Browse files
committed
update
1 parent beff896 commit b8f9544

26 files changed

+473
-699
lines changed

src/query/catalog/src/runtime_filter_info.rs

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,32 +12,54 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::HashMap;
16-
1715
use databend_common_base::base::tokio::sync::watch;
1816
use databend_common_base::base::tokio::sync::watch::Receiver;
1917
use databend_common_base::base::tokio::sync::watch::Sender;
2018
use databend_common_expression::Expr;
2119
use xorf::BinaryFuse16;
2220

2321
#[derive(Clone, Debug, Default)]
24-
pub struct RuntimeFiltersForScan {
25-
pub inlist: HashMap<usize, Expr<String>>,
26-
pub min_max: HashMap<usize, Expr<String>>,
27-
pub bloom: HashMap<usize, (String, BinaryFuse16)>,
22+
pub struct RuntimeFilterInfo {
23+
inlist: Vec<Expr<String>>,
24+
min_max: Vec<Expr<String>>,
25+
bloom: Vec<(String, BinaryFuse16)>,
2826
}
2927

30-
impl RuntimeFiltersForScan {
31-
pub fn add_inlist(&mut self, rf_id: usize, expr: Expr<String>) {
32-
self.inlist.insert(rf_id, expr);
28+
impl RuntimeFilterInfo {
29+
pub fn add_inlist(&mut self, expr: Expr<String>) {
30+
self.inlist.push(expr);
31+
}
32+
33+
pub fn add_bloom(&mut self, bloom: (String, BinaryFuse16)) {
34+
self.bloom.push(bloom);
35+
}
36+
37+
pub fn add_min_max(&mut self, expr: Expr<String>) {
38+
self.min_max.push(expr);
39+
}
40+
41+
pub fn get_inlist(&self) -> &Vec<Expr<String>> {
42+
&self.inlist
43+
}
44+
45+
pub fn get_bloom(&self) -> &Vec<(String, BinaryFuse16)> {
46+
&self.bloom
47+
}
48+
49+
pub fn get_min_max(&self) -> &Vec<Expr<String>> {
50+
&self.min_max
51+
}
52+
53+
pub fn blooms(self) -> Vec<(String, BinaryFuse16)> {
54+
self.bloom
3355
}
3456

35-
pub fn add_bloom(&mut self, rf_id: usize, bloom: (String, BinaryFuse16)) {
36-
self.bloom.insert(rf_id, bloom);
57+
pub fn inlists(self) -> Vec<Expr<String>> {
58+
self.inlist
3759
}
3860

39-
pub fn add_min_max(&mut self, rf_id: usize, expr: Expr<String>) {
40-
self.min_max.insert(rf_id, expr);
61+
pub fn min_maxs(self) -> Vec<Expr<String>> {
62+
self.min_max
4163
}
4264

4365
pub fn is_empty(&self) -> bool {

src/query/catalog/src/table_context.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ use crate::plan::PartInfoPtr;
7474
use crate::plan::PartStatistics;
7575
use crate::plan::Partitions;
7676
use crate::query_kind::QueryKind;
77+
use crate::runtime_filter_info::RuntimeFilterInfo;
7778
use crate::runtime_filter_info::RuntimeFilterReady;
78-
use crate::runtime_filter_info::RuntimeFiltersForScan;
7979
use crate::statistics::data_cache_statistics::DataCacheMetrics;
8080
use crate::table::Table;
8181

@@ -324,7 +324,7 @@ pub trait TableContext: Send + Sync {
324324

325325
fn get_query_profiles(&self) -> Vec<PlanProfile>;
326326

327-
fn set_runtime_filter(&self, filters: (usize, RuntimeFiltersForScan));
327+
fn set_runtime_filter(&self, filters: (usize, RuntimeFilterInfo));
328328

329329
fn set_runtime_filter_ready(&self, table_index: usize, ready: Arc<RuntimeFilterReady>);
330330

@@ -416,6 +416,8 @@ pub trait TableContext: Send + Sync {
416416
fn set_pruned_partitions_stats(&self, _partitions: PartStatistics) {
417417
unimplemented!()
418418
}
419+
420+
fn get_next_broadcast_id(&self) -> u32;
419421
}
420422

421423
pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;

src/query/pipeline/sources/src/async_source.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,7 @@ impl<T: 'static + AsyncSource> Processor for AsyncSourcer<T> {
131131
return Ok(());
132132
}
133133
match self.inner.generate().await? {
134-
None => {
135-
self.is_finish = true;
136-
}
134+
None => self.is_finish = true,
137135
Some(data_block) => {
138136
if !data_block.is_empty() {
139137
let progress_values = ProgressValues {

src/query/service/src/pipelines/builders/builder_join.rs

Lines changed: 82 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use crate::pipelines::processors::transforms::range_join::TransformRangeJoinLeft
2727
use crate::pipelines::processors::transforms::range_join::TransformRangeJoinRight;
2828
use crate::pipelines::processors::transforms::HashJoinBuildState;
2929
use crate::pipelines::processors::transforms::HashJoinProbeState;
30-
use crate::pipelines::processors::transforms::RuntimeFilterChannels;
3130
use crate::pipelines::processors::transforms::TransformHashJoinBuild;
3231
use crate::pipelines::processors::transforms::TransformHashJoinProbe;
3332
use crate::pipelines::processors::HashJoinDesc;
@@ -36,83 +35,42 @@ use crate::pipelines::PipelineBuilder;
3635
use crate::sessions::QueryContext;
3736

3837
impl PipelineBuilder {
39-
pub(crate) fn build_range_join(&mut self, range_join: &RangeJoin) -> Result<()> {
40-
let state = Arc::new(RangeJoinState::new(self.ctx.clone(), range_join));
41-
self.expand_right_side_pipeline(range_join, state.clone())?;
42-
self.build_left_side(range_join, state)?;
43-
Ok(())
44-
}
45-
46-
fn build_left_side(
47-
&mut self,
48-
range_join: &RangeJoin,
49-
state: Arc<RangeJoinState>,
50-
) -> Result<()> {
51-
self.build_pipeline(&range_join.left)?;
52-
let max_threads = self.settings.get_max_threads()? as usize;
53-
self.main_pipeline.try_resize(max_threads)?;
54-
self.main_pipeline.add_transform(|input, output| {
55-
Ok(ProcessorPtr::create(TransformRangeJoinLeft::create(
56-
input,
57-
output,
58-
state.clone(),
59-
)))
60-
})?;
61-
Ok(())
62-
}
63-
64-
fn expand_right_side_pipeline(
65-
&mut self,
66-
range_join: &RangeJoin,
67-
state: Arc<RangeJoinState>,
68-
) -> Result<()> {
69-
let right_side_context = QueryContext::create_from(self.ctx.as_ref());
70-
let mut right_side_builder = PipelineBuilder::create(
38+
// Create a new pipeline builder with the same context as the current builder
39+
fn create_sub_pipeline_builder(&self) -> PipelineBuilder {
40+
let sub_context = QueryContext::create_from(self.ctx.as_ref());
41+
let mut sub_builder = PipelineBuilder::create(
7142
self.func_ctx.clone(),
7243
self.settings.clone(),
73-
right_side_context,
44+
sub_context,
7445
self.main_pipeline.get_scopes(),
7546
);
76-
right_side_builder.hash_join_states = self.hash_join_states.clone();
77-
78-
let mut right_res = right_side_builder.finalize(&range_join.right)?;
79-
right_res.main_pipeline.add_sink(|input| {
80-
Ok(ProcessorPtr::create(
81-
Sinker::<TransformRangeJoinRight>::create(
82-
input,
83-
TransformRangeJoinRight::create(state.clone()),
84-
),
85-
))
86-
})?;
87-
self.pipelines.push(right_res.main_pipeline.finalize());
88-
self.pipelines.extend(right_res.sources_pipelines);
89-
Ok(())
47+
sub_builder.hash_join_states = self.hash_join_states.clone();
48+
sub_builder
9049
}
9150

92-
pub(crate) fn build_join(&mut self, join: &HashJoin) -> Result<()> {
93-
// for merge into target table as build side.
94-
let (enable_merge_into_optimization, merge_into_is_distributed) =
95-
self.merge_into_get_optimization_flag(join);
51+
pub(crate) fn build_hash_join(&mut self, join: &HashJoin) -> Result<()> {
52+
// Get optimization flags for merge-into operations
53+
let (enable_optimization, is_distributed) = self.merge_into_get_optimization_flag(join);
9654

97-
let state = self.build_join_state(
98-
join,
99-
merge_into_is_distributed,
100-
enable_merge_into_optimization,
101-
)?;
55+
// Create the join state with optimization flags
56+
let state = self.build_hash_join_state(join, is_distributed, enable_optimization)?;
10257
if let Some((build_cache_index, _)) = join.build_side_cache_info {
10358
self.hash_join_states
10459
.insert(build_cache_index, state.clone());
10560
}
10661

107-
self.expand_build_side_pipeline(&join.build, join, state.clone())?;
108-
self.build_join_probe(join, state)?;
62+
// Build both phases of the Hash Join
63+
self.build_hash_join_build_side(&join.build, join, state.clone())?;
64+
self.build_hash_join_probe_side(join, state)?;
10965

110-
// In the case of spilling, we need to share state among multiple threads. Quickly fetch all data from this round to quickly start the next round.
66+
// In the case of spilling, we need to share state among multiple threads
67+
// Quickly fetch all data from this round to quickly start the next round
11168
self.main_pipeline
11269
.resize(self.main_pipeline.output_len(), true)
11370
}
11471

115-
fn build_join_state(
72+
// Create the Hash Join state
73+
fn build_hash_join_state(
11674
&mut self,
11775
join: &HashJoin,
11876
merge_into_is_distributed: bool,
@@ -130,20 +88,14 @@ impl PipelineBuilder {
13088
)
13189
}
13290

133-
fn expand_build_side_pipeline(
91+
// Build the build-side pipeline for Hash Join
92+
fn build_hash_join_build_side(
13493
&mut self,
13594
build: &PhysicalPlan,
13695
hash_join_plan: &HashJoin,
13796
join_state: Arc<HashJoinState>,
13897
) -> Result<()> {
139-
let build_side_context = QueryContext::create_from(self.ctx.as_ref());
140-
let mut build_side_builder = PipelineBuilder::create(
141-
self.func_ctx.clone(),
142-
self.settings.clone(),
143-
build_side_context,
144-
self.main_pipeline.get_scopes(),
145-
);
146-
build_side_builder.hash_join_states = self.hash_join_states.clone();
98+
let build_side_builder = self.create_sub_pipeline_builder();
14799
let mut build_res = build_side_builder.finalize(build)?;
148100

149101
assert!(build_res.main_pipeline.is_pulling_pipeline()?);
@@ -155,13 +107,6 @@ impl PipelineBuilder {
155107
&hash_join_plan.build_projections,
156108
join_state.clone(),
157109
output_len,
158-
hash_join_plan
159-
.runtime_filter_plan
160-
.as_ref()
161-
.map(|_| RuntimeFilterChannels {
162-
rf_src_send: self.ctx.rf_src_send(hash_join_plan.join_id),
163-
rf_sink_recv: self.ctx.rf_sink_recv(hash_join_plan.join_id),
164-
}),
165110
)?;
166111
build_state.add_runtime_filter_ready();
167112

@@ -171,7 +116,7 @@ impl PipelineBuilder {
171116
build_state.clone(),
172117
)?))
173118
};
174-
// for distributed merge into when source as build side.
119+
// For distributed merge-into when source as build side
175120
if hash_join_plan.need_hold_hash_table {
176121
self.join_state = Some(build_state.clone())
177122
}
@@ -182,7 +127,12 @@ impl PipelineBuilder {
182127
Ok(())
183128
}
184129

185-
fn build_join_probe(&mut self, join: &HashJoin, state: Arc<HashJoinState>) -> Result<()> {
130+
// Build the probe-side pipeline for Hash Join
131+
fn build_hash_join_probe_side(
132+
&mut self,
133+
join: &HashJoin,
134+
state: Arc<HashJoinState>,
135+
) -> Result<()> {
186136
self.build_pipeline(&join.probe)?;
187137

188138
let max_block_size = self.settings.get_max_block_size()? as usize;
@@ -212,16 +162,66 @@ impl PipelineBuilder {
212162
)?))
213163
})?;
214164

165+
// For merge-into operations that need to hold the hash table
215166
if join.need_hold_hash_table {
216-
let mut projected_probe_fields = vec![];
167+
// Extract projected fields from probe schema
168+
let mut projected_fields = vec![];
217169
for (i, field) in probe_state.probe_schema.fields().iter().enumerate() {
218170
if probe_state.probe_projections.contains(&i) {
219-
projected_probe_fields.push(field.clone());
171+
projected_fields.push(field.clone());
220172
}
221173
}
222-
self.merge_into_probe_data_fields = Some(projected_probe_fields);
174+
self.merge_into_probe_data_fields = Some(projected_fields);
223175
}
224176

225177
Ok(())
226178
}
179+
180+
pub(crate) fn build_range_join(&mut self, range_join: &RangeJoin) -> Result<()> {
181+
let state = Arc::new(RangeJoinState::new(self.ctx.clone(), range_join));
182+
self.build_range_join_right_side(range_join, state.clone())?;
183+
self.build_range_join_left_side(range_join, state)?;
184+
Ok(())
185+
}
186+
187+
// Build the left-side pipeline for Range Join
188+
fn build_range_join_left_side(
189+
&mut self,
190+
range_join: &RangeJoin,
191+
state: Arc<RangeJoinState>,
192+
) -> Result<()> {
193+
self.build_pipeline(&range_join.left)?;
194+
let max_threads = self.settings.get_max_threads()? as usize;
195+
self.main_pipeline.try_resize(max_threads)?;
196+
self.main_pipeline.add_transform(|input, output| {
197+
Ok(ProcessorPtr::create(TransformRangeJoinLeft::create(
198+
input,
199+
output,
200+
state.clone(),
201+
)))
202+
})?;
203+
Ok(())
204+
}
205+
206+
// Build the right-side pipeline for Range Join
207+
fn build_range_join_right_side(
208+
&mut self,
209+
range_join: &RangeJoin,
210+
state: Arc<RangeJoinState>,
211+
) -> Result<()> {
212+
let right_side_builder = self.create_sub_pipeline_builder();
213+
214+
let mut right_res = right_side_builder.finalize(&range_join.right)?;
215+
right_res.main_pipeline.add_sink(|input| {
216+
Ok(ProcessorPtr::create(
217+
Sinker::<TransformRangeJoinRight>::create(
218+
input,
219+
TransformRangeJoinRight::create(state.clone()),
220+
),
221+
))
222+
})?;
223+
self.pipelines.push(right_res.main_pipeline.finalize());
224+
self.pipelines.extend(right_res.sources_pipelines);
225+
Ok(())
226+
}
227227
}

src/query/service/src/pipelines/builders/builder_runtime_filter.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,34 +13,33 @@
1313
// limitations under the License.
1414

1515
use databend_common_exception::Result;
16-
use databend_common_sql::executor::physical_plans::RuntimeFilterSink;
17-
use databend_common_sql::executor::physical_plans::RuntimeFilterSource;
16+
use databend_common_sql::executor::physical_plans::BroadcastSink;
17+
use databend_common_sql::executor::physical_plans::BroadcastSource;
1818
use databend_common_storages_fuse::TableContext;
1919

20-
use crate::pipelines::processors::transforms::RuntimeFilterSinkProcessor;
21-
use crate::pipelines::processors::transforms::RuntimeFilterSourceProcessor;
20+
use crate::pipelines::processors::transforms::BroadcastSinkProcessor;
21+
use crate::pipelines::processors::transforms::BroadcastSourceProcessor;
2222
use crate::pipelines::PipelineBuilder;
2323

2424
impl PipelineBuilder {
25-
pub(crate) fn build_runtime_filter_source(
26-
&mut self,
27-
_source: &RuntimeFilterSource,
28-
) -> Result<()> {
29-
let receiver = self.ctx.rf_src_recv(_source.join_id);
25+
pub(crate) fn build_broadcast_source(&mut self, source: &BroadcastSource) -> Result<()> {
26+
let receiver = self.ctx.broadcast_source_receiver(source.broadcast_id);
3027
self.main_pipeline.add_source(
31-
|output| {
32-
RuntimeFilterSourceProcessor::create(self.ctx.clone(), receiver.clone(), output)
33-
},
28+
|output| BroadcastSourceProcessor::create(self.ctx.clone(), receiver.clone(), output),
3429
1,
3530
)
3631
}
3732

38-
pub(crate) fn build_runtime_filter_sink(&mut self, sink: &RuntimeFilterSink) -> Result<()> {
33+
pub(crate) fn build_broadcast_sink(&mut self, sink: &BroadcastSink) -> Result<()> {
3934
self.build_pipeline(&sink.input)?;
4035
self.main_pipeline.resize(1, true)?;
4136
let node_num = self.ctx.get_cluster().nodes.len();
4237
self.main_pipeline.add_sink(|input| {
43-
RuntimeFilterSinkProcessor::create(input, node_num, self.ctx.rf_sink_send(sink.join_id))
38+
BroadcastSinkProcessor::create(
39+
input,
40+
node_num,
41+
self.ctx.broadcast_sink_sender(sink.broadcast_id),
42+
)
4443
})
4544
}
4645
}

0 commit comments

Comments
 (0)