Skip to content

Commit e595ba1

Browse files
committed
Move arrow wrappers into their own file
1 parent 232830d commit e595ba1

File tree

6 files changed

+76
-54
lines changed

6 files changed

+76
-54
lines changed

datafusion/ffi/src/arrow_wrappers.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use std::sync::Arc;
2+
3+
use abi_stable::StableAbi;
4+
use arrow::{
5+
datatypes::{Schema, SchemaRef},
6+
ffi::{FFI_ArrowArray, FFI_ArrowSchema},
7+
};
8+
use log::error;
9+
10+
/// This is a wrapper struct around FFI_ArrowSchema simply to indicate
11+
/// to the StableAbi macros that the underlying struct is FFI safe.
12+
#[repr(C)]
13+
#[derive(Debug, StableAbi)]
14+
pub struct WrappedSchema(#[sabi(unsafe_opaque_field)] pub FFI_ArrowSchema);
15+
16+
impl From<SchemaRef> for WrappedSchema {
17+
fn from(value: SchemaRef) -> Self {
18+
let ffi_schema = match FFI_ArrowSchema::try_from(value.as_ref()) {
19+
Ok(s) => s,
20+
Err(e) => {
21+
error!("Unable to convert DataFusion Schema to FFI_ArrowSchema in FFI_PlanProperties. {}", e);
22+
FFI_ArrowSchema::empty()
23+
}
24+
};
25+
26+
WrappedSchema(ffi_schema)
27+
}
28+
}
29+
30+
impl From<WrappedSchema> for SchemaRef {
31+
fn from(value: WrappedSchema) -> Self {
32+
let schema = match Schema::try_from(&value.0) {
33+
Ok(s) => s,
34+
Err(e) => {
35+
error!("Unable to convert from FFI_ArrowSchema to DataFusion Schema in FFI_PlanProperties. {}", e);
36+
Schema::empty()
37+
}
38+
};
39+
Arc::new(schema)
40+
}
41+
}
42+
43+
/// This is a wrapper struct for FFI_ArrowArray to indicate to StableAbi
44+
/// that the struct is FFI Safe. For convenience, we also include the
45+
/// schema needed to create a record batch from the array.
46+
#[repr(C)]
47+
#[derive(Debug, StableAbi)]
48+
pub struct WrappedArray {
49+
#[sabi(unsafe_opaque_field)]
50+
pub array: FFI_ArrowArray,
51+
52+
pub schema: WrappedSchema,
53+
}

datafusion/ffi/src/execution_plan.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use abi_stable::{
2121
std_types::{RResult, RString, RVec},
2222
StableAbi,
2323
};
24-
use arrow::ffi_stream::FFI_ArrowArrayStream;
2524
use datafusion::error::Result;
2625
use datafusion::{
2726
error::DataFusionError,
@@ -33,28 +32,36 @@ use crate::{
3332
plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream,
3433
};
3534

35+
/// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries.
3636
#[repr(C)]
3737
#[derive(Debug, StableAbi)]
38-
pub struct WrappedArrayStream(#[sabi(unsafe_opaque_field)] pub FFI_ArrowArrayStream);
39-
40-
#[repr(C)]
41-
#[derive(Debug, StableAbi)]
42-
#[allow(missing_docs)]
4338
#[allow(non_camel_case_types)]
4439
pub struct FFI_ExecutionPlan {
40+
/// Return the plan properties
4541
pub properties: unsafe extern "C" fn(plan: &Self) -> FFI_PlanProperties,
4642

43+
/// Return a vector of children plans
4744
pub children: unsafe extern "C" fn(plan: &Self) -> RVec<FFI_ExecutionPlan>,
4845

46+
/// Return the plan name.
4947
pub name: unsafe extern "C" fn(plan: &Self) -> RString,
5048

49+
/// Execute the plan and return a record batch stream. Errors
50+
/// will be returned as a string.
5151
pub execute: unsafe extern "C" fn(
5252
plan: &Self,
5353
partition: usize,
5454
) -> RResult<FFI_RecordBatchStream, RString>,
5555

56+
/// Used to create a clone on the provider of the execution plan. This should
57+
/// only need to be called by the receiver of the plan.
5658
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
59+
60+
/// Release the memory of the private data when it is no longer being used.
5761
pub release: unsafe extern "C" fn(arg: &mut Self),
62+
63+
/// Internal data. This is only to be accessed by the provider of the plan.
64+
/// A [`ForeignExecutionPlan`] should never attempt to access this data.
5865
pub private_data: *mut c_void,
5966
}
6067

@@ -154,6 +161,9 @@ impl Drop for FFI_ExecutionPlan {
154161
}
155162
}
156163

164+
/// This struct is used to access an execution plan provided by a foreign
165+
/// library across a FFI boundary.
166+
///
157167
/// The ForeignExecutionPlan is to be used by the caller of the plan, so it has
158168
/// no knowledge or access to the private data. All interaction with the plan
159169
/// must occur through the functions defined in FFI_ExecutionPlan.

datafusion/ffi/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
1818
#![deny(clippy::clone_on_ref_ptr)]
1919

20+
pub mod arrow_wrappers;
2021
pub mod execution_plan;
2122
pub mod plan_properties;
2223
pub mod record_batch_stream;

datafusion/ffi/src/plan_properties.rs

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@ use abi_stable::{
2424
},
2525
StableAbi,
2626
};
27-
use arrow::{
28-
datatypes::{Schema, SchemaRef},
29-
ffi::FFI_ArrowSchema,
30-
};
27+
use arrow::datatypes::SchemaRef;
3128
use datafusion::{
3229
error::{DataFusionError, Result},
3330
physical_expr::EquivalenceProperties,
@@ -42,9 +39,10 @@ use datafusion_proto::{
4239
},
4340
protobuf::{Partitioning, PhysicalSortExprNodeCollection},
4441
};
45-
use log::error;
4642
use prost::Message;
4743

44+
use crate::arrow_wrappers::WrappedSchema;
45+
4846
// TODO: should we just make ExecutionMode repr(C)?
4947
#[repr(C)]
5048
#[allow(non_camel_case_types)]
@@ -75,37 +73,6 @@ impl From<FFI_ExecutionMode> for ExecutionMode {
7573
}
7674
}
7775

