Skip to content

Fix Ballista executing during plan #2428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 101 additions & 94 deletions ballista/rust/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ use crate::serde::protobuf::{
ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair,
PartitionLocation,
};
use crate::utils::WrappedStream;

use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
Expand All @@ -43,12 +42,14 @@ use datafusion::physical_plan::{
use crate::serde::protobuf::execute_query_params::OptionalSessionId;
use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec};
use async_trait::async_trait;
use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::execution::context::TaskContext;
use futures::future;
use futures::StreamExt;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use log::{error, info};

/// This operator sends a logial plan to a Ballista scheduler for execution and
/// This operator sends a logical plan to a Ballista scheduler for execution and
/// polls the scheduler until the query is complete and then fetches the resulting
/// batches directly from the executors that hold the results from the final
/// query stage.
Expand Down Expand Up @@ -168,15 +169,6 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
) -> Result<SendableRecordBatchStream> {
assert_eq!(0, partition);

info!("Connecting to Ballista scheduler at {}", self.scheduler_url);
// TODO reuse the scheduler to avoid connecting to the Ballista scheduler again and again

let mut scheduler = SchedulerGrpcClient::connect(self.scheduler_url.clone())
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

let schema: Schema = self.plan.schema().as_ref().clone().into();

let mut buf: Vec<u8> = vec![];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend reviewing this without whitespace

https://github.com/apache/arrow-datafusion/pull/2428/files?w=1

let plan_message =
T::try_from_logical_plan(&self.plan, self.extension_codec.as_ref()).map_err(
Expand All @@ -191,88 +183,30 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
DataFusionError::Execution(format!("failed to encode logical plan: {:?}", e))
})?;

