Skip to content

Cache with sql #200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 65 additions & 20 deletions cs3api4lab/api/cs3_file_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import cs3.gateway.v1beta1.gateway_api_pb2_grpc as cs3gw_grpc
import cs3.rpc.v1beta1.code_pb2 as cs3code
import cs3.storage.provider.v1beta1.provider_api_pb2 as cs3sp

from cs3api4lab.exception.exceptions import ResourceNotFoundError

from cs3api4lab.utils.file_utils import FileUtils
Expand All @@ -23,6 +24,7 @@
from cs3api4lab.auth.channel_connector import ChannelConnector
from cs3api4lab.config.config_manager import Cs3ConfigManager
from cs3api4lab.api.lock_manager import LockManager
from cs3api4lab.utils.sqlquerycache import SqlQueryCache


class Cs3FileApi:
Expand All @@ -31,6 +33,7 @@ class Cs3FileApi:
auth = None
config = None
lock_manager = None
sql_cache = None

def __init__(self, log):
self.log = log
Expand All @@ -41,9 +44,8 @@ def __init__(self, log):
intercept_channel = grpc.intercept_channel(channel, auth_interceptor)
self.cs3_api = cs3gw_grpc.GatewayAPIStub(intercept_channel)
self.storage_api = StorageApi(log)

self.lock_manager = LockManager(log)

self.sql_cache = SqlQueryCache(config=self.config)
return

def mount_point(self):
Expand All @@ -64,27 +66,53 @@ def stat_info(self, file_path, endpoint='/'):
"""
time_start = time.time()
stat = self.storage_api.stat(file_path, endpoint)
if stat.status.code == cs3code.CODE_OK:
time_end = time.time()
self.log.info('msg="Invoked stat" fileid="%s" elapsedTimems="%.1f"' % (file_path, (time_end - time_start) * 1000))
return {
'inode': {'storage_id': stat.info.id.storage_id,
'opaque_id': stat.info.id.opaque_id},
'filepath': stat.info.path,
'userid': stat.info.owner.opaque_id,
'size': stat.info.size,
'mtime': stat.info.mtime.seconds,
'type': stat.info.type,
'mime_type': stat.info.mime_type,
'idp': stat.info.owner.idp,
'permissions': stat.info.permission_set
}
elif stat.status.code == cs3code.CODE_NOT_FOUND:
time_end = time.time()
self.log.info(
'msg="Invoked stat" fileid="%s" elapsedTimems="%.1f"' % (file_path, (time_end - time_start) * 1000))

if stat.status.code == cs3code.CODE_NOT_FOUND:
self.log.info('msg="Failed stat" fileid="%s" reason="%s"' % (file_path, stat.status.message))
raise FileNotFoundError(stat.status.message + ", file " + file_path)
else:
self._handle_error(stat)

self.sql_cache.save_item(
storage_id=stat.info.id.storage_id,
opaque_id=stat.info.id.storage_id,
stored_value=stat
)
return self._stat_output(stat)

def stat_info_by_resource(self, opaque_id, storage_id):
"""
Stat a file and returns (size, mtime) as well as other extended info using the given userid as access token.
Note that endpoint here means the storage id. Note that fileid can be either a path (which MUST begin with /)
or an id (which MUST NOT start with a /).
"""
time_start = time.time()
#
if self.sql_cache.item_exists(storage_id=storage_id, opaque_id=opaque_id):
stat = self.sql_cache.get_stored_value(
storage_id=storage_id,
opaque_id=opaque_id,
message=cs3sp.StatResponse()
)
else:
stat = self.storage_api.stat_by_resource(opaque_id, storage_id)
if stat.status.code == cs3code.CODE_NOT_FOUND:
self.log.info(
'msg="Failed stat" fileid="%s" storageid="%s" reason="%s"' %
(opaque_id, storage_id, stat.status.message)
)
raise FileNotFoundError(stat.status.message + ", file " + stat.info.path)
else:
self.sql_cache.save_item(storage_id=storage_id, opaque_id=opaque_id, stored_value=stat)

time_end = time.time()
self.log.info(
'msg="Invoked stat" fileid="%s" storageid="%s" elapsedTimems="%.1f"' % (
opaque_id, storage_id, (time_end - time_start) * 1000))

return self._stat_output(stat)

def read_file(self, file_path, endpoint=None):
"""
Read a file using the given userid as access token.
Expand Down Expand Up @@ -254,6 +282,23 @@ def create_directory(self, path, endpoint=None):
def get_home_dir(self):
return self.config.home_dir if self.config.home_dir else ""

