Skip to content

Commit 711aa43

Browse files
author
Brent Gardner
committed
Can compile and run test
Failing on scheduler due to no factories Tests pass Back to "no object store available for delta-rs://home-bgardner-workspace" Switch back to git refs
1 parent 6bcee13 commit 711aa43

File tree

6 files changed

+223
-52
lines changed

6 files changed

+223
-52
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use crate::datasource::TableProvider;
2+
use crate::execution::context::SessionState;
3+
use crate::physical_plan::ExecutionPlan;
4+
use arrow::datatypes::SchemaRef;
5+
use async_trait::async_trait;
6+
use datafusion_expr::{Expr, TableType};
7+
use std::any::Any;
8+
9+
use std::sync::Arc;
10+
11+
/// Exists to pass table_type, path, and options from parsing to serialization
12+
pub struct CustomTable {
13+
/// File/Table type used for factory to create TableProvider
14+
table_type: String,
15+
/// Path used for factory to create TableProvider
16+
path: String,
17+
/// TableProvider that was created
18+
provider: Arc<dyn TableProvider>,
19+
}
20+
21+
impl CustomTable {
22+
/// Creates a new CustomTable
23+
pub fn new(table_type: &str, path: &str, provider: Arc<dyn TableProvider>) -> Self {
24+
Self {
25+
table_type: table_type.to_string(),
26+
path: path.to_string(),
27+
provider,
28+
}
29+
}
30+
31+
/// Returns the table type
32+
pub fn get_table_type(&self) -> String {
33+
self.table_type.clone()
34+
}
35+
36+
/// Returns the table path
37+
pub fn get_path(&self) -> String {
38+
self.path.clone()
39+
}
40+
}
41+
42+
#[async_trait]
43+
impl TableProvider for CustomTable {
44+
fn as_any(&self) -> &dyn Any {
45+
self
46+
}
47+
48+
fn schema(&self) -> SchemaRef {
49+
self.provider.schema()
50+
}
51+
52+
fn table_type(&self) -> TableType {
53+
self.provider.table_type()
54+
}
55+
56+
async fn scan(
57+
&self,
58+
ctx: &SessionState,
59+
projection: &Option<Vec<usize>>,
60+
filters: &[Expr],
61+
limit: Option<usize>,
62+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
63+
self.provider.scan(ctx, projection, filters, limit).await
64+
}
65+
}