let query_result = scheduler
.execute_query(ExecuteQueryParams {
query: Some(Query::LogicalPlan(buf)),
settings: self
.config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
optional_session_id: Some(OptionalSessionId::SessionId(
self.session_id.clone(),
)),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?
.into_inner();

let response_session_id = query_result.session_id;
assert_eq!(
self.session_id.clone(),
response_session_id,
"Session id inconsistent between Client and Server side in DistributedQueryExec."
);
let query = ExecuteQueryParams {
query: Some(Query::LogicalPlan(buf)),
settings: self
.config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
optional_session_id: Some(OptionalSessionId::SessionId(
self.session_id.clone(),
)),
};

let job_id = query_result.job_id;
let mut prev_status: Option<job_status::Status> = None;
let stream = futures::stream::once(
execute_query(self.scheduler_url.clone(), self.session_id.clone(), query)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
.try_flatten();

loop {
let GetJobStatusResult { status } = scheduler
.get_job_status(GetJobStatusParams {
job_id: job_id.clone(),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?
.into_inner();
let status = status.and_then(|s| s.status).ok_or_else(|| {
DataFusionError::Internal("Received empty status message".to_owned())
})?;
let wait_future = tokio::time::sleep(Duration::from_millis(100));
let has_status_change = prev_status.map(|x| x != status).unwrap_or(true);
match status {
job_status::Status::Queued(_) => {
if has_status_change {
info!("Job {} still queued...", job_id);
}
wait_future.await;
prev_status = Some(status);
}
job_status::Status::Running(_) => {
if has_status_change {
info!("Job {} is running...", job_id);
}
wait_future.await;
prev_status = Some(status);
}
job_status::Status::Failed(err) => {
let msg = format!("Job {} failed: {}", job_id, err.error);
error!("{}", msg);
break Err(DataFusionError::Execution(msg));
}
job_status::Status::Completed(completed) => {
let result = future::join_all(
completed
.partition_location
.into_iter()
.map(fetch_partition),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

let result = WrappedStream::new(
Box::pin(futures::stream::iter(result).flatten()),
Arc::new(schema),
);
break Ok(Box::pin(result));
}
};
}
let schema = self.schema();
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}

fn fmt_as(
Expand All @@ -299,6 +233,79 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
}
}

async fn execute_query(
scheduler_url: String,
session_id: String,
query: ExecuteQueryParams,
) -> Result<impl Stream<Item = ArrowResult<RecordBatch>> + Send> {
info!("Connecting to Ballista scheduler at {}", scheduler_url);
// TODO reuse the scheduler to avoid connecting to the Ballista scheduler again and again

let mut scheduler = SchedulerGrpcClient::connect(scheduler_url.clone())
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

let query_result = scheduler
.execute_query(query)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?
.into_inner();

assert_eq!(
session_id, query_result.session_id,
"Session id inconsistent between Client and Server side in DistributedQueryExec."
);

let job_id = query_result.job_id;
let mut prev_status: Option<job_status::Status> = None;

loop {
let GetJobStatusResult { status } = scheduler
.get_job_status(GetJobStatusParams {
job_id: job_id.clone(),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?
.into_inner();
let status = status.and_then(|s| s.status).ok_or_else(|| {
DataFusionError::Internal("Received empty status message".to_owned())
})?;
let wait_future = tokio::time::sleep(Duration::from_millis(100));
let has_status_change = prev_status.map(|x| x != status).unwrap_or(true);
match status {
job_status::Status::Queued(_) => {
if has_status_change {
info!("Job {} still queued...", job_id);
}
wait_future.await;
prev_status = Some(status);
}
job_status::Status::Running(_) => {
if has_status_change {
info!("Job {} is running...", job_id);
}
wait_future.await;
prev_status = Some(status);
}
job_status::Status::Failed(err) => {
let msg = format!("Job {} failed: {}", job_id, err.error);
error!("{}", msg);
break Err(DataFusionError::Execution(msg));
}
job_status::Status::Completed(completed) => {
let streams = completed.partition_location.into_iter().map(|p| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change, previously it would buffer up all records from all partitions and then yield them in partition order, now it will stream them potentially interleaving results from different partitions. I'm fairly certain this is fine, but want to draw attention to this

let f = fetch_partition(p)
.map_err(|e| ArrowError::ExternalError(Box::new(e)));

futures::stream::once(f).try_flatten()
});

break Ok(futures::stream::iter(streams).flatten());
}
};
}
}

async fn fetch_partition(
location: PartitionLocation,
) -> Result<SendableRecordBatchStream> {
Expand Down
37 changes: 20 additions & 17 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,28 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::sync::Arc;
use std::{any::Any, pin::Pin};

use crate::client::BallistaClient;
use crate::serde::scheduler::{PartitionLocation, PartitionStats};

use crate::utils::WrappedStream;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;

use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
};
use futures::{future, StreamExt};
use futures::{StreamExt, TryStreamExt};

use datafusion::arrow::error::ArrowError;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use log::info;

/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
Expand Down Expand Up @@ -112,18 +110,23 @@ impl ExecutionPlan for ShuffleReaderExec {

let fetch_time =
MetricBuilder::new(&self.metrics).subset_time("fetch_time", partition);
let timer = fetch_time.timer();

let partition_locations = &self.partition[partition];
let result = future::join_all(partition_locations.iter().map(fetch_partition))
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
timer.done();
let locations = self.partition[partition].clone();
let stream = locations.into_iter().map(move |p| {
let fetch_time = fetch_time.clone();
futures::stream::once(async move {
let timer = fetch_time.timer();
let r = fetch_partition(&p).await;
timer.done();

r.map_err(|e| ArrowError::ExternalError(Box::new(e)))
})
.try_flatten()
});

let result = WrappedStream::new(
Box::pin(futures::stream::iter(result).flatten()),
let result = RecordBatchStreamAdapter::new(
Arc::new(self.schema.as_ref().clone()),
futures::stream::iter(stream).flatten(),
);
Ok(Box::pin(result))
}
Expand Down Expand Up @@ -201,7 +204,7 @@ fn stats_for_partitions(

async fn fetch_partition(
location: &PartitionLocation,
) -> Result<Pin<Box<dyn RecordBatchStream + Send>>> {
) -> Result<SendableRecordBatchStream> {
let metadata = &location.executor_meta;
let partition_id = &location.partition_id;
let mut ballista_client =
Expand Down
Loading