Skip to content

Commit 1aa0cc6

Browse files
committed
fix
1 parent 0357265 commit 1aa0cc6

File tree

5 files changed

+16
-7
lines changed

5 files changed

+16
-7
lines changed

src/query/catalog/src/table_context.rs

+4
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,10 @@ pub trait TableContext: Send + Sync {
420420
}
421421

422422
fn get_next_broadcast_id(&self) -> u32;
423+
424+
fn reset_broadcast_id(&self) {
425+
unimplemented!()
426+
}
423427
}
424428

425429
pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;

src/query/service/src/schedulers/scheduler.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ pub async fn build_distributed_pipeline(
101101
plan: &PhysicalPlan,
102102
) -> Result<PipelineBuildResult> {
103103
let mut fragments_actions = QueryFragmentsActions::create(ctx.clone());
104-
for plan in build_broadcast_plans(ctx.get_next_broadcast_id())?
104+
for plan in build_broadcast_plans(ctx.as_ref())?
105105
.iter()
106106
.chain(std::iter::once(plan))
107107
{

src/query/service/src/sessions/query_ctx.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -1861,7 +1861,13 @@ impl TableContext for QueryContext {
18611861
}
18621862

18631863
fn get_next_broadcast_id(&self) -> u32 {
1864-
self.shared.get_next_broadcast_id()
1864+
self.shared
1865+
.next_broadcast_id
1866+
.fetch_add(1, Ordering::Acquire)
1867+
}
1868+
1869+
fn reset_broadcast_id(&self) {
1870+
self.shared.next_broadcast_id.store(0, Ordering::Release);
18651871
}
18661872
}
18671873

src/query/service/src/sessions/query_ctx_shared.rs

-4
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,6 @@ impl QueryContextShared {
256256
}))
257257
}
258258

259-
pub fn get_next_broadcast_id(&self) -> u32 {
260-
self.next_broadcast_id.fetch_add(1, Ordering::AcqRel)
261-
}
262-
263259
pub fn broadcast_source_receiver(&self, broadcast_id: u32) -> Receiver<BlockMetaInfoPtr> {
264260
let mut broadcast_channels = self.broadcast_channels.lock();
265261
let entry = broadcast_channels.entry(broadcast_id).or_default();

src/query/sql/src/executor/physical_plans/physical_broadcast.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use databend_common_catalog::table_context::TableContext;
1516
use databend_common_exception::Result;
1617

1718
use super::Exchange;
@@ -52,8 +53,10 @@ pub fn build_broadcast_plan(broadcast_id: u32) -> Result<PhysicalPlan> {
5253
Ok(broadcast_sink)
5354
}
5455

55-
pub fn build_broadcast_plans(next_broadcast_id: u32) -> Result<Vec<PhysicalPlan>> {
56+
pub fn build_broadcast_plans(ctx: &dyn TableContext) -> Result<Vec<PhysicalPlan>> {
5657
let mut plans = vec![];
58+
let next_broadcast_id = ctx.get_next_broadcast_id();
59+
ctx.reset_broadcast_id();
5760
for broadcast_id in 0..next_broadcast_id {
5861
plans.push(build_broadcast_plan(broadcast_id)?);
5962
}

0 commit comments

Comments
 (0)