Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

178 changes: 176 additions & 2 deletions python/sedonadb/python/sedonadb/_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,82 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional
import os
from typing import Literal, Optional, Union

from sedonadb.utility import sedona # noqa: F401


class Options:
"""Global SedonaDB options"""
"""Global SedonaDB options

Options are divided into two categories:

**Display options** can be changed at any time and affect how results are
presented:

- `interactive`: Enable/disable auto-display of DataFrames.
- `width`: Override the detected terminal width.

**Runtime options** configure the execution environment and must be set
*before* the first query is executed (i.e., before the internal context
is initialized). Attempting to change these after the context has been
created will raise a `RuntimeError`:

- `memory_limit`: Maximum memory for execution, in bytes or as a
human-readable string (e.g., `"4gb"`, `"512m"`).
- `temp_dir`: Directory for temporary/spill files.
- `memory_pool_type`: Memory pool type (`"greedy"` or `"fair"`).
- `unspillable_reserve_ratio`: Fraction of memory reserved for
unspillable consumers (only applies to the `"fair"` pool type).

Examples:

>>> sd = sedona.db.connect()
>>> sd.options.memory_limit = "4gb"
>>> sd.options.memory_pool_type = "fair"
>>> sd.options.temp_dir = "/tmp/sedona-spill"
>>> sd.options.interactive = True
>>> sd.sql("SELECT 1 as one")
┌───────┐
│ one │
│ int64 │
╞═══════╡
│ 1 │
└───────┘
"""

def __init__(self):
# Display options (can be changed at any time)
self._interactive = False
self._width = None

# Runtime options (must be set before first query)
self._memory_limit = None
self._temp_dir = None
self._memory_pool_type = "greedy"
self._unspillable_reserve_ratio = None

# Set to True once the internal context is created; after this,
# runtime options become read-only.
self._runtime_frozen = False

def freeze_runtime(self) -> None:
"""Mark runtime options as read-only.

Called after the internal context has been successfully created.
"""
self._runtime_frozen = True

def _check_runtime_mutable(self, name: str) -> None:
if self._runtime_frozen:
raise RuntimeError(
f"Cannot change '{name}' after the context has been initialized. "
f"Set this option before executing your first query."
)

# --- Display options ---

@property
def interactive(self) -> bool:
"""Use interactive mode
Expand Down Expand Up @@ -52,3 +118,111 @@ def width(self) -> Optional[int]:
@width.setter
def width(self, value: Optional[int]):
self._width = value

# --- Runtime options (must be set before first query) ---

@property
def memory_limit(self) -> Union[int, str, None]:
"""Maximum memory for query execution.

Accepts an integer (bytes) or a human-readable string such as
`"4gb"`, `"512m"`, or `"1.5g"`. When set, a bounded memory pool is
created to enforce this limit. Without a memory limit, DataFusion's
default unbounded pool is used.

Must be set before the first query is executed.

Examples:

>>> sd = sedona.db.connect()
>>> sd.options.memory_limit = "4gb"
>>> sd.options.memory_limit = 4 * 1024 * 1024 * 1024 # equivalent
"""
return self._memory_limit

@memory_limit.setter
def memory_limit(self, value: Union[int, str, None]) -> None:
self._check_runtime_mutable("memory_limit")
if value is not None and not isinstance(value, (int, str)):
raise TypeError(
f"memory_limit must be an int, str, or None, got {type(value).__name__}"
)
self._memory_limit = value

@property
def temp_dir(self) -> Optional[str]:
"""Directory for temporary/spill files.

When set, disk-based spilling will use this directory. When `None`,
DataFusion's default temporary directory is used.

Must be set before the first query is executed.
"""
return self._temp_dir

@temp_dir.setter
def temp_dir(self, value: "Optional[Union[str, os.PathLike[str]]]") -> None:
self._check_runtime_mutable("temp_dir")
if value is None:
self._temp_dir = None
elif isinstance(value, os.PathLike):
self._temp_dir = os.fspath(value)
elif isinstance(value, str):
self._temp_dir = value
else:
raise TypeError(
f"temp_dir must be a str, PathLike, or None, got {type(value).__name__}"
)

@property
def memory_pool_type(self) -> str:
"""Memory pool type: `"greedy"` or `"fair"`.

- `"greedy"`: A simple pool that grants reservations on a
first-come-first-served basis. This is the default.
- `"fair"`: A pool that fairly distributes memory among spillable
consumers and reserves a fraction for unspillable consumers
(configured via `unspillable_reserve_ratio`).

Only takes effect when `memory_limit` is set.
Must be set before the first query is executed.
"""
return self._memory_pool_type

@memory_pool_type.setter
def memory_pool_type(self, value: Literal["greedy", "fair"]) -> None:
self._check_runtime_mutable("memory_pool_type")
if value not in ("greedy", "fair"):
raise ValueError(
f"memory_pool_type must be 'greedy' or 'fair', got '{value}'"
)
self._memory_pool_type = value

@property
def unspillable_reserve_ratio(self) -> Optional[float]:
"""Fraction of memory reserved for unspillable consumers (0.0 - 1.0).

Only applies when `memory_pool_type` is `"fair"` and
`memory_limit` is set. Defaults to 0.2 when not explicitly set.

