Skip to content

Commit a972186

Browse files
fred-yu-2013stiartsly
authored andcommitted
CU-394m5b1 - Refine backup server.
1 parent 918c9ff commit a972186

File tree

5 files changed

+101
-94
lines changed

5 files changed

+101
-94
lines changed

config/.env.sample

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
## private key to Hive node service DID
99
# SERVICE_DID_PRIVATE_KEY = YOUR-SERVICE-DID-BASE58-STRING
1010
# PASSPHRASE = secret
11-
# PASSWRD = password
11+
# PASSWORD = password
1212

1313
## Credential issued by User DID to service DID
1414
# NODE_CREDENTIAL = YOUR-CREDNETIAL-BASE58-STRING

src/modules/backup/backup.py

+29-2
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
from src.modules.database.mongodb_client import MongodbClient, Dotdict
44
from src.utils_v1.constants import VAULT_BACKUP_SERVICE_USING, \
5-
VAULT_BACKUP_SERVICE_MAX_STORAGE, VAULT_BACKUP_SERVICE_START_TIME, VAULT_BACKUP_SERVICE_END_TIME
5+
VAULT_BACKUP_SERVICE_MAX_STORAGE, VAULT_BACKUP_SERVICE_START_TIME, VAULT_BACKUP_SERVICE_END_TIME, VAULT_BACKUP_SERVICE_USE_STORAGE
66
from src.utils.consts import COL_IPFS_BACKUP_SERVER, USR_DID
77
from src.utils.http_exception import BackupNotFoundException
88
from src.utils_v1.payment.payment_config import PaymentConfig
99

1010

1111
class Backup(Dotdict):
12-
""" Represent a backup service which can be used to save backup data. """
12+
""" Represent a backup service which can be used to save backup data on the backup node side. """
1313

1414
def __init__(self, *args, **kwargs):
1515
super().__init__(*args, **kwargs)
@@ -28,6 +28,8 @@ def get_end_time(self):
2828

2929

3030
class BackupManager:
31+
""" Run on the backup node side. """
32+
3133
def __init__(self):
3234
self.mcli = MongodbClient()
3335

@@ -56,6 +58,26 @@ def __only_get_backup(self, user_did):
5658
raise BackupNotFoundException()
5759
return Backup(**doc)
5860

61+
def create_backup(self, user_did, price_plan):
62+
now = int(datetime.now().timestamp())
63+
end_time = -1 if price_plan['serviceDays'] == -1 else now + price_plan['serviceDays'] * 24 * 60 * 60
64+
65+
filter_ = {USR_DID: user_did}
66+
update = {"$setOnInsert": {
67+
VAULT_BACKUP_SERVICE_USING: price_plan['name'],
68+
VAULT_BACKUP_SERVICE_MAX_STORAGE: price_plan["maxStorage"] * 1024 * 1024,
69+
VAULT_BACKUP_SERVICE_USE_STORAGE: 0,
70+
VAULT_BACKUP_SERVICE_START_TIME: now,
71+
VAULT_BACKUP_SERVICE_END_TIME: int(end_time)
72+
}}
73+
74+
self.mcli.get_management_collection(COL_IPFS_BACKUP_SERVER).update_one(filter_, update, contains_extra=True)
75+
return self.get_backup(user_did)
76+
77+
def update_backup(self, user_did, update):
78+
filter_ = {USR_DID: user_did}
79+
self.mcli.get_management_collection(COL_IPFS_BACKUP_SERVER).update_one(filter_, update, contains_extra=True)
80+
5981
def upgrade(self, user_did, plan: dict, backup=None):
6082
if not backup:
6183
backup = self.get_backup(user_did)
@@ -83,3 +105,8 @@ def __try_to_downgrade_to_free(self, user_did, backup: Backup) -> Backup:
83105
# downgrade now
84106
self.upgrade(user_did, PaymentConfig.get_free_backup_plan(), backup=backup)
85107
return self.__only_get_backup(user_did)
108+
109+
def remove_backup(self, user_did):
110+
filter_ = {USR_DID: user_did}
111+
112+
self.mcli.get_management_collection(COL_IPFS_BACKUP_SERVER).delete_one(filter_)

src/modules/backup/backup_client.py

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747

4848

4949
class BackupClient:
50+
""" Represents the backup client on the vault node side. """
51+
5052
def __init__(self):
5153
self.auth = Auth()
5254
self.http = HttpClient()

src/modules/backup/backup_server.py

