Skip to content

Commit 0357265

Browse files
committed
fix
1 parent 5d5c0ed commit 0357265

File tree

4 files changed

+15
-24
lines changed

4 files changed

+15
-24
lines changed

src/query/service/src/pipelines/builders/builder_broadcast.rs

+1-7
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use databend_common_exception::Result;
1616
use databend_common_sql::executor::physical_plans::BroadcastSink;
1717
use databend_common_sql::executor::physical_plans::BroadcastSource;
18-
use databend_common_storages_fuse::TableContext;
1918

2019
use crate::pipelines::processors::transforms::BroadcastSinkProcessor;
2120
use crate::pipelines::processors::transforms::BroadcastSourceProcessor;
@@ -33,13 +32,8 @@ impl PipelineBuilder {
3332
pub(crate) fn build_broadcast_sink(&mut self, sink: &BroadcastSink) -> Result<()> {
3433
self.build_pipeline(&sink.input)?;
3534
self.main_pipeline.resize(1, true)?;
36-
let node_num = self.ctx.get_cluster().nodes.len();
3735
self.main_pipeline.add_sink(|input| {
38-
BroadcastSinkProcessor::create(
39-
input,
40-
node_num,
41-
self.ctx.broadcast_sink_sender(sink.broadcast_id),
42-
)
36+
BroadcastSinkProcessor::create(input, self.ctx.broadcast_sink_sender(sink.broadcast_id))
4337
})
4438
}
4539
}

src/query/service/src/pipelines/processors/transforms/broadcast.rs

+8-11
Original file line numberDiff line numberDiff line change
@@ -69,19 +69,13 @@ impl AsyncSource for BroadcastSourceProcessor {
6969
}
7070

7171
pub struct BroadcastSinkProcessor {
72-
expect_num: usize,
7372
received: Vec<BlockMetaInfoPtr>,
74-
sender: Sender<Vec<BlockMetaInfoPtr>>,
73+
sender: Sender<BlockMetaInfoPtr>,
7574
}
7675

7776
impl BroadcastSinkProcessor {
78-
pub fn create(
79-
input: Arc<InputPort>,
80-
node_num: usize,
81-
sender: Sender<Vec<BlockMetaInfoPtr>>,
82-
) -> Result<ProcessorPtr> {
77+
pub fn create(input: Arc<InputPort>, sender: Sender<BlockMetaInfoPtr>) -> Result<ProcessorPtr> {
8378
Ok(ProcessorPtr::create(AsyncSinker::create(input, Self {
84-
expect_num: node_num,
8579
received: vec![],
8680
sender,
8781
})))
@@ -95,6 +89,7 @@ impl AsyncSink for BroadcastSinkProcessor {
9589
const NAME: &'static str = "BroadcastSink";
9690

9791
async fn on_finish(&mut self) -> Result<()> {
92+
self.sender.close();
9893
Ok(())
9994
}
10095

@@ -103,8 +98,10 @@ impl AsyncSink for BroadcastSinkProcessor {
10398
.take_meta()
10499
.ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to BroadcastMeta"))?;
105100
log::info!("BroadcastSinkProcessor recv meta: {:?}", meta);
106-
self.received.push(meta);
107-
let all_recv = self.expect_num == self.received.len();
108-
Ok(all_recv)
101+
self.sender
102+
.send(meta)
103+
.await
104+
.map_err(|_| ErrorCode::Internal("BroadcastSinkProcessor send error"))?;
105+
Ok(false)
109106
}
110107
}

src/query/service/src/sessions/query_ctx.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,10 @@ impl QueryContext {
295295
self.shared.broadcast_source_sender(broadcast_id)
296296
}
297297

298-
pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver<Vec<BlockMetaInfoPtr>> {
298+
pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver<BlockMetaInfoPtr> {
299299
self.shared.broadcast_sink_receiver(broadcast_id)
300300
}
301-
pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender<Vec<BlockMetaInfoPtr>> {
301+
pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender<BlockMetaInfoPtr> {
302302
self.shared.broadcast_sink_sender(broadcast_id)
303303
}
304304

src/query/service/src/sessions/query_ctx_shared.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ pub struct QueryContextShared {
183183
pub struct BroadcastChannel {
184184
pub source_sender: Option<Sender<BlockMetaInfoPtr>>,
185185
pub source_receiver: Option<Receiver<BlockMetaInfoPtr>>,
186-
pub sink_sender: Option<Sender<Vec<BlockMetaInfoPtr>>>,
187-
pub sink_receiver: Option<Receiver<Vec<BlockMetaInfoPtr>>>,
186+
pub sink_sender: Option<Sender<BlockMetaInfoPtr>>,
187+
pub sink_receiver: Option<Receiver<BlockMetaInfoPtr>>,
188188
}
189189

190190
impl QueryContextShared {
@@ -285,7 +285,7 @@ impl QueryContextShared {
285285
}
286286
}
287287

288-
pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver<Vec<BlockMetaInfoPtr>> {
288+
pub fn broadcast_sink_receiver(&self, broadcast_id: u32) -> Receiver<BlockMetaInfoPtr> {
289289
let mut broadcast_channels = self.broadcast_channels.lock();
290290
let entry = broadcast_channels.entry(broadcast_id).or_default();
291291
match entry.sink_receiver.take() {
@@ -297,7 +297,7 @@ impl QueryContextShared {
297297
}
298298
}
299299
}
300-
pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender<Vec<BlockMetaInfoPtr>> {
300+
pub fn broadcast_sink_sender(&self, broadcast_id: u32) -> Sender<BlockMetaInfoPtr> {
301301
let mut broadcast_channels = self.broadcast_channels.lock();
302302
let entry = broadcast_channels.entry(broadcast_id).or_default();
303303
match entry.sink_sender.take() {

0 commit comments

Comments
 (0)