Skip to content

Commit

Permalink
add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
孙永强 committed Jan 4, 2025
1 parent 0292c24 commit 4dd8432
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 10 deletions.
10 changes: 9 additions & 1 deletion app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
from seafevents.repo_metadata.index_worker import RepoMetadataIndexWorker
from seafevents.repo_metadata.slow_task_handler import SlowTaskHandler
from seafevents.seafevent_server.seafevent_server import SeafEventServer
from seafevents.app.config import ENABLE_METADATA_MANAGEMENT
from seafevents.app.config import ENABLE_METADATA_MANAGEMENT, ENABLE_METRIC
from seafevents.seasearch.index_task.filename_index_updater import RepoFilenameIndexUpdater
from seafevents.seasearch.index_task.wiki_index_updater import SeasearchWikiIndexUpdater
from seafevents.events.metrics import MetricRedisRecord, MetricHandler


class App(object):
Expand Down Expand Up @@ -38,6 +39,10 @@ def __init__(self, config, seafile_config,
self._file_updates_sender = FileUpdatesSender()
self._repo_old_file_auto_del_scanner = RepoOldFileAutoDelScanner(config)
self._deleted_files_count_cleaner = DeletedFilesCountCleaner(config)
if ENABLE_METRIC:
self._metric_collect = MetricRedisRecord()
self._metric_handle = MetricHandler(self, config)

if ENABLE_METADATA_MANAGEMENT:
self._index_master = RepoMetadataIndexMaster(config)
self._index_worker = RepoMetadataIndexWorker(config)
Expand Down Expand Up @@ -70,6 +75,9 @@ def serve_forever(self):
self._index_worker.start()
self._slow_task_handler.start()
self._face_cluster.start()
if ENABLE_METRIC:
self._metric_collect.start()
self._metric_handle.start()
self._repo_filename_index_updater.start()
self._seasearch_wiki_index_updater.start()
self._es_wiki_index_updater.start()
1 change: 1 addition & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
METADATA_FILE_TYPES = getattr(seahub_settings, 'METADATA_FILE_TYPES', {})
DOWNLOAD_LIMIT_WHEN_THROTTLE = getattr(seahub_settings, 'DOWNLOAD_LIMIT_WHEN_THROTTLE', '1k')
ENABLED_ROLE_PERMISSIONS = getattr(seahub_settings, 'ENABLED_ROLE_PERMISSIONS', {})
ENABLE_METRIC = getattr(seahub_settings, 'ENABLE_METRIC', False)

except ImportError:
logger.critical("Can not import seahub settings.")
Expand Down
114 changes: 113 additions & 1 deletion app/event_redis.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
# -*- coding: utf-8 -*-
import copy
import json
import logging
import uuid

import redis
import time

logger = logging.getLogger(__name__)

REDIS_METRIC_KEY = "metric"
LOCK_NAME = "metric_lock"


class RedisClient(object):

Expand Down Expand Up @@ -32,8 +41,111 @@ def _parse_config(self, config, socket_connect_timeout, socket_timeout):
By default, each Redis instance created will in turn create its own connection pool.
Every caller using redis client will has it's own pool with config caller passed.
"""
import redis

self.connection = redis.Redis(
host=self._host, port=self._port, password=self._password, decode_responses=True,
socket_timeout=socket_timeout, socket_connect_timeout=socket_connect_timeout,
)

def get_subscriber(self, channel_name):
while True:
try:
subscriber = self.connection.pubsub(ignore_subscribe_messages=True)
subscriber.subscribe(channel_name)
except redis.AuthenticationError as e:
logger.critical('connect to redis auth error: %s', e)
raise e
except Exception as e:
logger.error('redis pubsub failed. {} retry after 10s'.format(e))
time.sleep(10)
else:
return subscriber

def setnx(self, key, value):
return self.connection.setnx(key, value)

def expire(self, name, timeout):
return self.connection.expire(name, timeout)

def get(self, key):
return self.connection.get(key)

def set(self, key, value, timeout=None):
if not timeout:
return self.connection.set(key, value)
else:
return self.connection.settex(key, timeout, value)
def delete(self, key):
return self.connection.delete(key)

def lrange(self, key, start, end):
return self.connection.lrange(key, start, end)

def lpush(self, key, value):
return self.connection.lpush(key, value)

def lrem(self, key, count):
return self.connection.lrem(key, count)

def publisher(self, channel, message):
return self.connection.publish(channel, message)


class RedisCache(object):
def __init__(self):
self._redis_client = None


def init_redis(self, config):
self._redis_client = RedisClient(config)


def get(self, key):
return self._redis_client.get(key)


def set(self, key, value, timeout=None):
return self._redis_client.set(key, value, timeout=timeout)


def delete(self, key):
return self._redis_client.delete(key)

def lrange(self, key, start, end):
return self._redis_client.lrange(key, start, end)

def lpush(self, key, value):
return self._redis_client.lpush(key, value)

def lrem(self, key, count):
return self._redis_client.lrem(key, count)

def acquire_lock(self):
lock_value = str(uuid.uuid4()) # 创建一个唯一的锁标识
if self._redis_client.setnx(LOCK_NAME, lock_value): # 获取锁
self._redis_client.expire(LOCK_NAME, timeout=10) # 设置锁的过期时间,避免死锁
return lock_value
return None

def release_lock(self):
self._redis_client.delete(LOCK_NAME)

def create_or_update(self, key, value):
lock_value = self.acquire_lock()
if lock_value:
try:
current_value = self._redis_client.get(key)
if current_value:
current_value_dict_copy = copy.deepcopy(json.loads(current_value))
current_value_dict_copy.update(value)
self._redis_client.set(key, json.dumps(current_value_dict_copy))
else:
self._redis_client.set(key, json.dumps(value))
finally:
self.release_lock()

def publisher(self, channel, message):
self._redis_client.publisher(channel, message)


redis_cache = RedisCache()
6 changes: 6 additions & 0 deletions db.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ def init_db_session_class(config, db='seafevent'):
Session = sessionmaker(bind=engine)
return Session

def init_redis_cache(config):
from seafevents.app.event_redis import redis_cache
redis_cache.init_redis(config)

return redis_cache


def create_db_tables(config):
# create seafevents tables if not exists.
Expand Down
130 changes: 130 additions & 0 deletions events/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import os
import json
import time
import datetime
import logging
from threading import Thread, Event
from seafevents.app.config import ENABLE_METRIC
from seafevents.app.event_redis import redis_cache, RedisClient


local_metric = {'metrics': {}}

node_name = os.environ.get('NODE_NAME', 'default')


### metrics decorator
def seasearch_index_timing_decorator(func):
def wrapper(*args, **kwargs):
redis_client = args[4]
publish_metric = {
"metric_name": "seasearch_index_timing",
"instance_name": "seafevents",
"node_name": node_name,
"details": {
"collected_at": datetime.datetime.utcnow().isoformat()
}
}
start_time = time.time()
func(*args, **kwargs)
end_time = time.time()
duration_seconds = end_time - start_time
publish_metric["details"]['metric_value'] = round(duration_seconds, 3)
if ENABLE_METRIC:
redis_client.publisher("metric-channel", json.dumps(publish_metric))
return wrapper


def format_metrics(cache):
metrics = cache.get('metrics')
if not metrics:
return ''
metrics = json.loads(metrics)

metric_info = ''
for metric_name, metric_detail in metrics.items():
metric_value = metric_detail.pop('metric_value')
if metric_detail:
for label_name, label_value in metric_detail.items():
label = label_name + '="' + str(label_value) + '",'
label = label[:-1]
metric_info = metric_name + '{' + label + '} ' + str(metric_value) +'\n'
else:
metric_info = metric_name + str(metric_value) + '\n'

cache.delete("metrics")
return metric_info.encode()


class MetricHandler(object):
def __init__(self, app, config):

self.app = app
self.config = config

def start(self):
MetricTask(self.app, self.config).start()


class MetricTask(Thread):
def __init__(self, app, config):
Thread.__init__(self)
self._finished = Event()
self._redis_client = RedisClient(config)
self.app = app

def run(self):
logging.info('Starting handle redis channel')
subscriber = self._redis_client.get_subscriber('metric-channel')

while not self._finished.is_set():
try:
message = subscriber.get_message()
if message is not None:
metric_data = json.loads(message['data'])
try:
key_name = metric_data.get('instance_name') + ':' + metric_data.get('node_name') + ':' + metric_data.get('metric_name')
metric_value = metric_data.get('details')
# global
local_metric['metrics'][key_name] = metric_value
except Exception as e:
logging.error('Handle metrics failed: %s' % e)
else:
time.sleep(0.5)
except Exception as e:
logging.error('Failed handle metrics: %s' % e)
subscriber = self._redis_client.get_subscriber('metric-channel')


class MetricRedisRecord(object):

def __init__(self):
self._interval = 15

def start(self):
logging.info('Start metric collect, interval = %s sec', self._interval)
MetricRedisCollect(self._interval).start()


class MetricRedisCollect(Thread):

def __init__(self, interval):
Thread.__init__(self)
self._interval = interval
self.finished = Event()

def run(self):

while not self.finished.is_set():
self.finished.wait(self._interval)
if not self.finished.is_set():
try:
if local_metric.get('metrics'):
redis_cache.create_or_update('metrics', local_metric.get('metrics'))
local_metric['metrics'].clear()
except Exception as e:
logging.exception('metric collect error: %s', e)

def cancel(self):
self.finished.set()

3 changes: 2 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from seafevents.app.log import LogConfigurator
from seafevents.app.app import App
from seafevents.app.config import get_config, is_cluster_enabled, is_syslog_enabled

from seafevents.app.event_redis import redis_cache

def main(background_tasks_only=False):
parser = argparse.ArgumentParser(description='seafevents main program')
Expand Down Expand Up @@ -36,6 +36,7 @@ def main(background_tasks_only=False):

seafile_config = get_config(seafile_conf_path)
config = get_config(args.config_file)
redis_cache.init_redis(config)
try:
create_db_tables(config)
prepare_db_tables(seafile_config)
Expand Down
Loading

0 comments on commit 4dd8432

Please sign in to comment.