Skip to content

Commit 8ab0661

Browse files
authored
feat: Add ScalarUDF support in FFI crate (#14579)
* initial commit for scalar udf in ffi crate * Add utility functions for converting back and forth to RResult * License text * There is no need to repeat the trait doc strings * Resolve clippy warning * Add unit tests for ffi scalar udfs * Add license text * Switch over ffi modules to use the new macros for conversion back and forth between result and rresult * Attempting to fix CI based on recommendation to try running clean, but this shouldn't be necessary * Revert "Attempting to fix CI based on recommendation to try running clean, but this shouldn't be necessary" This reverts commit 10248c2. * arrow_schema was removed during rebase * Switch from trying to expose the entire type signature to using the user_defined type * Call function to get valid types for scalar udf * Adding documentation * Resolve doctest failure
1 parent ee2d2a4 commit 8ab0661

12 files changed

+741
-178
lines changed

datafusion/ffi/src/arrow_wrappers.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ use std::sync::Arc;
1919

2020
use abi_stable::StableAbi;
2121
use arrow::{
22+
array::{make_array, ArrayRef},
2223
datatypes::{Schema, SchemaRef},
23-
ffi::{FFI_ArrowArray, FFI_ArrowSchema},
24+
ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
2425
};
2526
use log::error;
2627

@@ -68,3 +69,13 @@ pub struct WrappedArray {
6869

6970
pub schema: WrappedSchema,
7071
}
72+
73+
impl TryFrom<WrappedArray> for ArrayRef {
74+
type Error = arrow::error::ArrowError;
75+
76+
fn try_from(value: WrappedArray) -> Result<Self, Self::Error> {
77+
let data = unsafe { from_ffi(value.array, &value.schema.0)? };
78+
79+
Ok(make_array(data))
80+
}
81+
}

datafusion/ffi/src/execution_plan.rs

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ use datafusion::{
3030
use tokio::runtime::Handle;
3131

3232
use crate::{
33-
plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream,
33+
df_result, plan_properties::FFI_PlanProperties,
34+
record_batch_stream::FFI_RecordBatchStream, rresult,
3435
};
3536

3637
/// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries.
@@ -112,13 +113,11 @@ unsafe extern "C" fn execute_fn_wrapper(
112113
let ctx = &(*private_data).context;
113114
let runtime = (*private_data).runtime.clone();
114115

115-
match plan.execute(partition, Arc::clone(ctx)) {
116-
Ok(rbs) => RResult::ROk(FFI_RecordBatchStream::new(rbs, runtime)),
117-
Err(e) => RResult::RErr(
118-
format!("Error occurred during FFI_ExecutionPlan execute: {}", e).into(),
119-
),
120-
}
116+
rresult!(plan
117+
.execute(partition, Arc::clone(ctx))
118+
.map(|rbs| FFI_RecordBatchStream::new(rbs, runtime)))
121119
}
120+
122121
unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
123122
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
124123
let plan = &(*private_data).plan;
@@ -274,16 +273,8 @@ impl ExecutionPlan for ForeignExecutionPlan {
274273
_context: Arc<TaskContext>,
275274
) -> Result<SendableRecordBatchStream> {
276275
unsafe {
277-
match (self.plan.execute)(&self.plan, partition) {
278-
RResult::ROk(stream) => {
279-
let stream = Pin::new(Box::new(stream)) as SendableRecordBatchStream;
280-
Ok(stream)
281-
}
282-
RResult::RErr(e) => Err(DataFusionError::Execution(format!(
283-
"Error occurred during FFI call to FFI_ExecutionPlan execute. {}",
284-
e
285-
))),
286-
}
276+
df_result!((self.plan.execute)(&self.plan, partition))
277+
.map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
287278
}
288279
}
289280
}

datafusion/ffi/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ pub mod record_batch_stream;
2626
pub mod session_config;
2727
pub mod table_provider;
2828
pub mod table_source;
29+
pub mod udf;
30+
pub mod util;
31+
pub mod volatility;
2932

3033
#[cfg(feature = "integration-tests")]
3134
pub mod tests;

datafusion/ffi/src/plan_properties.rs

Lines changed: 46 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use std::{ffi::c_void, sync::Arc};
1919

2020
use abi_stable::{
2121
std_types::{
22-
RResult::{self, RErr, ROk},
23-
RStr, RVec,
22+
RResult::{self, ROk},
23+
RString, RVec,
2424
},
2525
StableAbi,
2626
};
@@ -44,7 +44,7 @@ use datafusion_proto::{
4444
};
4545
use prost::Message;
4646

47-
use crate::arrow_wrappers::WrappedSchema;
47+
use crate::{arrow_wrappers::WrappedSchema, df_result, rresult_return};
4848

4949
/// A stable struct for sharing [`PlanProperties`] across FFI boundaries.
5050
#[repr(C)]
@@ -54,7 +54,7 @@ pub struct FFI_PlanProperties {
5454
/// The output partitioning is a [`Partitioning`] protobuf message serialized
5555
/// into bytes to pass across the FFI boundary.
5656
pub output_partitioning:
57-
unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RStr<'static>>,
57+
unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RString>,
5858

5959
/// Return the emission type of the plan.
6060
pub emission_type: unsafe extern "C" fn(plan: &Self) -> FFI_EmissionType,
@@ -64,8 +64,7 @@ pub struct FFI_PlanProperties {
6464

6565
/// The output ordering is a [`PhysicalSortExprNodeCollection`] protobuf message
6666
/// serialized into bytes to pass across the FFI boundary.
67-
pub output_ordering:
68-
unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RStr<'static>>,
67+
pub output_ordering: unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RString>,
6968

7069
/// Return the schema of the plan.
7170
pub schema: unsafe extern "C" fn(plan: &Self) -> WrappedSchema,
@@ -84,21 +83,13 @@ struct PlanPropertiesPrivateData {
8483

8584
unsafe extern "C" fn output_partitioning_fn_wrapper(
8685
properties: &FFI_PlanProperties,
87-
) -> RResult<RVec<u8>, RStr<'static>> {
86+
) -> RResult<RVec<u8>, RString> {
8887
let private_data = properties.private_data as *const PlanPropertiesPrivateData;
8988
let props = &(*private_data).props;
9089

9190
let codec = DefaultPhysicalExtensionCodec {};
9291
let partitioning_data =
93-
match serialize_partitioning(props.output_partitioning(), &codec) {
94-
Ok(p) => p,
95-
Err(_) => {
96-
return RErr(
97-
"unable to serialize output_partitioning in FFI_PlanProperties"
98-
.into(),
99-
)
100-
}
101-
};
92+
rresult_return!(serialize_partitioning(props.output_partitioning(), &codec));
10293
let output_partitioning = partitioning_data.encode_to_vec();
10394

10495
ROk(output_partitioning.into())
@@ -122,31 +113,24 @@ unsafe extern "C" fn boundedness_fn_wrapper(
122113

123114
unsafe extern "C" fn output_ordering_fn_wrapper(
124115
properties: &FFI_PlanProperties,
125-
) -> RResult<RVec<u8>, RStr<'static>> {
116+
) -> RResult<RVec<u8>, RString> {
126117
let private_data = properties.private_data as *const PlanPropertiesPrivateData;
127118
let props = &(*private_data).props;
128119

129120
let codec = DefaultPhysicalExtensionCodec {};
130-
let output_ordering =
131-
match props.output_ordering() {
132-
Some(ordering) => {
133-
let physical_sort_expr_nodes =
134-
match serialize_physical_sort_exprs(ordering.to_owned(), &codec) {
135-
Ok(v) => v,
136-
Err(_) => return RErr(
137-
"unable to serialize output_ordering in FFI_PlanProperties"
138-
.into(),
139-
),
140-
};
141-
142-
let ordering_data = PhysicalSortExprNodeCollection {
143-
physical_sort_expr_nodes,
144-
};
145-
146-
ordering_data.encode_to_vec()
147-
}
148-
None => Vec::default(),
149-
};
121+
let output_ordering = match props.output_ordering() {
122+
Some(ordering) => {
123+
let physical_sort_expr_nodes = rresult_return!(
124+
serialize_physical_sort_exprs(ordering.to_owned(), &codec)
125+
);
126+
let ordering_data = PhysicalSortExprNodeCollection {
127+
physical_sort_expr_nodes,
128+
};
129+
130+
ordering_data.encode_to_vec()
131+
}
132+
None => Vec::default(),
133+
};
150134
ROk(output_ordering.into())
151135
}
152136

@@ -200,40 +184,32 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
200184
let codex = DefaultPhysicalExtensionCodec {};
201185

202186
let ffi_orderings = unsafe { (ffi_props.output_ordering)(&ffi_props) };
203-
let orderings = match ffi_orderings {
204-
ROk(ordering_vec) => {
205-
let proto_output_ordering =
206-
PhysicalSortExprNodeCollection::decode(ordering_vec.as_ref())
207-
.map_err(|e| DataFusionError::External(Box::new(e)))?;
208-
Some(parse_physical_sort_exprs(
209-
&proto_output_ordering.physical_sort_expr_nodes,
210-
&default_ctx,
211-
&schema,
212-
&codex,
213-
)?)
214-
}
215-
RErr(e) => return Err(DataFusionError::Plan(e.to_string())),
216-
};
217187

218-
let ffi_partitioning = unsafe { (ffi_props.output_partitioning)(&ffi_props) };
219-
let partitioning = match ffi_partitioning {
220-
ROk(partitioning_vec) => {
221-
let proto_output_partitioning =
222-
Partitioning::decode(partitioning_vec.as_ref())
223-
.map_err(|e| DataFusionError::External(Box::new(e)))?;
224-
parse_protobuf_partitioning(
225-
Some(&proto_output_partitioning),
226-
&default_ctx,
227-
&schema,
228-
&codex,
229-
)?
230-
.ok_or(DataFusionError::Plan(
231-
"Unable to deserialize partitioning protobuf in FFI_PlanProperties"
232-
.to_string(),
233-
))
234-
}
235-
RErr(e) => Err(DataFusionError::Plan(e.to_string())),
236-
}?;
188+
let proto_output_ordering =
189+
PhysicalSortExprNodeCollection::decode(df_result!(ffi_orderings)?.as_ref())
190+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
191+
let orderings = Some(parse_physical_sort_exprs(
192+
&proto_output_ordering.physical_sort_expr_nodes,
193+
&default_ctx,
194+
&schema,
195+
&codex,
196+
)?);
197+
198+
let partitioning_vec =
199+
unsafe { df_result!((ffi_props.output_partitioning)(&ffi_props))? };
200+
let proto_output_partitioning =
201+
Partitioning::decode(partitioning_vec.as_ref())
202+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
203+
let partitioning = parse_protobuf_partitioning(
204+
Some(&proto_output_partitioning),
205+
&default_ctx,
206+
&schema,
207+
&codex,
208+
)?
209+
.ok_or(DataFusionError::Plan(
210+
"Unable to deserialize partitioning protobuf in FFI_PlanProperties"
211+
.to_string(),
212+
))?;
237213

238214
let eq_properties = match orderings {
239215
Some(ordering) => {

datafusion/ffi/src/record_batch_stream.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ use datafusion::{
3535
use futures::{Stream, TryStreamExt};
3636
use tokio::runtime::Handle;
3737

38-
use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
38+
use crate::{
39+
arrow_wrappers::{WrappedArray, WrappedSchema},
40+
rresult,
41+
};
3942

4043
/// A stable struct for sharing [`RecordBatchStream`] across FFI boundaries.
4144
/// We use the async-ffi crate for handling async calls across libraries.
@@ -97,13 +100,12 @@ fn record_batch_to_wrapped_array(
97100
record_batch: RecordBatch,
98101
) -> RResult<WrappedArray, RString> {
99102
let struct_array = StructArray::from(record_batch);
100-
match to_ffi(&struct_array.to_data()) {
101-
Ok((array, schema)) => RResult::ROk(WrappedArray {
103+
rresult!(
104+
to_ffi(&struct_array.to_data()).map(|(array, schema)| WrappedArray {
102105
array,
103-
schema: WrappedSchema(schema),
104-
}),
105-
Err(e) => RResult::RErr(e.to_string().into()),
106-
}
106+
schema: WrappedSchema(schema)
107+
})
108+
)
107109
}
108110

109111
// probably want to use pub unsafe fn from_ffi(array: FFI_ArrowArray, schema: &FFI_ArrowSchema) -> Result<ArrayData> {

0 commit comments

Comments
 (0)