Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
945 changes: 885 additions & 60 deletions Cargo.lock

Large diffs are not rendered by default.

59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,65 @@ result = query.execute({"Person": people})
print(result.to_pydict()) # {'name': ['Bob', 'David'], 'age': [34, 42]}
```

## Python example: Direct SQL query

For data analytics workflows where you prefer standard SQL, use `SqlQuery` or `SqlEngine`. No `GraphConfig` is needed:

```python
import pyarrow as pa
from lance_graph import SqlQuery, SqlEngine

person = pa.table({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Carol"],
"age": [28, 34, 29],
})

# One-off query
result = SqlQuery(
"SELECT name, age FROM person WHERE age > 30"
).execute({"person": person})
print(result.to_pydict()) # {'name': ['Bob'], 'age': [34]}

# Multi-query with cached context
engine = SqlEngine({"person": person})
r1 = engine.execute("SELECT COUNT(*) AS cnt FROM person")
r2 = engine.execute("SELECT name FROM person ORDER BY age DESC LIMIT 2")
```

## Python example: Unity Catalog integration

Connect to [Unity Catalog](https://github.com/unitycatalog/unitycatalog) (OSS) to discover and query Delta Lake or Parquet tables directly:

```python
from lance_graph import UnityCatalog

# Connect to Unity Catalog
uc = UnityCatalog("http://localhost:8080/api/2.1/unity-catalog")

# Browse catalog metadata
catalogs = uc.list_catalogs()
schemas = uc.list_schemas("unity")
tables = uc.list_tables("unity", "default")
table = uc.get_table("unity", "default", "marksheet")
print(table.columns()) # [{"name": "id", "type_name": "INT", ...}, ...]

# Auto-register tables (Delta + Parquet) and query via SQL
engine = uc.create_sql_engine("unity", "default")
result = engine.execute("SELECT * FROM marksheet WHERE mark > 80")
print(result.to_pandas())

