Skip to content

Commit dd430eb

Browse files
committed
Update example naming and split into different files for table provider and table function
1 parent dd36787 commit dd430eb

File tree

5 files changed

+167
-128
lines changed

5 files changed

+167
-128
lines changed

examples/datafusion-ffi-example/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,5 @@ arrow-schema = { version = "55" }
3232
pyo3-build-config = "0.23"
3333

3434
[lib]
35-
name = "ffi_table_provider"
35+
name = "datafusion_ffi_example"
3636
crate-type = ["cdylib", "rlib"]

examples/datafusion-ffi-example/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ requires = ["maturin>=1.6,<2.0"]
2020
build-backend = "maturin"
2121

2222
[project]
23-
name = "ffi_library"
23+
name = "datafusion_ffi_example"
2424
requires-python = ">=3.9"
2525
classifiers = [
2626
"Programming Language :: Rust",

examples/datafusion-ffi-example/src/lib.rs

Lines changed: 6 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -15,135 +15,15 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{ffi::CString, sync::Arc};
18+
use crate::table_function::MyTableFunction;
19+
use crate::table_provider::MyTableProvider;
20+
use pyo3::prelude::*;
1921

20-
use arrow_array::ArrayRef;
21-
use datafusion::catalog::{TableFunctionImpl, TableProvider};
22-
use datafusion::logical_expr::Expr;
23-
use datafusion::{
24-
arrow::{
25-
array::RecordBatch,
26-
datatypes::{DataType, Field, Schema},
27-
},
28-
datasource::MemTable,
29-
error::{DataFusionError, Result},
30-
};
31-
use datafusion_ffi::table_provider::FFI_TableProvider;
32-
use datafusion_ffi::udtf::FFI_TableFunction;
33-
use pyo3::{exceptions::PyRuntimeError, prelude::*, types::PyCapsule};
34-
35-
/// In order to provide a test that demonstrates different sized record batches,
36-
/// the first batch will have num_rows, the second batch num_rows+1, and so on.
37-
#[pyclass(name = "MyTableProvider", module = "ffi_table_provider", subclass)]
38-
#[derive(Clone)]
39-
struct MyTableProvider {
40-
num_cols: usize,
41-
num_rows: usize,
42-
num_batches: usize,
43-
}
44-
45-
fn create_record_batch(
46-
schema: &Arc<Schema>,
47-
num_cols: usize,
48-
start_value: i32,
49-
num_values: usize,
50-
) -> Result<RecordBatch> {
51-
let end_value = start_value + num_values as i32;
52-
let row_values: Vec<i32> = (start_value..end_value).collect();
53-
54-
let columns: Vec<_> = (0..num_cols)
55-
.map(|_| {
56-
std::sync::Arc::new(arrow::array::Int32Array::from(row_values.clone())) as ArrayRef
57-
})
58-
.collect();
59-
60-
RecordBatch::try_new(Arc::clone(schema), columns).map_err(DataFusionError::from)
61-
}
62-
63-
impl MyTableProvider {
64-
fn create_table(&self) -> Result<MemTable> {
65-
let fields: Vec<_> = (0..self.num_cols)
66-
.map(|idx| (b'A' + idx as u8) as char)
67-
.map(|col_name| Field::new(col_name, DataType::Int32, true))
68-
.collect();
69-
70-
let schema = Arc::new(Schema::new(fields));
71-
72-
let batches: Result<Vec<_>> = (0..self.num_batches)
73-
.map(|batch_idx| {
74-
let start_value = batch_idx * self.num_rows;
75-
create_record_batch(
76-
&schema,
77-
self.num_cols,
78-
start_value as i32,
79-
self.num_rows + batch_idx,
80-
)
81-
})
82-
.collect();
83-
84-
MemTable::try_new(schema, vec![batches?])
85-
}
86-
}
87-
88-
#[pymethods]
89-
impl MyTableProvider {
90-
#[new]
91-
fn new(num_cols: usize, num_rows: usize, num_batches: usize) -> Self {
92-
Self {
93-
num_cols,
94-
num_rows,
95-
num_batches,
96-
}
97-
}
98-
99-
fn __datafusion_table_provider__<'py>(
100-
&self,
101-
py: Python<'py>,
102-
) -> PyResult<Bound<'py, PyCapsule>> {
103-
let name = CString::new("datafusion_table_provider").unwrap();
104-
105-
let provider = self
106-
.create_table()
107-
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
108-
let provider = FFI_TableProvider::new(Arc::new(provider), false, None);
109-
110-
PyCapsule::new_bound(py, provider, Some(name.clone()))
111-
}
112-
}
113-
114-
#[pyclass(name = "MyTableFunction", module = "ffi_table_provider", subclass)]
115-
#[derive(Debug, Clone)]
116-
struct MyTableFunction {}
117-
118-
#[pymethods]
119-
impl MyTableFunction {
120-
#[new]
121-
fn new() -> Self {
122-
Self {}
123-
}
124-
125-
fn __datafusion_table_function__<'py>(
126-
&self,
127-
py: Python<'py>,
128-
) -> PyResult<Bound<'py, PyCapsule>> {
129-
let name = cr"datafusion_table_function".into();
130-
131-
let func = self.clone();
132-
let provider = FFI_TableFunction::new(Arc::new(func), None);
133-
134-
PyCapsule::new(py, provider, Some(name))
135-
}
136-
}
137-
138-
impl TableFunctionImpl for MyTableFunction {
139-
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
140-
let provider = MyTableProvider::new(10, 3, 2).create_table()?;
141-
Ok(Arc::new(provider))
142-
}
143-
}
22+
pub(crate) mod table_function;
23+
pub(crate) mod table_provider;
14424

