Skip to content

Commit 05b5bc2

Browse files
committed
Reorganize the logical plan related code in proto to be consistent with the physical plan code
1 parent c772d6a commit 05b5bc2

File tree

8 files changed

+1642
-1710
lines changed

8 files changed

+1642
-1710
lines changed

datafusion/proto/src/bytes/mod.rs

Lines changed: 9 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@
1616
// under the License.
1717

1818
//! Serialization / Deserialization to Bytes
19-
use crate::logical_plan::{AsLogicalPlan, LogicalExtensionCodec};
20-
use crate::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
21-
use crate::{from_proto::parse_expr, protobuf};
22-
use arrow::datatypes::SchemaRef;
23-
use datafusion::datasource::TableProvider;
19+
use crate::logical_plan::{
20+
self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
21+
};
22+
use crate::physical_plan::{
23+
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
24+
};
25+
use crate::protobuf;
2426
use datafusion::physical_plan::functions::make_scalar_function;
2527
use datafusion_common::{DataFusionError, Result};
26-
use datafusion_expr::{
27-
create_udaf, create_udf, Expr, Extension, LogicalPlan, Volatility,
28-
};
28+
use datafusion_expr::{create_udaf, create_udf, Expr, LogicalPlan, Volatility};
2929
use prost::{
3030
bytes::{Bytes, BytesMut},
3131
Message,
@@ -137,7 +137,7 @@ impl Serializeable for Expr {
137137
DataFusionError::Plan(format!("Error decoding expr as protobuf: {}", e))
138138
})?;
139139

140-
parse_expr(&protobuf, registry).map_err(|e| {
140+
logical_plan::from_proto::parse_expr(&protobuf, registry).map_err(|e| {
141141
DataFusionError::Plan(format!("Error parsing protobuf into Expr: {}", e))
142142
})
143143
}
@@ -272,75 +272,6 @@ pub fn physical_plan_from_bytes_with_extension_codec(
272272
protobuf.try_into_physical_plan(ctx, &ctx.runtime_env(), extension_codec)
273273
}
274274

275-
#[derive(Debug)]
276-
struct DefaultLogicalExtensionCodec {}
277-
278-
impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
279-
fn try_decode(
280-
&self,
281-
_buf: &[u8],
282-
_inputs: &[LogicalPlan],
283-
_ctx: &SessionContext,
284-
) -> Result<Extension> {
285-
Err(DataFusionError::NotImplemented(
286-
"No extension codec provided".to_string(),
287-
))
288-
}
289-
290-
fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
291-
Err(DataFusionError::NotImplemented(
292-
"No extension codec provided".to_string(),
293-
))
294-
}
295-
296-
fn try_decode_table_provider(
297-
&self,
298-
_buf: &[u8],
299-
_schema: SchemaRef,
300-
_ctx: &SessionContext,
301-
) -> std::result::Result<Arc<dyn TableProvider>, DataFusionError> {
302-
Err(DataFusionError::NotImplemented(
303-
"No codec provided to for TableProviders".to_string(),
304-
))
305-
}
306-
307-
fn try_encode_table_provider(
308-
&self,
309-
_node: Arc<dyn TableProvider>,
310-
_buf: &mut Vec<u8>,
311-
) -> std::result::Result<(), DataFusionError> {
312-
Err(DataFusionError::NotImplemented(
313-
"No codec provided to for TableProviders".to_string(),
314-
))
315-
}
316-
}
317-
318-
#[derive(Debug)]
319-
pub struct DefaultPhysicalExtensionCodec {}
320-
321-
impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
322-
fn try_decode(
323-
&self,
324-
_buf: &[u8],
325-
_inputs: &[Arc<dyn ExecutionPlan>],
326-
_registry: &dyn FunctionRegistry,
327-
) -> Result<Arc<dyn ExecutionPlan>> {
328-
Err(DataFusionError::NotImplemented(
329-
"PhysicalExtensionCodec is not provided".to_string(),
330-
))
331-
}
332-
333-
fn try_encode(
334-
&self,
335-
_node: Arc<dyn ExecutionPlan>,
336-
_buf: &mut Vec<u8>,
337-
) -> Result<()> {
338-
Err(DataFusionError::NotImplemented(
339-
"PhysicalExtensionCodec is not provided".to_string(),
340-
))
341-
}
342-
}
343-
344275
#[cfg(test)]
345276
mod test {
346277
use super::*;

datafusion/proto/src/common.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,46 @@ pub fn str_to_byte(s: &String) -> Result<u8, DataFusionError> {
3333
Ok(s.as_bytes()[0])
3434
}
3535

36-
pub(crate) fn proto_error<S: Into<String>>(message: S) -> DataFusionError {
36+
pub fn byte_to_string(b: u8) -> Result<String, DataFusionError> {
37+
let b = &[b];
38+
let b = std::str::from_utf8(b)
39+
.map_err(|_| DataFusionError::Internal("Invalid CSV delimiter".to_owned()))?;
40+
Ok(b.to_owned())
41+
}
42+
43+
#[macro_export]
44+
macro_rules! convert_required {
45+
($PB:expr) => {{
46+
if let Some(field) = $PB.as_ref() {
47+
Ok(field.try_into()?)
48+
} else {
49+
Err(proto_error("Missing required field in protobuf"))
50+
}
51+
}};
52+
}
53+
54+
#[macro_export]
55+
macro_rules! into_required {
56+
($PB:expr) => {{
57+
if let Some(field) = $PB.as_ref() {
58+
Ok(field.into())
59+
} else {
60+
Err(proto_error("Missing required field in protobuf"))
61+
}
62+
}};
63+
}
64+
65+
#[macro_export]
66+
macro_rules! convert_box_required {
67+
($PB:expr) => {{
68+
if let Some(field) = $PB.as_ref() {
69+
field.as_ref().try_into()
70+
} else {
71+
Err(proto_error("Missing required field in protobuf"))
72+
}
73+
}};
74+
}
75+
76+
pub fn proto_error<S: Into<String>>(message: S) -> DataFusionError {
3777
DataFusionError::Internal(message.into())
3878
}

0 commit comments

Comments
 (0)