datafusion/core/src/datasource/datasource.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,12 @@ pub trait TableProvider: Sync + Send {
8181
pub trait TableProviderFactory: Sync + Send {
8282
/// Create a TableProvider given name and url
8383
async fn create(&self, name: &str, url: &str) -> Result<Arc<dyn TableProvider>>;
84+
85+
/// Create a TableProvider during execution with schema already known from planning
86+
fn with_schema(
87+
&self,
88+
schema: SchemaRef,
89+
table_type: &str,
90+
url: &str,
91+
) -> Result<Arc<dyn TableProvider>>;
8492
}

datafusion/core/src/datasource/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! DataFusion data sources
1919
2020
#![allow(clippy::module_inception)]
21+
pub mod custom;
2122
pub mod datasource;
2223
pub mod default_table_source;
2324
pub mod empty;

datafusion/core/src/execution/context.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,10 +418,11 @@ impl SessionContext {
418418
cmd: &CreateExternalTable,
419419
) -> Result<Arc<DataFrame>> {
420420
let state = self.state.read().clone();
421+
let file_type = cmd.file_type.to_lowercase();
421422
let factory = &state
422423
.runtime_env
423424
.table_factories
424-
.get(&cmd.file_type)
425+
.get(file_type.as_str())
425426
.ok_or_else(|| {
426427
DataFusionError::Execution(format!(
427428
"Unable to find factory for {}",

datafusion/proto/proto/datafusion.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ message LogicalPlanNode {
7070
CreateViewNode create_view = 22;
7171
DistinctNode distinct = 23;
7272
ViewTableScanNode view_scan = 24;
73+
CustomTableScanNode custom_scan = 25;
7374
}
7475
}
7576

@@ -118,6 +119,16 @@ message ViewTableScanNode {
118119
string definition = 5;
119120
}
120121

122+
// Logical Plan to Scan a CustomTableProvider registered at runtime
123+
message CustomTableScanNode {
124+
string table_name = 1;
125+
string path = 2;
126+
string table_type = 3;
127+
ProjectionColumns projection = 4;
128+
datafusion.Schema schema = 5;
129+
repeated datafusion.LogicalExprNode filters = 6;
130+
}
131+
121132
message ProjectionNode {
122133
LogicalPlanNode input = 1;
123134
repeated datafusion.LogicalExprNode expr = 2;

datafusion/proto/src/logical_plan.rs

Lines changed: 136 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::protobuf::{LogicalExprNode, ProjectionColumns};
1819
use crate::{
1920
from_proto::{self, parse_expr},
2021
protobuf::{
@@ -24,6 +25,7 @@ use crate::{
2425
to_proto,
2526
};
2627
use arrow::datatypes::Schema;
28+
use datafusion::datasource::custom::CustomTable;
2729
use datafusion::{
2830
datasource::{
2931
file_format::{
@@ -410,6 +412,48 @@ impl AsLogicalPlan for LogicalPlanNode {
410412
)?
411413
.build()
412414
}
415+
LogicalPlanType::CustomScan(scan) => {
416+
let schema: Schema = convert_required!(scan.schema)?;
417+
let mut projection = None;
418+
if let Some(columns) = &scan.projection {
419+
let column_indices = columns
420+
.columns
421+
.iter()
422+
.map(|name| schema.index_of(name))
423+
.collect::<Result<Vec<usize>, _>>()?;
424+
projection = Some(column_indices);
425+
}
426+
427+
let filters = scan
428+
.filters
429+
.iter()
430+
.map(|expr| parse_expr(expr, ctx))
431+
.collect::<Result<Vec<_>, _>>()?;
432+
let state = ctx.state.read();
433+
let factory = state
434+
.runtime_env
435+
.table_factories
436+
.get(&scan.table_type)
437+
.ok_or_else(|| {
438+
DataFusionError::Plan(format!(
439+
"Planner unable to find factory for {}",
440+
scan.table_type
441+
))
442+
})?;
443+
let provider = (*factory).with_schema(
444+
Arc::new(schema),
445+
scan.table_type.as_str(),
446+
scan.path.as_str(),
447+
)?;
448+
449+
LogicalPlanBuilder::scan_with_filters(
450+
&scan.table_name,
451+
provider_as_source(provider),
452+
projection,
453+
filters,
454+
)?
455+
.build()
456+
}
413457
LogicalPlanType::Sort(sort) => {
414458
let input: LogicalPlan =
415459
into_logical_plan!(sort.input, ctx, extension_codec)?;
@@ -760,55 +804,21 @@ impl AsLogicalPlan for LogicalPlanNode {
760804
.collect::<Result<Vec<_>, _>>()?;
761805

762806
if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
763-
let any = listing_table.options().format.as_any();
764-
let file_format_type = if let Some(parquet) =
765-
any.downcast_ref::<ParquetFormat>()
766-
{
767-
FileFormatType::Parquet(protobuf::ParquetFormat {
768-
enable_pruning: parquet.enable_pruning(),
769-
})
770-
} else if let Some(csv) = any.downcast_ref::<CsvFormat>() {
771-
FileFormatType::Csv(protobuf::CsvFormat {
772-
delimiter: byte_to_string(csv.delimiter())?,
773-
has_header: csv.has_header(),
774-
})
775-
} else if any.is::<AvroFormat>() {
776-
FileFormatType::Avro(protobuf::AvroFormat {})
777-
} else {
778-
return Err(proto_error(format!(
779-
"Error converting file format, {:?} is invalid as a datafusion foramt.",
780-
listing_table.options().format
781-
)));
782-
};
783-
Ok(protobuf::LogicalPlanNode {
784-
logical_plan_type: Some(LogicalPlanType::ListingScan(
785-
protobuf::ListingTableScanNode {
786-
file_format_type: Some(file_format_type),
787-
table_name: table_name.to_owned(),
788-
collect_stat: listing_table.options().collect_stat,
789-
file_extension: listing_table
790-
.options()
791-
.file_extension
792-
.clone(),
793-
table_partition_cols: listing_table
794-
.options()
795-
.table_partition_cols
796-
.clone(),
797-
paths: listing_table
798-
.table_paths()
799-
.iter()
800-
.map(|x| x.to_string())
801-
.collect(),
802-
schema: Some(schema),
803-
projection,
804-
filters,
805-
target_partitions: listing_table
806-
.options()
807-
.target_partitions
808-
as u32,
809-
},
810-
)),
811-
})
807+
Self::serialize_listing_table(
808+
table_name,
809+
projection,
810+
schema,
811+
filters,
812+
listing_table,
813+
)
814+
} else if let Some(custom_table) = source.downcast_ref::<CustomTable>() {
815+
Self::serialize_custom_table(
816+
table_name,
817+
projection,
818+
schema,
819+
filters,
820+
custom_table,
821+
)
812822
} else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
813823
Ok(protobuf::LogicalPlanNode {
814824
logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
@@ -831,8 +841,9 @@ impl AsLogicalPlan for LogicalPlanNode {
831841
})
832842
} else {
833843
Err(DataFusionError::Internal(format!(
834-
"logical plan to_proto unsupported table provider {:?}",
835-
source
844+
"logical plan to_proto unsupported table provider {:?} {}",
845+
source,
846+
plan.display_indent()
836847
)))
837848
}
838849
}
@@ -1253,3 +1264,77 @@ impl AsLogicalPlan for LogicalPlanNode {
12531264
}
12541265
}
12551266
}
1267+
1268+
impl LogicalPlanNode {
1269+
fn serialize_custom_table(
1270+
table_name: &String,
1271+
projection: Option<ProjectionColumns>,
1272+
schema: protobuf::Schema,
1273+
filters: Vec<LogicalExprNode>,
1274+
custom_table: &CustomTable,
1275+
) -> Result<LogicalPlanNode, DataFusionError> {
1276+
Ok(protobuf::LogicalPlanNode {
1277+
logical_plan_type: Some(LogicalPlanType::CustomScan(
1278+
protobuf::CustomTableScanNode {
1279+
table_name: table_name.to_owned(),
1280+
table_type: custom_table.get_table_type(),
1281+
path: custom_table.get_path(),
1282+
schema: Some(schema),
1283+
projection,
1284+
filters,
1285+
},
1286+
)),
1287+
})
1288+
}
1289+
1290+
fn serialize_listing_table(
1291+
table_name: &String,
1292+
projection: Option<ProjectionColumns>,
1293+
schema: protobuf::Schema,
1294+
filters: Vec<LogicalExprNode>,
1295+
listing_table: &ListingTable,
1296+
) -> Result<LogicalPlanNode, DataFusionError> {
1297+
let any = listing_table.options().format.as_any();
1298+
let file_format_type = if let Some(parquet) = any.downcast_ref::<ParquetFormat>()
1299+
{
1300+
FileFormatType::Parquet(protobuf::ParquetFormat {
1301+
enable_pruning: parquet.enable_pruning(),
1302+
})
1303+
} else if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1304+
FileFormatType::Csv(protobuf::CsvFormat {
1305+
delimiter: byte_to_string(csv.delimiter())?,
1306+
has_header: csv.has_header(),
1307+
})
1308+
} else if any.is::<AvroFormat>() {
1309+
FileFormatType::Avro(protobuf::AvroFormat {})
1310+
} else {
1311+
return Err(proto_error(format!(
1312+
"Error converting file format, {:?} is invalid as a datafusion foramt.",
1313+
listing_table.options().format
1314+
)));
1315+
};
1316+
Ok(protobuf::LogicalPlanNode {
1317+
logical_plan_type: Some(LogicalPlanType::ListingScan(
1318+
protobuf::ListingTableScanNode {
1319+
file_format_type: Some(file_format_type),
1320+
table_name: table_name.to_owned(),
1321+
collect_stat: listing_table.options().collect_stat,
1322+
file_extension: listing_table.options().file_extension.clone(),
1323+
table_partition_cols: listing_table
1324+
.options()
1325+
.table_partition_cols
1326+
.clone(),
1327+
paths: listing_table
1328+
.table_paths()
1329+
.iter()
1330+
.map(|x| x.to_string())
1331+
.collect(),
1332+
schema: Some(schema),
1333+
projection,
1334+
filters,
1335+
target_partitions: listing_table.options().target_partitions as u32,
1336+
},
1337+
)),
1338+
})
1339+
}
1340+
}

0 commit comments

Comments
 (0)