Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 84 additions & 24 deletions native/core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,82 @@
// specific language governing permissions and limitations
// under the License.

use arrow::ipc::writer::StreamWriter;
use arrow_array::builder::Int32Builder;
use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use comet::execution::shuffle::ShuffleWriterExec;
use comet::execution::shuffle::{calculate_partition_ids, write_ipc_compressed, ShuffleWriterExec};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::physical_plan::metrics::Time;
use datafusion::{
physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan},
prelude::SessionContext,
};
use datafusion_physical_expr::{expressions::Column, Partitioning};
use std::io::Cursor;
use std::sync::Arc;
use tokio::runtime::Runtime;

fn criterion_benchmark(c: &mut Criterion) {
let batch = create_batch();
let mut batches = Vec::new();
for _ in 0..10 {
batches.push(batch.clone());
}
let partitions = &[batches];
let exec = ShuffleWriterExec::try_new(
Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
)
.unwrap();

let mut group = c.benchmark_group("shuffle_writer");
group.bench_function("shuffle_writer", |b| {
group.bench_function("shuffle_writer: calculate partition ids", |b| {
let batch = create_batch(8192, true);
let arrays = batch.columns().to_vec();
let mut hashes_buf = vec![0; batch.num_rows()];
let mut partition_ids = vec![0; batch.num_rows()];
b.iter(|| {
calculate_partition_ids(&arrays, 200, &mut hashes_buf, &mut partition_ids).unwrap();
});
});
group.bench_function("shuffle_writer: encode with new writer each time", |b| {
let batches = create_batches();
let schema = batches[0].schema();
let mut output = vec![];
b.iter(|| {
output.clear();
for batch in &batches {
let cursor = Cursor::new(&mut output);
let mut arrow_writer =
StreamWriter::try_new(zstd::Encoder::new(cursor, 1).unwrap(), &schema).unwrap();
arrow_writer.write(batch).unwrap();
arrow_writer.finish().unwrap();
}
});
});
group.bench_function("shuffle_writer: encode with single writer", |b| {
let batches = create_batches();
let schema = batches[0].schema();
let mut output = vec![];
b.iter(|| {
output.clear();
let cursor = Cursor::new(&mut output);
let mut arrow_writer =
StreamWriter::try_new(zstd::Encoder::new(cursor, 1).unwrap(), &schema).unwrap();
for batch in &batches {
arrow_writer.write(batch).unwrap();
}
arrow_writer.finish().unwrap();
});
});
group.bench_function("shuffle_writer: encode and compress", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let mut cursor = Cursor::new(&mut buffer);
let ipc_time = Time::default();
b.iter(|| write_ipc_compressed(&batch, &mut cursor, &ipc_time));
});
group.bench_function("shuffle_writer: end to end", |b| {
let ctx = SessionContext::new();
let batches = create_batches();
let schema = batches[0].schema();
let partitions = &[batches];
let exec = ShuffleWriterExec::try_new(
Arc::new(MemoryExec::try_new(partitions, schema, None).unwrap()),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
)
.unwrap();
b.iter(|| {
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx).unwrap();
Expand All @@ -54,19 +100,33 @@ fn criterion_benchmark(c: &mut Criterion) {
});
}

fn create_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
fn create_batches() -> Vec<RecordBatch> {
let batch = create_batch(8192, true);
let mut batches = Vec::new();
for _ in 0..10 {
batches.push(batch.clone());
}
batches
}

fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("c0", DataType::Int32, true),
Field::new("c1", DataType::Utf8, true),
]));
let mut a = Int32Builder::new();
let mut b = StringBuilder::new();
for i in 0..8192 {
if i % 10 == 0 {
for i in 0..num_rows {
a.append_value(i as i32);
if allow_nulls && i % 10 == 0 {
b.append_null();
} else {
b.append_value(format!("{i}"));
b.append_value(format!("this is string number {i}"));
}
}
let array = b.finish();

RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap()
let a = a.finish();
let b = b.finish();
RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap()
}

fn config() -> Criterion {
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ mod list;
mod map;
pub mod row;
mod shuffle_writer;
pub use shuffle_writer::ShuffleWriterExec;
pub use shuffle_writer::{calculate_partition_ids, write_ipc_compressed, ShuffleWriterExec};
45 changes: 32 additions & 13 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,23 +773,23 @@ impl ShuffleRepartitioner {
Partitioning::Hash(exprs, _) => {
let (partition_starts, shuffled_partition_ids): (Vec<usize>, Vec<usize>) = {
let mut timer = self.metrics.repart_time.timer();

// evaluate partition expressions
let arrays = exprs
.iter()
.map(|expr| expr.evaluate(&input)?.into_array(input.num_rows()))
.collect::<Result<Vec<_>>>()?;

// use identical seed as spark hash partition
let hashes_buf = &mut self.hashes_buf[..arrays[0].len()];
hashes_buf.fill(42_u32);

// Hash arrays and compute buckets based on number of partitions
let partition_ids = &mut self.partition_ids[..arrays[0].len()];
create_murmur3_hashes(&arrays, hashes_buf)?
.iter()
.enumerate()
.for_each(|(idx, hash)| {
partition_ids[idx] = pmod(*hash, num_output_partitions) as u64
});
// calculate partition ids
let num_rows = input.num_rows();
let hashes_buf = &mut self.hashes_buf[..num_rows];
let partition_ids = &mut self.partition_ids[..num_rows];
calculate_partition_ids(
&arrays,
num_output_partitions,
hashes_buf,
partition_ids,
)?;

// count each partition size
let mut partition_counters = vec![0usize; num_output_partitions];
Expand Down Expand Up @@ -1085,6 +1085,25 @@ impl ShuffleRepartitioner {
}
}

/// Calculate the partition ID for each row in a batch
#[inline]
pub fn calculate_partition_ids(
arrays: &[ArrayRef],
num_output_partitions: usize,
hashes_buf: &mut [u32],
partition_ids: &mut [u64],
) -> Result<(), DataFusionError> {
// use identical seed as spark hash partition
hashes_buf.fill(42_u32);

// Hash arrays and compute buckets based on number of partitions
create_murmur3_hashes(arrays, hashes_buf)?
.iter()
.enumerate()
.for_each(|(idx, hash)| partition_ids[idx] = pmod(*hash, num_output_partitions) as u64);
Ok(())
}

/// consume the `buffered_partitions` and do spill into a single temp shuffle output file
fn spill_into(
buffered_partitions: &mut [PartitionBuffer],
Expand Down Expand Up @@ -1528,7 +1547,7 @@ impl Checksum {

/// Writes given record batch as Arrow IPC bytes into given writer.
/// Returns number of bytes written.
pub(crate) fn write_ipc_compressed<W: Write + Seek>(
pub fn write_ipc_compressed<W: Write + Seek>(
batch: &RecordBatch,
output: &mut W,
ipc_time: &Time,
Expand Down
Loading