-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
69 lines (56 loc) · 2.16 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import time
import pandas as pd
import os
import subprocess
LOG_FILE = 'logs'
RUNTIME_CAPTURE = os.path.join('datasets','runtime','capture.csv')
OS_INSTALLATION_PATHS = [
# Common Unix/Linux directories
"/bin",
"/sbin",
"/usr/bin",
"/usr/sbin",
"/opt",
"/usr/local/bin",
"/usr/local/sbin",
"/snap/bin",
"/var/lib/flatpak/app",
# Common Windows installation directories
"C:\\Program Files",
"C:\\Program Files (x86)",
f"C:\\Users\\{os.getlogin()}\\AppData\\Local\\Programs",
f"C:\\Users\\{os.getlogin()}\\AppData\\Roaming",
f"C:\\Users\\{os.getlogin()}\\AppData\\Local",
f"C:\\Users\\{os.getlogin()}\\AppData\\Roaming\\Microsoft\\Windows\\Start Menu\\Programs",
f"C:\\Users\\{os.getlogin()}\\Desktop",
f"C:\\Users\\{os.getlogin()}\\AppData\\Local\\Microsoft\\WinGet\\Packages",
f"C:\\Users\\{os.getlogin()}\\AppData\Local\Temp",
f"C:\\Program Files\\WindowsApps"
]
# Utility function for time measurement
def current_ms() -> int:
return round(time.time() * 1000)
def run_command(command):
print("RUNNING COMMAND " + command)
subprocess.call(command.split(" "), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=False)
def preprocess_dataset(df: pd.DataFrame):
"""
Preprocess dataset.
Converts timestamps, categorical features, and binary flags into numerical features.
Removes source and destination IPs and ports from classification.
"""
df.drop(columns=['start_request_time', 'end_request_time'], inplace=True)
df.drop(columns=['start_response_time', 'end_response_time'], inplace=True)
df.drop(columns=['src_ip', 'dst_ip', 'src_port', 'dst_port'], inplace=True)
flag_columns = ['SYN', 'ACK', 'FIN', 'RST', 'URG', 'PSH']
for flag in flag_columns:
df[flag] = df[flag].astype(int)
df['duration'] = df['duration'].astype(float)
if 'label' in df.columns: # if label exists for training purposes
df['label'] = df['label'].astype(int)
return df
# Save logs to a file
def save_logs(log_message):
with open(LOG_FILE, 'a') as log_file:
for x in log_message:
log_file.write(x)