+67-91
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,25 @@
11
# -*- coding: utf-8 -*-
22
import logging
3-
from datetime import datetime
43

54
from flask import g
65

7-
from src.modules.auth.auth import Auth
8-
from src.modules.auth.user import UserManager
9-
from src.modules.database.mongodb_client import MongodbClient
10-
from src.modules.backup.backup_client import BackupClient
11-
from src.modules.backup.backup_executor import ExecutorBase, BackupServerExecutor
12-
from src.modules.subscription.subscription import VaultSubscription
13-
from src.modules.subscription.vault import VaultManager
146
from src.utils.consts import BKSERVER_REQ_STATE, BACKUP_REQUEST_STATE_PROCESS, BKSERVER_REQ_ACTION, \
157
BACKUP_REQUEST_ACTION_BACKUP, BKSERVER_REQ_CID, BKSERVER_REQ_SHA256, BKSERVER_REQ_SIZE, \
168
BKSERVER_REQ_STATE_MSG, BACKUP_REQUEST_STATE_FAILED, COL_IPFS_BACKUP_SERVER, USR_DID, BACKUP_REQUEST_STATE_SUCCESS
17-
from src.utils.db_client import cli
18-
from src.utils.file_manager import fm
199
from src.utils.http_exception import BackupNotFoundException, AlreadyExistsException, BadRequestException, \
2010
InsufficientStorageException, NotImplementedException, VaultNotFoundException
21-
from src.utils_v1.constants import DID_INFO_DB_NAME, \
22-
VAULT_BACKUP_SERVICE_MAX_STORAGE, VAULT_BACKUP_SERVICE_START_TIME, VAULT_BACKUP_SERVICE_END_TIME, \
11+
from src.utils_v1.constants import VAULT_BACKUP_SERVICE_MAX_STORAGE, VAULT_BACKUP_SERVICE_START_TIME, VAULT_BACKUP_SERVICE_END_TIME, \
2312
VAULT_BACKUP_SERVICE_USING, VAULT_BACKUP_SERVICE_USE_STORAGE, VAULT_SERVICE_MAX_STORAGE
2413
from src.utils_v1.payment.payment_config import PaymentConfig
14+
from src.modules.auth.auth import Auth
15+
from src.modules.auth.user import UserManager
16+
from src.modules.backup.backup import BackupManager
17+
from src.modules.database.mongodb_client import MongodbClient
18+
from src.modules.backup.backup_client import BackupClient
19+
from src.modules.backup.backup_executor import ExecutorBase, BackupServerExecutor
20+
from src.modules.files.ipfs_client import IpfsClient
21+
from src.modules.subscription.subscription import VaultSubscription
22+
from src.modules.subscription.vault import VaultManager
2523

2624

2725
class BackupServer:
@@ -32,6 +30,8 @@ def __init__(self):
3230
self.mcli = MongodbClient()
3331
self.user_manager = UserManager()
3432
self.vault_manager = VaultManager()
33+
self.backup_manager = BackupManager()
34+
self.ipfs_client = IpfsClient()
3535

3636
def promotion(self):
3737
""" This processing is just like restore the vault:
@@ -42,7 +42,7 @@ def promotion(self):
4242
4. increase the reference count of the file cid.
4343
5. restore all user databases.
4444
"""
45-
doc = self.find_backup_request(g.usr_did, throw_exception=True)
45+
backup = self.backup_manager.get_backup(g.usr_did)
4646

4747
try:
4848
self.vault_manager.get_vault(g.usr_did)
@@ -51,7 +51,7 @@ def promotion(self):
5151
pass
5252

