Skip to content

Commit 509fa2d

Browse files
committed
refactor
1 parent 7939cad commit 509fa2d

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -842,10 +842,19 @@ impl QueryCoordinator {
842842
if let Some(mut build_res) = coordinator.pipeline_build_res.take() {
843843
build_res.set_max_threads(max_threads as usize);
844844

845-
if let Some(params) = params {
845+
if build_res.main_pipeline.is_pulling_pipeline()? {
846+
let Some(params) = params else {
847+
return Err(ErrorCode::Internal(
848+
"pipeline is pulling pipeline, but exchange params is none",
849+
));
850+
};
846851
// Add exchange data publisher.
847852
ExchangeSink::via(&info.query_ctx, &params, &mut build_res.main_pipeline)?;
848-
}
853+
} else if build_res.main_pipeline.is_complete_pipeline()? && params.is_some() {
854+
return Err(ErrorCode::Internal(
855+
"pipeline is complete pipeline, but exchange params is some",
856+
));
857+
};
849858

850859
if !build_res.main_pipeline.is_complete_pipeline()? {
851860
return Err(ErrorCode::Internal("Logical error, It's a bug"));

0 commit comments

Comments
 (0)