Skip to content

Commit 1dedac5

Browse files
adragomirccciudatu
authored andcommitted
[HSTACK] Delta table support
1 parent 0211066 commit 1dedac5

File tree

4 files changed

+38
-7
lines changed

4 files changed

+38
-7
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ futures = "0.3"
5151
object_store = { version = "0.12.3", features = ["aws", "gcp", "azure", "http"] }
5252
url = "2"
5353
log = "0.4.27"
54+
deltalake = { version = "0.28.0", features = ["datafusion", "azure", "s3"] }
55+
env_logger = "0.11.7"
5456

5557
[build-dependencies]
5658
prost-types = "0.13.1" # keep in line with `datafusion-substrait`

python/datafusion/context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,9 @@ def register_listing_table(
585585
self._convert_file_sort_order(file_sort_order),
586586
)
587587

588+
def register_delta_table(self, name: str, table_uri: str, storage_opts: dict[str, str] = {}):
589+
self.ctx.register_delta_table(name, table_uri, storage_opts)
590+
588591
def sql(self, query: str, options: SQLOptions | None = None) -> DataFrame:
589592
"""Create a :py:class:`~datafusion.DataFrame` from SQL query text.
590593

src/context.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use pyo3::prelude::*;
3434
use crate::catalog::{PyCatalog, PyTable, RustWrappedPyCatalogProvider};
3535
use crate::dataframe::PyDataFrame;
3636
use crate::dataset::Dataset;
37-
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionResult};
37+
use crate::errors::{py_datafusion_err, py_runtime_err, to_datafusion_err, PyDataFusionResult};
3838
use crate::expr::sort_expr::PySortExpr;
3939
use crate::physical_plan::PyExecutionPlan;
4040
use crate::record_batch::PyRecordBatchStream;
@@ -320,6 +320,13 @@ impl PySessionContext {
320320
} else {
321321
RuntimeEnvBuilder::default()
322322
};
323+
deltalake::azure::register_handlers(None);
324+
deltalake::aws::register_handlers(None);
325+
let _ = env_logger::try_init();
326+
327+
let config = config.set_bool("datafusion.sql_parser.enable_ident_normalization", false);
328+
329+
323330
let runtime = Arc::new(runtime_env_builder.build()?);
324331
let session_state = SessionStateBuilder::new()
325332
.with_config(config)
@@ -875,6 +882,22 @@ impl PySessionContext {
875882
Ok(())
876883
}
877884

885+
pub fn register_delta_table(
886+
&self,
887+
name: &str,
888+
table_uri: &str,
889+
storage_opts: HashMap<String, String>,
890+
py: Python,
891+
) -> PyResult<()> {
892+
deltalake::ensure_initialized();
893+
let table_uri = Url::parse(table_uri).map_err(py_runtime_err)?;
894+
let table = deltalake::open_table_with_storage_options(table_uri, storage_opts);
895+
let table = wait_for_future(py, table)?.map_err(py_datafusion_err)?;
896+
self.ctx.register_table(name, Arc::new(table)).map_err(py_datafusion_err)?;
897+
Ok(())
898+
}
899+
900+
878901
#[pyo3(signature = (name="datafusion"))]
879902
pub fn catalog(&self, name: &str) -> PyResult<PyObject> {
880903
let catalog = self.ctx.catalog(name).ok_or(PyKeyError::new_err(format!(

src/physical_plan.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
// under the License.
1717

1818
use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};
19-
use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec};
19+
use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
2020
use prost::Message;
2121
use std::sync::Arc;
22-
22+
use deltalake::delta_datafusion::DeltaPhysicalCodec;
2323
use pyo3::{exceptions::PyRuntimeError, prelude::*, types::PyBytes};
2424

2525
use crate::{context::PySessionContext, errors::PyDataFusionResult};
@@ -59,10 +59,9 @@ impl PyExecutionPlan {
5959
}
6060

6161
pub fn to_proto<'py>(&'py self, py: Python<'py>) -> PyDataFusionResult<Bound<'py, PyBytes>> {
62-
let codec = DefaultPhysicalExtensionCodec {};
6362
let proto = datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
6463
self.plan.clone(),
65-
&codec,
64+
codec(),
6665
)?;
6766

6867
let bytes = proto.encode_to_vec();
@@ -82,8 +81,7 @@ impl PyExecutionPlan {
8281
))
8382
})?;
8483

85-
let codec = DefaultPhysicalExtensionCodec {};
86-
let plan = proto_plan.try_into_physical_plan(&ctx.ctx, &ctx.ctx.runtime_env(), &codec)?;
84+
let plan = proto_plan.try_into_physical_plan(&ctx.ctx, &ctx.ctx.runtime_env(), codec())?;
8785
Ok(Self::new(plan))
8886
}
8987

@@ -97,6 +95,11 @@ impl PyExecutionPlan {
9795
}
9896
}
9997

98+
pub(crate) fn codec() -> &'static dyn PhysicalExtensionCodec {
99+
static CODEC: DeltaPhysicalCodec = DeltaPhysicalCodec {};
100+
&CODEC
101+
}
102+
100103
impl From<PyExecutionPlan> for Arc<dyn ExecutionPlan> {
101104
fn from(plan: PyExecutionPlan) -> Arc<dyn ExecutionPlan> {
102105
plan.plan.clone()

0 commit comments

Comments
 (0)