Skip to content

Commit e335e68

Browse files
committed
Initial commit for example on using FFI Table provider in rust as a module loading system
1 parent 71ae880 commit e335e68

File tree

7 files changed

+149
-0
lines changed

7 files changed

+149
-0
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ members = [
4747
"datafusion/substrait",
4848
"datafusion/wasmtest",
4949
"datafusion-examples",
50+
"datafusion-examples/examples/ffi/ffi_example_table_provider",
51+
"datafusion-examples/examples/ffi/ffi_module_interface",
52+
"datafusion-examples/examples/ffi/ffi_module_loader",
5053
"test-utils",
5154
"benchmarks",
5255
]
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[package]
2+
name = "ffi_example_table_provider"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
datafusion = { workspace = true }
8+
datafusion-ffi = { workspace = true }
9+
abi_stable = "0.11.3"
10+
arrow = { workspace = true }
11+
arrow-array = { workspace = true }
12+
arrow-schema = { workspace = true }
13+
ffi_module_interface = { path = "../ffi_module_interface" }
14+
15+
[lib]
16+
name = "ffi_example_table_provider"
17+
crate-type = ["cdylib",'rlib']
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use std::sync::Arc;
2+
3+
use abi_stable::{export_root_module, prefix_type::PrefixTypeTrait};
4+
use arrow_array::RecordBatch;
5+
use datafusion::{
6+
arrow::datatypes::{DataType, Field, Schema},
7+
common::record_batch,
8+
datasource::MemTable,
9+
};
10+
use datafusion_ffi::table_provider::FFI_TableProvider;
11+
use ffi_module_interface::{TableProviderModule, TableProviderModuleRef};
12+
13+
fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch {
14+
let end_value = start_value + num_values as i32;
15+
let a_vals: Vec<i32> = (start_value..end_value).collect();
16+
let b_vals: Vec<f64> = a_vals.iter().map(|v| *v as f64).collect();
17+
18+
record_batch!(("a", Int32, a_vals), ("b", Float64, b_vals)).unwrap()
19+
}
20+
21+
extern "C" fn construct_simple_table_provider() -> FFI_TableProvider {
22+
let schema = Arc::new(Schema::new(vec![
23+
Field::new("a", DataType::Int32, true),
24+
Field::new("b", DataType::Float64, true),
25+
]));
26+
27+
let batches = vec![
28+
create_record_batch(1, 5),
29+
create_record_batch(6, 1),
30+
create_record_batch(7, 5),
31+
];
32+
33+
let table_provider = MemTable::try_new(schema, vec![batches]).unwrap();
34+
35+
FFI_TableProvider::new(Arc::new(table_provider), true)
36+
}
37+
38+
#[export_root_module]
39+
pub fn get_simple_memory_table() -> TableProviderModuleRef {
40+
TableProviderModule {
41+
create_table: construct_simple_table_provider,
42+
}
43+
.leak_into_prefix()
44+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[package]
2+
name = "ffi_module_interface"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
abi_stable = "0.11.3"
8+
datafusion-ffi = { workspace = true }
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use abi_stable::{
2+
declare_root_module_statics,
3+
library::{LibraryError, RootModule},
4+
package_version_strings,
5+
sabi_types::VersionStrings,
6+
StableAbi,
7+
};
8+
use datafusion_ffi::table_provider::FFI_TableProvider;
9+
10+
#[repr(C)]
11+
#[derive(StableAbi)]
12+
#[sabi(kind(Prefix(prefix_ref = TableProviderModuleRef)))]
13+
pub struct TableProviderModule {
14+
/// Constructs the table provider
15+
pub create_table: extern "C" fn() -> FFI_TableProvider,
16+
}
17+
18+
impl RootModule for TableProviderModuleRef {
19+
declare_root_module_statics! {TableProviderModuleRef}
20+
const BASE_NAME: &'static str = "ffi_example_table_provider";
21+
const NAME: &'static str = "ffi_example_table_provider";
22+
const VERSION_STRINGS: VersionStrings = package_version_strings!();
23+
24+
fn initialization(self) -> Result<Self, LibraryError> {
25+
Ok(self)
26+
}
27+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[package]
2+
name = "ffi_module_loader"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
8+
datafusion = { workspace = true }
9+
datafusion-ffi = { workspace = true }
10+
ffi_module_interface = { path = "../ffi_module_interface" }
11+
abi_stable = "0.11.3"
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use std::sync::Arc;
2+
3+
use datafusion::{
4+
error::{DataFusionError, Result},
5+
prelude::SessionContext,
6+
};
7+
8+
use abi_stable::library::{development_utils::compute_library_path, RootModule};
9+
use datafusion_ffi::table_provider::ForeignTableProvider;
10+
use ffi_module_interface::TableProviderModuleRef;
11+
12+
#[tokio::main]
13+
async fn main() -> Result<()> {
14+
let target: &std::path::Path = "../../../../target/".as_ref();
15+
let library_path = compute_library_path::<TableProviderModuleRef>(target).unwrap();
16+
17+
let table_provider_module =
18+
TableProviderModuleRef::load_from_directory(&library_path)
19+
.unwrap_or_else(|e| panic!("{}", e));
20+
21+
let ffi_table_provider =
22+
table_provider_module
23+
.create_table()
24+
.ok_or(DataFusionError::NotImplemented(
25+
"External table provider failed to implement create_table".to_string(),
26+
))?();
27+
28+
let foreign_table_provider: ForeignTableProvider = (&ffi_table_provider).into();
29+
30+
let ctx = SessionContext::new();
31+
32+
ctx.register_table("external_table", Arc::new(foreign_table_provider))?;
33+
34+
let df = ctx.table("external_table").await?;
35+
36+
df.show().await?;
37+
38+
Ok(())
39+
}

0 commit comments

Comments
 (0)