Skip to content

Commit

Permalink
optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
孙永强 committed Jan 6, 2025
1 parent 4dd8432 commit e6ddc54
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 69 deletions.
6 changes: 3 additions & 3 deletions app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from seafevents.app.config import ENABLE_METADATA_MANAGEMENT, ENABLE_METRIC
from seafevents.seasearch.index_task.filename_index_updater import RepoFilenameIndexUpdater
from seafevents.seasearch.index_task.wiki_index_updater import SeasearchWikiIndexUpdater
from seafevents.events.metrics import MetricRedisRecord, MetricHandler
from seafevents.events.metrics import MetricRedisRecorder, MetricHandler


class App(object):
Expand Down Expand Up @@ -40,7 +40,7 @@ def __init__(self, config, seafile_config,
self._repo_old_file_auto_del_scanner = RepoOldFileAutoDelScanner(config)
self._deleted_files_count_cleaner = DeletedFilesCountCleaner(config)
if ENABLE_METRIC:
self._metric_collect = MetricRedisRecord()
self._metric_redis_recorder = MetricRedisRecorder()
self._metric_handle = MetricHandler(self, config)

if ENABLE_METADATA_MANAGEMENT:
Expand Down Expand Up @@ -76,7 +76,7 @@ def serve_forever(self):
self._slow_task_handler.start()
self._face_cluster.start()
if ENABLE_METRIC:
self._metric_collect.start()
self._metric_redis_recorder.start()
self._metric_handle.start()
self._repo_filename_index_updater.start()
self._seasearch_wiki_index_updater.start()
Expand Down
24 changes: 3 additions & 21 deletions app/event_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,7 @@ def set(self, key, value, timeout=None):
def delete(self, key):
return self.connection.delete(key)

def lrange(self, key, start, end):
return self.connection.lrange(key, start, end)

def lpush(self, key, value):
return self.connection.lpush(key, value)

def lrem(self, key, count):
return self.connection.lrem(key, count)

def publisher(self, channel, message):
def publish(self, channel, message):
return self.connection.publish(channel, message)


Expand All @@ -111,15 +102,6 @@ def set(self, key, value, timeout=None):
def delete(self, key):
return self._redis_client.delete(key)

def lrange(self, key, start, end):
return self._redis_client.lrange(key, start, end)

def lpush(self, key, value):
return self._redis_client.lpush(key, value)

def lrem(self, key, count):
return self._redis_client.lrem(key, count)

def acquire_lock(self):
lock_value = str(uuid.uuid4()) # 创建一个唯一的锁标识
if self._redis_client.setnx(LOCK_NAME, lock_value): # 获取锁
Expand All @@ -144,8 +126,8 @@ def create_or_update(self, key, value):
finally:
self.release_lock()

def publisher(self, channel, message):
self._redis_client.publisher(channel, message)
def publish(self, channel, message):
self._redis_client.publish(channel, message)


redis_cache = RedisCache()
21 changes: 11 additions & 10 deletions events/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

local_metric = {'metrics': {}}

node_name = os.environ.get('NODE_NAME', 'default')
NODE_NAME = os.environ.get('NODE_NAME', 'default')


### metrics decorator
Expand All @@ -20,18 +20,18 @@ def wrapper(*args, **kwargs):
publish_metric = {
"metric_name": "seasearch_index_timing",
"instance_name": "seafevents",
"node_name": node_name,
"node_name": NODE_NAME,
"details": {
"collected_at": datetime.datetime.utcnow().isoformat()
"collected_at": datetime.datetime.now().isoformat()
}
}
start_time = time.time()
func(*args, **kwargs)
end_time = time.time()
duration_seconds = end_time - start_time
publish_metric["details"]['metric_value'] = round(duration_seconds, 3)
publish_metric['metric_value'] = round(duration_seconds, 3)
if ENABLE_METRIC:
redis_client.publisher("metric-channel", json.dumps(publish_metric))
redis_client.publish("metric-channel", json.dumps(publish_metric))
return wrapper


Expand All @@ -48,9 +48,9 @@ def format_metrics(cache):
for label_name, label_value in metric_detail.items():
label = label_name + '="' + str(label_value) + '",'
label = label[:-1]
metric_info = metric_name + '{' + label + '} ' + str(metric_value) +'\n'
metric_info += metric_name + '{' + label + '} ' + str(metric_value) +'\n'
else:
metric_info = metric_name + str(metric_value) + '\n'
metric_info += metric_name + str(metric_value) + '\n'

cache.delete("metrics")
return metric_info.encode()
Expand Down Expand Up @@ -84,9 +84,10 @@ def run(self):
metric_data = json.loads(message['data'])
try:
key_name = metric_data.get('instance_name') + ':' + metric_data.get('node_name') + ':' + metric_data.get('metric_name')
metric_value = metric_data.get('details')
metric_details = metric_data.get('details')
metric_details['metric_value'] = metric_data.get('metric_value')
# global
local_metric['metrics'][key_name] = metric_value
local_metric['metrics'][key_name] = metric_details
except Exception as e:
logging.error('Handle metrics failed: %s' % e)
else:
Expand All @@ -96,7 +97,7 @@ def run(self):
subscriber = self._redis_client.get_subscriber('metric-channel')


class MetricRedisRecord(object):
class MetricRedisRecorder(object):

def __init__(self):
self._interval = 15
Expand Down
54 changes: 19 additions & 35 deletions seafevent_server/request_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import jwt
import logging
import json
import time
import datetime

from flask import Flask, request, make_response
from seafevents.app.config import SEAHUB_SECRET_KEY, ENABLE_METRIC
Expand All @@ -11,14 +11,28 @@
from seafevents.repo_metadata.metadata_server_api import MetadataServerAPI
from seafevents.repo_metadata.utils import add_file_details
from seafevents.app.event_redis import redis_cache
from seafevents.events.metrics import node_name
from seafevents.events.metrics import NODE_NAME



app = Flask(__name__)
logger = logging.getLogger(__name__)


def publish_io_qsize_metric(qsize):
publish_metric = {
"metric_name": "io_task_qsize",
"instance_name": "seafevents",
"node_name": NODE_NAME,
"metric_value": qsize,
"details": {
"collected_at": datetime.datetime.now().isoformat()
}
}
if ENABLE_METRIC:
redis_cache.publish('metric-channel', json.dumps(publish_metric))


def check_auth_token(req):
auth = req.headers.get('Authorization', '').split()
if not auth or auth[0].lower() != 'token' or len(auth) != 2:
Expand Down Expand Up @@ -77,17 +91,7 @@ def get_sys_logs_task():
log_type = request.args.get('log_type')
try:
task_id = event_export_task_manager.add_export_logs_task(start_time, end_time, log_type)
publish_metric = {
"metric_name": "io_task_qsize",
"instance_name": "seafevents",
"node_name": node_name,
"details": {
"metric_value": event_export_task_manager.tasks_queue.qsize(),
"collected_at": time.time()
}
}
if ENABLE_METRIC:
redis_cache.publisher('metric-channel', json.dumps(publish_metric))
publish_io_qsize_metric(event_export_task_manager.tasks_queue.qsize())
except Exception as e:
logger.error(e)
return make_response((e, 500))
Expand All @@ -112,17 +116,7 @@ def get_org_logs_task():
org_id = request.args.get('org_id')
try:
task_id = event_export_task_manager.add_org_export_logs_task(start_time, end_time, log_type, org_id)
publish_metric = {
"metric_name": "io_task_qsize",
"instance_name": "seafevents",
"node_name": node_name,
"details": {
"metric_value": event_export_task_manager.tasks_queue.qsize(),
"collected_at": time.time()
}
}
if ENABLE_METRIC:
redis_cache.publisher('metric-channel', json.dumps(publish_metric))
publish_io_qsize_metric(event_export_task_manager.tasks_queue.qsize())
except Exception as e:
logger.error(e)
return make_response((e, 500))
Expand Down Expand Up @@ -291,17 +285,7 @@ def add_convert_wiki_task():

try:
task_id = event_export_task_manager.add_convert_wiki_task(old_repo_id, new_repo_id, username)
publish_metric = {
"metric_name": "io_task_qsize",
"instance_name": "seafevents",
"node_name": node_name,
"details": {
"metric_value": event_export_task_manager.tasks_queue.qsize(),
"collected_at": time.time()
}
}
if ENABLE_METRIC:
redis_cache.publisher('metric-channel', json.dumps(publish_metric))
publish_io_qsize_metric(event_export_task_manager.tasks_queue.qsize())
except Exception as e:
logger.error(e)
return make_response((e, 500))
Expand Down

0 comments on commit e6ddc54

Please sign in to comment.