Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added partition handling utils for managed folders #30

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/dku_io_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
########################################################
# ------------- dku_io_utils: 0.1.0 ----------------
# ------------- dku_io_utils: 0.1.1 ----------------

# For more information, see https://github.com/dataiku/dss-plugin-dkulib/tree/main/core/dku_io_utils
# Library version: 0.1.0
# Last update: 2021-07
# Library version: 0.1.1
# Last update: 2024-01
# Author: Dataiku (Alex Combessie)
#########################################################

from .chunked_processing import count_records, process_dataset_chunks # noqa
from .column_descriptions import set_column_descriptions # noqa
from .partitions_handling import get_folder_partition_root, get_dimensions, get_partitions, complete_file_path_pattern, fix_date_elements_folder_path, complete_file_path_time_pattern, get_dimension_value_from_flow_variables, check_only_one_read_partition # noqa
164 changes: 164 additions & 0 deletions core/dku_io_utils/partitions_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# -*- coding: utf-8 -*-
"""Module to get the root path of partitioned folders"""

import dataiku


TIME_DIMENSION_PATTERNS = {"YEAR": "%Y", "MONTH": "%M", "DAY": "%D", "HOUR": "%H"}


def get_folder_partition_root(folder, is_input=False):
"""Retrieve the partition root path using a dataiku.Folder.

Args:
folder (dataiku.Folder): Input or output folder of the recipe used to retrieve the partition path pattern.
is_input: True if the folder must be considered as a input, False if output

Returns:
Partition path or None if folder is not partitioned.
"""
folder_id = folder.get_id()
input_id = folder_id if is_input else None
dku_flow_variables = dataiku.get_flow_variables()
client = dataiku.api_client()
project = client.get_project(dataiku.default_project_key())
folder = project.get_managed_folder(folder_id)
folder_config = folder.get_definition()
partitioning_config = folder_config.get("partitioning")
if not partitioning_config:
return ""
file_path_pattern = partitioning_config.get("filePathPattern", None)
dimensions, types = get_dimensions(partitioning_config)
partitions = get_partitions(dku_flow_variables, dimensions, input_id=input_id)
file_path = complete_file_path_pattern(file_path_pattern, partitions, dimensions, types)
file_path = complete_file_path_time_pattern(dku_flow_variables, file_path, input_id=input_id)
return file_path


def get_dimensions(partitioning_config):
"""Retrieve the list of partition dimension names.

Args:
partitioning_config (dict): Dictionary of partitioning variables.

Returns:
List of dimensions.
"""
dimensions_dict = partitioning_config.get("dimensions")
dimensions = []
types = []
for dimension in dimensions_dict:
dimensions.append(dimension.get("name"))
types.append(dimension.get("type"))
return dimensions, types


def get_partitions(dku_flow_variables, dimensions, input_id=None):
"""Retrieve the list of partition values corresponding to the partition dimensions.

Args:
dku_flow_variables (dict): Dictionary of flow variables for a project.
dimensions (list): List of partition dimensions.
input_id (str): folder id if the folder is an input, None for output folder

Raises:
ValueError: If the dimension value is not found in the input or output

Returns:
List of partitions.
"""
partitions = []
for dimension in dimensions:
dimension_value = get_dimension_value_from_flow_variables(dku_flow_variables, input_id, dimension)
if not dimension_value:
raise ValueError(
f"Partition dimension '{dimension}' not found in output. "
+ "Please make sure your output has the same partition dimensions as your input."
)
partitions.append(dimension_value)
return partitions


def complete_file_path_pattern(file_path_pattern, partitions, dimensions, types):
"""Fill the placeholders of the partition path pattern for the discrete dimensions with the right partition values.

Args:
file_path_pattern (str)
partitions (list): List of partition values corresponding to the partition dimensions.
dimensions (list): List of partition dimensions.

Returns:
File path prefix. Time dimensions pattern are not filled.
"""

if file_path_pattern is None:
partitions = fix_date_elements_folder_path(partitions, types)
return "/".join(partitions)
file_path = file_path_pattern.replace(".*", "")
for partition, dimension in zip(partitions, dimensions):
file_path = file_path.replace(f"%{{{dimension}}}", partition)
return file_path


def fix_date_elements_folder_path(partitions, types):
""" Replace the '-' separator in time dimension with '/' so they can be used in folder paths """
fixed_partitions = []
for partition, type in zip(partitions, types):
if type == "time":
fixed_partitions.append(partition.replace("-", "/"))
else:
fixed_partitions.append(partition)
return fixed_partitions


def complete_file_path_time_pattern(dku_flow_variables, file_path_pattern, input_id=None):
"""Fill the placeholders of the partition path pattern for the time dimensions with the right partition values.

Args:
dku_flow_variables (dict): Dictionary of flow variables for a project.
file_path_pattern (str)
input_id (str): folder id if the folder is an input, None if for output folder

Returns:
File path prefix.
"""
file_path = file_path_pattern
for time_dimension in TIME_DIMENSION_PATTERNS:
time_value = get_dimension_value_from_flow_variables(dku_flow_variables, input_id, time_dimension)
if time_value is not None:
time_pattern = TIME_DIMENSION_PATTERNS.get(time_dimension)
file_path = file_path.replace(time_pattern, time_value)
return file_path


def get_dimension_value_from_flow_variables(dku_flow_variables, input_id, dimension):
if input_id: # input folder, there can be multiple read partitions
dimension_value = dku_flow_variables.get(f"DKU_SRC_{input_id}_{dimension}")
dimension_values = dku_flow_variables.get(f"DKU_SRC_{input_id}_{dimension}_VALUES")
if not dimension_value and dimension_values:
check_only_one_read_partition(dimension_values, dataiku.Folder(input_id))
else: # output folder, there can be only one write partition
dimension_value = dku_flow_variables.get(f"DKU_DST_{dimension}")
return dimension_value


def check_only_one_read_partition(partition_root, dku_computable):
"""Check that input only has one read partition

Args:
partition_root (str): Partition root path of output. None if no partitioning.
dku_computable (dataiku.Folder/dataiku.Dataset): Input dataset or folder.

Raises:
ValuError: If input is partitioned and has multiple read partitions
"""
if partition_root and dku_computable:
if len(dku_computable.read_partitions) > 1:
if isinstance(dku_computable, dataiku.Dataset):
error_message_prefix = f"Input dataset '{dku_computable.short_name}' has multiple read partitions. "
if isinstance(dku_computable, dataiku.Folder):
error_message_prefix = f"Input folder '{dku_computable.get_name()}' has multiple read partitions. "
raise ValueError(
error_message_prefix
+ "Please specify 'Equals' partition dependencies in the Input / Output tab of the recipe."
)
Loading