diff --git a/app/app.py b/app/app.py index d6a6eae4..dcb6795d 100644 --- a/app/app.py +++ b/app/app.py @@ -11,7 +11,7 @@ from seafevents.app.config import ENABLE_METADATA_MANAGEMENT, ENABLE_METRIC from seafevents.seasearch.index_task.filename_index_updater import RepoFilenameIndexUpdater from seafevents.seasearch.index_task.wiki_index_updater import SeasearchWikiIndexUpdater -from seafevents.events.metrics import MetricRedisRecord, MetricHandler +from seafevents.events.metrics import MetricRedisRecorder, MetricHandler class App(object): @@ -40,7 +40,7 @@ def __init__(self, config, seafile_config, self._repo_old_file_auto_del_scanner = RepoOldFileAutoDelScanner(config) self._deleted_files_count_cleaner = DeletedFilesCountCleaner(config) if ENABLE_METRIC: - self._metric_collect = MetricRedisRecord() + self._metric_redis_recorder = MetricRedisRecorder() self._metric_handle = MetricHandler(self, config) if ENABLE_METADATA_MANAGEMENT: @@ -76,7 +76,7 @@ def serve_forever(self): self._slow_task_handler.start() self._face_cluster.start() if ENABLE_METRIC: - self._metric_collect.start() + self._metric_redis_recorder.start() self._metric_handle.start() self._repo_filename_index_updater.start() self._seasearch_wiki_index_updater.start() diff --git a/app/event_redis.py b/app/event_redis.py index 30521144..41581c74 100644 --- a/app/event_redis.py +++ b/app/event_redis.py @@ -78,16 +78,7 @@ def set(self, key, value, timeout=None): def delete(self, key): return self.connection.delete(key) - def lrange(self, key, start, end): - return self.connection.lrange(key, start, end) - - def lpush(self, key, value): - return self.connection.lpush(key, value) - - def lrem(self, key, count): - return self.connection.lrem(key, count) - - def publisher(self, channel, message): + def publish(self, channel, message): return self.connection.publish(channel, message) @@ -111,15 +102,6 @@ def set(self, key, value, timeout=None): def delete(self, key): return self._redis_client.delete(key) - def lrange(self, key, start, end): - return self._redis_client.lrange(key, start, end) - - def lpush(self, key, value): - return self._redis_client.lpush(key, value) - - def lrem(self, key, count): - return self._redis_client.lrem(key, count) - def acquire_lock(self): lock_value = str(uuid.uuid4()) # 创建一个唯一的锁标识 if self._redis_client.setnx(LOCK_NAME, lock_value): # 获取锁 @@ -144,8 +126,8 @@ def create_or_update(self, key, value): finally: self.release_lock() - def publisher(self, channel, message): - self._redis_client.publisher(channel, message) + def publish(self, channel, message): + self._redis_client.publish(channel, message) redis_cache = RedisCache() diff --git a/events/metrics.py b/events/metrics.py index 4034e521..f8551aa5 100644 --- a/events/metrics.py +++ b/events/metrics.py @@ -10,7 +10,7 @@ local_metric = {'metrics': {}} -node_name = os.environ.get('NODE_NAME', 'default') +NODE_NAME = os.environ.get('NODE_NAME', 'default') ### metrics decorator @@ -20,18 +20,18 @@ def wrapper(*args, **kwargs): publish_metric = { "metric_name": "seasearch_index_timing", "instance_name": "seafevents", - "node_name": node_name, + "node_name": NODE_NAME, "details": { - "collected_at": datetime.datetime.utcnow().isoformat() + "collected_at": datetime.datetime.now().isoformat() } } start_time = time.time() func(*args, **kwargs) end_time = time.time() duration_seconds = end_time - start_time - publish_metric["details"]['metric_value'] = round(duration_seconds, 3) + publish_metric['metric_value'] = round(duration_seconds, 3) if ENABLE_METRIC: - redis_client.publisher("metric-channel", json.dumps(publish_metric)) + redis_client.publish("metric-channel", json.dumps(publish_metric)) return wrapper @@ -48,9 +48,9 @@ def format_metrics(cache): for label_name, label_value in metric_detail.items(): label = label_name + '="' + str(label_value) + '",' label = label[:-1] - metric_info = metric_name + '{' + label + '} ' + str(metric_value) +'\n' + metric_info += metric_name + '{' + label + '} ' + str(metric_value) +'\n' else: - metric_info = metric_name + str(metric_value) + '\n' + metric_info += metric_name + str(metric_value) + '\n' cache.delete("metrics") return metric_info.encode() @@ -84,9 +84,10 @@ def run(self): metric_data = json.loads(message['data']) try: key_name = metric_data.get('instance_name') + ':' + metric_data.get('node_name') + ':' + metric_data.get('metric_name') - metric_value = metric_data.get('details') + metric_details = metric_data.get('details') + metric_details['metric_value'] = metric_data.get('metric_value') # global - local_metric['metrics'][key_name] = metric_value + local_metric['metrics'][key_name] = metric_details except Exception as e: logging.error('Handle metrics failed: %s' % e) else: @@ -96,7 +97,7 @@ def run(self): subscriber = self._redis_client.get_subscriber('metric-channel') -class MetricRedisRecord(object): +class MetricRedisRecorder(object): def __init__(self): self._interval = 15 diff --git a/seafevent_server/request_handler.py b/seafevent_server/request_handler.py index eaca52bd..20f8a8f5 100644 --- a/seafevent_server/request_handler.py +++ b/seafevent_server/request_handler.py @@ -1,7 +1,7 @@ import jwt import logging import json -import time +import datetime from flask import Flask, request, make_response from seafevents.app.config import SEAHUB_SECRET_KEY, ENABLE_METRIC @@ -11,7 +11,7 @@ from seafevents.repo_metadata.metadata_server_api import MetadataServerAPI from seafevents.repo_metadata.utils import add_file_details from seafevents.app.event_redis import redis_cache -from seafevents.events.metrics import node_name +from seafevents.events.metrics import NODE_NAME @@ -19,6 +19,20 @@ logger = logging.getLogger(__name__) +def publish_io_qsize_metric(qsize): + publish_metric = { + "metric_name": "io_task_qsize", + "instance_name": "seafevents", + "node_name": NODE_NAME, + "metric_value": qsize, + "details": { + "collected_at": datetime.datetime.now().isoformat() + } + } + if ENABLE_METRIC: + redis_cache.publish('metric-channel', json.dumps(publish_metric)) + + def check_auth_token(req): auth = req.headers.get('Authorization', '').split() if not auth or auth[0].lower() != 'token' or len(auth) != 2: @@ -77,17 +91,7 @@ def get_sys_logs_task(): log_type = request.args.get('log_type') try: task_id = event_export_task_manager.add_export_logs_task(start_time, end_time, log_type) - publish_metric = { - "metric_name": "io_task_qsize", - "instance_name": "seafevents", - "node_name": node_name, - "details": { - "metric_value": event_export_task_manager.tasks_queue.qsize(), - "collected_at": time.time() - } - } - if ENABLE_METRIC: - redis_cache.publisher('metric-channel', json.dumps(publish_metric)) + publish_io_qsize_metric(event_export_task_manager.tasks_queue.qsize()) except Exception as e: logger.error(e) return make_response((e, 500)) @@ -112,17 +116,7 @@ def get_org_logs_task(): org_id = request.args.get('org_id') try: task_id = event_export_task_manager.add_org_export_logs_task(start_time, end_time, log_type, org_id) - publish_metric = { - "metric_name": "io_task_qsize", - "instance_name": "seafevents", - "node_name": node_name, - "details": { - "metric_value": event_export_task_manager.tasks_queue.qsize(), - "collected_at": time.time() - } - } - if ENABLE_METRIC: - redis_cache.publisher('metric-channel', json.dumps(publish_metric)) + publish_io_qsize_metric(event_export_task_manager.tasks_queue.qsize()) except Exception as e: logger.error(e) return make_response((e, 500)) @@ -291,17 +285,7 @@ def add_convert_wiki_task(): try: task_id = event_export_task_manager.add_convert_wiki_task(old_repo_id, new_repo_id, username) - publish_metric = { - "metric_name": "io_task_qsize", - "instance_name": "seafevents", - "node_name": node_name, - "details": { - "metric_value": event_export_task_manager.tasks_queue.qsize(), - "collected_at": time.time() - } - } - if ENABLE_METRIC: - redis_cache.publisher('metric-channel', json.dumps(publish_metric)) + publish_io_qsize_metric(event_export_task_manager.tasks_queue.qsize()) except Exception as e: logger.error(e) return make_response((e, 500))