Skip to content

Commit

Permalink
test dm
Browse files Browse the repository at this point in the history
  • Loading branch information
JoinTyang committed Jan 18, 2025
1 parent 31558c9 commit 8b18bb5
Show file tree
Hide file tree
Showing 12 changed files with 446 additions and 260 deletions.
4 changes: 2 additions & 2 deletions content_scanner/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class ContentScanRecord(Base):
__tablename__ = 'ContentScanRecord'
__tablename__ = 'CONTENTSCANRECORD'

id = mapped_column(Integer, primary_key=True, autoincrement=True)
repo_id = mapped_column(String(length=36), nullable=False, index=True)
Expand All @@ -21,7 +21,7 @@ def __init__(self, repo_id, commit_id, timestamp):


class ContentScanResult(Base):
__tablename__ = 'ContentScanResult'
__tablename__ = 'CONTENTSCANRESULT'

id = mapped_column(Integer, primary_key=True, autoincrement=True)
repo_id = mapped_column(String(length=36), nullable=False, index=True)
Expand Down
20 changes: 19 additions & 1 deletion db.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ def create_engine_from_conf(config, db='seafevent'):
db_url = "oracle://%s:%s@%s:%s/%s" % (username, quote_plus(passwd),
host, port, service_name)

elif backend == 'dm':
if config.has_option(db_sec, 'host'):
host = config.get(db_sec, 'host').lower()
else:
host = 'localhost'

if config.has_option(db_sec, 'port'):
port = config.getint(db_sec, 'port')
else:
port = 5236
username = config.get(db_sec, user)
passwd = config.get(db_sec, 'password')
service_name = config.get(db_sec, db_name)
if db == 'seafile':
service_name = config.get(db_sec, user)

db_url = "dm+dmPython://%s:%s@%s:%s/?schema=%s" % (username, quote_plus(passwd), host, port, service_name)
else:
logger.error("Unknown database backend: %s" % backend)
raise RuntimeError("Unknown database backend: %s" % backend)
Expand All @@ -104,7 +121,7 @@ def init_db_session_class(config, db='seafevent'):
try:
engine = create_engine_from_conf(config, db)
except (configparser.NoOptionError, configparser.NoSectionError) as e:
logger.error(e)
logger.exception(e)
raise RuntimeError("create db engine error: %s" % e)

Session = sessionmaker(bind=engine)
Expand Down Expand Up @@ -134,6 +151,7 @@ def prepare_db_tables(seafile_config):
logger.error(e)
raise RuntimeError("create db engine error: %s" % e)

# SeafBase.prepare(autoload_with=engine, schema='SYSDBA')
SeafBase.prepare(autoload_with=engine)


Expand Down
78 changes: 55 additions & 23 deletions events/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _get_user_activities(session, username, start, limit):
if limit <= 0:
logger.error('limit must be positive')
raise RuntimeError('limit must be positive')

