Skip to content

Commit 6d088df

Browse files
author
Brent Gardner
committed
Add roundtrip test
1 parent 26c89c7 commit 6d088df

File tree

4 files changed

+103
-53
lines changed

4 files changed

+103
-53
lines changed

datafusion/core/src/datasource/custom.rs

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
//! A TableProvider that wraps tables registered through TableProviderFactories so they can be reified during serde
19+
1820
use crate::datasource::TableProvider;
1921
use crate::execution::context::SessionState;
2022
use crate::physical_plan::ExecutionPlan;

datafusion/core/src/test_util.rs

+73-1
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,20 @@
1717

1818
//! Utility functions to make testing DataFusion based crates easier
1919
20+
use std::any::Any;
2021
use std::collections::BTreeMap;
2122
use std::{env, error::Error, path::PathBuf, sync::Arc};
2223

23-
use crate::datasource::{empty::EmptyTable, provider_as_source};
24+
use crate::datasource::custom::CustomTable;
25+
use crate::datasource::datasource::TableProviderFactory;
26+
use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider};
27+
use crate::execution::context::SessionState;
2428
use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
29+
use crate::physical_plan::ExecutionPlan;
2530
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
31+
use async_trait::async_trait;
2632
use datafusion_common::DataFusionError;
33+
use datafusion_expr::{Expr, TableType};
2734

2835
/// Compares formatted output of a record batch with an expected
2936
/// vector of strings, with the result of pretty formatting record
@@ -317,6 +324,71 @@ pub fn aggr_test_schema_with_missing_col() -> SchemaRef {
317324
Arc::new(schema)
318325
}
319326

