Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove repo metadata worker #455

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions repo_metadata/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,5 @@ def __init__(self):
FACES_TABLE = FacesTable('faces', '0001', '0004')

TAGS_TABLE = TagsTable('tags', '0002', '0003')

ZERO_OBJ_ID = '0000000000000000000000000000000000000000'
71 changes: 0 additions & 71 deletions repo_metadata/index_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@

from redis.exceptions import ConnectionError as NoMQAvailable, ResponseError, TimeoutError

from seafevents.repo_data import repo_data
from seafevents.mq import get_mq
from seafevents.utils import get_opt_from_conf_or_env
from seafevents.db import init_db_session_class
from seafevents.repo_metadata.metadata_server_api import MetadataServerAPI
from seafevents.repo_metadata.repo_metadata import RepoMetadata
from seafevents.repo_metadata.metadata_manager import MetadataManager
from seafevents.face_recognition.face_recognition_manager import FaceRecognitionManager


Expand Down Expand Up @@ -47,8 +44,6 @@ def __init__(self, config):
self._parse_config(config)

self.mq = get_mq(self.mq_server, self.mq_port, self.mq_password)
self.repo_metadata = RepoMetadata(self.metadata_server_api, self.mq)
self.metadata_manager = MetadataManager(self._db_session_class, self.repo_metadata)
self.face_recognition_manager = FaceRecognitionManager(config)
self.set_signal()

Expand Down Expand Up @@ -81,68 +76,10 @@ def tname(self):
return threading.current_thread().name

def start(self):
for i in range(int(self.worker_num)):
threading.Thread(target=self.worker_handler, name='subscribe_' + str(i), daemon=True).start()

for i in range(int(self.worker_num)):
threading.Thread(target=self.face_cluster_handler, name='face_cluster_' + str(i), daemon=True).start()
threading.Thread(target=self.refresh_lock, name='refresh_thread', daemon=True).start()

def worker_handler(self):
logger.info('%s starting update metadata work' % self.tname)
try:
while not self.should_stop.isSet():
try:
res = self.mq.brpop('metadata_task', timeout=30)
if res is not None:
key, value = res
msg = value.split('\t')
if len(msg) != 2:
logger.info('Bad message: %s' % str(msg))
else:
op_type, repo_id = msg[0], msg[1]
self.worker_task_handler(self.mq, repo_id, self.should_stop, op_type)
except (ResponseError, NoMQAvailable, TimeoutError) as e:
logger.error('The connection to the redis server failed: %s' % e)
except Exception as e:
logger.error('%s Handle Worker Task Error' % self.tname)
logger.error(e, exc_info=True)
# prevent case that redis break at program running.
time.sleep(0.3)

def worker_task_handler(self, mq, repo_id, should_stop, op_type):
# Python cannot kill threads, so stop it generate more locked key.
if not should_stop.isSet():
# set key-value if does not exist which will expire 30 minutes later
if mq.set(self._get_lock_key(repo_id), time.time(),
ex=self.LOCK_TIMEOUT, nx=True):
# get lock
logger.info('%s start updating repo %s' % (threading.currentThread().getName(), repo_id))
lock_key = self._get_lock_key(repo_id)
self.locked_keys.add(lock_key)
self.update_metadata(repo_id)
try:
self.locked_keys.remove(lock_key)
except KeyError:
logger.error("%s is already removed. SHOULD NOT HAPPEN!" % lock_key)
mq.delete(lock_key)
logger.info("%s Finish updating repo: %s, delete redis lock %s" %
(self.tname, repo_id, lock_key))
else:
# the repo is updated by other thread, push back to the queue
self.add_to_undo_task(mq, repo_id, op_type)

def update_metadata(self, repo_id):
commit_id = repo_data.get_repo_head_commit(repo_id)
if not commit_id:
# invalid repo without head commit id
logger.error("invalid repo : %s " % repo_id)
return
try:
self.metadata_manager.update_metadata(repo_id, commit_id)
except Exception as e:
logger.exception('update repo: %s metadata error: %s', repo_id, e)

def face_cluster_handler(self):
face_recognition_logger.info('%s starting face cluster' % self.tname)
try:
Expand Down Expand Up @@ -193,14 +130,6 @@ def update_face_cluster(self, repo_id, username):
except Exception as e:
face_recognition_logger.exception('update repo: %s metadata error: %s', repo_id, e)

def add_to_undo_task(self, mq, repo_id, op_type):
"""Push task back to the end of the queue.
"""
# avoid get the same task repeatedly
time.sleep(0.1)
mq.lpush('metadata_task', '\t'.join([op_type, repo_id]))
logger.debug('%s push back task (%s,) to the queue' % (self.tname, repo_id))

def refresh_lock(self):
logger.info('%s Starting refresh locks' % self.tname)
while True:
Expand Down
134 changes: 0 additions & 134 deletions repo_metadata/metadata_manager.py

This file was deleted.

Loading
Loading