Skip to content

Commit f70741b

Browse files
committed
Update test
1 parent 34db3aa commit f70741b

File tree

2 files changed

+182
-257
lines changed

2 files changed

+182
-257
lines changed

datafusion/core/src/physical_plan/memory.rs

Lines changed: 0 additions & 257 deletions
Original file line numberDiff line numberDiff line change
@@ -368,261 +368,4 @@ mod tests {
368368
Ok(())
369369
}
370370

371-
#[tokio::test]
372-
async fn test_insert_into() -> Result<()> {
373-
// Create session context
374-
let config = SessionConfig::new().with_target_partitions(8);
375-
let ctx = SessionContext::with_config(config);
376-
let testdata = test_util::arrow_test_data();
377-
let schema = test_util::aggr_test_schema();
378-
ctx.register_csv(
379-
"aggregate_test_100",
380-
&format!("{testdata}/csv/aggregate_test_100.csv"),
381-
CsvReadOptions::new().schema(&schema),
382-
)
383-
.await?;
384-
ctx.sql(
385-
"CREATE TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)",
386-
)
387-
.await?;
388-
389-
let sql = "INSERT INTO table_without_values SELECT
390-
SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
391-
COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
392-
FROM aggregate_test_100
393-
ORDER by c1
394-
";
395-
let msg = format!("Creating logical plan for '{sql}'");
396-
let dataframe = ctx.sql(sql).await.expect(&msg);
397-
let physical_plan = dataframe.create_physical_plan().await?;
398-
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
399-
let expected = {
400-
vec![
401-
"MemoryWriteExec: partitions=1, input_partition=1",
402-
" ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]",
403-
" SortPreservingMergeExec: [c1@2 ASC NULLS LAST]",
404-
" ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(UInt8(1)), c1@0 as c1]",
405-
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]",
406-
" SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]",
407-
" CoalesceBatchesExec: target_batch_size=8192",
408-
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8",
409-
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
410-
]
411-
};
412-
413-
let actual: Vec<&str> = formatted.trim().lines().collect();
414-
let actual_len = actual.len();
415-
let actual_trim_last = &actual[..actual_len - 1];
416-
assert_eq!(
417-
expected, actual_trim_last,
418-
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
419-
);
420-
Ok(())
421-
}
422-
423-
#[tokio::test]
424-
async fn test_insert_into_as_select_multi_partitioned() -> Result<()> {
425-
// Create session context
426-
let config = SessionConfig::new().with_target_partitions(8);
427-
let ctx = SessionContext::with_config(config);
428-
let testdata = test_util::arrow_test_data();
429-
let schema = test_util::aggr_test_schema();
430-
ctx.register_csv(
431-
"aggregate_test_100",
432-
&format!("{testdata}/csv/aggregate_test_100.csv"),
433-
CsvReadOptions::new().schema(&schema),
434-
)
435-
.await?;
436-
ctx.sql(
437-
"CREATE TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)",
438-
)
439-
.await?;
440-
441-
let sql = "INSERT INTO table_without_values SELECT
442-
SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
443-
COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
444-
FROM aggregate_test_100";
445-
let msg = format!("Creating logical plan for '{sql}'");
446-
let dataframe = ctx.sql(sql).await.expect(&msg);
447-
let physical_plan = dataframe.create_physical_plan().await?;
448-
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
449-
let expected = {
450-
vec![
451-
"MemoryWriteExec: partitions=1, input_partition=1",
452-
" CoalescePartitionsExec",
453-
" ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]",
454-
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]",
455-
" SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]",
456-
" CoalesceBatchesExec: target_batch_size=8192",
457-
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8",
458-
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
459-
]
460-
};
461-
462-
let actual: Vec<&str> = formatted.trim().lines().collect();
463-
let actual_len = actual.len();
464-
let actual_trim_last = &actual[..actual_len - 1];
465-
assert_eq!(
466-
expected, actual_trim_last,
467-
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
468-
);
469-
Ok(())
470-
}
471-
472-
// TODO: The generated plan is suboptimal since SortExec is in global state.
473-
#[tokio::test]
474-
async fn test_insert_into_as_select_single_partition() -> Result<()> {
475-
// Create session context
476-
let config = SessionConfig::new().with_target_partitions(8);
477-
let ctx = SessionContext::with_config(config);
478-
let testdata = test_util::arrow_test_data();
479-
let schema = test_util::aggr_test_schema();
480-
ctx.register_csv(
481-
"aggregate_test_100",
482-
&format!("{testdata}/csv/aggregate_test_100.csv"),
483-
CsvReadOptions::new().schema(&schema),
484-
)
485-
.await?;
486-
ctx.sql("CREATE TABLE table_without_values AS SELECT
487-
SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
488-
COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
489-
FROM aggregate_test_100")
490-
.await?;
491-
492-
let sql = "INSERT INTO table_without_values SELECT
493-
SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
494-
COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
495-
FROM aggregate_test_100
496-
ORDER BY c1";
497-
let msg = format!("Creating logical plan for '{sql}'");
498-
let dataframe = ctx.sql(sql).await.expect(&msg);
499-
let physical_plan = dataframe.create_physical_plan().await?;
500-
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
501-
let expected = {
502-
vec![
503-
"MemoryWriteExec: partitions=8, input_partition=8",
504-
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
505-
" ProjectionExec: expr=[a1@0 as a1, a2@1 as a2]",
506-
" SortPreservingMergeExec: [c1@2 ASC NULLS LAST]",
507-
" ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1]",
508-
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]",
509-
" SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]",
510-
" CoalesceBatchesExec: target_batch_size=8192",
511-
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8",
512-
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
513-
]
514-
};
515-
516-
let actual: Vec<&str> = formatted.trim().lines().collect();
517-
let actual_len = actual.len();
518-
let actual_trim_last = &actual[..actual_len - 1];
519-
assert_eq!(
520-
expected, actual_trim_last,
521-
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
522-
);
523-
Ok(())
524-
}
525-
526-
// DummyPartition is a simple implementation of the PartitionStream trait.
527-
// It produces a stream of record batches with a fixed schema and the same content.
528-
struct DummyPartition {
529-
schema: SchemaRef,
530-
batch: RecordBatch,
531-
num_batches: usize,
532-
}
533-
534-
impl PartitionStream for DummyPartition {
535-
// Return a reference to the schema of this partition.
536-
fn schema(&self) -> &SchemaRef {
537-
&self.schema
538-
}
539-
540-
// Execute the partition stream, producing a stream of record batches.
541-
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
542-
let batches = itertools::repeat_n(self.batch.clone(), self.num_batches);
543-
Box::pin(RecordBatchStreamAdapter::new(
544-
self.schema.clone(),
545-
futures::stream::iter(batches).map(Ok),
546-
))
547-
}
548-
}
549-
550-
// Test the less-lock mode by inserting a large number of batches into a table.
551-
#[tokio::test]
552-
async fn test_one_to_one_mode() -> Result<()> {
553-
let num_batches = 10000;
554-
// Create a new session context
555-
let session_ctx = SessionContext::new();
556-
// Create a new schema with one field called "a" of type Int32
557-
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
558-
559-
// Create a new batch of data to insert into the table
560-
let batch = RecordBatch::try_new(
561-
schema.clone(),
562-
vec![Arc::new(Int32Array::from_slice([1, 2, 3]))],
563-
)?;
564-
let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
565-
566-
let single_partition = Arc::new(DummyPartition {
567-
schema: schema.clone(),
568-
batch,
569-
num_batches,
570-
});
571-
let input = Arc::new(StreamingTableExec::try_new(
572-
schema.clone(),
573-
vec![single_partition],
574-
None,
575-
false,
576-
)?);
577-
let plan = initial_table
578-
.insert_into(&session_ctx.state(), input)
579-
.await?;
580-
let res = collect(plan, session_ctx.task_ctx()).await?;
581-
assert!(res.is_empty());
582-
// Ensure that the table now contains two batches of data in the same partition
583-
assert_eq!(initial_table.batches[0].read().await.len(), num_batches);
584-
Ok(())
585-
}
586-
587-
// Test the locked mode by inserting a large number of batches into a table. It tests
588-
// where the table partition count is not equal to the input's output partition count.
589-
#[tokio::test]
590-
async fn test_locked_mode() -> Result<()> {
591-
let num_batches = 10000;
592-
// Create a new session context
593-
let session_ctx = SessionContext::new();
594-
// Create a new schema with one field called "a" of type Int32
595-
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
596-
597-
// Create a new batch of data to insert into the table
598-
let batch = RecordBatch::try_new(
599-
schema.clone(),
600-
vec![Arc::new(Int32Array::from_slice([1, 2, 3]))],
601-
)?;
602-
let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
603-
604-
let single_partition = Arc::new(DummyPartition {
605-
schema: schema.clone(),
606-
batch,
607-
num_batches,
608-
});
609-
let input = Arc::new(StreamingTableExec::try_new(
610-
schema.clone(),
611-
vec![
612-
single_partition.clone(),
613-
single_partition.clone(),
614-
single_partition,
615-
],
616-
None,
617-
false,
618-
)?);
619-
let plan = initial_table
620-
.insert_into(&session_ctx.state(), input)
621-
.await?;
622-
let res = collect(plan, session_ctx.task_ctx()).await?;
623-
assert!(res.is_empty());
624-
// Ensure that the table now contains two batches of data in the same partition
625-
assert_eq!(initial_table.batches[0].read().await.len(), num_batches * 3);
626-
Ok(())
627-
}
628371
}

0 commit comments

Comments
 (0)