Skip to content

Commit

Permalink
Feature tslong (#7)
Browse files Browse the repository at this point in the history
* Refactor

* Make codespell and some pre-commit happy

* Make precommit quasi happy

* Update doc images
  • Loading branch information
ghiggi authored Apr 9, 2024
1 parent c2fbdfe commit 718714f
Show file tree
Hide file tree
Showing 35 changed files with 1,301 additions and 330 deletions.
Binary file modified docs/source/static/documentation_release.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/source/static/package_release.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions tstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

__all__ = [
"open_tsdf",
"open_tslong",
"TSArray",
"TSDtype",
"TS",
Expand Down
7 changes: 0 additions & 7 deletions tstore/archive/__init__,py

This file was deleted.

17 changes: 17 additions & 0 deletions tstore/archive/attributes/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,20 @@ def read_attributes(base_dir, tstore_ids=None):
# Subset tstore_ids
# TODO: maybe possible to pass filters to pd.read_parquet
return df


def write_attributes(df, base_dir):
"""Write static attributes dataframe.
Assume df pandas !
TODO: polars, pyarrow does not have index
--> Ensure tstore_id column
"""
fpath = define_attributes_filepath(base_dir)
df.index = df.index.astype("string[pyarrow]")
df.to_parquet(
fpath,
engine="pyarrow",
compression="snappy",
index=True,
)
19 changes: 10 additions & 9 deletions tstore/archive/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@
import numpy as np


def get_available_ts_variables(base_dir):
"""Get available TStore timeseries."""
from tstore.archive.readers import read_metadata

metadata = read_metadata(base_dir=base_dir)
return metadata["ts_variables"]


def check_is_tstore(base_dir):
"""Check is a TStore."""
# TODO
pass
return base_dir


def get_available_ts_variables(base_dir):
"""Get available TStore timeseries."""
from tstore.archive.metadata.readers import read_tstore_metadata

metadata = read_tstore_metadata(base_dir=base_dir)
return metadata["ts_variables"]


def check_ts_variables(ts_variables, base_dir):
"""Check valid ts_variables.
Expand All @@ -40,7 +40,7 @@ def check_ts_variables(ts_variables, base_dir):
unvalid_ts_variables = unvalid_ts_variables.tolist()
if len(unvalid_ts_variables) > 0:
raise ValueError(
f"Valid ts_variables are {ts_variables}. Invalid: {unvalid_ts_variables}.",
f"Valid ts_variables are {available_ts_variables}. Invalid: {unvalid_ts_variables}.",
)
return ts_variables

Expand All @@ -50,6 +50,7 @@ def check_tstore_ids(tstore_ids, base_dir):
If tstore_ids=None, return None.
"""
# get_available_ts_store_ids(base_dir) # look into disk (based on tstore_structure)
# TODO:
pass
return tstore_ids
Expand Down
33 changes: 30 additions & 3 deletions tstore/archive/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,38 @@ def define_metadata_filepath(base_dir):

def get_tstore_structure(base_dir):
"""Get TStore structure."""
from tstore.archive.readers import read_metadata
from tstore.archive.metadata.readers import read_tstore_metadata

metadata = read_metadata(base_dir=base_dir)
metadata = read_tstore_metadata(base_dir=base_dir)
return metadata["tstore_structure"]


def get_time_var(base_dir):
"""Get TStore time variable."""
from tstore.archive.metadata.readers import read_tstore_metadata

metadata = read_tstore_metadata(base_dir=base_dir)
return metadata["time_var"]


def get_id_var(base_dir):
"""Get TStore ID variable."""
from tstore.archive.metadata.readers import read_tstore_metadata

metadata = read_tstore_metadata(base_dir=base_dir)
return metadata["id_var"]


def get_partitions(base_dir, ts_variable):
"""Get TStore time series partitioning."""
from tstore.archive.metadata.readers import read_tstore_metadata

metadata = read_tstore_metadata(base_dir=base_dir)
partitioning = metadata["partitioning"][ts_variable]
partitions = partitioning.split("/") if partitioning is not None else []
return partitions


def get_ts_info(base_dir, ts_variable):
"""Retrieve filepaths and tstore_ids for a specific ts_variable."""
tstore_structure = get_tstore_structure(base_dir)
Expand All @@ -67,4 +93,5 @@ def get_ts_info(base_dir, ts_variable):
tstore_ids = [os.path.basename(fpath) for fpath in fpaths]
else:
raise ValueError("Valid tstore_structure are 'id-var' and 'var-id'.")
return fpaths, tstore_ids
partitions = get_partitions(base_dir, ts_variable)
return fpaths, tstore_ids, partitions
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
Created on Mon Jun 12 17:58:15 2023.
Created on Mon Jun 12 22:23:06 2023.
@author: ghiggi
"""
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def _read_yaml_metadata(fpath):
return metadata


def read_metadata(base_dir):
def read_tstore_metadata(base_dir):
"""Read TStore metadata."""
metadata_fpath = define_metadata_filepath(base_dir)
metadata = _read_yaml_metadata(metadata_fpath)
Expand Down
27 changes: 27 additions & 0 deletions tstore/archive/metadata/writers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env python3
"""
Created on Mon Apr 8 17:24:09 2024.
@author: ghiggi
"""
import yaml

from tstore.archive.io import define_metadata_filepath


def _write_yaml_metadata(metadata, fpath):
"""Write metadata YAML file."""
with open(fpath, "w") as file:
yaml.dump(metadata, file)


def write_tstore_metadata(base_dir, ts_variables, id_var, time_var, tstore_structure, partitioning):
"""Write TStore metadata file."""
metadata_fpath = define_metadata_filepath(base_dir)
metadata = {}
metadata["ts_variables"] = ts_variables
metadata["time_var"] = time_var
metadata["id_var"] = id_var
metadata["tstore_structure"] = tstore_structure
metadata["partitioning"] = partitioning
_write_yaml_metadata(metadata=metadata, fpath=metadata_fpath)
112 changes: 112 additions & 0 deletions tstore/archive/partitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#!/usr/bin/env python3
"""
Created on Mon Apr 8 17:05:26 2024.
@author: ghiggi
"""


def get_partitioning_mapping_dict(time_var, backend="pandas"):
# Mapping of partitioning components to corresponding pandas attributes
if backend == "pandas":
partitioning_mapping = {
"year": lambda df: df[time_var].dt.year,
"month": lambda df: df[time_var].dt.month,
"day": lambda df: df[time_var].dt.day,
"doy": lambda df: df[time_var].dt.dayofyear,
"dow": lambda df: df[time_var].dt.dayofweek,
# week TODO
"hh": lambda df: df[time_var].dt.hour,
"mm": lambda df: df[time_var].dt.minute,
"ss": lambda df: df[time_var].dt.second,
}
elif backend == "polars":
partitioning_mapping = {
"year": lambda df: df[time_var].dt.year(),
"month": lambda df: df[time_var].dt.month(),
"day": lambda df: df[time_var].dt.day(),
"doy": lambda df: df[time_var].dt.ordinal_day(),
"dow": lambda df: df[time_var].dt.weekday(),
# 'week': lambda df: df[time_var].dt.week(),
"hh": lambda df: df[time_var].dt.hour(),
"mm": lambda df: df[time_var].dt.minute(),
"ss": lambda df: df[time_var].dt.second(),
}

else:
raise NotImplementedError(f"Backend {backend}")
# TODO: add quarter, daysinmonth, month_name and relevant checks
# TODO: partitioning_str: (YYYY/MM/DD) or (YYYY/DOY/HH). Or list ?
# TODO: provide proxy for year(YYYY) and month (MM) ? But month conflicts with minutes ?

# TODO: for polars
return partitioning_mapping


def get_valid_partitions():
"""Get valid partitioning components."""
return list(get_partitioning_mapping_dict(time_var="dummy"))


def check_partitions(partitioning_str):
"""Check partitioning components of partitinoning string.
Return the partitioning components.
"""
if partitioning_str is None:
return None

# Parse the partitioning string to extract partitioning components
partitioning_components = partitioning_str.split("/")

# Get valid partitions
valid_partitions = get_valid_partitions()

# Check specified partitions
partitions = []
for component in partitioning_components:
if component.lower() not in valid_partitions:
raise ValueError(f"Invalid partitioning component '{component}'")
partitions.append(component.lower())

# Ensure month/day or doy is specified
if "month" in partitions and "doy" in partitions:
raise ValueError("Either specify 'month' or 'doy' (day of year).")
if "day" in partitions and "doy" in partitions:
raise ValueError("Either specify 'day' or 'doy' (day of year).")

return partitions


def check_partitioning(partitioning, ts_variables):
"""Check to_tstore partitioning values."""
if not isinstance(partitioning, (dict, str, type(None))):
raise TypeError("")
if isinstance(partitioning, str) or partitioning is None:
partitioning = {ts_variable: partitioning for ts_variable in ts_variables}
for ts_variable, partitioning_str in partitioning.items():
try:
partitions = check_partitions(partitioning_str)
if partitions is not None:
partitioning[ts_variable] = "/".join(partitions)
except Exception as e:
raise ValueError(f"Invalid partitioning for {ts_variable}: {e}")
return partitioning


def add_partitioning_columns(df, partitioning_str, time_var, backend):
"""Add partitioning columns to the dataframe based on the partitioning string."""
if partitioning_str is None:
return df, None

partitions = check_partitions(partitioning_str)
partitioning_mapping = get_partitioning_mapping_dict(time_var=time_var, backend=backend)
for component in partitions:
if backend in ["pandas"]:
df[component] = partitioning_mapping[component](df)
elif backend == "polars":
df_series = partitioning_mapping[component](df)
df = df.with_columns(df_series.alias(component))
else:
raise NotImplementedError
return df, partitions
2 changes: 1 addition & 1 deletion tstore/archive/ts/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
Created on Mon Jun 12 22:22:27 2023.
Created on Mon Jun 12 22:23:06 2023.
@author: ghiggi
"""
13 changes: 0 additions & 13 deletions tstore/archive/ts/partitioning.py

This file was deleted.

6 changes: 6 additions & 0 deletions tstore/archive/ts/readers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env python3
"""
Created on Mon Jun 12 22:23:06 2023.
@author: ghiggi
"""
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import dask.dataframe as dd
import pandas as pd

from tstore.archive.ts.filtering import get_time_filters
from tstore.archive.ts.partitioning import get_dataset_partitioning_columns
from tstore.archive.ts.utility import get_time_filters


def open_ts(
fpath,
partitions,
columns=None,
start_time=None,
end_time=None,
Expand Down Expand Up @@ -60,7 +60,6 @@ def open_ts(
)

# Drop partitioning columns
partitioning_columns = get_dataset_partitioning_columns(fpath)
df = df.drop(columns=partitioning_columns)
df = df.drop(columns=partitions)

return df
Loading

0 comments on commit 718714f

Please sign in to comment.