5353
vault = self.vault_manager.create_vault(g.usr_did, PaymentConfig.get_vault_plan('Free'), is_upgraded=True)
54-
request_metadata = self.get_server_request_metadata(g.usr_did, doc, is_promotion=True,
54+
request_metadata = self.get_server_request_metadata(g.usr_did, backup, is_promotion=True,
5555
vault_max_size=vault[VAULT_SERVICE_MAX_STORAGE])
5656

5757
# INFO: if free vault can not hold the backup data, then let it go
@@ -64,10 +64,10 @@ def promotion(self):
6464
ExecutorBase.update_vault_usage_by_metadata(g.usr_did, request_metadata)
6565

6666
def internal_backup(self, cid, sha256, size, is_force):
67-
doc = self.find_backup_request(g.usr_did, throw_exception=True)
68-
if not is_force and doc.get(BKSERVER_REQ_STATE) == BACKUP_REQUEST_STATE_PROCESS:
67+
backup = self.backup_manager.get_backup(g.usr_did)
68+
if not is_force and backup.get(BKSERVER_REQ_STATE) == BACKUP_REQUEST_STATE_PROCESS:
6969
raise BadRequestException('Failed because backup is in processing.')
70-
fm.ipfs_pin_cid(cid)
70+
self.ipfs_client.cid_pin(cid)
7171
update = {
7272
BKSERVER_REQ_ACTION: BACKUP_REQUEST_ACTION_BACKUP,
7373
BKSERVER_REQ_STATE: BACKUP_REQUEST_STATE_PROCESS,
@@ -76,125 +76,116 @@ def internal_backup(self, cid, sha256, size, is_force):
7676
BKSERVER_REQ_SHA256: sha256,
7777
BKSERVER_REQ_SIZE: size
7878
}
79-
self.update_backup_request(g.usr_did, update)
80-
BackupServerExecutor(g.usr_did, self, self.find_backup_request(g.usr_did, False)).start()
79+
self.backup_manager.update_backup(g.usr_did, update)
80+
BackupServerExecutor(g.usr_did, self, self.backup_manager.get_backup(g.usr_did)).start()
8181

8282
def internal_backup_state(self):
83-
doc = self.find_backup_request(g.usr_did, throw_exception=True)
83+
backup = self.backup_manager.get_backup(g.usr_did)
8484
return {
85-
'state': doc.get(BKSERVER_REQ_ACTION), # None or backup
86-
'result': doc.get(BKSERVER_REQ_STATE),
87-
'message': doc.get(BKSERVER_REQ_STATE_MSG)
85+
'state': backup.get(BKSERVER_REQ_ACTION), # None or backup
86+
'result': backup.get(BKSERVER_REQ_STATE),
87+
'message': backup.get(BKSERVER_REQ_STATE_MSG)
8888
}
8989

9090
def internal_restore(self):
91-
doc = self.find_backup_request(g.usr_did, throw_exception=True)
91+
backup = self.backup_manager.get_backup(g.usr_did)
9292

9393
# BKSERVER_REQ_ACTION: None, means not backup called; 'backup', backup called, and can be three states.
94-
if not doc.get(BKSERVER_REQ_ACTION):
94+
if not backup.get(BKSERVER_REQ_ACTION):
9595
raise BadRequestException('No backup data for restoring on backup node.')
96-
elif doc.get(BKSERVER_REQ_ACTION) != BACKUP_REQUEST_ACTION_BACKUP:
97-
raise BadRequestException(f'No backup data for restoring with invalid action "{doc.get(BKSERVER_REQ_ACTION)}" on backup node.')
96+
elif backup.get(BKSERVER_REQ_ACTION) != BACKUP_REQUEST_ACTION_BACKUP:
97+
raise BadRequestException(f'No backup data for restoring with invalid action "{backup.get(BKSERVER_REQ_ACTION)}" on backup node.')
9898

9999
# if BKSERVER_REQ_ACTION is not None, it can be three states
100-
if doc.get(BKSERVER_REQ_STATE) == BACKUP_REQUEST_STATE_PROCESS:
100+
if backup.get(BKSERVER_REQ_STATE) == BACKUP_REQUEST_STATE_PROCESS:
101101
raise BadRequestException('Failed because backup is in processing..')
102-
elif doc.get(BKSERVER_REQ_STATE) == BACKUP_REQUEST_STATE_FAILED:
102+
elif backup.get(BKSERVER_REQ_STATE) == BACKUP_REQUEST_STATE_FAILED:
103103
raise BadRequestException('Cannot execute restore because last backup is failed.')
104-
elif doc.get(BKSERVER_REQ_STATE) != BACKUP_REQUEST_STATE_SUCCESS:
105-
raise BadRequestException(f'Cannot execute restore because unknown state "{doc.get(BKSERVER_REQ_STATE)}".')
104+
elif backup.get(BKSERVER_REQ_STATE) != BACKUP_REQUEST_STATE_SUCCESS:
105+
raise BadRequestException(f'Cannot execute restore because unknown state "{backup.get(BKSERVER_REQ_STATE)}".')
106106

107-
if not doc.get(BKSERVER_REQ_CID):
108-
raise BadRequestException(f'Cannot execute restore because invalid data cid "{doc.get(BKSERVER_REQ_CID)}".')
107+
if not backup.get(BKSERVER_REQ_CID):
108+
raise BadRequestException(f'Cannot execute restore because invalid data cid "{backup.get(BKSERVER_REQ_CID)}".')
109109

110110
# backup data is valid, go on
111111
return {
112-
'cid': doc.get(BKSERVER_REQ_CID),
113-
'sha256': doc.get(BKSERVER_REQ_SHA256),
114-
'size': doc.get(BKSERVER_REQ_SIZE),
112+
'cid': backup.get(BKSERVER_REQ_CID),
113+
'sha256': backup.get(BKSERVER_REQ_SHA256),
114+
'size': backup.get(BKSERVER_REQ_SIZE),
115115
}
116116

117117
# the flowing is for the executors.
118118

119119
def update_request_state(self, user_did, state, msg=None):
120-
self.update_backup_request(user_did, {BKSERVER_REQ_STATE: state, BKSERVER_REQ_STATE_MSG: msg})
120+
self.backup_manager.update_backup(user_did, {BKSERVER_REQ_STATE: state, BKSERVER_REQ_STATE_MSG: msg})
121121

122122
def get_server_request_metadata(self, user_did, req, is_promotion=False, vault_max_size=0):
123123
""" Get the request metadata for promotion or backup.
124+
125+
:param user_did
126+
:param req
127+
:param is_promotion
124128
:param vault_max_size Only for promotion.
125129
"""
126-
request_metadata = self._get_verified_request_metadata(user_did, req)
130+
request_metadata = self.__get_verified_request_metadata(user_did, req)
127131
logging.info('[IpfsBackupServer] Success to get verified request metadata.')
128-
self._check_verified_request_metadata(request_metadata, req,
129-
is_promotion=is_promotion,
130-
vault_max_size=vault_max_size)
131-
logging.info('[IpfsBackupServer] Success to check the verified request metadata.')
132-
return request_metadata
133-
134-
def _get_verified_request_metadata(self, user_did, req):
135-
cid, sha256, size = req.get(BKSERVER_REQ_CID), req.get(BKSERVER_REQ_SHA256), req.get(BKSERVER_REQ_SIZE)
136-
return fm.ipfs_download_file_content(cid, is_proxy=True, sha256=sha256, size=size)
137132

138-
def _check_verified_request_metadata(self, request_metadata, req, is_promotion=False, vault_max_size=0):
139133
if is_promotion:
140134
if request_metadata['vault_size'] > vault_max_size:
141135
raise InsufficientStorageException('No enough space for promotion.')
142136
else:
143137
# for backup
144138
if request_metadata['backup_size'] > req[VAULT_BACKUP_SERVICE_MAX_STORAGE]:
145139
raise InsufficientStorageException('No enough space for backup on the backup node.')
140+
logging.info('[IpfsBackupServer] Success to check the verified request metadata.')
141+
142+
return request_metadata
143+
144+
def __get_verified_request_metadata(self, user_did, req):
145+
cid, sha256, size = req.get(BKSERVER_REQ_CID), req.get(BKSERVER_REQ_SHA256), req.get(BKSERVER_REQ_SIZE)
146+
return self.ipfs_client.download_file_json_content(cid, is_proxy=True, sha256=sha256, size=size)
146147

147148
# ipfs-subscription
148149

149150
def subscribe(self):
150-
doc = self.find_backup_request(g.usr_did, throw_exception=False)
151-
if doc:
152-
raise AlreadyExistsException('The backup service is already subscribed.')
153-
return self._get_backup_info(self._create_backup(g.usr_did, PaymentConfig.get_free_backup_plan()))
151+
try:
152+
backup = self.backup_manager.get_backup(g.usr_did)
153+
if backup:
154+
raise AlreadyExistsException('The backup service is already subscribed.')
155+
except BackupNotFoundException as e:
156+
pass
157+
158+
new_backup = self.backup_manager.create_backup(g.usr_did, PaymentConfig.get_free_backup_plan())
159+
return self.__get_backup_info(new_backup)
154160

155161
def unsubscribe(self):
156-
doc = self.find_backup_request(g.usr_did, throw_exception=True)
157-
if doc.get(BKSERVER_REQ_STATE) == BACKUP_REQUEST_STATE_PROCESS:
158-
raise BadRequestException(f"The '{doc.get(BKSERVER_REQ_ACTION)}' is in process.")
162+
backup = self.backup_manager.get_backup(g.usr_did)
163+
if backup.get(BKSERVER_REQ_STATE) == BACKUP_REQUEST_STATE_PROCESS:
164+
raise BadRequestException(f"The '{backup.get(BKSERVER_REQ_ACTION)}' is in process.")
159165

160166
# INFO: maybe use has a vault.
161167
# self.user_manager.remove_user(g.usr_did)
162-
self.remove_backup_by_did(g.usr_did, doc)
168+
self.remove_backup_by_did(g.usr_did, backup)
163169

164170
def remove_backup_by_did(self, user_did, doc):
165171
""" Remove all data belongs to the backup of the user. """
166172
logging.debug(f'start remove the backup of the user {user_did}, _id, {str(doc["_id"])}')
167173
if doc.get(BKSERVER_REQ_CID):
168-
request_metadata = self._get_verified_request_metadata(user_did, doc)
174+
request_metadata = self.__get_verified_request_metadata(user_did, doc)
169175
ExecutorBase.handle_cids_in_local_ipfs(request_metadata, root_cid=doc.get(BKSERVER_REQ_CID), is_unpin=True)
170176

171-
cli.delete_one_origin(DID_INFO_DB_NAME,
172-
COL_IPFS_BACKUP_SERVER,
173-
{USR_DID: user_did},
174-
is_check_exist=False)
177+
self.backup_manager.remove_backup(user_did)
175178

176179
def get_info(self):
177-
return self._get_backup_info(self.find_backup_request(g.usr_did, throw_exception=True))
180+
return self.__get_backup_info(self.backup_manager.get_backup(g.usr_did))
178181

179182
def activate(self):
180183
raise NotImplementedException()
181184

182185
def deactivate(self):
183186
raise NotImplementedException()
184187

185-
def _create_backup(self, user_did, price_plan):
186-
now = int(datetime.now().timestamp())
187-
end_time = -1 if price_plan['serviceDays'] == -1 else now + price_plan['serviceDays'] * 24 * 60 * 60
188-
doc = {USR_DID: user_did,
189-
VAULT_BACKUP_SERVICE_USING: price_plan['name'],
190-
VAULT_BACKUP_SERVICE_MAX_STORAGE: price_plan["maxStorage"] * 1024 * 1024,
191-
VAULT_BACKUP_SERVICE_USE_STORAGE: 0,
192-
VAULT_BACKUP_SERVICE_START_TIME: now,
193-
VAULT_BACKUP_SERVICE_END_TIME: int(end_time)}
194-
cli.insert_one_origin(DID_INFO_DB_NAME, COL_IPFS_BACKUP_SERVER, doc, create_on_absence=True, is_extra=True)
195-
return doc
196-
197-
def _get_backup_info(self, doc):
188+
def __get_backup_info(self, doc):
198189
return {
199190
'service_did': self.auth.get_did_string(),
200191
'pricing_plan': doc[VAULT_BACKUP_SERVICE_USING],
@@ -207,22 +198,7 @@ def _get_backup_info(self, doc):
207198
}
208199

209200
def update_storage_usage(self, user_did, size):
210-
self.update_backup_request(user_did, {VAULT_BACKUP_SERVICE_USE_STORAGE: size})
211-
212-
def update_backup_request(self, user_did, update):
213-
col_filter = {USR_DID: user_did}
214-
cli.update_one_origin(DID_INFO_DB_NAME, COL_IPFS_BACKUP_SERVER, col_filter, {'$set': update}, is_extra=True)
215-
216-
def find_backup_request(self, user_did, throw_exception=True):
217-
""" get the backup request information belonged to the user DID
218-
:param user_did: user DID
219-
:param throw_exception: throw BackupNotFoundException when True
220-
"""
221-
doc = cli.find_one_origin(DID_INFO_DB_NAME, COL_IPFS_BACKUP_SERVER, {USR_DID: user_did},
222-
create_on_absence=True, throw_exception=False)
223-
if throw_exception and not doc:
224-
raise BackupNotFoundException()
225-
return doc
201+
self.backup_manager.update_backup(user_did, {VAULT_BACKUP_SERVICE_USE_STORAGE: size})
226202

227203
def retry_backup_request(self):
228204
""" retry unfinished backup&restore action when node rebooted """

src/modules/files/ipfs_client.py

+2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ def download_file_json_content(self, cid, is_proxy=False, sha256=None, size=None
5252
return metadata
5353

5454
def cid_pin(self, cid):
55+
""" Pin file from ipfs proxy to the local node. """
56+
5557
# INFO: IPFS does not support that one node directly pin file from other node.
5658
logging.info(f'[fm.ipfs_pin_cid] Try to pin {cid} to the local IPFS node.')
5759

0 commit comments

Comments
 (0)