-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfile_monitor.py
More file actions
executable file
·226 lines (179 loc) · 7.14 KB
/
file_monitor.py
File metadata and controls
executable file
·226 lines (179 loc) · 7.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#!/usr/bin/env python3
"""
File Monitor for Linux Antivirus Scanner
Monitors directories for new files and automatically scans them
"""
import os
import sys
import time
import argparse
import configparser
from pathlib import Path
from typing import List, Set
import subprocess
try:
import pyinotify
except ImportError:
print("Error: pyinotify not installed")
print("Install with: pip3 install pyinotify")
sys.exit(1)
class FileMonitor:
"""Monitor directories for new files and scan them"""
def __init__(self, config_file: str = None, watch_dirs: List[str] = None,
extensions: List[str] = None):
"""Initialize file monitor"""
self.config_file = config_file
self.watch_dirs = watch_dirs or []
self.extensions = extensions or []
self.scanned_files = set()
self.scanner_script = os.path.join(os.path.dirname(__file__), "linux_av.py")
# Load config if provided
if config_file and os.path.exists(config_file):
self.load_config()
def load_config(self):
"""Load configuration from file"""
try:
parser = configparser.ConfigParser()
parser.read(self.config_file)
if "Scanning" in parser:
download_dir = parser.get("Scanning", "download_dir",
fallback=str(Path.home() / "Downloads"))
if download_dir and download_dir not in self.watch_dirs:
self.watch_dirs.append(download_dir)
if not self.extensions:
extensions_str = parser.get("Scanning", "extensions", fallback="")
if extensions_str:
self.extensions = [ext.strip() for ext in extensions_str.split(",")]
except Exception as e:
print(f"Error loading config: {e}")
def should_scan_file(self, filepath: str) -> bool:
"""Check if file should be scanned"""
# Check if already scanned
if filepath in self.scanned_files:
return False
# Check if file still exists
if not os.path.isfile(filepath):
return False
# Check extension filter
if self.extensions:
file_ext = os.path.splitext(filepath)[1].lower()
if file_ext not in self.extensions and not any(ext in file_ext for ext in self.extensions):
return False
return True
def scan_file(self, filepath: str):
"""Scan a file using the main scanner"""
if not self.should_scan_file(filepath):
return
print(f"\n🔍 New file detected: {filepath}")
self.scanned_files.add(filepath)
# Build scanner command
cmd = [sys.executable, self.scanner_script, "--file", filepath]
if self.config_file:
cmd.extend(["--config", self.config_file])
# Run scanner
try:
result = subprocess.run(cmd, capture_output=False)
if result.returncode != 0:
print(f"Scanner error for {filepath}")
except Exception as e:
print(f"Error running scanner: {e}")
def start_monitoring(self):
"""Start monitoring directories"""
if not self.watch_dirs:
print("Error: No directories to monitor")
return
# Verify directories exist
valid_dirs = []
for directory in self.watch_dirs:
if os.path.isdir(directory):
valid_dirs.append(directory)
else:
print(f"Warning: Directory not found: {directory}")
if not valid_dirs:
print("Error: No valid directories to monitor")
return
print("="*60)
print("Linux Antivirus File Monitor")
print("="*60)
print(f"Monitoring directories:")
for directory in valid_dirs:
print(f" - {directory}")
if self.extensions:
print(f"\nFile extensions filter: {', '.join(self.extensions)}")
else:
print(f"\nMonitoring all file types")
print("\nPress Ctrl+C to stop monitoring\n")
# Set up inotify
wm = pyinotify.WatchManager()
class EventHandler(pyinotify.ProcessEvent):
def __init__(self, monitor):
self.monitor = monitor
super().__init__()
def process_IN_CLOSE_WRITE(self, event):
"""File was written and closed"""
if not event.dir:
# Wait a moment to ensure file is complete
time.sleep(0.5)
self.monitor.scan_file(event.pathname)
def process_IN_MOVED_TO(self, event):
"""File was moved into monitored directory"""
if not event.dir:
time.sleep(0.5)
self.monitor.scan_file(event.pathname)
handler = EventHandler(self)
notifier = pyinotify.Notifier(wm, handler)
# Add watches
mask = pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO
for directory in valid_dirs:
wm.add_watch(directory, mask, rec=False)
# Start monitoring
try:
notifier.loop()
except KeyboardInterrupt:
print("\n\nStopping file monitor...")
notifier.stop()
def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="File Monitor for Linux Antivirus Scanner",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Monitor Downloads directory
%(prog)s --directory ~/Downloads
# Monitor multiple directories
%(prog)s --directory ~/Downloads /tmp/downloads
# Monitor with extension filter
%(prog)s --directory ~/Downloads --extensions .deb .appimage
# Use config file settings
%(prog)s --config ~/.linux-av.conf
"""
)
parser.add_argument("--config", "-c",
default=os.path.expanduser("~/.linux-av.conf"),
help="Configuration file path")
parser.add_argument("--directory", "-d",
nargs="+",
help="Directories to monitor")
parser.add_argument("--extensions", "-e",
nargs="+",
help="File extensions to scan (e.g., .deb .appimage)")
args = parser.parse_args()
# Prepare watch directories
watch_dirs = []
if args.directory:
watch_dirs = [os.path.expanduser(d) for d in args.directory]
# Prepare extensions
extensions = []
if args.extensions:
extensions = [ext if ext.startswith('.') else f'.{ext}' for ext in args.extensions]
# Create and start monitor
monitor = FileMonitor(
config_file=args.config if os.path.exists(args.config) else None,
watch_dirs=watch_dirs,
extensions=extensions
)
monitor.start_monitoring()
return 0
if __name__ == "__main__":
sys.exit(main())