78-
#[repr(C)]
79-
#[derive(Debug, StableAbi)]
80-
pub struct WrappedSchema(#[sabi(unsafe_opaque_field)] pub FFI_ArrowSchema);
81-
82-
impl From<SchemaRef> for WrappedSchema {
83-
fn from(value: SchemaRef) -> Self {
84-
let ffi_schema = match FFI_ArrowSchema::try_from(value.as_ref()) {
85-
Ok(s) => s,
86-
Err(e) => {
87-
error!("Unable to convert DataFusion Schema to FFI_ArrowSchema in FFI_PlanProperties. {}", e);
88-
FFI_ArrowSchema::empty()
89-
}
90-
};
91-
92-
WrappedSchema(ffi_schema)
93-
}
94-
}
95-
96-
impl From<WrappedSchema> for SchemaRef {
97-
fn from(value: WrappedSchema) -> Self {
98-
let schema = match Schema::try_from(&value.0) {
99-
Ok(s) => s,
100-
Err(e) => {
101-
error!("Unable to convert from FFI_ArrowSchema to DataFusion Schema in FFI_PlanProperties. {}", e);
102-
Schema::empty()
103-
}
104-
};
105-
Arc::new(schema)
106-
}
107-
}
108-
10976
#[repr(C)]
11077
#[derive(Debug, StableAbi)]
11178
#[allow(missing_docs)]

datafusion/ffi/src/record_batch_stream.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use abi_stable::{
2424
use arrow::array::{Array, RecordBatch};
2525
use arrow::{
2626
array::{make_array, StructArray},
27-
ffi::{from_ffi, to_ffi, FFI_ArrowArray},
27+
ffi::{from_ffi, to_ffi},
2828
};
2929
use async_ffi::{ContextExt, FfiContext, FfiPoll};
3030
use datafusion::error::Result;
@@ -34,16 +34,7 @@ use datafusion::{
3434
};
3535
use futures::{Stream, TryStreamExt};
3636

37-
use crate::plan_properties::WrappedSchema;
38-
39-
#[repr(C)]
40-
#[derive(Debug, StableAbi)]
41-
pub struct WrappedArray {
42-
#[sabi(unsafe_opaque_field)]
43-
array: FFI_ArrowArray,
44-
45-
schema: WrappedSchema,
46-
}
37+
use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
4738

4839
#[repr(C)]
4940
#[derive(Debug, StableAbi)]

datafusion/ffi/src/table_provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use datafusion_proto::{
4242
use prost::Message;
4343

4444
use crate::{
45-
plan_properties::WrappedSchema,
45+
arrow_wrappers::WrappedSchema,
4646
session_config::ForeignSessionConfig,
4747
table_source::{FFI_TableProviderFilterPushDown, FFI_TableType},
4848
};

0 commit comments

Comments
 (0)