Skip to content

Commit 119cf6f

Browse files
author
Brent Gardner
committed
Passing deltalake test
1 parent 6d088df commit 119cf6f

File tree

7 files changed

+69
-149
lines changed

7 files changed

+69
-149
lines changed

datafusion/core/src/datasource/custom.rs

-84
This file was deleted.

datafusion/core/src/datasource/datasource.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,12 @@ pub trait TableProvider: Sync + Send {
8080
#[async_trait]
8181
pub trait TableProviderFactory: Sync + Send {
8282
/// Create a TableProvider given name and url
83-
async fn create(&self, name: &str, url: &str) -> Result<Arc<dyn TableProvider>>;
83+
async fn create(&self, url: &str) -> Result<Arc<dyn TableProvider>>;
8484

8585
/// Create a TableProvider during execution with schema already known from planning
8686
fn with_schema(
8787
&self,
8888
schema: SchemaRef,
89-
table_type: &str,
9089
url: &str,
9190
) -> Result<Arc<dyn TableProvider>>;
9291
}

datafusion/core/src/datasource/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
//! DataFusion data sources
1919
2020
#![allow(clippy::module_inception)]
21-
pub mod custom;
2221
pub mod datasource;
2322
pub mod default_table_source;
2423
pub mod empty;

datafusion/core/src/execution/context.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ impl SessionContext {
430430
))
431431
})?;
432432
let table = (*factory)
433-
.create(cmd.name.as_str(), cmd.location.as_str())
433+
.create(cmd.location.as_str())
434434
.await?;
435435
self.register_table(cmd.name.as_str(), table)?;
436436
let plan = LogicalPlanBuilder::empty(false).build()?;

datafusion/proto/proto/datafusion.proto

+4-5
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,10 @@ message ViewTableScanNode {
122122
// Logical Plan to Scan a CustomTableProvider registered at runtime
123123
message CustomTableScanNode {
124124
string table_name = 1;
125-
string path = 2;
126-
string table_type = 3;
127-
ProjectionColumns projection = 4;
128-
datafusion.Schema schema = 5;
129-
repeated datafusion.LogicalExprNode filters = 6;
125+
ProjectionColumns projection = 2;
126+
datafusion.Schema schema = 3;
127+
repeated datafusion.LogicalExprNode filters = 4;
128+
bytes custom_table_data = 5;
130129
}
131130

132131
message ProjectionNode {

datafusion/proto/src/bytes/mod.rs

+15
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
// under the License.
1717

1818
//! Serialization / Deserialization to Bytes
19+
use std::sync::Arc;
20+
use arrow::datatypes::SchemaRef;
1921
use crate::logical_plan::{AsLogicalPlan, LogicalExtensionCodec};
2022
use crate::{from_proto::parse_expr, protobuf};
2123
use datafusion_common::{DataFusionError, Result};
@@ -24,6 +26,7 @@ use prost::{
2426
bytes::{Bytes, BytesMut},
2527
Message,
2628
};
29+
use datafusion::datasource::TableProvider;
2730

2831
// Reexport Bytes which appears in the API
2932
use datafusion::execution::registry::FunctionRegistry;
@@ -180,6 +183,18 @@ impl LogicalExtensionCodec for DefaultExtensionCodec {
180183
"No extension codec provided".to_string(),
181184
))
182185
}
186+
187+
fn try_decode_table_provider(&self, _buf: &[u8], _schema: SchemaRef, _ctx: &SessionContext) -> std::result::Result<Arc<dyn TableProvider>, DataFusionError> {
188+
Err(DataFusionError::NotImplemented(
189+
"No extension codec provided".to_string(),
190+
))
191+
}
192+
193+
fn try_encode_table_provider(&self, _node: Arc<dyn TableProvider>, _buf: &mut Vec<u8>) -> std::result::Result<(), DataFusionError> {
194+
Err(DataFusionError::NotImplemented(
195+
"No extension codec provided".to_string(),
196+
))
197+
}
183198
}
184199

185200
#[cfg(test)]

datafusion/proto/src/logical_plan.rs

