Skip to content

Commit

Permalink
added optimize delta tables to support all delta tables (#386)
Browse files Browse the repository at this point in the history
* added optimize delta tables to support all delta tables

* warehouse support ID

* added _split_abfss_path, _get_default_file_path

* support ID for lakehouse parameter

* removed from init

* removed

* fixed per comments
  • Loading branch information
m-kovalsky authored Jan 13, 2025
1 parent ebc659f commit 380064b
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 141 deletions.
135 changes: 89 additions & 46 deletions src/sempy_labs/_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,24 @@ def create_abfss_path(
return f"abfss://{lakehouse_workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{delta_table_name}"


def _get_default_file_path() -> str:

default_file_storage = _get_fabric_context_setting(name="fs.defaultFS")

return default_file_storage.split("@")[-1][:-1]


def _split_abfss_path(path: str) -> Tuple[UUID, UUID, str]:

parsed_url = urllib.parse.urlparse(path)

workspace_id = parsed_url.netloc.split("@")[0]
item_id = parsed_url.path.lstrip("/").split("/")[0]
delta_table_name = parsed_url.path.split("/")[-1]

return workspace_id, item_id, delta_table_name


def format_dax_object_name(table: str, column: str) -> str:
"""
Formats a table/column combination to the 'Table Name'[Column Name] format.
Expand Down Expand Up @@ -172,23 +190,40 @@ def resolve_item_name_and_id(
return item_name, item_id


def resolve_dataset_name_and_id(
dataset: str | UUID, workspace: Optional[str | UUID] = None
def resolve_lakehouse_name_and_id(
lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None
) -> Tuple[str, UUID]:

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
type = "Lakehouse"

if _is_valid_uuid(dataset):
dataset_id = dataset
dataset_name = fabric.resolve_item_name(
item_id=dataset_id, type="SemanticModel", workspace=workspace_id
if lakehouse is None:
lakehouse_id = fabric.get_lakehouse_id()
lakehouse_name = fabric.resolve_item_name(
item_id=lakehouse_id, type=type, workspace=workspace_id
)
elif _is_valid_uuid(lakehouse):
lakehouse_id = lakehouse
lakehouse_name = fabric.resolve_item_name(
item_id=lakehouse_id, type=type, workspace=workspace_id
)
else:
dataset_name = dataset
dataset_id = fabric.resolve_item_id(
item_name=dataset, type="SemanticModel", workspace=workspace_id
lakehouse_name = lakehouse
lakehouse_id = fabric.resolve_item_id(
item_name=lakehouse, type=type, workspace=workspace_id
)

return lakehouse_name, lakehouse_id


def resolve_dataset_name_and_id(
dataset: str | UUID, workspace: Optional[str | UUID] = None
) -> Tuple[str, UUID]:

(dataset_name, dataset_id) = resolve_item_name_and_id(
item=dataset, type="SemanticModel", workspace=workspace
)

return dataset_name, dataset_id


Expand Down Expand Up @@ -280,15 +315,15 @@ def resolve_lakehouse_name(


def resolve_lakehouse_id(
lakehouse: str, workspace: Optional[str | UUID] = None
lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None
) -> UUID:
"""
Obtains the ID of the Fabric lakehouse.
Parameters
----------
lakehouse : str
The name of the Fabric lakehouse.
lakehouse : str | uuid.UUID, default=None
The name or ID of the Fabric lakehouse.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID.
Defaults to None which resolves to the workspace of the attached lakehouse
Expand All @@ -300,9 +335,14 @@ def resolve_lakehouse_id(
The ID of the Fabric lakehouse.
"""

return fabric.resolve_item_id(
item_name=lakehouse, type="Lakehouse", workspace=workspace
)
if lakehouse is None:
return fabric.get_lakehouse_id()
elif _is_valid_uuid(lakehouse):
return lakehouse
else:
fabric.resolve_item_id(
item_name=lakehouse, type="Lakehouse", workspace=workspace
)


def get_direct_lake_sql_endpoint(
Expand Down Expand Up @@ -426,7 +466,7 @@ def save_as_delta_table(
write_mode: str,
merge_schema: bool = False,
schema: Optional[dict] = None,
lakehouse: Optional[str] = None,
lakehouse: Optional[str | UUID] = None,
workspace: Optional[str | UUID] = None,
):
"""
Expand All @@ -444,8 +484,8 @@ def save_as_delta_table(
Merges the schemas of the dataframe to the delta table.
schema : dict, default=None
A dictionary showing the schema of the columns and their data types.
lakehouse : str, default=None
The Fabric lakehouse used by the Direct Lake semantic model.
lakehouse : str | uuid.UUID, default=None
The Fabric lakehouse name or ID.
Defaults to None which resolves to the lakehouse attached to the notebook.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID.
Expand All @@ -468,21 +508,16 @@ def save_as_delta_table(
)

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
(lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id(
lakehouse=lakehouse, workspace=workspace_id
)

if lakehouse is None:
lakehouse_id = fabric.get_lakehouse_id()
lakehouse = resolve_lakehouse_name(
lakehouse_id=lakehouse_id, workspace=workspace_id
)
else:
lakehouse_id = resolve_lakehouse_id(lakehouse, workspace_id)

writeModes = ["append", "overwrite"]
write_modes = ["append", "overwrite"]
write_mode = write_mode.lower()

if write_mode not in writeModes:
if write_mode not in write_modes:
raise ValueError(
f"{icons.red_dot} Invalid 'write_type' parameter. Choose from one of the following values: {writeModes}."
f"{icons.red_dot} Invalid 'write_type' parameter. Choose from one of the following values: {write_modes}."
)

if " " in delta_table_name:
Expand All @@ -507,16 +542,19 @@ def save_as_delta_table(
"timestamp": TimestampType(),
}

if schema is None:
spark_df = spark.createDataFrame(dataframe)
if isinstance(dataframe, pd.DataFrame):
if schema is None:
spark_df = spark.createDataFrame(dataframe)
else:
schema_map = StructType(
[
StructField(column_name, type_mapping[data_type], True)
for column_name, data_type in schema.items()
]
)
spark_df = spark.createDataFrame(dataframe, schema_map)
else:
schema_map = StructType(
[
StructField(column_name, type_mapping[data_type], True)
for column_name, data_type in schema.items()
]
)
spark_df = spark.createDataFrame(dataframe, schema_map)
spark_df = dataframe

filePath = create_abfss_path(
lakehouse_id=lakehouse_id,
Expand All @@ -531,7 +569,7 @@ def save_as_delta_table(
else:
spark_df.write.mode(write_mode).format("delta").save(filePath)
print(
f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse}' lakehouse within the '{workspace_name}' workspace."
f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse_name}' lakehouse within the '{workspace_name}' workspace."
)


Expand Down Expand Up @@ -1024,28 +1062,33 @@ def _get_adls_client(account_name):
return service_client


def resolve_warehouse_id(warehouse: str, workspace: Optional[str | UUID]) -> UUID:
def resolve_warehouse_id(
warehouse: str | UUID, workspace: Optional[str | UUID]
) -> UUID:
"""
Obtains the Id for a given warehouse.
Parameters
----------
warehouse : str
The warehouse name
warehouse : str | uuid.UUID
The warehouse name or ID.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID in which the semantic model resides.
Defaults to None which resolves to the workspace of the attached lakehouse
or if no lakehouse attached, resolves to the workspace of the notebook.
Returns
-------
UUID
uuid.UUID
The warehouse Id.
"""

return fabric.resolve_item_id(
item_name=warehouse, type="Warehouse", workspace=workspace
)
if _is_valid_uuid(warehouse):
return warehouse
else:
return fabric.resolve_item_id(
item_name=warehouse, type="Warehouse", workspace=workspace
)


def get_language_codes(languages: str | List[str]):
Expand Down
18 changes: 11 additions & 7 deletions src/sempy_labs/_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from itertools import chain, repeat
from sempy.fabric.exceptions import FabricHTTPException
from sempy_labs._helper_functions import (
resolve_warehouse_id,
resolve_lakehouse_id,
resolve_lakehouse_name_and_id,
resolve_item_name_and_id,
resolve_workspace_name_and_id,
)
from uuid import UUID
Expand Down Expand Up @@ -35,7 +35,7 @@ def _bytes2mswin_bstr(value: bytes) -> bytes:
class ConnectBase:
def __init__(
self,
name: str,
item: str,
workspace: Optional[Union[str, UUID]] = None,
timeout: Optional[int] = None,
endpoint_type: str = "warehouse",
Expand All @@ -45,11 +45,15 @@ def __init__(

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)

# Resolve the appropriate ID (warehouse or lakehouse)
# Resolve the appropriate ID and name (warehouse or lakehouse)
if endpoint_type == "warehouse":
resource_id = resolve_warehouse_id(warehouse=name, workspace=workspace_id)
(resource_id, resource_name) = resolve_item_name_and_id(
item=item, type=endpoint_type.capitalize(), workspace=workspace_id
)
else:
resource_id = resolve_lakehouse_id(lakehouse=name, workspace=workspace_id)
(resource_id, resource_name) = resolve_lakehouse_name_and_id(
lakehouse=item, workspace=workspace_id
)

# Get the TDS endpoint
client = fabric.FabricRestClient()
Expand All @@ -72,7 +76,7 @@ def __init__(
# Set up the connection string
access_token = SynapseTokenProvider()()
tokenstruct = _bytes2mswin_bstr(access_token.encode())
conn_str = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={tds_endpoint};DATABASE={name};Encrypt=Yes;"
conn_str = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={tds_endpoint};DATABASE={resource_name};Encrypt=Yes;"

if timeout is not None:
conn_str += f"Connect Timeout={timeout};"
Expand Down
12 changes: 6 additions & 6 deletions src/sempy_labs/_warehouses.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,15 @@ def delete_warehouse(name: str, workspace: Optional[str | UUID] = None):


def get_warehouse_tables(
warehouse: str, workspace: Optional[str | UUID] = None
warehouse: str | UUID, workspace: Optional[str | UUID] = None
) -> pd.DataFrame:
"""
Shows a list of the tables in the Fabric warehouse. This function is based on INFORMATION_SCHEMA.TABLES.
Parameters
----------
warehouse : str
Name of the Fabric warehouse.
warehouse : str | uuid.UUID
Name or ID of the Fabric warehouse.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID.
Defaults to None which resolves to the workspace of the attached lakehouse
Expand All @@ -185,15 +185,15 @@ def get_warehouse_tables(


def get_warehouse_columns(
warehouse: str, workspace: Optional[str | UUID] = None
warehouse: str | UUID, workspace: Optional[str | UUID] = None
) -> pd.DataFrame:
"""
Shows a list of the columns in each table within the Fabric warehouse. This function is based on INFORMATION_SCHEMA.COLUMNS.
Parameters
----------
warehouse : str
Name of the Fabric warehouse.
warehouse : str | uuid.UUID
Name or ID of the Fabric warehouse.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID.
Defaults to None which resolves to the workspace of the attached lakehouse
Expand Down
Loading

0 comments on commit 380064b

Please sign in to comment.