327+
/// TableFactory for tests
328+
pub struct TestTableFactory {}
329+
330+
#[async_trait]
331+
impl TableProviderFactory for TestTableFactory {
332+
async fn create(
333+
&self,
334+
_name: &str,
335+
url: &str,
336+
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
337+
Ok(Arc::new(CustomTable::new(
338+
"deltatable",
339+
url,
340+
Arc::new(TestTableProvider {}),
341+
)))
342+
}
343+
344+
fn with_schema(
345+
&self,
346+
_schema: SchemaRef,
347+
table_type: &str,
348+
url: &str,
349+
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
350+
Ok(Arc::new(CustomTable::new(
351+
table_type,
352+
url,
353+
Arc::new(TestTableProvider {}),
354+
)))
355+
}
356+
}
357+
358+
/// TableProvider for testing purposes
359+
pub struct TestTableProvider {}
360+
361+
impl TestTableProvider {}
362+
363+
#[async_trait]
364+
impl TableProvider for TestTableProvider {
365+
fn as_any(&self) -> &dyn Any {
366+
unimplemented!("TestTableProvider is a stub for testing.")
367+
}
368+
369+
fn schema(&self) -> SchemaRef {
370+
let schema = Schema::new(vec![
371+
Field::new("a", DataType::Int64, true),
372+
Field::new("b", DataType::Decimal128(15, 2), true),
373+
]);
374+
Arc::new(schema)
375+
}
376+
377+
fn table_type(&self) -> TableType {
378+
unimplemented!("TestTableProvider is a stub for testing.")
379+
}
380+
381+
async fn scan(
382+
&self,
383+
_ctx: &SessionState,
384+
_projection: &Option<Vec<usize>>,
385+
_filters: &[Expr],
386+
_limit: Option<usize>,
387+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
388+
unimplemented!("TestTableProvider is a stub for testing.")
389+
}
390+
}
391+
320392
#[cfg(test)]
321393
mod tests {
322394
use super::*;

datafusion/core/tests/sql/create_drop.rs

+1-51
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use async_trait::async_trait;
19-
use std::any::Any;
2018
use std::collections::HashMap;
2119
use std::io::Write;
2220

2321
use datafusion::datasource::datasource::TableProviderFactory;
24-
use datafusion::execution::context::SessionState;
2522
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
26-
use datafusion_expr::TableType;
23+
use datafusion::test_util::TestTableFactory;
2724
use tempfile::TempDir;
2825

2926
use super::*;
@@ -369,53 +366,6 @@ async fn create_pipe_delimited_csv_table() -> Result<()> {
369366
Ok(())
370367
}
371368

372-
struct TestTableProvider {}
373-
374-
impl TestTableProvider {}
375-
376-
#[async_trait]
377-
impl TableProvider for TestTableProvider {
378-
fn as_any(&self) -> &dyn Any {
379-
unimplemented!("TestTableProvider is a stub for testing.")
380-
}
381-
382-
fn schema(&self) -> SchemaRef {
383-
unimplemented!("TestTableProvider is a stub for testing.")
384-
}
385-
386-
fn table_type(&self) -> TableType {
387-
unimplemented!("TestTableProvider is a stub for testing.")
388-
}
389-
390-
async fn scan(
391-
&self,
392-
_ctx: &SessionState,
393-
_projection: &Option<Vec<usize>>,
394-
_filters: &[Expr],
395-
_limit: Option<usize>,
396-
) -> Result<Arc<dyn ExecutionPlan>> {
397-
unimplemented!("TestTableProvider is a stub for testing.")
398-
}
399-
}
400-
401-
struct TestTableFactory {}
402-
403-
#[async_trait]
404-
impl TableProviderFactory for TestTableFactory {
405-
async fn create(&self, _name: &str, _url: &str) -> Result<Arc<dyn TableProvider>> {
406-
Ok(Arc::new(TestTableProvider {}))
407-
}
408-
409-
fn with_schema(
410-
&self,
411-
_schema: SchemaRef,
412-
_table_type: &str,
413-
_url: &str,
414-
) -> Result<Arc<dyn TableProvider>> {
415-
Ok(Arc::new(TestTableProvider {}))
416-
}
417-
}
418-
419369
#[tokio::test]
420370
async fn create_custom_table() -> Result<()> {
421371
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =

datafusion/proto/src/lib.rs

+27-1
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,13 @@ mod roundtrip_tests {
5959
TimeUnit, UnionMode,
6060
},
6161
};
62+
use datafusion::datasource::datasource::TableProviderFactory;
63+
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
6264
use datafusion::physical_plan::functions::make_scalar_function;
63-
use datafusion::prelude::{create_udf, CsvReadOptions, SessionContext};
65+
use datafusion::prelude::{
66+
create_udf, CsvReadOptions, SessionConfig, SessionContext,
67+
};
68+
use datafusion::test_util::TestTableFactory;
6469
use datafusion_common::{DFSchemaRef, DataFusionError, ScalarValue};
6570
use datafusion_expr::create_udaf;
6671
use datafusion_expr::expr::{Between, BinaryExpr, Case, GroupingSet, Like};
@@ -71,6 +76,7 @@ mod roundtrip_tests {
7176
};
7277
use prost::Message;
7378
use std::any::Any;
79+
use std::collections::HashMap;
7480
use std::fmt;
7581
use std::fmt::Debug;
7682
use std::fmt::Formatter;
@@ -129,6 +135,26 @@ mod roundtrip_tests {
129135
Ok(())
130136
}
131137

138+
#[tokio::test]
139+
async fn roundtrip_custom_tables() -> Result<(), DataFusionError> {
140+
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
141+
HashMap::new();
142+
table_factories.insert("deltatable".to_string(), Arc::new(TestTableFactory {}));
143+
let cfg = RuntimeConfig::new().with_table_factories(table_factories);
144+
let env = RuntimeEnv::new(cfg).unwrap();
145+
let ses = SessionConfig::new();
146+
let ctx = SessionContext::with_config_rt(ses, Arc::new(env));
147+
148+
let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 's3://bucket/schema/table';";
149+
ctx.sql(sql).await.unwrap();
150+
151+
let scan = ctx.table("dt")?.to_logical_plan()?;
152+
let bytes = logical_plan_to_bytes(&scan)?;
153+
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
154+
assert_eq!(format!("{:?}", scan), format!("{:?}", logical_round_trip));
155+
Ok(())
156+
}
157+
132158
#[tokio::test]
133159
async fn roundtrip_logical_plan_aggregation() -> Result<(), DataFusionError> {
134160
let ctx = SessionContext::new();

0 commit comments

Comments
 (0)