+48-56
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::protobuf::{LogicalExprNode, ProjectionColumns};
18+
use crate::protobuf::{CustomTableScanNode, LogicalExprNode, ProjectionColumns};
1919
use crate::{
2020
from_proto::{self, parse_expr},
2121
protobuf::{
@@ -24,8 +24,7 @@ use crate::{
2424
},
2525
to_proto,
2626
};
27-
use arrow::datatypes::Schema;
28-
use datafusion::datasource::custom::CustomTable;
27+
use arrow::datatypes::{Schema, SchemaRef};
2928
use datafusion::{
3029
datasource::{
3130
file_format::{
@@ -37,7 +36,7 @@ use datafusion::{
3736
datasource::{provider_as_source, source_as_provider},
3837
prelude::SessionContext,
3938
};
40-
use datafusion_common::{Column, DataFusionError};
39+
use datafusion_common::{Column, context, DataFusionError};
4140
use datafusion_expr::{
4241
logical_plan::{
4342
Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView,
@@ -50,6 +49,8 @@ use prost::bytes::BufMut;
5049
use prost::Message;
5150
use std::fmt::Debug;
5251
use std::sync::Arc;
52+
use datafusion::datasource::TableProvider;
53+
use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
5354

5455
fn byte_to_string(b: u8) -> Result<String, DataFusionError> {
5556
let b = &[b];
@@ -108,6 +109,19 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync {
108109
node: &Extension,
109110
buf: &mut Vec<u8>,
110111
) -> Result<(), DataFusionError>;
112+
113+
fn try_decode_table_provider(
114+
&self,
115+
buf: &[u8],
116+
schema: SchemaRef,
117+
ctx: &SessionContext,
118+
) -> Result<Arc<dyn TableProvider>, DataFusionError>;
119+
120+
fn try_encode_table_provider(
121+
&self,
122+
node: Arc<dyn TableProvider>,
123+
buf: &mut Vec<u8>,
124+
) -> Result<(), DataFusionError>;
111125
}
112126

113127
#[derive(Debug, Clone)]
@@ -134,6 +148,18 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
134148
"LogicalExtensionCodec is not provided".to_string(),
135149
))
136150
}
151+
152+
fn try_decode_table_provider(&self, _buf: &[u8], _schema: SchemaRef, _ctx: &SessionContext) -> Result<Arc<dyn TableProvider>, DataFusionError> {
153+
Err(DataFusionError::NotImplemented(
154+
"LogicalExtensionCodec is not provided".to_string(),
155+
))
156+
}
157+
158+
fn try_encode_table_provider(&self, _node: Arc<dyn TableProvider>, _buf: &mut Vec<u8>) -> Result<(), DataFusionError> {
159+
Err(DataFusionError::NotImplemented(
160+
"LogicalExtensionCodec is not provided".to_string(),
161+
))
162+
}
137163
}
138164

139165
#[macro_export]
@@ -414,6 +440,7 @@ impl AsLogicalPlan for LogicalPlanNode {
414440
}
415441
LogicalPlanType::CustomScan(scan) => {
416442
let schema: Schema = convert_required!(scan.schema)?;
443+
let schema = Arc::new(schema);
417444
let mut projection = None;
418445
if let Some(columns) = &scan.projection {
419446
let column_indices = columns
@@ -429,22 +456,7 @@ impl AsLogicalPlan for LogicalPlanNode {
429456
.iter()
430457
.map(|expr| parse_expr(expr, ctx))
431458
.collect::<Result<Vec<_>, _>>()?;
432-
let state = ctx.state.read();
433-
let factory = state
434-
.runtime_env
435-
.table_factories
436-
.get(&scan.table_type)
437-
.ok_or_else(|| {
438-
DataFusionError::Plan(format!(
439-
"Planner unable to find factory for {}",
440-
scan.table_type
441-
))
442-
})?;
443-
let provider = (*factory).with_schema(
444-
Arc::new(schema),
445-
scan.table_type.as_str(),
446-
scan.path.as_str(),
447-
)?;
459+
let provider = extension_codec.try_decode_table_provider(&scan.custom_table_data, schema, ctx)?;
448460

449461
LogicalPlanBuilder::scan_with_filters(
450462
&scan.table_name,
@@ -780,8 +792,8 @@ impl AsLogicalPlan for LogicalPlanNode {
780792
projection,
781793
..
782794
}) => {
783-
let source = source_as_provider(source)?;
784-
let schema = source.schema();
795+
let provider = source_as_provider(source)?;
796+
let schema = provider.schema();
785797
let source = source.as_any();
786798

787799
let projection = match projection {
@@ -811,14 +823,6 @@ impl AsLogicalPlan for LogicalPlanNode {
811823
filters,
812824
listing_table,
813825
)
814-
} else if let Some(custom_table) = source.downcast_ref::<CustomTable>() {
815-
Self::serialize_custom_table(
816-
table_name,
817-
projection,
818-
schema,
819-
filters,
820-
custom_table,
821-
)
822826
} else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
823827
Ok(protobuf::LogicalPlanNode {
824828
logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
@@ -840,11 +844,20 @@ impl AsLogicalPlan for LogicalPlanNode {
840844
))),
841845
})
842846
} else {
843-
Err(DataFusionError::Internal(format!(
844-
"logical plan to_proto unsupported table provider {:?} {}",
845-
source,
846-
plan.display_indent()
847-
)))
847+
let mut bytes = vec![];
848+
extension_codec.try_encode_table_provider(provider, &mut bytes)
849+
.map_err(|e| context!("Error serializing custom table", e))?;
850+
let scan = CustomScan(CustomTableScanNode {
851+
table_name: table_name.clone(),
852+
projection,
853+
schema: Some(schema),
854+
filters,
855+
custom_table_data: bytes
856+
});
857+
let node = LogicalPlanNode {
858+
logical_plan_type: Some(scan)
859+
};
860+
Ok(node)
848861
}
849862
}
850863
LogicalPlan::Projection(Projection {
@@ -1266,27 +1279,6 @@ impl AsLogicalPlan for LogicalPlanNode {
12661279
}
12671280

12681281
impl LogicalPlanNode {
1269-
fn serialize_custom_table(
1270-
table_name: &String,
1271-
projection: Option<ProjectionColumns>,
1272-
schema: protobuf::Schema,
1273-
filters: Vec<LogicalExprNode>,
1274-
custom_table: &CustomTable,
1275-
) -> Result<LogicalPlanNode, DataFusionError> {
1276-
Ok(protobuf::LogicalPlanNode {
1277-
logical_plan_type: Some(LogicalPlanType::CustomScan(
1278-
protobuf::CustomTableScanNode {
1279-
table_name: table_name.to_owned(),
1280-
table_type: custom_table.get_table_type(),
1281-
path: custom_table.get_path(),
1282-
schema: Some(schema),
1283-
projection,
1284-
filters,
1285-
},
1286-
)),
1287-
})
1288-
}
1289-
12901282
fn serialize_listing_table(
12911283
table_name: &String,
12921284
projection: Option<ProjectionColumns>,

0 commit comments

Comments
 (0)