Skip to content

Commit 9280901

Browse files
authored
Implement UDF linking (#78)
## Summary of changes This PR implements linking of UDFs by passing in a table of function pointers into `compile_hir`. This PR also adds the `Catalog` field to both the rule engine and the optimizer as a trait object (`Arc<dyn Catalog>`). I strongly considered making this a generic parameter, but the problem with that is then the `C: Catalog` type parameter would need to be added to literally every structures and function, and I that would significantly reduce readability. The performance is essentially negligible because it is unlikely the compiler could make significant optimizations based on the implementation of the `Catalog`. Ideally I would add tests that test the functionality of the catalog as well but I don't have time (and also I don't have any reason to think that it wouldn't work now). Once I get an interface that the optimizer needs from the catalog from @SarveshOO7 and @yliang412, I can add those tests easily.
1 parent 7202b53 commit 9280901

File tree

15 files changed

+473
-138
lines changed

15 files changed

+473
-138
lines changed

Cargo.lock

Lines changed: 159 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

optd/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ colored = "3.0.0"
1818
enum_dispatch = "0.3.13"
1919
futures = "0.3.31"
2020
iceberg = { version = "0.4.0", default-features = false }
21+
iceberg-catalog-memory = "0.4.0"
2122
ordered-float = "5.0.0"
2223
tokio = { version = "1.44.2", features = ["macros", "rt"] }
2324
trait-variant = "0.1.2"

optd/src/catalog/iceberg.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,27 @@
11
use super::{Catalog as OptdCatalog, CatalogError};
22
use async_trait::async_trait;
3-
use iceberg::{Catalog as IcebergCatalog, NamespaceIdent, TableIdent, table::Table};
4-
use std::collections::HashMap;
3+
use iceberg::{
4+
Catalog as IcebergCatalog, NamespaceIdent, TableIdent, io::FileIOBuilder, table::Table,
5+
};
6+
use iceberg_catalog_memory::MemoryCatalog;
7+
use std::{collections::HashMap, sync::Arc};
58

69
/// The default namespace for the Iceberg catalog.
710
///
811
/// TODO(connor): For now, keep everything in the default namespace for simplicity.
912
static DEFAULT_NAMESPACE: &str = "default";
1013

11-
/// A wrapper around an arbitrary Iceberg catalog.
12-
#[derive(Debug)]
13-
pub struct OptdIcebergCatalog<C: IcebergCatalog>(C);
14+
/// A wrapper around an arbitrary Iceberg Catalog.
15+
#[derive(Debug, Clone)]
16+
pub struct OptdIcebergCatalog<C: IcebergCatalog>(Arc<C>);
1417

1518
impl<C: IcebergCatalog> OptdIcebergCatalog<C> {
1619
/// Creates a new catalog.
1720
pub fn new(catalog: C) -> Self {
21+
Self(Arc::new(catalog))
22+
}
23+
24+
pub fn new_from_arc(catalog: Arc<C>) -> Self {
1825
Self(catalog)
1926
}
2027

@@ -23,14 +30,23 @@ impl<C: IcebergCatalog> OptdIcebergCatalog<C> {
2330
let namespace_ident = NamespaceIdent::new(DEFAULT_NAMESPACE.to_string());
2431
let table_ident = TableIdent::new(namespace_ident, table_name.to_string());
2532

26-
// TODO(connor): FIX ERROR HANDLING.
2733
self.0
2834
.load_table(&table_ident)
2935
.await
3036
.map_err(|e| CatalogError::Unknown(e.to_string()))
3137
}
3238
}
3339

40+
/// Creates a basic in-memory catalog. Used for testing.
41+
pub fn memory_catalog() -> OptdIcebergCatalog<MemoryCatalog> {
42+
let file_io = FileIOBuilder::new("memory")
43+
.build()
44+
.expect("unable to create file");
45+
let catalog = Arc::new(MemoryCatalog::new(file_io, Some("mock".to_string())));
46+
47+
OptdIcebergCatalog(catalog)
48+
}
49+
3450
#[async_trait]
3551
impl<C: IcebergCatalog> OptdCatalog for OptdIcebergCatalog<C> {
3652
async fn get_table_properties(

optd/src/core/optimizer/jobs.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ impl<M: Memoize> Optimizer<M> {
269269
expression_id: LogicalExpressionId,
270270
job_id: JobId,
271271
) -> Result<(), Error> {
272-
let engine = Engine::new(self.hir_context.clone());
272+
let engine = Engine::new(self.hir_context.clone(), self.catalog.clone());
273273

274274
let plan: PartialLogicalPlan = self
275275
.memo
@@ -311,7 +311,7 @@ impl<M: Memoize> Optimizer<M> {
311311
group_id: GroupId,
312312
job_id: JobId,
313313
) -> Result<(), Error> {
314-
let engine = Engine::new(self.hir_context.clone());
314+
let engine = Engine::new(self.hir_context.clone(), self.catalog.clone());
315315

316316
let plan: PartialLogicalPlan = self
317317
.memo
@@ -350,7 +350,7 @@ impl<M: Memoize> Optimizer<M> {
350350
goal_id: GoalId,
351351
job_id: JobId,
352352
) -> Result<(), Error> {
353-
let engine = Engine::new(self.hir_context.clone());
353+
let engine = Engine::new(self.hir_context.clone(), self.catalog.clone());
354354

355355
let Goal(_, physical_props) = self.memo.materialize_goal(goal_id).await?;
356356
let plan = self
@@ -389,7 +389,7 @@ impl<M: Memoize> Optimizer<M> {
389389
expression_id: PhysicalExpressionId,
390390
job_id: JobId,
391391
) -> Result<(), Error> {
392-
let engine = Engine::new(self.hir_context.clone());
392+
let engine = Engine::new(self.hir_context.clone(), self.catalog.clone());
393393

394394
let plan = self.egest_partial_plan(expression_id).await?;
395395

0 commit comments

Comments
 (0)