Skip to content

Commit

Permalink
Merge pull request #11304 from rtibbles/stale_chunks
Browse files Browse the repository at this point in the history
Add LRU eviction mechanisms for streamed file chunks
  • Loading branch information
rtibbles authored Oct 13, 2023
2 parents 350266b + 108a0f5 commit ef9a11b
Show file tree
Hide file tree
Showing 7 changed files with 428 additions and 51 deletions.
22 changes: 20 additions & 2 deletions kolibri/core/content/utils/content_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
from kolibri.core.discovery.utils.network.connections import capture_connection_state
from kolibri.core.discovery.utils.network.errors import NetworkLocationResponseFailure
from kolibri.core.utils.urls import reverse_path
from kolibri.utils.conf import OPTIONS
from kolibri.utils.data import bytes_for_humans
from kolibri.utils.file_transfer import ChunkedFileDirectoryManager


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -663,6 +665,7 @@ def _process_content_requests(incomplete_downloads):
has_processed_sync_removals = False
has_processed_user_removals = False
has_processed_user_downloads = False
has_freed_space_in_stream_cache = False
qs = incomplete_downloads_with_metadata.all()

# loop while we have pending downloads
Expand Down Expand Up @@ -710,6 +713,16 @@ def _process_content_requests(incomplete_downloads):
has_processed_user_downloads = True
process_user_downloads_for_removal()
continue
if not has_freed_space_in_stream_cache:
# try to clear space, then repeat
has_freed_space_in_stream_cache = True
chunked_file_manager = ChunkedFileDirectoryManager(
OPTIONS["Paths"]["CONTENT_DIR"]
)
chunked_file_manager.evict_files(
calc.get_additional_free_space_needed()
)
continue
raise InsufficientStorage(
"Content download requests need {} of free space".format(
bytes_for_humans(_total_size(incomplete_downloads_with_metadata))
Expand Down Expand Up @@ -970,12 +983,13 @@ def __init__(self, incomplete_downloads_queryset):
total_size=_total_size_annotation(available=True),
)
self.free_space = 0
self.incomplete_downloads_size = 0

def _calculate_space_available(self):
self.incomplete_downloads_size = _total_size(self.incomplete_downloads)
free_space = get_free_space_for_downloads(
completed_size=_total_size(completed_downloads_queryset())
)
free_space -= _total_size(self.incomplete_downloads)
free_space += _total_size(self.incomplete_sync_removals)
free_space += _total_size(self.incomplete_user_removals)
free_space += _total_size(self.complete_user_downloads)
Expand All @@ -984,4 +998,8 @@ def _calculate_space_available(self):

def is_space_sufficient(self):
self._calculate_space_available()
return self.free_space > _total_size(self.incomplete_downloads)
return self.free_space > self.incomplete_downloads_size

def get_additional_free_space_needed(self):
self._calculate_space_available()
return self.incomplete_downloads_size - self.free_space
21 changes: 21 additions & 0 deletions kolibri/core/deviceadmin/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from kolibri.core.tasks.decorators import register_task
from kolibri.core.tasks.exceptions import JobRunning
from kolibri.core.utils.lock import db_lock
from kolibri.utils.conf import OPTIONS
from kolibri.utils.file_transfer import ChunkedFileDirectoryManager
from kolibri.utils.time_utils import local_now


Expand Down Expand Up @@ -69,3 +71,22 @@ def schedule_vacuum():
perform_vacuum.enqueue_at(vacuum_time, repeat=None, interval=24 * 60 * 60)
except JobRunning:
pass


# Constant job id for streamed cache cleanup task
STREAMED_CACHE_CLEANUP_JOB_ID = "streamed_cache_cleanup"


@register_task(job_id=STREAMED_CACHE_CLEANUP_JOB_ID)
def streamed_cache_cleanup():
manager = ChunkedFileDirectoryManager(OPTIONS["Paths"]["CONTENT_DIR"])
manager.limit_files(OPTIONS["Cache"]["STREAMED_FILE_CACHE_SIZE"])


def schedule_streamed_cache_cleanup():
try:
streamed_cache_cleanup.enqueue_in(
timedelta(hours=1), repeat=None, interval=60 * 60
)
except JobRunning:
pass
78 changes: 77 additions & 1 deletion kolibri/utils/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,89 @@ class ChunkedFileDoesNotExist(Exception):
pass


CHUNK_SUFFIX = ".chunks"


class ChunkedFileDirectoryManager(object):
"""
A class to manage all chunked files in a directory and all its descendant directories.
Its main purpose is to allow for the deletion of chunked files based on a least recently used
metric, as indicated by last access time on any of the files in the chunked file directory.
"""

def __init__(self, chunked_file_dir):
self.chunked_file_dir = chunked_file_dir

def _get_chunked_file_dirs(self):
"""
Returns a generator of all chunked file directories in the chunked file directory.
"""
for root, dirs, _ in os.walk(self.chunked_file_dir):
for dir in dirs:
if dir.endswith(CHUNK_SUFFIX):
yield os.path.join(root, dir)
# Don't continue to walk down the directory tree
dirs.remove(dir)

def _get_chunked_file_stats(self):
stats = {}
for chunked_file_dir in self._get_chunked_file_dirs():
file_stats = {"last_access_time": 0, "size": 0}
for dirpath, _, filenames in os.walk(chunked_file_dir):
for file in filenames:
file_path = os.path.join(dirpath, file)
file_stats["last_access_time"] = max(
file_stats["last_access_time"], os.path.getatime(file_path)
)
file_stats["size"] += os.path.getsize(file_path)
stats[chunked_file_dir] = file_stats
return stats

def _do_file_eviction(self, chunked_file_stats, file_size):
chunked_file_dirs = sorted(
chunked_file_stats.keys(),
key=lambda x: chunked_file_stats[x]["last_access_time"],
)
evicted_file_size = 0
for chunked_file_dir in chunked_file_dirs:
# Do the check here to catch the edge case where file_size is <= 0
if file_size <= evicted_file_size:
break
file_stats = chunked_file_stats[chunked_file_dir]
evicted_file_size += file_stats["size"]
shutil.rmtree(chunked_file_dir)
return evicted_file_size

def evict_files(self, file_size):
"""
Attempt to clean up file_size bytes of space in the chunked file directory.
Iterate through all chunked file directories, and delete the oldest chunked files
until the target file size is reached.
"""
chunked_file_stats = self._get_chunked_file_stats()
return self._do_file_eviction(chunked_file_stats, file_size)

def limit_files(self, max_size):
"""
Limits the total size used to a certain number of bytes.
If the total size of all chunked files exceeds max_size, the oldest files are evicted.
"""
chunked_file_stats = self._get_chunked_file_stats()

total_size = sum(
file_stats["size"] for file_stats in chunked_file_stats.values()
)

return self._do_file_eviction(chunked_file_stats, total_size - max_size)


class ChunkedFile(BufferedIOBase):
# Set chunk size to 128KB
chunk_size = 128 * 1024

def __init__(self, filepath):
self.filepath = filepath
self.chunk_dir = filepath + ".chunks"
self.chunk_dir = filepath + CHUNK_SUFFIX
mkdirp(self.chunk_dir, exist_ok=True)
self.cache_dir = os.path.join(self.chunk_dir, ".cache")
self.position = 0
Expand Down
10 changes: 10 additions & 0 deletions kolibri/utils/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,16 @@ def lazy_import_callback_list(value):
"default": "",
"description": "Eviction policy to use when using Redis for caching, Redis only.",
},
"STREAMED_FILE_CACHE_SIZE": {
"type": "bytes",
"default": "500MB",
"description": """
Disk space to be used for caching streamed files. This is used for caching files that are
being streamed from remote libraries, if these files are later imported, these should be cleaned up,
and will no longer count to this cache size.
Value can either be a number suffixed with a unit (e.g. MB, GB, TB) or an integer number of bytes.
""",
},
},
"Database": {
"DATABASE_ENGINE": {
Expand Down
23 changes: 17 additions & 6 deletions kolibri/utils/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,22 +248,30 @@ def START(self):
START.priority = 75


class ServicesPlugin(SimplePlugin):
def __init__(self, bus):
self.bus = bus
self.worker = None

class DefaultScheduledTasksPlugin(SimplePlugin):
def START(self):
from kolibri.core.tasks.main import initialize_workers
from kolibri.core.analytics.tasks import schedule_ping
from kolibri.core.deviceadmin.tasks import schedule_vacuum
from kolibri.core.deviceadmin.tasks import schedule_streamed_cache_cleanup

# schedule the pingback job if not already scheduled
schedule_ping()

# schedule the vacuum job if not already scheduled
schedule_vacuum()

# schedule the streamed cache cleanup job if not already scheduled
schedule_streamed_cache_cleanup()


class ServicesPlugin(SimplePlugin):
def __init__(self, bus):
self.bus = bus
self.worker = None

def START(self):
from kolibri.core.tasks.main import initialize_workers

# Initialize the iceqube engine to handle queued tasks
self.worker = initialize_workers()

Expand Down Expand Up @@ -734,6 +742,9 @@ def __init__(
reload_plugin = ProcessControlPlugin(self)
reload_plugin.subscribe()

default_scheduled_tasks_plugin = DefaultScheduledTasksPlugin(self)
default_scheduled_tasks_plugin.subscribe()

def run(self):
self.graceful()
self.block()
Expand Down
Loading

0 comments on commit ef9a11b

Please sign in to comment.