Skip to content

Commit 0aea0df

Browse files
authored
Implement fmt_as for ShuffleReaderExec (apache#400)
1 parent 8b31714 commit 0aea0df

File tree

1 file changed

+29
-1
lines changed

1 file changed

+29
-1
lines changed

ballista/rust/core/src/execution_plans/shuffle_reader.rs

+29-1
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,13 @@ use crate::serde::scheduler::PartitionLocation;
2424

2525
use arrow::datatypes::SchemaRef;
2626
use async_trait::async_trait;
27-
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
27+
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
2828
use datafusion::{
2929
error::{DataFusionError, Result},
3030
physical_plan::RecordBatchStream,
3131
};
3232
use log::info;
33+
use std::fmt::Formatter;
3334

3435
/// ShuffleReaderExec reads partitions that have already been materialized by an executor.
3536
#[derive(Debug, Clone)]
@@ -103,4 +104,31 @@ impl ExecutionPlan for ShuffleReaderExec {
103104
.await
104105
.map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))
105106
}
107+
108+
fn fmt_as(
109+
&self,
110+
t: DisplayFormatType,
111+
f: &mut std::fmt::Formatter,
112+
) -> std::fmt::Result {
113+
match t {
114+
DisplayFormatType::Default => {
115+
let loc_str = self
116+
.partition_location
117+
.iter()
118+
.map(|l| {
119+
format!(
120+
"[executor={} part={}:{}:{} stats={:?}]",
121+
l.executor_meta.id,
122+
l.partition_id.job_id,
123+
l.partition_id.stage_id,
124+
l.partition_id.partition_id,
125+
l.partition_stats
126+
)
127+
})
128+
.collect::<Vec<String>>()
129+
.join(",");
130+
write!(f, "ShuffleReaderExec: partition_locations={}", loc_str)
131+
}
132+
}
133+
}
106134
}

0 commit comments

Comments
 (0)