From 4dd84322773b1084c45da959672a4f37b47f75a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E6=B0=B8=E5=BC=BA?= <11704063+s-yongqiang@user.noreply.gitee.com> Date: Sat, 4 Jan 2025 23:03:24 +0800 Subject: [PATCH] add metrics --- app/app.py | 10 +- app/config.py | 1 + app/event_redis.py | 114 ++++++++++++++- db.py | 6 + events/metrics.py | 130 ++++++++++++++++++ main.py | 3 +- seafevent_server/request_handler.py | 40 +++++- seafevents_api.py | 4 +- .../index_task/filename_index_updater.py | 15 +- 9 files changed, 313 insertions(+), 10 deletions(-) create mode 100644 events/metrics.py diff --git a/app/app.py b/app/app.py index 5f6326a4..d6a6eae4 100644 --- a/app/app.py +++ b/app/app.py @@ -8,9 +8,10 @@ from seafevents.repo_metadata.index_worker import RepoMetadataIndexWorker from seafevents.repo_metadata.slow_task_handler import SlowTaskHandler from seafevents.seafevent_server.seafevent_server import SeafEventServer -from seafevents.app.config import ENABLE_METADATA_MANAGEMENT +from seafevents.app.config import ENABLE_METADATA_MANAGEMENT, ENABLE_METRIC from seafevents.seasearch.index_task.filename_index_updater import RepoFilenameIndexUpdater from seafevents.seasearch.index_task.wiki_index_updater import SeasearchWikiIndexUpdater +from seafevents.events.metrics import MetricRedisRecord, MetricHandler class App(object): @@ -38,6 +39,10 @@ def __init__(self, config, seafile_config, self._file_updates_sender = FileUpdatesSender() self._repo_old_file_auto_del_scanner = RepoOldFileAutoDelScanner(config) self._deleted_files_count_cleaner = DeletedFilesCountCleaner(config) + if ENABLE_METRIC: + self._metric_collect = MetricRedisRecord() + self._metric_handle = MetricHandler(self, config) + if ENABLE_METADATA_MANAGEMENT: self._index_master = RepoMetadataIndexMaster(config) self._index_worker = RepoMetadataIndexWorker(config) @@ -70,6 +75,9 @@ def serve_forever(self): self._index_worker.start() self._slow_task_handler.start() self._face_cluster.start() + if ENABLE_METRIC: + self._metric_collect.start() + self._metric_handle.start() self._repo_filename_index_updater.start() self._seasearch_wiki_index_updater.start() self._es_wiki_index_updater.start() diff --git a/app/config.py b/app/config.py index ed833b92..b132a15e 100644 --- a/app/config.py +++ b/app/config.py @@ -26,6 +26,7 @@ METADATA_FILE_TYPES = getattr(seahub_settings, 'METADATA_FILE_TYPES', {}) DOWNLOAD_LIMIT_WHEN_THROTTLE = getattr(seahub_settings, 'DOWNLOAD_LIMIT_WHEN_THROTTLE', '1k') ENABLED_ROLE_PERMISSIONS = getattr(seahub_settings, 'ENABLED_ROLE_PERMISSIONS', {}) + ENABLE_METRIC = getattr(seahub_settings, 'ENABLE_METRIC', False) except ImportError: logger.critical("Can not import seahub settings.") diff --git a/app/event_redis.py b/app/event_redis.py index 5109f0f7..30521144 100644 --- a/app/event_redis.py +++ b/app/event_redis.py @@ -1,8 +1,17 @@ # -*- coding: utf-8 -*- +import copy +import json import logging +import uuid + +import redis +import time logger = logging.getLogger(__name__) +REDIS_METRIC_KEY = "metric" +LOCK_NAME = "metric_lock" + class RedisClient(object): @@ -32,8 +41,111 @@ def _parse_config(self, config, socket_connect_timeout, socket_timeout): By default, each Redis instance created will in turn create its own connection pool. Every caller using redis client will has it's own pool with config caller passed. """ - import redis + self.connection = redis.Redis( host=self._host, port=self._port, password=self._password, decode_responses=True, socket_timeout=socket_timeout, socket_connect_timeout=socket_connect_timeout, ) + + def get_subscriber(self, channel_name): + while True: + try: + subscriber = self.connection.pubsub(ignore_subscribe_messages=True) + subscriber.subscribe(channel_name) + except redis.AuthenticationError as e: + logger.critical('connect to redis auth error: %s', e) + raise e + except Exception as e: + logger.error('redis pubsub failed. {} retry after 10s'.format(e)) + time.sleep(10) + else: + return subscriber + + def setnx(self, key, value): + return self.connection.setnx(key, value) + + def expire(self, name, timeout): + return self.connection.expire(name, timeout) + + def get(self, key): + return self.connection.get(key) + + def set(self, key, value, timeout=None): + if not timeout: + return self.connection.set(key, value) + else: + return self.connection.settex(key, timeout, value) + def delete(self, key): + return self.connection.delete(key) + + def lrange(self, key, start, end): + return self.connection.lrange(key, start, end) + + def lpush(self, key, value): + return self.connection.lpush(key, value) + + def lrem(self, key, count): + return self.connection.lrem(key, count) + + def publisher(self, channel, message): + return self.connection.publish(channel, message) + + +class RedisCache(object): + def __init__(self): + self._redis_client = None + + + def init_redis(self, config): + self._redis_client = RedisClient(config) + + + def get(self, key): + return self._redis_client.get(key) + + + def set(self, key, value, timeout=None): + return self._redis_client.set(key, value, timeout=timeout) + + + def delete(self, key): + return self._redis_client.delete(key) + + def lrange(self, key, start, end): + return self._redis_client.lrange(key, start, end) + + def lpush(self, key, value): + return self._redis_client.lpush(key, value) + + def lrem(self, key, count): + return self._redis_client.lrem(key, count) + + def acquire_lock(self): + lock_value = str(uuid.uuid4()) # 创建一个唯一的锁标识 + if self._redis_client.setnx(LOCK_NAME, lock_value): # 获取锁 + self._redis_client.expire(LOCK_NAME, timeout=10) # 设置锁的过期时间,避免死锁 + return lock_value + return None + + def release_lock(self): + self._redis_client.delete(LOCK_NAME) + + def create_or_update(self, key, value): + lock_value = self.acquire_lock() + if lock_value: + try: + current_value = self._redis_client.get(key) + if current_value: + current_value_dict_copy = copy.deepcopy(json.loads(current_value)) + current_value_dict_copy.update(value) + self._redis_client.set(key, json.dumps(current_value_dict_copy)) + else: + self._redis_client.set(key, json.dumps(value)) + finally: + self.release_lock() + + def publisher(self, channel, message): + self._redis_client.publisher(channel, message) + + +redis_cache = RedisCache() diff --git a/db.py b/db.py index 2e42ae52..1527a2f0 100644 --- a/db.py +++ b/db.py @@ -110,6 +110,12 @@ def init_db_session_class(config, db='seafevent'): Session = sessionmaker(bind=engine) return Session +def init_redis_cache(config): + from seafevents.app.event_redis import redis_cache + redis_cache.init_redis(config) + + return redis_cache + def create_db_tables(config): # create seafevents tables if not exists. diff --git a/events/metrics.py b/events/metrics.py new file mode 100644 index 00000000..4034e521 --- /dev/null +++ b/events/metrics.py @@ -0,0 +1,130 @@ +import os +import json +import time +import datetime +import logging +from threading import Thread, Event +from seafevents.app.config import ENABLE_METRIC +from seafevents.app.event_redis import redis_cache, RedisClient + + +local_metric = {'metrics': {}} + +node_name = os.environ.get('NODE_NAME', 'default') + + +### metrics decorator +def seasearch_index_timing_decorator(func): + def wrapper(*args, **kwargs): + redis_client = args[4] + publish_metric = { + "metric_name": "seasearch_index_timing", + "instance_name": "seafevents", + "node_name": node_name, + "details": { + "collected_at": datetime.datetime.utcnow().isoformat() + } + } + start_time = time.time() + func(*args, **kwargs) + end_time = time.time() + duration_seconds = end_time - start_time + publish_metric["details"]['metric_value'] = round(duration_seconds, 3) + if ENABLE_METRIC: + redis_client.publisher("metric-channel", json.dumps(publish_metric)) + return wrapper + + +def format_metrics(cache): + metrics = cache.get('metrics') + if not metrics: + return '' + metrics = json.loads(metrics) + + metric_info = '' + for metric_name, metric_detail in metrics.items(): + metric_value = metric_detail.pop('metric_value') + if metric_detail: + for label_name, label_value in metric_detail.items(): + label = label_name + '="' + str(label_value) + '",' + label = label[:-1] + metric_info = metric_name + '{' + label + '} ' + str(metric_value) +'\n' + else: + metric_info = metric_name + str(metric_value) + '\n' + + cache.delete("metrics") + return metric_info.encode() + + +class MetricHandler(object): + def __init__(self, app, config): + + self.app = app + self.config = config + + def start(self): + MetricTask(self.app, self.config).start() + + +class MetricTask(Thread): + def __init__(self, app, config): + Thread.__init__(self) + self._finished = Event() + self._redis_client = RedisClient(config) + self.app = app + + def run(self): + logging.info('Starting handle redis channel') + subscriber = self._redis_client.get_subscriber('metric-channel') + + while not self._finished.is_set(): + try: + message = subscriber.get_message() + if message is not None: + metric_data = json.loads(message['data']) + try: + key_name = metric_data.get('instance_name') + ':' + metric_data.get('node_name') + ':' + metric_data.get('metric_name') + metric_value = metric_data.get('details') + # global + local_metric['metrics'][key_name] = metric_value + except Exception as e: + logging.error('Handle metrics failed: %s' % e) + else: + time.sleep(0.5) + except Exception as e: + logging.error('Failed handle metrics: %s' % e) + subscriber = self._redis_client.get_subscriber('metric-channel') + + +class MetricRedisRecord(object): + + def __init__(self): + self._interval = 15 + + def start(self): + logging.info('Start metric collect, interval = %s sec', self._interval) + MetricRedisCollect(self._interval).start() + + +class MetricRedisCollect(Thread): + + def __init__(self, interval): + Thread.__init__(self) + self._interval = interval + self.finished = Event() + + def run(self): + + while not self.finished.is_set(): + self.finished.wait(self._interval) + if not self.finished.is_set(): + try: + if local_metric.get('metrics'): + redis_cache.create_or_update('metrics', local_metric.get('metrics')) + local_metric['metrics'].clear() + except Exception as e: + logging.exception('metric collect error: %s', e) + + def cancel(self): + self.finished.set() + diff --git a/main.py b/main.py index 40ecd073..a042ecd4 100644 --- a/main.py +++ b/main.py @@ -8,7 +8,7 @@ from seafevents.app.log import LogConfigurator from seafevents.app.app import App from seafevents.app.config import get_config, is_cluster_enabled, is_syslog_enabled - +from seafevents.app.event_redis import redis_cache def main(background_tasks_only=False): parser = argparse.ArgumentParser(description='seafevents main program') @@ -36,6 +36,7 @@ def main(background_tasks_only=False): seafile_config = get_config(seafile_conf_path) config = get_config(args.config_file) + redis_cache.init_redis(config) try: create_db_tables(config) prepare_db_tables(seafile_config) diff --git a/seafevent_server/request_handler.py b/seafevent_server/request_handler.py index 9ddb4d64..eaca52bd 100644 --- a/seafevent_server/request_handler.py +++ b/seafevent_server/request_handler.py @@ -1,14 +1,19 @@ import jwt import logging import json +import time from flask import Flask, request, make_response -from seafevents.app.config import SEAHUB_SECRET_KEY +from seafevents.app.config import SEAHUB_SECRET_KEY, ENABLE_METRIC from seafevents.seafevent_server.task_manager import task_manager from seafevents.seafevent_server.export_task_manager import event_export_task_manager from seafevents.seasearch.index_task.index_task_manager import index_task_manager from seafevents.repo_metadata.metadata_server_api import MetadataServerAPI from seafevents.repo_metadata.utils import add_file_details +from seafevents.app.event_redis import redis_cache +from seafevents.events.metrics import node_name + + app = Flask(__name__) logger = logging.getLogger(__name__) @@ -72,6 +77,17 @@ def get_sys_logs_task(): log_type = request.args.get('log_type') try: task_id = event_export_task_manager.add_export_logs_task(start_time, end_time, log_type) + publish_metric = { + "metric_name": "io_task_qsize", + "instance_name": "seafevents", + "node_name": node_name, + "details": { + "metric_value": event_export_task_manager.tasks_queue.qsize(), + "collected_at": time.time() + } + } + if ENABLE_METRIC: + redis_cache.publisher('metric-channel', json.dumps(publish_metric)) except Exception as e: logger.error(e) return make_response((e, 500)) @@ -96,6 +112,17 @@ def get_org_logs_task(): org_id = request.args.get('org_id') try: task_id = event_export_task_manager.add_org_export_logs_task(start_time, end_time, log_type, org_id) + publish_metric = { + "metric_name": "io_task_qsize", + "instance_name": "seafevents", + "node_name": node_name, + "details": { + "metric_value": event_export_task_manager.tasks_queue.qsize(), + "collected_at": time.time() + } + } + if ENABLE_METRIC: + redis_cache.publisher('metric-channel', json.dumps(publish_metric)) except Exception as e: logger.error(e) return make_response((e, 500)) @@ -264,6 +291,17 @@ def add_convert_wiki_task(): try: task_id = event_export_task_manager.add_convert_wiki_task(old_repo_id, new_repo_id, username) + publish_metric = { + "metric_name": "io_task_qsize", + "instance_name": "seafevents", + "node_name": node_name, + "details": { + "metric_value": event_export_task_manager.tasks_queue.qsize(), + "collected_at": time.time() + } + } + if ENABLE_METRIC: + redis_cache.publisher('metric-channel', json.dumps(publish_metric)) except Exception as e: logger.error(e) return make_response((e, 500)) diff --git a/seafevents_api.py b/seafevents_api.py index e6da7c20..d52af1ae 100644 --- a/seafevents_api.py +++ b/seafevents_api.py @@ -1,4 +1,4 @@ -from .db import init_db_session_class +from .db import init_db_session_class, init_redis_cache from .statistics.db import * from .events.db import * from .events.handlers import get_delete_records @@ -6,6 +6,8 @@ from .virus_scanner.db_oper import * from .app.config import is_repo_auto_del_enabled, is_search_enabled, is_audit_enabled, \ is_seasearch_enabled +from .events.metrics import format_metrics + def is_pro(): diff --git a/seasearch/index_task/filename_index_updater.py b/seasearch/index_task/filename_index_updater.py index 0528fb84..045f26df 100644 --- a/seasearch/index_task/filename_index_updater.py +++ b/seasearch/index_task/filename_index_updater.py @@ -11,6 +11,8 @@ from seafevents.seasearch.utils.seasearch_api import SeaSearchAPI from seafevents.repo_data import repo_data from seafevents.utils import parse_bool, get_opt_from_conf_or_env, parse_interval +from seafevents.events.metrics import seasearch_index_timing_decorator +from seafevents.app.event_redis import RedisClient logger = logging.getLogger(__name__) @@ -26,6 +28,7 @@ def __init__(self, config): self._repo_filename_index = None self._index_manager = None self._parse_config(config) + self.redis_client = RedisClient(config) def _parse_config(self, config): """Parse filename index update related parts of events.conf""" @@ -87,7 +90,8 @@ def start(self): self._repo_filename_index, self._index_manager, self._repo_data, - self._interval + self._interval, + self.redis_client ).start() @@ -105,8 +109,8 @@ def clear_deleted_repo(repo_status_filename_index, repo_filename_index, index_ma logger.info('Repo %s has been deleted from filename index.' % repo_id) logger.info("filename index deleted repo has been cleared") - -def update_repo_file_name_indexes(repo_status_filename_index, repo_filename_index, index_manager, repo_data): +@seasearch_index_timing_decorator +def update_repo_file_name_indexes(repo_status_filename_index, repo_filename_index, index_manager, repo_data, redis_client): start, count = 0, 1000 all_repos = [] @@ -139,20 +143,21 @@ def update_repo_file_name_indexes(repo_status_filename_index, repo_filename_inde class RepoFilenameIndexUpdaterTimer(Thread): - def __init__(self, repo_status_filename_index, repo_filename_index, index_manager, repo_data, interval): + def __init__(self, repo_status_filename_index, repo_filename_index, index_manager, repo_data, interval, redis_client): super(RepoFilenameIndexUpdaterTimer, self).__init__() self.repo_status_filename_index = repo_status_filename_index self.repo_filename_index = repo_filename_index self.index_manager = index_manager self.repo_data = repo_data self.interval = interval + self.redis_client = redis_client def run(self): sched = GeventScheduler() logging.info('Start to update filename index...') try: sched.add_job(update_repo_file_name_indexes, IntervalTrigger(seconds=self.interval), - args=(self.repo_status_filename_index, self.repo_filename_index, self.index_manager, self.repo_data)) + args=(self.repo_status_filename_index, self.repo_filename_index, self.index_manager, self.repo_data, self.redis_client)) except Exception as e: logging.exception('periodical update filename index error: %s', e)