Must be set before the first query is executed.
"""
return self._unspillable_reserve_ratio

@unspillable_reserve_ratio.setter
def unspillable_reserve_ratio(self, value: Optional[float]) -> None:
self._check_runtime_mutable("unspillable_reserve_ratio")
if value is None:
self._unspillable_reserve_ratio = None
return
if not isinstance(value, (int, float)):
raise TypeError(
"unspillable_reserve_ratio must be a number between 0.0 and 1.0 "
f"or None, got {type(value).__name__}"
)
value = float(value)
if not (0.0 <= value <= 1.0):
raise ValueError(
f"unspillable_reserve_ratio must be between 0.0 and 1.0, got {value}"
)
self._unspillable_reserve_ratio = value
54 changes: 52 additions & 2 deletions python/sedonadb/python/sedonadb/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ class SedonaContext:
registered tables, and available memory. This is similar to a
Spark SessionContext or a database connection.

Runtime configuration (memory limits, spill directory, pool type) can
be set via `options` before executing the first query. Once the
first query runs, the internal execution context is created and
runtime options become read-only.

Examples:

>>> sd = sedona.db.connect()
Expand All @@ -47,12 +52,47 @@ class SedonaContext:
╞═══════╡
│ 1 │
└───────┘

Configuring memory limits:

>>> sd = sedona.db.connect()
>>> sd.options.memory_limit = "4gb"
>>> sd.options.memory_pool_type = "fair"
"""

def __init__(self):
self._impl = InternalContext()
self.__impl = None
self.options = Options()

@property
def _impl(self):
"""Lazily initialize the internal Rust context on first use.

This allows runtime options (memory_limit, temp_dir, etc.) to be
configured via `self.options` before the context is created.
Once created, runtime options are frozen.
"""
if self.__impl is None:
# Build a dict[str, str] of non-None runtime options
opts = {}
if self.options.memory_limit is not None:
opts["memory_limit"] = str(self.options.memory_limit)
if self.options.temp_dir is not None:
opts["temp_dir"] = self.options.temp_dir
if self.options.memory_pool_type is not None:
opts["memory_pool_type"] = self.options.memory_pool_type
if self.options.unspillable_reserve_ratio is not None:
opts["unspillable_reserve_ratio"] = str(
self.options.unspillable_reserve_ratio
)

# Create the context first, then freeze options. If creation
# fails the user can still correct options and retry.
impl = InternalContext(opts)
self.__impl = impl
self.options.freeze_runtime()
return self.__impl

def create_data_frame(self, obj: Any, schema: Any = None) -> DataFrame:
"""Create a DataFrame from an in-memory or protocol-enabled object.

Expand Down Expand Up @@ -381,7 +421,17 @@ def funcs(self) -> Functions:


def connect() -> SedonaContext:
"""Create a new [SedonaContext][sedonadb.context.SedonaContext]"""
"""Create a new [SedonaContext][sedonadb.context.SedonaContext]

Runtime configuration (memory limits, spill directory, pool type)
can be set via `options` on the returned context before executing
the first query::

sd = sedona.db.connect()
sd.options.memory_limit = "4gb"
sd.options.memory_pool_type = "fair"
sd.options.temp_dir = "/tmp/sedona-spill"
"""
return SedonaContext()


Expand Down
2 changes: 1 addition & 1 deletion python/sedonadb/python/sedonadb/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
class Sedona:
"""Mock sedona Python module

The Apache Sedaona Python ecosystem is centered around the apache-sedona Python
The Apache Sedona Python ecosystem is centered around the apache-sedona Python
package which provides the `sedona` modules. To decouple the maintenance and
provide fine-grained dependency control for projects that need it, sedonadb
is distributed as a standalone package. This mock `sedona` module lets us write
Expand Down
9 changes: 7 additions & 2 deletions python/sedonadb/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{collections::HashMap, sync::Arc};
use datafusion_expr::ScalarUDFImpl;
use pyo3::prelude::*;
use sedona::context::SedonaContext;
use sedona::context_builder::SedonaContextBuilder;
use tokio::runtime::Runtime;

use crate::{
Expand All @@ -39,15 +40,19 @@ pub struct InternalContext {
#[pymethods]
impl InternalContext {
#[new]
fn new(py: Python) -> Result<Self, PySedonaError> {
#[pyo3(signature = (options=HashMap::new()))]
fn new(py: Python, options: HashMap<String, String>) -> Result<Self, PySedonaError> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.map_err(|e| {
PySedonaError::SedonaPython(format!("Failed to build multithreaded runtime: {e}"))
})?;

let inner = wait_for_future(py, &runtime, SedonaContext::new_local_interactive())??;
let builder = SedonaContextBuilder::from_options(&options)
.map_err(|e| PySedonaError::SedonaPython(e.to_string()))?;

let inner = wait_for_future(py, &runtime, builder.build())??;

Ok(Self {
inner,
Expand Down
2 changes: 1 addition & 1 deletion rust/sedona-geo-generic-alg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ float_next_after = { workspace = true }
geo-traits = { workspace = true }
geo-types = { workspace = true, features = ["approx", "use-rstar_0_12"] }
sedona-geo-traits-ext = { workspace = true }
log = "0.4.11"
log = { workspace = true }
num-traits = { workspace = true }
robust = "1.1.0"
rstar = "0.12.0"
Expand Down
1 change: 1 addition & 0 deletions rust/sedona/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ geo-traits = { workspace = true }
geo-types = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
regex = { workspace = true }
sedona-common = { workspace = true }
sedona-datasource = { workspace = true }
sedona-expr = { workspace = true }
Expand Down
Loading