14525
#[pymodule]
146-
fn ffi_table_provider(m: &Bound<'_, PyModule>) -> PyResult<()> {
26+
fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> PyResult<()> {
14727
m.add_class::<MyTableProvider>()?;
14828
m.add_class::<MyTableFunction>()?;
14929
Ok(())
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::table_provider::MyTableProvider;
19+
use datafusion::catalog::{TableFunctionImpl, TableProvider};
20+
use datafusion::error::Result as DataFusionResult;
21+
use datafusion::prelude::Expr;
22+
use datafusion_ffi::udtf::FFI_TableFunction;
23+
use pyo3::types::PyCapsule;
24+
use pyo3::{pyclass, pymethods, Bound, PyResult, Python};
25+
use std::sync::Arc;
26+
27+
#[pyclass(name = "MyTableFunction", module = "datafusion_ffi_example", subclass)]
28+
#[derive(Debug, Clone)]
29+
pub(crate) struct MyTableFunction {}
30+
31+
#[pymethods]
32+
impl MyTableFunction {
33+
#[new]
34+
fn new() -> Self {
35+
Self {}
36+
}
37+
38+
fn __datafusion_table_function__<'py>(
39+
&self,
40+
py: Python<'py>,
41+
) -> PyResult<Bound<'py, PyCapsule>> {
42+
let name = cr"datafusion_table_function".into();
43+
44+
let func = self.clone();
45+
let provider = FFI_TableFunction::new(Arc::new(func), None);
46+
47+
PyCapsule::new(py, provider, Some(name))
48+
}
49+
}
50+
51+
impl TableFunctionImpl for MyTableFunction {
52+
fn call(&self, _args: &[Expr]) -> DataFusionResult<Arc<dyn TableProvider>> {
53+
let provider = MyTableProvider::new(10, 3, 2).create_table()?;
54+
Ok(Arc::new(provider))
55+
}
56+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_array::{ArrayRef, RecordBatch};
19+
use arrow_schema::{DataType, Field, Schema};
20+
use datafusion::catalog::MemTable;
21+
use datafusion::error::{DataFusionError, Result as DataFusionResult};
22+
use datafusion_ffi::table_provider::FFI_TableProvider;
23+
use pyo3::exceptions::PyRuntimeError;
24+
use pyo3::types::PyCapsule;
25+
use pyo3::{pyclass, pymethods, Bound, PyResult, Python};
26+
use std::sync::Arc;
27+
28+
/// In order to provide a test that demonstrates different sized record batches,
29+
/// the first batch will have num_rows, the second batch num_rows+1, and so on.
30+
#[pyclass(name = "MyTableProvider", module = "datafusion_ffi_example", subclass)]
31+
#[derive(Clone)]
32+
pub(crate) struct MyTableProvider {
33+
num_cols: usize,
34+
num_rows: usize,
35+
num_batches: usize,
36+
}
37+
38+
fn create_record_batch(
39+
schema: &Arc<Schema>,
40+
num_cols: usize,
41+
start_value: i32,
42+
num_values: usize,
43+
) -> DataFusionResult<RecordBatch> {
44+
let end_value = start_value + num_values as i32;
45+
let row_values: Vec<i32> = (start_value..end_value).collect();
46+
47+
let columns: Vec<_> = (0..num_cols)
48+
.map(|_| Arc::new(arrow::array::Int32Array::from(row_values.clone())) as ArrayRef)
49+
.collect();
50+
51+
RecordBatch::try_new(Arc::clone(schema), columns).map_err(DataFusionError::from)
52+
}
53+
54+
impl MyTableProvider {
55+
pub fn create_table(&self) -> DataFusionResult<MemTable> {
56+
let fields: Vec<_> = (0..self.num_cols)
57+
.map(|idx| (b'A' + idx as u8) as char)
58+
.map(|col_name| Field::new(col_name, DataType::Int32, true))
59+
.collect();
60+
61+
let schema = Arc::new(Schema::new(fields));
62+
63+
let batches: DataFusionResult<Vec<_>> = (0..self.num_batches)
64+
.map(|batch_idx| {
65+
let start_value = batch_idx * self.num_rows;
66+
create_record_batch(
67+
&schema,
68+
self.num_cols,
69+
start_value as i32,
70+
self.num_rows + batch_idx,
71+
)
72+
})
73+
.collect();
74+
75+
MemTable::try_new(schema, vec![batches?])
76+
}
77+
}
78+
79+
#[pymethods]
80+
impl MyTableProvider {
81+
#[new]
82+
pub fn new(num_cols: usize, num_rows: usize, num_batches: usize) -> Self {
83+
Self {
84+
num_cols,
85+
num_rows,
86+
num_batches,
87+
}
88+
}
89+
90+
pub fn __datafusion_table_provider__<'py>(
91+
&self,
92+
py: Python<'py>,
93+
) -> PyResult<Bound<'py, PyCapsule>> {
94+
let name = cr"datafusion_table_provider".into();
95+
96+
let provider = self
97+
.create_table()
98+
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
99+
let provider = FFI_TableProvider::new(Arc::new(provider), false, None);
100+
101+
PyCapsule::new(py, provider, Some(name))
102+
}
103+
}

0 commit comments

Comments
 (0)