Skip to content

Commit 925c01b

Browse files
committed
Update tests
1 parent 2b0ed37 commit 925c01b

File tree

2 files changed

+35
-134
lines changed

2 files changed

+35
-134
lines changed

datafusion/core/src/datasource/memory.rs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,8 @@ mod tests {
228228
use crate::from_slice::FromSlice;
229229
use crate::physical_plan::collect;
230230
use crate::prelude::SessionContext;
231-
use arrow::array::Int32Array;
232-
use arrow::datatypes::{DataType, Field, Schema};
231+
use arrow::array::{AsArray, Int32Array};
232+
use arrow::datatypes::{DataType, Field, Schema, UInt64Type};
233233
use arrow::error::ArrowError;
234234
use datafusion_expr::LogicalPlanBuilder;
235235
use futures::StreamExt;
@@ -467,6 +467,11 @@ mod tests {
467467
initial_data: Vec<Vec<RecordBatch>>,
468468
inserted_data: Vec<Vec<RecordBatch>>,
469469
) -> Result<Vec<Vec<RecordBatch>>> {
470+
let expected_count: u64 = inserted_data
471+
.iter()
472+
.flat_map(|batches| batches.iter().map(|batch| batch.num_rows() as u64))
473+
.sum();
474+
470475
// Create a new session context
471476
let session_ctx = SessionContext::new();
472477
// Create and register the initial table with the provided schema and data
@@ -490,8 +495,8 @@ mod tests {
490495

491496
// Execute the physical plan and collect the results
492497
let res = collect(plan, session_ctx.task_ctx()).await?;
493-
// Ensure the result is empty after the insert operation
494-
assert!(res.is_empty());
498+
assert_eq!(extract_count(res), expected_count);
499+
495500
// Read the data from the initial table and store it in a vector of partitions
496501
let mut partitions = vec![];
497502
for partition in initial_table.batches.iter() {
@@ -501,6 +506,32 @@ mod tests {
501506
Ok(partitions)
502507
}
503508

509+
/// Returns the value of results:
510+
///
511+
/// "+-------+",
512+
/// "| count |",
513+
/// "+-------+",
514+
/// "| 6 |",
515+
/// "+-------+",
516+
fn extract_count(res: Vec<RecordBatch>) -> u64 {
517+
assert_eq!(res.len(), 1, "expected one batch, got {}", res.len());
518+
let batch = &res[0];
519+
assert_eq!(
520+
batch.num_columns(),
521+
1,
522+
"expected 1 column, got {}",
523+
batch.num_columns()
524+
);
525+
let col = batch.column(0).as_primitive::<UInt64Type>();
526+
assert_eq!(col.len(), 1, "expected 1 row, got {}", col.len());
527+
let val = col
528+
.iter()
529+
.next()
530+
.expect("had value")
531+
.expect("expected non null");
532+
val
533+
}
534+
504535
// Test inserting a single batch of data into a single partition
505536
#[tokio::test]
506537
async fn test_insert_into_single_partition() -> Result<()> {

datafusion/core/src/physical_plan/memory.rs

Lines changed: 0 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -238,133 +238,3 @@ impl RecordBatchStream for MemoryStream {
238238
self.schema.clone()
239239
}
240240
}
241-
242-
#[cfg(test)]
243-
mod tests {
244-
use super::*;
245-
use crate::datasource::streaming::PartitionStream;
246-
use crate::datasource::{MemTable, TableProvider};
247-
use crate::from_slice::FromSlice;
248-
use crate::physical_plan::stream::RecordBatchStreamAdapter;
249-
use crate::physical_plan::streaming::StreamingTableExec;
250-
use crate::physical_plan::ColumnStatistics;
251-
use crate::physical_plan::{collect, displayable, SendableRecordBatchStream};
252-
use crate::prelude::{CsvReadOptions, SessionContext};
253-
use crate::test_util;
254-
use arrow::array::Int32Array;
255-
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
256-
use arrow::record_batch::RecordBatch;
257-
use datafusion_common::Result;
258-
use datafusion_execution::config::SessionConfig;
259-
use datafusion_execution::TaskContext;
260-
use futures::StreamExt;
261-
use std::sync::Arc;
262-
263-
fn mock_data() -> Result<(SchemaRef, RecordBatch)> {
264-
let schema = Arc::new(Schema::new(vec![
265-
Field::new("a", DataType::Int32, false),
266-
Field::new("b", DataType::Int32, false),
267-
Field::new("c", DataType::Int32, true),
268-
Field::new("d", DataType::Int32, false),
269-
]));
270-
271-
let batch = RecordBatch::try_new(
272-
schema.clone(),
273-
vec![
274-
Arc::new(Int32Array::from_slice([1, 2, 3])),
275-
Arc::new(Int32Array::from_slice([4, 5, 6])),
276-
Arc::new(Int32Array::from(vec![None, None, Some(9)])),
277-
Arc::new(Int32Array::from_slice([7, 8, 9])),
278-
],
279-
)?;
280-
281-
Ok((schema, batch))
282-
}
283-
284-
#[tokio::test]
285-
async fn test_with_projection() -> Result<()> {
286-
let session_ctx = SessionContext::new();
287-
let task_ctx = session_ctx.task_ctx();
288-
let (schema, batch) = mock_data()?;
289-
290-
let executor = MemoryExec::try_new(&[vec![batch]], schema, Some(vec![2, 1]))?;
291-
let statistics = executor.statistics();
292-
293-
assert_eq!(statistics.num_rows, Some(3));
294-
assert_eq!(
295-
statistics.column_statistics,
296-
Some(vec![
297-
ColumnStatistics {
298-
null_count: Some(2),
299-
max_value: None,
300-
min_value: None,
301-
distinct_count: None,
302-
},
303-
ColumnStatistics {
304-
null_count: Some(0),
305-
max_value: None,
306-
min_value: None,
307-
distinct_count: None,
308-
},
309-
])
310-
);
311-
312-
// scan with projection
313-
let mut it = executor.execute(0, task_ctx)?;
314-
let batch2 = it.next().await.unwrap()?;
315-
assert_eq!(2, batch2.schema().fields().len());
316-
assert_eq!("c", batch2.schema().field(0).name());
317-
assert_eq!("b", batch2.schema().field(1).name());
318-
assert_eq!(2, batch2.num_columns());
319-
320-
Ok(())
321-
}
322-
323-
#[tokio::test]
324-
async fn test_without_projection() -> Result<()> {
325-
let session_ctx = SessionContext::new();
326-
let task_ctx = session_ctx.task_ctx();
327-
let (schema, batch) = mock_data()?;
328-
329-
let executor = MemoryExec::try_new(&[vec![batch]], schema, None)?;
330-
let statistics = executor.statistics();
331-
332-
assert_eq!(statistics.num_rows, Some(3));
333-
assert_eq!(
334-
statistics.column_statistics,
335-
Some(vec![
336-
ColumnStatistics {
337-
null_count: Some(0),
338-
max_value: None,
339-
min_value: None,
340-
distinct_count: None,
341-
},
342-
ColumnStatistics {
343-
null_count: Some(0),
344-
max_value: None,
345-
min_value: None,
346-
distinct_count: None,
347-
},
348-
ColumnStatistics {
349-
null_count: Some(2),
350-
max_value: None,
351-
min_value: None,
352-
distinct_count: None,
353-
},
354-
ColumnStatistics {
355-
null_count: Some(0),
356-
max_value: None,
357-
min_value: None,
358-
distinct_count: None,
359-
},
360-
])
361-
);
362-
363-
let mut it = executor.execute(0, task_ctx)?;
364-
let batch1 = it.next().await.unwrap()?;
365-
assert_eq!(4, batch1.schema().fields().len());
366-
assert_eq!(4, batch1.num_columns());
367-
368-
Ok(())
369-
}
370-
}

0 commit comments

Comments
 (0)