sub_query = (
select(UserActivity.activity_id)
.where(UserActivity.username == username)
Expand All @@ -85,7 +85,7 @@ def _get_user_activities_by_op_user(session, username, op_user, start, limit):
if limit <= 0:
logger.error('limit must be positive')
raise RuntimeError('limit must be positive')

sub_query = (
select(UserActivity.activity_id)
.where(UserActivity.username == username)
Expand Down Expand Up @@ -199,9 +199,10 @@ def get_file_history_by_day(session, repo_id, path, start, limit, to_tz, history
repo_id_path_md5 = hashlib.md5((repo_id + path).encode('utf8')).hexdigest()
current_item = session.scalars(select(FileHistory).where(FileHistory.repo_id_path_md5 == repo_id_path_md5).
order_by(desc(FileHistory.id)).limit(1)).first()

new_events = []
if current_item:
# convert_tz 需要改,但是接口是sdoc的接口,需要配好sdoc后再测试
query_stmt = select(
FileHistory.id, FileHistory.op_type, FileHistory.op_user, FileHistory.timestamp, FileHistory.repo_id, FileHistory.commit_id,
FileHistory.file_id, FileHistory.file_uuid, FileHistory.path, FileHistory.repo_id_path_md5, FileHistory.size, FileHistory.old_path,
Expand Down Expand Up @@ -244,8 +245,9 @@ def get_file_daily_history_detail(session, repo_id, path, start_time, end_time,
repo_id_path_md5 = hashlib.md5((repo_id + path).encode('utf8')).hexdigest()
current_item = session.scalars(select(FileHistory).where(FileHistory.repo_id_path_md5 == repo_id_path_md5).
order_by(desc(FileHistory.id)).limit(1)).first()

details = list()
# convert_tz 需要改,但是接口是sdoc的接口,需要配好sdoc后再测试
try:
q = select(FileHistory.id, FileHistory.op_type, FileHistory.op_user, FileHistory.timestamp, FileHistory.repo_id, FileHistory.commit_id,
FileHistory.file_id, FileHistory.file_uuid, FileHistory.path, FileHistory.repo_id_path_md5, FileHistory.size, FileHistory.old_path,
Expand All @@ -271,8 +273,20 @@ def save_user_activity(session, record):
session.commit()

def save_repo_trash(session, record):
repo_trash = FileTrash(record)
session.add(repo_trash)
user = record['op_user']
obj_type = record['obj_type']
obj_id = record.get('obj_id', "")
obj_name = record['obj_name']
delete_time = record['timestamp']
repo_id = record['repo_id']
path = record['path']
commit_id = record.get('commit_id', None)
size = record.get('size', 0)

sql = f"""INSERT INTO FileTrash ("user", obj_type, obj_id, obj_name, delete_time, repo_id, commit_id, path, "size")
VALUES ('{user}', '{obj_type}', '{obj_id}', '{obj_name}', '{delete_time}', '{repo_id}', '{commit_id}', '{path}', {size})"""

session.execute(text(sql))
session.commit()

def restore_repo_trash(session, record):
Expand All @@ -291,7 +305,7 @@ def clean_up_repo_trash(session, repo_id, keep_days):
stmt = delete(FileTrash).where(FileTrash.repo_id == repo_id, FileTrash.delete_time < _timestamp)
session.execute(stmt)
session.commit()

def clean_up_all_repo_trash(session, keep_days):
if keep_days == 0:
stmt = delete(FileTrash)
Expand All @@ -314,10 +328,12 @@ def update_user_activity_timestamp(session, activity_id, record):
session.commit()

def update_file_history_record(session, history_id, record):
stmt = update(FileHistory).where(FileHistory.id == history_id).\
values(timestamp=record["timestamp"], file_id=record["obj_id"],
commit_id=record["commit_id"], size=record["size"])
session.execute(stmt)
timestamp = record["timestamp"]
file_id = record["obj_id"]
commit_id = record["commit_id"]
size = record["size"]
sql = f"""UPDATE FileHistory SET timestamp='{timestamp}', file_id='{file_id}', commit_id='{commit_id}', "size"={size} WHERE id='{history_id}'"""
session.execute(text(sql))
session.commit()

def query_prev_record(session, record):
Expand Down Expand Up @@ -366,8 +382,22 @@ def save_filehistory(session, fh_threshold, record):
file_uuid = uuid.uuid4().__str__()
record['file_uuid'] = file_uuid

filehistory = FileHistory(record)
session.add(filehistory)
op_type = record['op_type']
op_user = record['op_user']
timestamp = record['timestamp']
repo_id = record['repo_id']
commit_id = record.get('commit_id', '')
file_id = record.get('obj_id')
file_uuid = record.get('file_uuid')
path = record['path']
repo_id_path_md5 = hashlib.md5((repo_id + path).encode('utf8')).hexdigest()
size = record.get('size')
old_path = record.get('old_path', '')

sql = f"""INSERT INTO FileHistory (op_type, op_user, timestamp, repo_id, commit_id, file_id, file_uuid, path, repo_id_path_md5, "size", old_path) VALUES
('{op_type}', '{op_user}', '{timestamp}', '{repo_id}', '{commit_id}', '{file_id}', '{file_uuid}', '{path}', '{repo_id_path_md5}', {size}, '{old_path}')"""

session.execute(text(sql))
session.commit()


Expand All @@ -376,10 +406,12 @@ def save_file_update_event(session, timestamp, user, org_id, repo_id,
if timestamp is None:
timestamp = datetime.datetime.utcnow()

event = FileUpdate(timestamp, user, org_id, repo_id, commit_id, file_oper)
session.add(event)
sql = f"""INSERT INTO FileUpdate (timestamp, "user", org_id, repo_id, commit_id, file_oper) VALUES
('{timestamp}', '{user}', {org_id}, '{repo_id}', '{commit_id}', '{file_oper}')"""
session.execute(text(sql))
session.commit()


def get_events(session, obj, username, org_id, repo_id, file_path, start, limit):
if start < 0:
logger.error('start must be non-negative')
Expand Down Expand Up @@ -428,21 +460,21 @@ def save_file_audit_event(session, timestamp, etype, user, ip, device,
if timestamp is None:
timestamp = datetime.datetime.utcnow()

file_audit = FileAudit(timestamp, etype, user, ip, device, org_id,
repo_id, file_path)

session.add(file_audit)
sql = f"""INSERT INTO FileAudit (timestamp, etype, "user", ip, device, org_id,
repo_id, file_path) VALUES
('{timestamp}', '{etype}', '{user}', '{ip}', '{device}', {org_id}, '{repo_id}', '{file_path}')"""
session.execute(text(sql))
session.commit()

def save_perm_audit_event(session, timestamp, etype, from_user, to,
org_id, repo_id, file_path, perm):
if timestamp is None:
timestamp = datetime.datetime.utcnow()

perm_audit = PermAudit(timestamp, etype, from_user, to, org_id,
repo_id, file_path, perm)

session.add(perm_audit)
sql = f"""INSERT INTO PermAudit (timestamp, etype, from_user, "to", org_id,
repo_id, file_path, permission) VALUES
('{timestamp}', '{etype}', '{from_user}', '{to}', {org_id}, '{repo_id}', '{file_path}', '{perm}')"""
session.execute(text(sql))
session.commit()

def get_perm_audit_events(session, from_user, org_id, repo_id, start, limit):
Expand Down
14 changes: 7 additions & 7 deletions events/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
class Activity(Base):
"""
"""
__tablename__ = 'Activity'
__tablename__ = 'ACTIVITY'

id = mapped_column(BigInteger, primary_key=True, autoincrement=True)
op_type = mapped_column(String(length=128), nullable=False)
Expand Down Expand Up @@ -51,7 +51,7 @@ def __str__(self):
class UserActivity(Base):
"""
"""
__tablename__ = 'UserActivity'
__tablename__ = 'USERACTIVITY'

id = mapped_column(BigInteger, primary_key=True, autoincrement=True)
username = mapped_column(String(length=255), nullable=False)
Expand All @@ -73,7 +73,7 @@ def __str__(self):


class FileHistory(Base):
__tablename__ = 'FileHistory'
__tablename__ = 'FILEHISTORY'

id = mapped_column(BigInteger, primary_key=True, autoincrement=True)
op_type = mapped_column(String(length=128), nullable=False)
Expand Down Expand Up @@ -105,7 +105,7 @@ def __init__(self, record):


class FileAudit(Base):
__tablename__ = 'FileAudit'
__tablename__ = 'FILEAUDIT'

eid = mapped_column(BigInteger, primary_key=True, autoincrement=True)
timestamp = mapped_column(DateTime, nullable=False, index=True)
Expand Down Expand Up @@ -149,7 +149,7 @@ def __str__(self):


class FileUpdate(Base):
__tablename__ = 'FileUpdate'
__tablename__ = 'FILEUPDATE'

eid = mapped_column(BigInteger, primary_key=True, autoincrement=True)
timestamp = mapped_column(DateTime, nullable=False, index=True)
Expand Down Expand Up @@ -186,7 +186,7 @@ def __str__(self):


class PermAudit(Base):
__tablename__ = 'PermAudit'
__tablename__ = 'PERMAUDIT'

eid = mapped_column(BigInteger, primary_key=True, autoincrement=True)
timestamp = mapped_column(DateTime, nullable=False)
Expand Down Expand Up @@ -246,7 +246,7 @@ def __init__(self, login_date, username, login_ip, login_success):
self.login_success = login_success

class FileTrash(Base):
__tablename__ = 'FileTrash'
__tablename__ = 'FILETRASH'

id = mapped_column(Integer, primary_key=True, autoincrement=True)
user = mapped_column(String(length=255), nullable=False)
Expand Down
2 changes: 1 addition & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def main(background_tasks_only=False):
seafile_config = get_config(seafile_conf_path)
config = get_config(args.config_file)
try:
create_db_tables(config)
# create_db_tables(config)
prepare_db_tables(seafile_config)
except Exception as e:
logging.error('Failed create tables, error: %s' % e)
Expand Down
18 changes: 16 additions & 2 deletions repo_data/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ def create_engine_from_conf(config_file):
db_url = "mysql+pymysql://%s:%s@%s:%s/%s?charset=utf8" % \
(db_username, quote_plus(db_passwd),
db_server, db_port, db_name)

elif backend == 'dm':
db_server = 'localhost'
db_port = 5236

if seaf_conf.has_option('database', 'host'):
db_server = seaf_conf.get('database', 'host')
if seaf_conf.has_option('database', 'port'):
db_port = seaf_conf.getint('database', 'port')
db_username = seaf_conf.get('database', 'user')
db_passwd = seaf_conf.get('database', 'password')
db_name = seaf_conf.get('database', 'user')
db_url = "dm+dmPython://%s:%s@%s:%s?schema=%s" % (db_username, quote_plus(db_passwd),
db_server, db_port, db_name)
else:
logger.critical("Unknown Database backend: %s" % backend)
raise RuntimeError("Unknown Database backend: %s" % backend)
Expand All @@ -57,7 +71,7 @@ def init_db_session_class(config_file):
except (configparser.NoOptionError, configparser.NoSectionError) as e:
logger.error(e)
raise RuntimeError("invalid config file %s", config_file)

Session = sessionmaker(bind=engine)
return Session

Expand All @@ -74,6 +88,6 @@ def ping_connection(dbapi_connection, connection_record, connection_proxy): # py
except:
logger.info('fail to ping database server, disposing all cached connections')
connection_proxy._pool.dispose() # pylint: disable=protected-access

# Raise DisconnectionError so the pool would create a new connection
raise DisconnectionError()
Loading

0 comments on commit 8b18bb5

Please sign in to comment.