Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 35 additions & 34 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ use crate::{
object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
};
use futures::StreamExt;
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;

use datafusion::common::instant::Instant;
use datafusion::common::{plan_datafusion_err, plan_err};
use datafusion::config::ConfigFileType;
Expand All @@ -41,13 +35,15 @@ use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::execution::memory_pool::MemoryConsumer;
use datafusion::physical_plan::spill::get_record_batch_memory_size;
use datafusion::sql::sqlparser;
use datafusion::sql::sqlparser::dialect::dialect_from_str;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
use std::sync::Arc;
use tokio::signal;

/// run and execute SQL statements and commands, against a context with the given print options
Expand Down Expand Up @@ -230,18 +226,17 @@ pub(super) async fn exec_and_print(
for statement in statements {
let adjusted =
AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);

let plan = create_plan(ctx, statement).await?;
let adjusted = adjusted.with_plan(&plan);

let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

// Track memory usage for the query result if it's bounded
let mut reservation =
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());
let is_unbounded = physical_plan.boundedness().is_unbounded();
let mut stream = execute_stream(Arc::clone(&physical_plan), task_ctx.clone())?;

if physical_plan.boundedness().is_unbounded() {
// Both bounded and unbounded streams are streaming prints
if is_unbounded {
if physical_plan.pipeline_behavior() == EmissionType::Final {
return plan_err!(
"The given query can generate a valid result only once \
Expand All @@ -250,37 +245,43 @@ pub(super) async fn exec_and_print(
}
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but I don't understand this comment about Memory safety

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also confusing to me, maybe we can remove the message. It should be memory safe, we only keep one batch size.

let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
print_options
.print_stream(MaxRows::Unlimited, stream, now)
.await?;
} else {
// Bounded stream; collected results size is limited by the maxrows option
let schema = physical_plan.schema();
let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
let mut results = vec![];
let mut row_count = 0_usize;
let max_rows = match print_options.maxrows {
MaxRows::Unlimited => usize::MAX,
MaxRows::Limited(n) => n,
};
while let Some(batch) = stream.next().await {
let batch = batch?;
let curr_num_rows = batch.num_rows();
// Stop collecting results if the number of rows exceeds the limit
// results batch should include the last batch that exceeds the limit
if row_count < max_rows + curr_num_rows {
// Try to grow the reservation to accommodate the batch in memory
reservation.try_grow(get_record_batch_memory_size(&batch))?;
results.push(batch);
}
row_count += curr_num_rows;
let stdout = std::io::stdout();
let mut writer = stdout.lock();

// If we don't want to print the table, we should use the streaming print same as above
if print_options.format != PrintFormat::Table
&& print_options.format != PrintFormat::Automatic
{
print_options
.print_stream(print_options.maxrows, stream, now)
.await?;
continue;
}

// into_inner will finalize the print options to table if it's automatic
adjusted
.into_inner()
.print_batches(schema, &results, now, row_count)?;
reservation.free();
.print_table_batch(
print_options,
schema,
&mut stream,
max_rows,
&mut writer,
now,
)
.await?;
}
}

Ok(())
}

Expand Down
Loading