def _stat_output(self, stat):
if stat.status.code == cs3code.CODE_OK:
return {
'inode': {'storage_id': stat.info.id.storage_id,
'opaque_id': stat.info.id.opaque_id},
'filepath': stat.info.path,
'userid': stat.info.owner.opaque_id,
'size': stat.info.size,
'mtime': stat.info.mtime.seconds,
'type': stat.info.type,
'mime_type': stat.info.mime_type,
'idp': stat.info.owner.idp,
'permissions': stat.info.permission_set
}
else:
self._handle_error(stat)

def _handle_error(self, response):
self.log.error(response)
raise Exception("Incorrect server response: " +
Expand Down
18 changes: 14 additions & 4 deletions cs3api4lab/api/share_api_facade.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import urllib.parse
import time

import cs3.ocm.provider.v1beta1.provider_api_pb2_grpc as ocm_provider_api_grpc
import cs3.storage.provider.v1beta1.resources_pb2 as Resources
Expand All @@ -19,6 +20,7 @@
from cs3api4lab.api.storage_api import StorageApi
from cs3api4lab.exception.exceptions import OCMDisabledError


class ShareAPIFacade:
def __init__(self, log):
self.log = log
Expand All @@ -34,6 +36,7 @@ def __init__(self, log):
self.ocm_share_api = Cs3OcmShareApi(log)

self.storage_api = StorageApi(log)

return

def create(self, endpoint, file_path, opaque_id, idp, role=Role.EDITOR, grantee_type=Grantee.USER, reshare=True):
Expand Down Expand Up @@ -102,12 +105,18 @@ def list_shares(self):
:return: created shares and OCM shares combined and mapped to Jupyter model
:rtype: dict
"""
time_start = time.time()
share_list = self.share_api.list()
if self.config.enable_ocm:
ocm_share_list = self.ocm_share_api.list()
else:
ocm_share_list = None
return self.map_shares(share_list, ocm_share_list)

mapped_shares = self.map_shares(share_list, ocm_share_list)
time_end = time.time()
print('shares times:', time_end - time_start)

return mapped_shares

def list_received(self, status=None):
"""
Expand Down Expand Up @@ -205,9 +214,9 @@ def map_shares_to_model(self, list_response, received=False):
share = share.share
try:
user = self.user_api.get_user_info(share.owner.idp, share.owner.opaque_id)
stat = self.file_api.stat_info(urllib.parse.unquote(share.resource_id.opaque_id),
share.resource_id.storage_id) # todo remove this and use storage_logic
# stat = self.storage_logic.stat_info(urllib.parse.unquote(share.resource_id.opaque_id), share.resource_id.storage_id)
# if not self.stat_cache.item_exists(share.resource_id.storage_id, share.resource_id.opaque_id):
stat = self.file_api.stat_info_by_resource(urllib.parse.unquote(share.resource_id.opaque_id),
share.resource_id.storage_id)

if stat['type'] == Resources.RESOURCE_TYPE_FILE:
if hasattr(share.permissions.permissions,
Expand All @@ -226,6 +235,7 @@ def map_shares_to_model(self, list_response, received=False):
model['writable'] = True if ShareUtils.map_permissions_to_role(
share.permissions.permissions) == 'editor' else False
except Exception as e:
print(e)
self.log.error("Unable to map share " + share.resource_id.opaque_id + ", " + e.__str__())
continue

Expand Down
4 changes: 4 additions & 0 deletions cs3api4lab/api/storage_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ def stat(self, file_path, endpoint='/'):
ref = FileUtils.get_reference(file_path, endpoint)
return self._stat_internal(ref)

def stat_by_resource(self, opaque_id, storage_id):
ref = FileUtils.get_reference_by_resource(opaque_id=opaque_id, storage_id=storage_id)
return self._stat_internal(ref)

def _stat_internal(self, ref):
return self.cs3_api.Stat(request=cs3sp.StatRequest(ref=ref),
metadata=[('x-access-token', self.auth.authenticate())])
Expand Down
24 changes: 23 additions & 1 deletion cs3api4lab/config/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ class Config(LoggingConfigurable):
oauth_token = Unicode(
config=True, allow_none=True, help="""OAuth token"""
)
stat_cache_enabled = Bool(
config=True, default_value=False, help="""Stat caching is enabled"""
)
stat_cache_file = Unicode(
config=True, default_value="./tmp_cache_file.db", allow_none=True, help="""Path to db file"""
)
stat_cache_time = CInt(
config=True, default_value=180, allow_none=True, help="""Cache ttl in seconds"""
)

@default("reva_host")
def _reva_host_default(self):
Expand Down Expand Up @@ -180,6 +189,18 @@ def _oauth_file_default(self):
def _oauth_token_default(self):
return self._get_config_value("oauth_token")

@default("stat_cache_enabled")
def _stat_cache_enabled_default(self):
return self._get_config_value("stat_cache_enabled") in ["true", True]

@default("stat_cache_file")
def _stat_cache_file_default(self):
return self._get_config_value("stat_cache_file")

@default("stat_cache_time")
def _stat_cache_time_default(self):
return self._get_config_value("stat_cache_time")

def _get_config_value(self, key):
env = os.getenv("CS3_" + key.upper())
if env:
Expand Down Expand Up @@ -235,7 +256,8 @@ def _file_config(self, key):
"eos_file": None,
"eos_token": None,
"oauth_file": None,
"oauth_token": None
"oauth_token": None,
"stat_cache_enabled": False,
}


Expand Down
3 changes: 3 additions & 0 deletions cs3api4lab/tests/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import cs3.sharing.ocm.v1beta1.ocm_api_pb2_grpc as ocm_api_grpc
import cs3.gateway.v1beta1.gateway_api_pb2_grpc as grpc_gateway

from cs3api4lab.utils.sqlquerycache import SqlQueryCache


class ExtStorageApi(StorageApi):
def __init__(self, log, config):
Expand Down Expand Up @@ -76,6 +78,7 @@ def __init__(self, log, config) -> None:
self.cs3_api = cs3gw_grpc.GatewayAPIStub(intercept_channel)
self.lock_manager = ExtLockManager(log, config)
self.storage_api = ExtStorageApi(log, config)
self.sql_cache = SqlQueryCache(config=self.config)


class ExtCs3ShareApi(Cs3ShareApi):
Expand Down
3 changes: 2 additions & 1 deletion cs3api4lab/tests/jupyter-config/jupyter_cs3_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"locks_expiration_time": 10,
"tus_enabled": false,
"enable_ocm": false,
"shared_folder": "MyShares"
"shared_folder": "MyShares",
"stat_cache_enabled": false
}
}
22 changes: 12 additions & 10 deletions cs3api4lab/tests/share_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import cs3.rpc.v1beta1.code_pb2 as cs3code
from collections import namedtuple


