from pathlib import Path== Overview
CacheManager is a Python library that manages the cache of data that is too expensive to compute.It provides functionality to store, retrieve, and manage cached objects efficiently, ensuring that the cost of storing an object is less than the cost of re-computing it.
CacheManager in itself is decoupled from the metadata storage and object storage.Currently it contains example implementation of metadata storage in SQLite database and object storage in the file system (and a mock storage used by the unit tests).The user can provide their own implementations of these interfaces, which will be covered in the future documentation.
To install CacheManager, you can use pip
:
pip install git+https://github.com/adamryczkowski/CacheManager.git
Alternatively, you can install it using poetry
:
poetry add git+https://github.com/adamryczkowski/CacheManager.git
Using a cache requires four elements:
-
A cache instance that provides the functionality to store, retrieve, and manage cached objects.
-
A pure function object that will compute the object if it is not found in the cache. (It makes no sense to cache a function that has side effects).
-
A serializable object holding the result of the computation.
-
Set of arguments to that function object that are hashable, so they can be used as a key to identify an item in the cache.
from CacheManager import generate_file_cache, StorageKeyGenerator_Path, ModelCacheManagerConfig
from pathlib import Path
# Initialize the cache
storage_path_folder = Path("/home/user/.cache/MyApplicationCache")
metadata_db = Path("/home/user/.config/MyApplication/metadata.sqlite")
storage_file_naming_settings = StorageKeyGenerator_Path(
file_prefix="model_", file_extension="bin"
)
initial_config = ModelCacheManagerConfig()
initial_config.reserved_free_space = 1024 * 1024 * 1024 # 1 GB set aside
initial_config.cost_of_minute_compute_rel_to_cost_of_1GB = 120 # 1 GB of storage is just worth as 2 hours of compute time
cache = generate_file_cache(
cached_dir=storage_path_folder,
initial_config=initial_config,
storage_key_generator=storage_file_naming_settings,
db_filename=metadata_db,
)
CacheManager is designed to be very flexible in terms of how the result objects are serialized. It also contains pre-defined serializers for pickle serialization and JSON serialization.
The computation function returns the result wrapped with a pickle_wrap_promise
function. The pickle_wrap_promise
function returns an object of I_ItemProducer
interface which contains methods that use pickle for serialization and deserialization.
import time
import datetime as dt
from CacheManager import I_ItemProducer, pickle_wrap_promise, calc_hash
import numpy as np
class SomeHeavyResult: # Some pickle-serializable class
_data: bytes
def __init__(self, object_size: int):
# Array of random bytes of size 'object_size'
self._data = np.random.bytes(object_size)
assert isinstance(self._data, bytes)
def some_heavy_computation(
arg1_important_arg: str, arg2_compute_time: dt.timedelta, arg3_result_size: int
) -> I_ItemProducer:
def compute_function(arg1_important_arg: str, arg2_compute_time: dt.timedelta, arg3_result_size: int) -> SomeHeavyResult:
"""A user-supplied pure function that computes the result.
"""
time.sleep(arg2_compute_time.total_seconds())
return SomeHeavyResult(arg3_result_size)
kwargs = { # Arguments are gathered to any calc_hash hashable object to produce a key.
"arg1": arg1_important_arg, # Every argument is part of the result's e
"arg2": arg2_compute_time,
"arg3": arg3_result_size,
}
item_key = calc_hash(kwargs)
# noinspection PyTypeChecker
return pickle_wrap_promise(item_key, compute_function, **kwargs)
item_key
does not need to be produced by the calc_hash
function, but it must be a hash object wrapped by the EntityHash
wrapper. EntityHash
can be built from an integer or binary data.
Cache cannot just store a black-box result object: it needs to know how to serialize and deserialize it. The I_ItemProducer
interface provides methods for that (and also bundles it with the key of the object).
JSON serialization is similar - it requires the user to also pass a class type that will be serialized to and deserialized from JSON. This is because string is insufficient means to name a class in Python.
For actual serialization json_wrap_promise
uses the pydantic
library, so the result class must be a subclass of pydantic.BaseModel
. The next session shows how to build a custom-serialized result class.
from pydantic import BaseModel, ConfigDict
import numpy as np
import datetime as dt
import time
from CacheManager import I_ItemProducer, json_wrap_promise, calc_hash
class SomeHeavyResult(BaseModel): # json_wrap_promise requires a pydantic model
data: bytes
model_config = ConfigDict( # An example of an optional configuration for the model
set_json_bytes="base64", val_json_bytes="base64"
)
def __init__(self, object_size: int):
assert isinstance(object_size, int)
data = np.random.bytes(object_size)
super().__init__(data=data)
def some_heavy_computation(
arg1_important_arg: str, arg2_compute_time: dt.timedelta, arg3_result_size: int
) -> SomeHeavyResult:
time.sleep(arg2_compute_time.total_seconds())
return SomeHeavyResult(arg3_result_size)
def wrapped_heavy_computation(
arg1_important_arg: str, arg2_compute_time: dt.timedelta, arg3_result_size: int
) -> I_ItemProducer:
kwargs = {
"arg1_important_arg": arg1_important_arg,
"arg2_compute_time": arg2_compute_time,
"arg3_result_size": arg3_result_size,
}
item_key = calc_hash(kwargs)
return json_wrap_promise(
item_key, SomeHeavyResult, _producer=some_heavy_computation, **kwargs
)
If the user wants to use a custom serialization method, they can implement their own I_ItemProducer
object.The object must implement the serialize
and deserialize
methods.
from __future__ import annotations
import datetime as dt
import pickle
import time
import zlib
from pathlib import Path
from typing import Any, Optional
from CacheManager import I_ItemProducer, calc_hash, EntityHash, I_AbstractItemID
class SomeHeavyComputation(I_ItemProducer):
"""Class that encapsulates the computation of a heavy object together with its serialization functions and means to generate its ID (item_key) that is used to identify it in the cache.
Note, that the class itself is not going to be serialized. Only the result of the `compute_item` is.
"""
compute_arguments: dict
def __init__(self, **kwargs):
self.compute_arguments = kwargs
# @overrides
def get_item_key(self) -> EntityHash:
return calc_hash(self.compute_arguments)
# @overrides
def compute_item(self) -> Any:
return self.some_heavy_computation(**self.compute_arguments)
@staticmethod
def some_heavy_computation(
arg1_important_arg: str, arg2_compute_time: dt.timedelta, arg3_result_size: int
) -> SomeHeavyComputation:
# staticmethod just for illustration. It can be also implemented directly in `def compute_item(self) -> Any` method.
time.sleep(arg2_compute_time.total_seconds())
return SomeHeavyComputation(arg3_result_size=arg3_result_size)
# @overrides
def instantiate_item(self, data: bytes) -> Any:
uncompressed_data = zlib.decompress(data)
item = pickle.loads(uncompressed_data)
return item
# @overrides
def serialize_item(self, item: Any) -> bytes:
bytes = pickle.dumps(item)
compressed_bytes = zlib.compress(bytes)
return compressed_bytes
# @overrides
def propose_item_storage_key(self)-> Optional[Path | I_AbstractItemID]:
return None # This function allows an alternative way to compute storage key for the item. If it is not provided, the default storage key generation object is used.
# We will use the `SomeHeavyComputation` class constructor directly in place of the `json_wrap_promise` or `pickle_wrap_promise` functions.
Tip
|
NB Code above has # @overrides commented out.I personally recommend to add from overrides import overrides into your project and have the method overrides being verified at runtime by this excellent library!
|
CacheManager requires a unique key for each object stored in the cache. The key is used to identify the object in the cache. The key may be generated by the provided calc_hash
function, or it can be provided by the user in the form of either integer or binary data wrapped by the EntityHash
wrapper.
EntityHash
is provided by a separate installable library, EntityHash, on which CacheManager depends.
from CacheManager import ObjectCache, CacheItem
import datetime as dt
def test1(cache:ObjectCache):
cache = cache
object_promise = SomeHeavyComputation( # `json_wrap_promise`, `pickle_wrap_promise` or any other any other way to produce an I_ItemProducer object.
arg1_important_arg="test1",
arg2_compute_time=dt.timedelta(seconds=5),
arg3_result_size=128,
)
result = cache.get_object(object_promise) # This will compute the object if it is not found in the cache.
cache_item:CacheItem = cache.get_object_info(object_promise.get_item_key()) # This will return the cache item info, which is used to query the cache item's metadata.
You can query the cache for the metadata regarding the object by a key. If the object has been seen by the cache, you will return the CacheItem
object holding all the ways to read the object’s storage key, size, computation time, utility and anything else. If the object has not been seen by the cache (i.e. it has never been requested before), you get None:
from CacheManager import CacheItem, EntityHash, I_AbstractItemID
from pathlib import Path
from typing import Union
from humanize import naturalsize, naturaldelta
import datetime as dt
ItemID = Union[Path, I_AbstractItemID]
def cache_item_props_demo(item:CacheItem[ItemID]):
print(f"Here's the metadata of the item {item.pretty_description}:")
item_key:EntityHash = item.item_key
print(f"Item key used to retrieve it from cache: {item_key.as_hex}")
item_storage_key:ItemID = item.item_storage_key
print(f"Storage key used to identify the item in the storage: {item_storage_key} (i.e. it can be a Path)")
if item.exists:
print(f"Item is currently stored in cache.")
else:
print(f"Item has been seen by the cache, but it is not stored")
print(f"Size of the storage taken by the item: {naturalsize(item.filesize)}")
print(f"Computation time for the item: {naturaldelta(item.computation_time)}")
print(f"Net utility of holding the item (if it is already in the cache) or adding it (if it does not): {item.utility:.2f}. Negative value means that the item is not worth storing.")
access_list:list[dt.datetime] = item.get_history_of_accesses()
The prunning algorithm iterates over all the elements currently stored in the cache and removes the ones that are not worth storing. The algorithm is based on the utility of the object, which is the cost of storing the object versus the cost of recomputing it.
The interface is a straightforward, single-threaded method:
from CacheManager import ObjectCache
def prune_cache(cache:ObjectCache):
cache.prune_cache(remove_history=False, verbose=True)
If you choose to remove the history (of accesses), the cache will forget about the objects that have been accessed in the past. This is useful if you want to keep the cache small and do not want to store the history of accesses, but otherwise it is better to keep is, so the cache knows better which objects are not worth storing.