Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added optimize delta tables to support all delta tables #386

Merged
merged 7 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 89 additions & 46 deletions src/sempy_labs/_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,24 @@ def create_abfss_path(
return f"abfss://{lakehouse_workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{delta_table_name}"


def _get_default_file_path() -> str:

default_file_storage = _get_fabric_context_setting(name="fs.defaultFS")

return default_file_storage.split("@")[-1][:-1]


def _split_abfss_path(path: str) -> Tuple[UUID, UUID, str]:

# Extracts the workspace ID, item ID and delta table name from the abfss path.

workspace_id = path.split("abfss://")[1].split("@")[0]
m-kovalsky marked this conversation as resolved.
Show resolved Hide resolved
item_id = path.split(".com/")[1].split("/")[0]
delta_table_name = path.split("/")[-1]

return workspace_id, item_id, delta_table_name


def format_dax_object_name(table: str, column: str) -> str:
"""
Formats a table/column combination to the 'Table Name'[Column Name] format.
Expand Down Expand Up @@ -172,23 +190,40 @@ def resolve_item_name_and_id(
return item_name, item_id


def resolve_dataset_name_and_id(
dataset: str | UUID, workspace: Optional[str | UUID] = None
def resolve_lakehouse_name_and_id(
lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None
) -> Tuple[str, UUID]:

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
type = "Lakehouse"

if _is_valid_uuid(dataset):
dataset_id = dataset
dataset_name = fabric.resolve_item_name(
item_id=dataset_id, type="SemanticModel", workspace=workspace_id
if lakehouse is None:
lakehouse_id = fabric.get_lakehouse_id()
lakehouse_name = fabric.resolve_item_name(
item_id=lakehouse_id, type=type, workspace=workspace_id
)
elif _is_valid_uuid(lakehouse):
lakehouse_id = lakehouse
lakehouse_name = fabric.resolve_item_name(
item_id=lakehouse_id, type=type, workspace=workspace_id
)
else:
dataset_name = dataset
dataset_id = fabric.resolve_item_id(
item_name=dataset, type="SemanticModel", workspace=workspace_id
lakehouse_name = lakehouse
lakehouse_id = fabric.resolve_item_id(
item_name=lakehouse, type=type, workspace=workspace_id
)

return lakehouse_name, lakehouse_id


def resolve_dataset_name_and_id(
dataset: str | UUID, workspace: Optional[str | UUID] = None
) -> Tuple[str, UUID]:

(dataset_name, dataset_id) = resolve_item_name_and_id(
item=dataset, type="SemanticModel", workspace=workspace
)

return dataset_name, dataset_id


Expand Down Expand Up @@ -280,15 +315,15 @@ def resolve_lakehouse_name(


def resolve_lakehouse_id(
lakehouse: str, workspace: Optional[str | UUID] = None
lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None
) -> UUID:
"""
Obtains the ID of the Fabric lakehouse.

Parameters
----------
lakehouse : str
The name of the Fabric lakehouse.
lakehouse : str | uuid.UUID, default=None
The name or ID of the Fabric lakehouse.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID.
Defaults to None which resolves to the workspace of the attached lakehouse
Expand All @@ -300,9 +335,14 @@ def resolve_lakehouse_id(
The ID of the Fabric lakehouse.
"""

return fabric.resolve_item_id(
item_name=lakehouse, type="Lakehouse", workspace=workspace
)
if lakehouse is None:
return fabric.get_lakehouse_id()
elif _is_valid_uuid(lakehouse):
return lakehouse
else:
fabric.resolve_item_id(
item_name=lakehouse, type="Lakehouse", workspace=workspace
)


def get_direct_lake_sql_endpoint(
Expand Down Expand Up @@ -426,7 +466,7 @@ def save_as_delta_table(
write_mode: str,
merge_schema: bool = False,
schema: Optional[dict] = None,
lakehouse: Optional[str] = None,
lakehouse: Optional[str | UUID] = None,
workspace: Optional[str | UUID] = None,
):
"""
Expand All @@ -444,8 +484,8 @@ def save_as_delta_table(
Merges the schemas of the dataframe to the delta table.
schema : dict, default=None
A dictionary showing the schema of the columns and their data types.
lakehouse : str, default=None
The Fabric lakehouse used by the Direct Lake semantic model.
lakehouse : str | uuid.UUID, default=None
The Fabric lakehouse name or ID.
Defaults to None which resolves to the lakehouse attached to the notebook.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID.
Expand All @@ -468,21 +508,16 @@ def save_as_delta_table(
)

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
(lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id(
lakehouse=lakehouse, workspace=workspace_id
)

if lakehouse is None:
lakehouse_id = fabric.get_lakehouse_id()
lakehouse = resolve_lakehouse_name(
lakehouse_id=lakehouse_id, workspace=workspace_id
)
else:
lakehouse_id = resolve_lakehouse_id(lakehouse, workspace_id)

writeModes = ["append", "overwrite"]
write_modes = ["append", "overwrite"]
write_mode = write_mode.lower()

if write_mode not in writeModes:
if write_mode not in write_modes:
raise ValueError(
f"{icons.red_dot} Invalid 'write_type' parameter. Choose from one of the following values: {writeModes}."
f"{icons.red_dot} Invalid 'write_type' parameter. Choose from one of the following values: {write_modes}."
)

if " " in delta_table_name:
Expand All @@ -507,16 +542,19 @@ def save_as_delta_table(
"timestamp": TimestampType(),
}

if schema is None:
spark_df = spark.createDataFrame(dataframe)
if isinstance(dataframe, pd.DataFrame):
if schema is None:
spark_df = spark.createDataFrame(dataframe)
else:
schema_map = StructType(
[
StructField(column_name, type_mapping[data_type], True)
for column_name, data_type in schema.items()
]
)
spark_df = spark.createDataFrame(dataframe, schema_map)
else:
schema_map = StructType(
[
StructField(column_name, type_mapping[data_type], True)
for column_name, data_type in schema.items()
]
)
spark_df = spark.createDataFrame(dataframe, schema_map)
spark_df = dataframe

filePath = create_abfss_path(
lakehouse_id=lakehouse_id,
Expand All @@ -531,7 +569,7 @@ def save_as_delta_table(
else:
spark_df.write.mode(write_mode).format("delta").save(filePath)
print(
f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse}' lakehouse within the '{workspace_name}' workspace."
f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse_name}' lakehouse within the '{workspace_name}' workspace."
)


Expand Down Expand Up @@ -1024,28 +1062,33 @@ def _get_adls_client(account_name):
return service_client


def resolve_warehouse_id(warehouse: str, workspace: Optional[str | UUID]) -> UUID:
def resolve_warehouse_id(
warehouse: str | UUID, workspace: Optional[str | UUID]
) -> UUID:
"""
Obtains the Id for a given warehouse.

Parameters
----------
warehouse : str
The warehouse name
warehouse : str | uuid.UUID
The warehouse name or ID.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID in which the semantic model resides.
Defaults to None which resolves to the workspace of the attached lakehouse
or if no lakehouse attached, resolves to the workspace of the notebook.

Returns
-------
UUID
uuid.UUID
The warehouse Id.
"""

return fabric.resolve_item_id(
item_name=warehouse, type="Warehouse", workspace=workspace
)
if _is_valid_uuid(warehouse):
return warehouse
else:
return fabric.resolve_item_id(
item_name=warehouse, type="Warehouse", workspace=workspace
)


def get_language_codes(languages: str | List[str]):
Expand Down
80 changes: 80 additions & 0 deletions src/sempy_labs/_optimize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import pandas as pd
from typing import Optional, Union, List
from uuid import UUID
from sempy_labs._helper_functions import (
resolve_workspace_name_and_id,
resolve_lakehouse_name_and_id,
resolve_item_name_and_id,
create_abfss_path,
)
from tqdm.auto import tqdm


def optimize_delta_tables(
tables: Optional[Union[str, List[str]]] = None,
source: Optional[str | UUID] = None,
source_type: str = "Lakehouse",
workspace: Optional[str | UUID] = None,
):
"""
Runs the `OPTIMIZE <https://docs.delta.io/latest/optimizations-oss.html>`_ function over the specified delta tables.

Parameters
----------
tables : str | List[str], default=None
The table(s) to optimize.
Defaults to None which resovles to optimizing all tables within the lakehouse.
source : str | uuid.UUID, default=None
The source location of the delta table (i.e. lakehouse).
Defaults to None which resolves to the lakehouse attached to the notebook.
source_type : str, default="Lakehouse"
The source type (i.e. "Lakehouse", "SemanticModel")
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID used by the lakehouse.
Defaults to None which resolves to the workspace of the attached lakehouse
or if no lakehouse attached, resolves to the workspace of the notebook.
"""

from pyspark.sql import SparkSession
from sempy_labs.lakehouse._get_lakehouse_tables import get_lakehouse_tables
from delta import DeltaTable

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)

if source is None:
(item_name, item_id) = resolve_lakehouse_name_and_id()
else:
(item_name, item_id) = resolve_item_name_and_id(
item=source, type=source_type, workspace=workspace_id
)

if isinstance(tables, str):
tables = [tables]

if source_type == "Lakehouse":
dfL = get_lakehouse_tables(lakehouse=item_name, workspace=workspace_id)
dfL_delta = dfL[dfL["Format"] == "delta"]

if tables is not None:
delta_tables = dfL_delta[dfL_delta["Table Name"].isin(tables)]
else:
delta_tables = dfL_delta.copy()
else:
data = []
for t in tables:
new_data = {
"Table Name": t,
"Location": create_abfss_path(workspace_id, item_id, t),
}
data.append(new_data)

delta_tables = pd.DataFrame(data)

spark = SparkSession.builder.getOrCreate()

for _, r in (bar := tqdm(delta_tables.iterrows())):
tableName = r["Table Name"]
tablePath = r["Location"]
bar.set_description(f"Optimizing the '{tableName}' table...")
deltaTable = DeltaTable.forPath(spark, tablePath)
deltaTable.optimize().executeCompaction()
m-kovalsky marked this conversation as resolved.
Show resolved Hide resolved
18 changes: 11 additions & 7 deletions src/sempy_labs/_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from itertools import chain, repeat
from sempy.fabric.exceptions import FabricHTTPException
from sempy_labs._helper_functions import (
resolve_warehouse_id,
resolve_lakehouse_id,
resolve_lakehouse_name_and_id,
resolve_item_name_and_id,
resolve_workspace_name_and_id,
)
from uuid import UUID
Expand Down Expand Up @@ -35,7 +35,7 @@ def _bytes2mswin_bstr(value: bytes) -> bytes:
class ConnectBase:
def __init__(
self,
name: str,
item: str,
workspace: Optional[Union[str, UUID]] = None,
timeout: Optional[int] = None,
endpoint_type: str = "warehouse",
Expand All @@ -45,11 +45,15 @@ def __init__(

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)

# Resolve the appropriate ID (warehouse or lakehouse)
# Resolve the appropriate ID and name (warehouse or lakehouse)
if endpoint_type == "warehouse":
resource_id = resolve_warehouse_id(warehouse=name, workspace=workspace_id)
(resource_id, resource_name) = resolve_item_name_and_id(
item=item, type=endpoint_type.capitalize(), workspace=workspace_id
)
else:
resource_id = resolve_lakehouse_id(lakehouse=name, workspace=workspace_id)
(resource_id, resource_name) = resolve_lakehouse_name_and_id(
lakehouse=item, workspace=workspace_id
)

# Get the TDS endpoint
client = fabric.FabricRestClient()
Expand All @@ -72,7 +76,7 @@ def __init__(
# Set up the connection string
access_token = SynapseTokenProvider()()
tokenstruct = _bytes2mswin_bstr(access_token.encode())
conn_str = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={tds_endpoint};DATABASE={name};Encrypt=Yes;"
conn_str = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={tds_endpoint};DATABASE={resource_name};Encrypt=Yes;"

if timeout is not None:
conn_str += f"Connect Timeout={timeout};"
Expand Down
Loading
Loading