class ShareTestBase:
storage_id = '123e4567-e89b-12d3-a456-426655440000'
receiver_role = 'editor'
Expand Down Expand Up @@ -34,10 +35,11 @@ def setUp(self):
"authenticator_class": "cs3api4lab.auth.RevaPassword",
"client_id": "marie",
"client_secret": "radioactivity",
"locks_expiration_time": 10,
"tus_enabled": True,
"enable_ocm": False
}
"locks_expiration_time": 10,
"tus_enabled": True,
"enable_ocm": False,
"stat_cache_enabled": False
}
marie_ext_config = namedtuple('MarieConfig', marie_ext_config)(**marie_ext_config)

richard_local_config = {
Expand All @@ -54,9 +56,10 @@ def setUp(self):
"authenticator_class": "cs3api4lab.auth.RevaPassword",
"client_id": "richard",
"client_secret": "superfluidity",
"locks_expiration_time": 10,
"tus_enabled": True,
"enable_ocm": False
"locks_expiration_time": 10,
"tus_enabled": True,
"enable_ocm": False,
"stat_cache_enabled": False,
}
richard_local_config = namedtuple('richardConfig', richard_local_config)(**richard_local_config)

Expand Down Expand Up @@ -152,7 +155,6 @@ def clear_locks_on_file(self, file, endpoint='/'):
for lock in list(metadata.keys()):
self.storage_api.set_metadata({lock: "{}"}, file, endpoint)


def remove_test_share(self, user, share_id):
if user == 'einstein':
self.share_api.remove(share_id)
Expand Down Expand Up @@ -228,9 +230,9 @@ def remove_share_and_file_by_path(self, user, file_path):
stat = storage.stat(file_path)
if stat.status.code == cs3code.CODE_NOT_FOUND or stat.status.code == cs3code.CODE_INTERNAL:
self.create_test_file(user, file_path)
#todo the code above won't be necessary after https://github.com/cs3org/reva/issues/2847 is fixed
# todo the code above won't be necessary after https://github.com/cs3org/reva/issues/2847 is fixed

shares = share_api.list_shares_for_filepath(file_path) #todo this won't work on CERNBOX
shares = share_api.list_shares_for_filepath(file_path) # todo this won't work on CERNBOX
if shares:
for share in shares:
share_api.remove(share['opaque_id'])
Expand Down
6 changes: 6 additions & 0 deletions cs3api4lab/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ def get_reference(file_id, endpoint=None):
# assume we have an opaque fileid
return storage_provider.Reference(resource_id=storage_provider.ResourceId(storage_id=endpoint, opaque_id=file_id))

@staticmethod
def get_reference_by_resource(opaque_id, storage_id):
return storage_provider.Reference(
resource_id=storage_provider.ResourceId(storage_id=storage_id, opaque_id=opaque_id)
)

@staticmethod
def check_and_transform_file_path(file_id):
config = Cs3ConfigManager().get_config() #note: can cause problems in tests because of the config, it should be passed as an argument
Expand Down
Loading