Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added optimize delta tables to support all delta tables #386

Merged
merged 7 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 89 additions & 46 deletions src/sempy_labs/_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,24 @@ def create_abfss_path(
return f"abfss://{lakehouse_workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{delta_table_name}"


def _get_default_file_path() -> str:

default_file_storage = _get_fabric_context_setting(name="fs.defaultFS")

return default_file_storage.split("@")[-1][:-1]


def _split_abfss_path(path: str) -> Tuple[UUID, UUID, str]:

parsed_url = urllib.parse.urlparse(path)

workspace_id = parsed_url.netloc.split("@")[0]
item_id = parsed_url.path.lstrip("/").split("/")[0]
delta_table_name = parsed_url.path.split("/")[-1]

return workspace_id, item_id, delta_table_name


def format_dax_object_name(table: str, column: str) -> str:
"""
Formats a table/column combination to the 'Table Name'[Column Name] format.
Expand Down Expand Up @@ -172,23 +190,40 @@ def resolve_item_name_and_id(
return item_name, item_id


def resolve_dataset_name_and_id(
dataset: str | UUID, workspace: Optional[str | UUID] = None
def resolve_lakehouse_name_and_id(
lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None
) -> Tuple[str, UUID]:

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
type = "Lakehouse"

if _is_valid_uuid(dataset):
dataset_id = dataset
dataset_name = fabric.resolve_item_name(
item_id=dataset_id, type="SemanticModel", workspace=workspace_id
if lakehouse is None:
lakehouse_id = fabric.get_lakehouse_id()
lakehouse_name = fabric.resolve_item_name(
item_id=lakehouse_id, type=type, workspace=workspace_id
)
elif _is_valid_uuid(lakehouse):
lakehouse_id = lakehouse
lakehouse_name = fabric.resolve_item_name(
item_id=lakehouse_id, type=type, workspace=workspace_id
)
else:
dataset_name = dataset
dataset_id = fabric.resolve_item_id(
item_name=dataset, type="SemanticModel", workspace=workspace_id
lakehouse_name = lakehouse
lakehouse_id = fabric.resolve_item_id(
item_name=lakehouse, type=type, workspace=workspace_id
)

return lakehouse_name, lakehouse_id


def resolve_dataset_name_and_id(
dataset: str | UUID, workspace: Optional[str | UUID] = None
) -> Tuple[str, UUID]:

(dataset_name, dataset_id) = resolve_item_name_and_id(
item=dataset, type="SemanticModel", workspace=workspace
)

return dataset_name, dataset_id


Expand Down Expand Up @@ -280,15 +315,15 @@ def resolve_lakehouse_name(


def resolve_lakehouse_id(
lakehouse: str, workspace: Optional[str | UUID] = None
lakehouse: Optional[str | UUID] = None, workspace: Optional[str | UUID] = None
) -> UUID:
"""
Obtains the ID of the Fabric lakehouse.

Parameters
----------
lakehouse : str
The name of the Fabric lakehouse.
lakehouse : str | uuid.UUID, default=None
The name or ID of the Fabric lakehouse.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID.
Defaults to None which resolves to the workspace of the attached lakehouse
Expand All @@ -300,9 +335,14 @@ def resolve_lakehouse_id(
The ID of the Fabric lakehouse.
"""

return fabric.resolve_item_id(
item_name=lakehouse, type="Lakehouse", workspace=workspace
)
if lakehouse is None:
return fabric.get_lakehouse_id()
elif _is_valid_uuid(lakehouse):
return lakehouse
else:
fabric.resolve_item_id(
item_name=lakehouse, type="Lakehouse", workspace=workspace
)


def get_direct_lake_sql_endpoint(
Expand Down Expand Up @@ -426,7 +466,7 @@ def save_as_delta_table(
write_mode: str,
merge_schema: bool = False,
schema: Optional[dict] = None,
lakehouse: Optional[str] = None,
lakehouse: Optional[str | UUID] = None,
workspace: Optional[str | UUID] = None,
):
"""
Expand All @@ -444,8 +484,8 @@ def save_as_delta_table(
Merges the schemas of the dataframe to the delta table.
schema : dict, default=None
A dictionary showing the schema of the columns and their data types.
lakehouse : str, default=None
The Fabric lakehouse used by the Direct Lake semantic model.
lakehouse : str | uuid.UUID, default=None
The Fabric lakehouse name or ID.
Defaults to None which resolves to the lakehouse attached to the notebook.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID.
Expand All @@ -468,21 +508,16 @@ def save_as_delta_table(
)

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
(lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id(
lakehouse=lakehouse, workspace=workspace_id
)

if lakehouse is None:
lakehouse_id = fabric.get_lakehouse_id()
lakehouse = resolve_lakehouse_name(
lakehouse_id=lakehouse_id, workspace=workspace_id
)
else:
lakehouse_id = resolve_lakehouse_id(lakehouse, workspace_id)

writeModes = ["append", "overwrite"]
write_modes = ["append", "overwrite"]
write_mode = write_mode.lower()

if write_mode not in writeModes:
if write_mode not in write_modes:
raise ValueError(
f"{icons.red_dot} Invalid 'write_type' parameter. Choose from one of the following values: {writeModes}."
f"{icons.red_dot} Invalid 'write_type' parameter. Choose from one of the following values: {write_modes}."
)

if " " in delta_table_name:
Expand All @@ -507,16 +542,19 @@ def save_as_delta_table(
"timestamp": TimestampType(),
}

if schema is None:
spark_df = spark.createDataFrame(dataframe)
if isinstance(dataframe, pd.DataFrame):
if schema is None:
spark_df = spark.createDataFrame(dataframe)
else:
schema_map = StructType(
[
StructField(column_name, type_mapping[data_type], True)
for column_name, data_type in schema.items()
]
)
spark_df = spark.createDataFrame(dataframe, schema_map)
else:
schema_map = StructType(
[
StructField(column_name, type_mapping[data_type], True)
for column_name, data_type in schema.items()
]
)
spark_df = spark.createDataFrame(dataframe, schema_map)
spark_df = dataframe

filePath = create_abfss_path(
lakehouse_id=lakehouse_id,
Expand All @@ -531,7 +569,7 @@ def save_as_delta_table(
else:
spark_df.write.mode(write_mode).format("delta").save(filePath)
print(
f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse}' lakehouse within the '{workspace_name}' workspace."
f"{icons.green_dot} The dataframe has been saved as the '{delta_table_name}' table in the '{lakehouse_name}' lakehouse within the '{workspace_name}' workspace."
)


Expand Down Expand Up @@ -1024,28 +1062,33 @@ def _get_adls_client(account_name):
return service_client


def resolve_warehouse_id(warehouse: str, workspace: Optional[str | UUID]) -> UUID:
def resolve_warehouse_id(
warehouse: str | UUID, workspace: Optional[str | UUID]
) -> UUID:
"""
Obtains the Id for a given warehouse.

Parameters
----------
warehouse : str
The warehouse name
warehouse : str | uuid.UUID
The warehouse name or ID.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID in which the semantic model resides.
Defaults to None which resolves to the workspace of the attached lakehouse
or if no lakehouse attached, resolves to the workspace of the notebook.

Returns
-------
UUID
uuid.UUID
The warehouse Id.
"""

return fabric.resolve_item_id(
item_name=warehouse, type="Warehouse", workspace=workspace
)
if _is_valid_uuid(warehouse):
return warehouse
else:
return fabric.resolve_item_id(
item_name=warehouse, type="Warehouse", workspace=workspace
)


def get_language_codes(languages: str | List[str]):
Expand Down
18 changes: 11 additions & 7 deletions src/sempy_labs/_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from itertools import chain, repeat
from sempy.fabric.exceptions import FabricHTTPException
from sempy_labs._helper_functions import (
resolve_warehouse_id,
resolve_lakehouse_id,
resolve_lakehouse_name_and_id,
resolve_item_name_and_id,
resolve_workspace_name_and_id,
)
from uuid import UUID
Expand Down Expand Up @@ -35,7 +35,7 @@ def _bytes2mswin_bstr(value: bytes) -> bytes:
class ConnectBase:
def __init__(
self,
name: str,
item: str,
workspace: Optional[Union[str, UUID]] = None,
timeout: Optional[int] = None,
endpoint_type: str = "warehouse",
Expand All @@ -45,11 +45,15 @@ def __init__(

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)

# Resolve the appropriate ID (warehouse or lakehouse)
# Resolve the appropriate ID and name (warehouse or lakehouse)
if endpoint_type == "warehouse":
resource_id = resolve_warehouse_id(warehouse=name, workspace=workspace_id)
(resource_id, resource_name) = resolve_item_name_and_id(
item=item, type=endpoint_type.capitalize(), workspace=workspace_id
)
else:
resource_id = resolve_lakehouse_id(lakehouse=name, workspace=workspace_id)
(resource_id, resource_name) = resolve_lakehouse_name_and_id(
lakehouse=item, workspace=workspace_id
)

# Get the TDS endpoint
client = fabric.FabricRestClient()
Expand All @@ -72,7 +76,7 @@ def __init__(
# Set up the connection string
access_token = SynapseTokenProvider()()
tokenstruct = _bytes2mswin_bstr(access_token.encode())
conn_str = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={tds_endpoint};DATABASE={name};Encrypt=Yes;"
conn_str = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={tds_endpoint};DATABASE={resource_name};Encrypt=Yes;"

if timeout is not None:
conn_str += f"Connect Timeout={timeout};"
Expand Down
12 changes: 6 additions & 6 deletions src/sempy_labs/_warehouses.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,15 @@ def delete_warehouse(name: str, workspace: Optional[str | UUID] = None):


def get_warehouse_tables(
warehouse: str, workspace: Optional[str | UUID] = None
warehouse: str | UUID, workspace: Optional[str | UUID] = None
) -> pd.DataFrame:
"""
Shows a list of the tables in the Fabric warehouse. This function is based on INFORMATION_SCHEMA.TABLES.
Parameters
----------
warehouse : str
Name of the Fabric warehouse.
warehouse : str | uuid.UUID
Name or ID of the Fabric warehouse.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID.
Defaults to None which resolves to the workspace of the attached lakehouse
Expand All @@ -185,15 +185,15 @@ def get_warehouse_tables(


def get_warehouse_columns(
warehouse: str, workspace: Optional[str | UUID] = None
warehouse: str | UUID, workspace: Optional[str | UUID] = None
) -> pd.DataFrame:
"""
Shows a list of the columns in each table within the Fabric warehouse. This function is based on INFORMATION_SCHEMA.COLUMNS.
Parameters
----------
warehouse : str
Name of the Fabric warehouse.
warehouse : str | uuid.UUID
Name or ID of the Fabric warehouse.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID.
Defaults to None which resolves to the workspace of the attached lakehouse
Expand Down
Loading
Loading