Skip to content

feat: Add ScalarUDF support in FFI crate #14579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion datafusion/ffi/src/arrow_wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use std::sync::Arc;

use abi_stable::StableAbi;
use arrow::{
array::{make_array, ArrayRef},
datatypes::{Schema, SchemaRef},
ffi::{FFI_ArrowArray, FFI_ArrowSchema},
ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
};
use log::error;

Expand Down Expand Up @@ -68,3 +69,13 @@ pub struct WrappedArray {

pub schema: WrappedSchema,
}

impl TryFrom<WrappedArray> for ArrayRef {
type Error = arrow::error::ArrowError;

fn try_from(value: WrappedArray) -> Result<Self, Self::Error> {
let data = unsafe { from_ffi(value.array, &value.schema.0)? };

Ok(make_array(data))
}
}
25 changes: 8 additions & 17 deletions datafusion/ffi/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use datafusion::{
use tokio::runtime::Handle;

use crate::{
plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream,
df_result, plan_properties::FFI_PlanProperties,
record_batch_stream::FFI_RecordBatchStream, rresult,
};

/// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries.
Expand Down Expand Up @@ -112,13 +113,11 @@ unsafe extern "C" fn execute_fn_wrapper(
let ctx = &(*private_data).context;
let runtime = (*private_data).runtime.clone();

match plan.execute(partition, Arc::clone(ctx)) {
Ok(rbs) => RResult::ROk(FFI_RecordBatchStream::new(rbs, runtime)),
Err(e) => RResult::RErr(
format!("Error occurred during FFI_ExecutionPlan execute: {}", e).into(),
),
}
rresult!(plan
.execute(partition, Arc::clone(ctx))
.map(|rbs| FFI_RecordBatchStream::new(rbs, runtime)))
}

unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
let plan = &(*private_data).plan;
Expand Down Expand Up @@ -274,16 +273,8 @@ impl ExecutionPlan for ForeignExecutionPlan {
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unsafe {
match (self.plan.execute)(&self.plan, partition) {
RResult::ROk(stream) => {
let stream = Pin::new(Box::new(stream)) as SendableRecordBatchStream;
Ok(stream)
}
RResult::RErr(e) => Err(DataFusionError::Execution(format!(
"Error occurred during FFI call to FFI_ExecutionPlan execute. {}",
e
))),
}
df_result!((self.plan.execute)(&self.plan, partition))
.map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub mod record_batch_stream;
pub mod session_config;
pub mod table_provider;
pub mod table_source;
pub mod udf;
pub mod util;
pub mod volatility;

#[cfg(feature = "integration-tests")]
pub mod tests;
Expand Down
116 changes: 46 additions & 70 deletions datafusion/ffi/src/plan_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use std::{ffi::c_void, sync::Arc};

use abi_stable::{
std_types::{
RResult::{self, RErr, ROk},
RStr, RVec,
RResult::{self, ROk},
RString, RVec,
},
StableAbi,
};
Expand All @@ -44,7 +44,7 @@ use datafusion_proto::{
};
use prost::Message;

use crate::arrow_wrappers::WrappedSchema;
use crate::{arrow_wrappers::WrappedSchema, df_result, rresult_return};

/// A stable struct for sharing [`PlanProperties`] across FFI boundaries.
#[repr(C)]
Expand All @@ -54,7 +54,7 @@ pub struct FFI_PlanProperties {
/// The output partitioning is a [`Partitioning`] protobuf message serialized
/// into bytes to pass across the FFI boundary.
pub output_partitioning:
unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RStr<'static>>,
unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RString>,

/// Return the emission type of the plan.
pub emission_type: unsafe extern "C" fn(plan: &Self) -> FFI_EmissionType,
Expand All @@ -64,8 +64,7 @@ pub struct FFI_PlanProperties {

/// The output ordering is a [`PhysicalSortExprNodeCollection`] protobuf message
/// serialized into bytes to pass across the FFI boundary.
pub output_ordering:
unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RStr<'static>>,
pub output_ordering: unsafe extern "C" fn(plan: &Self) -> RResult<RVec<u8>, RString>,

/// Return the schema of the plan.
pub schema: unsafe extern "C" fn(plan: &Self) -> WrappedSchema,
Expand All @@ -84,21 +83,13 @@ struct PlanPropertiesPrivateData {

unsafe extern "C" fn output_partitioning_fn_wrapper(
properties: &FFI_PlanProperties,
) -> RResult<RVec<u8>, RStr<'static>> {
) -> RResult<RVec<u8>, RString> {
let private_data = properties.private_data as *const PlanPropertiesPrivateData;
let props = &(*private_data).props;

let codec = DefaultPhysicalExtensionCodec {};
let partitioning_data =
match serialize_partitioning(props.output_partitioning(), &codec) {
Ok(p) => p,
Err(_) => {
return RErr(
"unable to serialize output_partitioning in FFI_PlanProperties"
.into(),
)
}
};
rresult_return!(serialize_partitioning(props.output_partitioning(), &codec));
let output_partitioning = partitioning_data.encode_to_vec();

ROk(output_partitioning.into())
Expand All @@ -122,31 +113,24 @@ unsafe extern "C" fn boundedness_fn_wrapper(

unsafe extern "C" fn output_ordering_fn_wrapper(
properties: &FFI_PlanProperties,
) -> RResult<RVec<u8>, RStr<'static>> {
) -> RResult<RVec<u8>, RString> {
let private_data = properties.private_data as *const PlanPropertiesPrivateData;
let props = &(*private_data).props;

let codec = DefaultPhysicalExtensionCodec {};
let output_ordering =
match props.output_ordering() {
Some(ordering) => {
let physical_sort_expr_nodes =
match serialize_physical_sort_exprs(ordering.to_owned(), &codec) {
Ok(v) => v,
Err(_) => return RErr(
"unable to serialize output_ordering in FFI_PlanProperties"
.into(),
),
};

let ordering_data = PhysicalSortExprNodeCollection {
physical_sort_expr_nodes,
};

ordering_data.encode_to_vec()
}
None => Vec::default(),
};
let output_ordering = match props.output_ordering() {
Some(ordering) => {
let physical_sort_expr_nodes = rresult_return!(
serialize_physical_sort_exprs(ordering.to_owned(), &codec)
);
let ordering_data = PhysicalSortExprNodeCollection {
physical_sort_expr_nodes,
};

ordering_data.encode_to_vec()
}
None => Vec::default(),
};
ROk(output_ordering.into())
}

Expand Down Expand Up @@ -200,40 +184,32 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
let codex = DefaultPhysicalExtensionCodec {};

let ffi_orderings = unsafe { (ffi_props.output_ordering)(&ffi_props) };
let orderings = match ffi_orderings {
ROk(ordering_vec) => {
let proto_output_ordering =
PhysicalSortExprNodeCollection::decode(ordering_vec.as_ref())
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Some(parse_physical_sort_exprs(
&proto_output_ordering.physical_sort_expr_nodes,
&default_ctx,
&schema,
&codex,
)?)
}
RErr(e) => return Err(DataFusionError::Plan(e.to_string())),
};

let ffi_partitioning = unsafe { (ffi_props.output_partitioning)(&ffi_props) };
let partitioning = match ffi_partitioning {
ROk(partitioning_vec) => {
let proto_output_partitioning =
Partitioning::decode(partitioning_vec.as_ref())
.map_err(|e| DataFusionError::External(Box::new(e)))?;
parse_protobuf_partitioning(
Some(&proto_output_partitioning),
&default_ctx,
&schema,
&codex,
)?
.ok_or(DataFusionError::Plan(
"Unable to deserialize partitioning protobuf in FFI_PlanProperties"
.to_string(),
))
}
RErr(e) => Err(DataFusionError::Plan(e.to_string())),
}?;
let proto_output_ordering =
PhysicalSortExprNodeCollection::decode(df_result!(ffi_orderings)?.as_ref())
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let orderings = Some(parse_physical_sort_exprs(
&proto_output_ordering.physical_sort_expr_nodes,
&default_ctx,
&schema,
&codex,
)?);

let partitioning_vec =
unsafe { df_result!((ffi_props.output_partitioning)(&ffi_props))? };
let proto_output_partitioning =
Partitioning::decode(partitioning_vec.as_ref())
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let partitioning = parse_protobuf_partitioning(
Some(&proto_output_partitioning),
&default_ctx,
&schema,
&codex,
)?
.ok_or(DataFusionError::Plan(
"Unable to deserialize partitioning protobuf in FFI_PlanProperties"
.to_string(),
))?;

let eq_properties = match orderings {
Some(ordering) => {
Expand Down
16 changes: 9 additions & 7 deletions datafusion/ffi/src/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use datafusion::{
use futures::{Stream, TryStreamExt};
use tokio::runtime::Handle;

use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
use crate::{
arrow_wrappers::{WrappedArray, WrappedSchema},
rresult,
};

/// A stable struct for sharing [`RecordBatchStream`] across FFI boundaries.
/// We use the async-ffi crate for handling async calls across libraries.
Expand Down Expand Up @@ -97,13 +100,12 @@ fn record_batch_to_wrapped_array(
record_batch: RecordBatch,
) -> RResult<WrappedArray, RString> {
let struct_array = StructArray::from(record_batch);
match to_ffi(&struct_array.to_data()) {
Ok((array, schema)) => RResult::ROk(WrappedArray {
rresult!(
to_ffi(&struct_array.to_data()).map(|(array, schema)| WrappedArray {
array,
schema: WrappedSchema(schema),
}),
Err(e) => RResult::RErr(e.to_string().into()),
}
schema: WrappedSchema(schema)
})
)
}

// probably want to use pub unsafe fn from_ffi(array: FFI_ArrowArray, schema: &FFI_ArrowSchema) -> Result<ArrayData> {
Expand Down
Loading
Loading