# For cloud storage (S3, Azure, GCS), pass storage options:
uc = UnityCatalog(
"http://localhost:8080/api/2.1/unity-catalog",
storage_options={
"aws_access_key_id": "...",
"aws_secret_access_key": "...",
"aws_region": "us-east-1",
}
)
```

## Knowledge Graph CLI & API

The `knowledge_graph` package layers a simple Lance-backed knowledge graph
Expand Down
8 changes: 8 additions & 0 deletions crates/lance-graph-catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@ arrow-schema = "56.2"
async-trait = "0.1"
datafusion = { version = "50.3", default-features = false }
lance-namespace = "1.0.1"
reqwest = { version = "0.12", features = ["json"], optional = true }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
snafu = "0.8"

[features]
default = ["unity-catalog"]
unity-catalog = ["dep:reqwest"]

[dev-dependencies]
tokio = { version = "1.37", features = ["macros", "rt-multi-thread"] }
wiremock = "0.6"
172 changes: 172 additions & 0 deletions crates/lance-graph-catalog/src/catalog_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Catalog provider trait and data types for external catalog integration.
//!
//! Inspired by Presto's `ConnectorMetadata` SPI, this module defines the
//! abstract interface for browsing external catalogs (Unity Catalog, Hive
//! Metastore, AWS Glue, etc.).

use std::collections::HashMap;

use arrow_schema::SchemaRef;
use async_trait::async_trait;

/// Metadata about a catalog (top-level namespace).
#[derive(Debug, Clone)]
pub struct CatalogInfo {
pub name: String,
pub comment: Option<String>,
pub properties: HashMap<String, String>,
pub created_at: Option<i64>,
pub updated_at: Option<i64>,
}

/// Metadata about a schema (second-level namespace within a catalog).
#[derive(Debug, Clone)]
pub struct SchemaInfo {
pub name: String,
pub catalog_name: String,
pub comment: Option<String>,
pub properties: HashMap<String, String>,
pub created_at: Option<i64>,
pub updated_at: Option<i64>,
}

/// Metadata about a column in a table.
#[derive(Debug, Clone)]
pub struct ColumnInfo {
pub name: String,
/// Human-readable type string (e.g., "INT", "VARCHAR(255)").
pub type_text: String,
/// Canonical type name from the catalog (e.g., "INT", "STRING").
pub type_name: String,
/// Column position (0-based).
pub position: i32,
pub nullable: bool,
pub comment: Option<String>,
}

/// Data format of the underlying storage.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DataSourceFormat {
Delta,
Parquet,
Csv,
Json,
Avro,
Orc,
Text,
Other(String),
}

/// Type of table (managed vs external).
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TableType {
Managed,
External,
}

/// Full table metadata including columns and storage information.
#[derive(Debug, Clone)]
pub struct TableInfo {
pub name: String,
pub catalog_name: String,
pub schema_name: String,
pub table_type: TableType,
pub data_source_format: DataSourceFormat,
pub columns: Vec<ColumnInfo>,
pub storage_location: Option<String>,
pub comment: Option<String>,
pub properties: HashMap<String, String>,
pub created_at: Option<i64>,
pub updated_at: Option<i64>,
}

/// Errors that can occur during catalog operations.
#[derive(Debug)]
pub enum CatalogError {
/// Network or HTTP error.
ConnectionError(String),
/// Resource not found (catalog, schema, or table).
NotFound(String),
/// Authentication or authorization failure.
AuthError(String),
/// Invalid or unparsable response from the catalog server.
InvalidResponse(String),
/// Failed to map a catalog type to an Arrow type.
TypeMappingError(String),
/// Other errors.
Other(String),
}

impl std::fmt::Display for CatalogError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ConnectionError(msg) => write!(f, "Catalog connection error: {}", msg),
Self::NotFound(msg) => write!(f, "Not found: {}", msg),
Self::AuthError(msg) => write!(f, "Auth error: {}", msg),
Self::InvalidResponse(msg) => write!(f, "Invalid response: {}", msg),
Self::TypeMappingError(msg) => write!(f, "Type mapping error: {}", msg),
Self::Other(msg) => write!(f, "Catalog error: {}", msg),
}
}
}

impl std::error::Error for CatalogError {}

pub type CatalogResult<T> = std::result::Result<T, CatalogError>;

/// Abstract trait for browsing an external catalog.
///
/// Analogous to Presto's `ConnectorMetadata`. Implementations provide access
/// to catalog metadata (catalogs, schemas, tables, columns) without being
/// coupled to any specific data format or storage backend.
///
/// # Extensibility
///
/// Implement this trait to add support for new catalog backends:
/// - Unity Catalog (provided)
/// - Hive Metastore (future)
/// - AWS Glue (future)
/// - Iceberg REST Catalog (future)
#[async_trait]
pub trait CatalogProvider: Send + Sync {
/// Human-readable name of this catalog provider (e.g., "unity-catalog").
fn name(&self) -> &str;

/// List all catalogs available in this provider.
async fn list_catalogs(&self) -> CatalogResult<Vec<CatalogInfo>>;

/// Get information about a specific catalog.
async fn get_catalog(&self, name: &str) -> CatalogResult<CatalogInfo>;

/// List all schemas within a catalog.
async fn list_schemas(&self, catalog_name: &str) -> CatalogResult<Vec<SchemaInfo>>;

/// Get information about a specific schema.
async fn get_schema(&self, catalog_name: &str, schema_name: &str) -> CatalogResult<SchemaInfo>;

/// List all tables within a schema.
async fn list_tables(
&self,
catalog_name: &str,
schema_name: &str,
) -> CatalogResult<Vec<TableInfo>>;

/// Get detailed information about a specific table, including columns.
async fn get_table(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
) -> CatalogResult<TableInfo>;

/// Convert a table's column definitions to an Arrow schema.
///
/// The default implementation uses the standard type mapping from
/// [`crate::type_mapping::columns_to_arrow_schema`].
fn table_to_arrow_schema(&self, table: &TableInfo) -> CatalogResult<SchemaRef> {
crate::type_mapping::columns_to_arrow_schema(&table.columns)
}
}
Loading
Loading