24
24
25
25
import ast
26
26
import sys
27
+ import errno
27
28
import socket
28
29
import logging
29
30
import subprocess
30
31
31
- import ipaddress
32
+ from os import rename
32
33
33
34
from ctypes import *
34
- from fcntl import ioctl , flock , LOCK_EX , LOCK_UN
35
+ from fcntl import ioctl , flock , LOCK_EX , LOCK_NB , LOCK_UN
35
36
36
37
from time import sleep , time
37
38
from json import loads
38
39
from yaml import safe_load
39
40
from redis import StrictRedis
41
+
40
42
from threading import Thread , Event
41
43
from service import find_syslog , Service
42
44
from logging .handlers import SysLogHandler
43
45
46
+
44
47
CONFIG_LOCATION = "/etc/pfui_firewall.yml"
45
48
46
49
# Constants
@@ -220,9 +223,11 @@ def db_push(logger, log: bool, db, table: str, data: list):
220
223
for ip , ttl in data :
221
224
key = "{}{}{}" .format (table , "^" , ip )
222
225
if ttl < now : # Real TTL
226
+ if ttl < 3600 : # Always allow for min 1 hour
227
+ ttl = 3600
223
228
pipe .hmset (key , {'epoch' : now , 'ttl' : ttl })
224
229
else : # Cached entry TTL = Future Expiry Epoch
225
- pipe .hmset (key , {'epoch' : now }) # TODO: Risk of records with no ttl..
230
+ pipe .hmset (key , {'epoch' : now , 'expires' : ttl })
226
231
pipe .execute ()
227
232
return True
228
233
except Exception as e :
@@ -244,42 +249,55 @@ def db_pop(logger, log: bool, db, table: str, ip: str):
244
249
def file_push (logger , log : bool , file : str , ip_list : list ):
245
250
if log :
246
251
logger .info ("PFUIFW: Installing {} into file {}" .format (ip_list , file ))
247
- inserts = []
252
+ unique = []
248
253
try :
249
- with open (file , "r+" ) as f :
250
- for ip in ip_list :
251
- found = next ((l for l in f if "{}\n " .format (ip ) == l ), False )
252
- if ip and not found :
253
- inserts .append (ip )
254
- inserts .append ("\n " )
254
+ with open (file , "r+" ) as f : # Open for reading and writing with pointer at beginning
255
+ while True :
256
+ try :
257
+ flock (f , LOCK_EX | LOCK_NB )
258
+ break
259
+ except IOError as e :
260
+ if e .errno != errno .EAGAIN :
261
+ raise # raise other file access issues
262
+ else :
263
+ if log :
264
+ logger .info ("PFUIFW: File {} Locked." .format (file ))
265
+ sleep (0.001 ) # 1ms
266
+ lines = f .readlines ()
267
+ unique = ["{}\n " .format (ip ) for ip in ip_list if "{}\n " .format (ip ) not in lines ] # Check not exists
255
268
try :
256
- flock (f , LOCK_EX )
257
- f .write ("" .join (inserts )) # append missing
269
+ f .write ("" .join (unique )) # append new
258
270
except Exception as e :
259
271
logger .error ("PFUIFW: f.write error {}" .format (e ))
260
- finally :
261
- flock (f , LOCK_UN )
272
+ flock (f , LOCK_UN )
262
273
return True
263
274
except Exception as e :
264
- logger .error ("PFUIFW: Failed to install {} to file {}. {}" .format (inserts , file , e ))
275
+ logger .error ("PFUIFW: Failed to install {} to file {}. {}" .format (unique , file , e ))
265
276
return False
266
277
267
278
268
279
def file_pop (logger , log : bool , file : str , ip : str ):
280
+ # TODO: Implement multiple parallel deletes to reduce disk IO (requires rework of Scanner)
269
281
if log :
270
282
logger .info ("PFUIFW: Clearing {} from file {}" .format (ip , file ))
271
283
try :
272
- with open (file , "r" ) as f :
273
- lines = f .readlines ()
274
- nlines = [l for l in lines if l not in [ip , "\n " , "" ]]
275
- with open (file , "w" ) as f :
276
- try :
277
- flock (f , LOCK_EX )
278
- f .writelines (nlines )
279
- except Exception as e :
280
- logger .error ("PFUIFW: f.writelines error {}" .format (e ))
281
- finally :
282
- flock (f , LOCK_UN )
284
+ with open (file , "r" ) as f , open (file + "~" , "w" ) as tmp :
285
+ lines = [l for l in f if l not in ["{}\n " .format (ip ), "" ]]
286
+ lines = list (dict .fromkeys (lines )) # Strip dups
287
+ tmp .writelines (lines )
288
+ while True : # Set lock - blocking
289
+ try :
290
+ flock (f , LOCK_EX | LOCK_NB )
291
+ break
292
+ except IOError as e :
293
+ if e .errno != errno .EAGAIN :
294
+ raise # raise other file access issues
295
+ else :
296
+ if log :
297
+ logger .info ("PFUIFW: File {} Locked." .format (file ))
298
+ sleep (0.001 ) # 1ms
299
+ rename (file + "~" , file )
300
+ flock (f , LOCK_UN )
283
301
except Exception as e :
284
302
logger .error ("PFUIFW: Failed to delete IP {} from {}. {}" .format (ip , file , e ))
285
303
return False
@@ -349,19 +367,31 @@ def scan_redis_db(self):
349
367
""" Expire IPs with last update epoch/timestamp older than (TTL * TTL_MULTIPLIER). """
350
368
if self .cfg ['LOGGING' ]:
351
369
self .logger .info ("PFUIFW: Scan DB({}) for expiring {} IPs." .format (self .cfg ['REDIS_DB' ], self .table ))
370
+ now = int (time ())
352
371
try :
353
372
keys = self .db .keys ("{}*" .format (self .table ))
354
373
except Exception as e :
355
- self .logger .error ("PFUIFW: Failed to read keys from Redis. {}" .format (e ))
356
- now = int ( time ())
374
+ self .logger .error ("PFUIFW: Failed to get keys from Redis. {}" .format (e ))
375
+ return
357
376
for k in keys :
377
+ db_last , db_ttl , db_expires = 0 , 0 , 0
358
378
try :
359
379
v = self .db .hgetall (k )
360
- ttl = int (v [b'ttl' ].decode ('utf-8' ))
361
- db_epoch = int (v [b'epoch' ].decode ('utf-8' ))
362
- except :
363
- db_epoch , ttl = now , 0
364
- if db_epoch <= now - (ttl * self .cfg ['TTL_MULTIPLIER' ]):
380
+ db_last = int (v [b'epoch' ].decode ('utf-8' ))
381
+ db_ttl = int (v [b'ttl' ].decode ('utf-8' ))
382
+ except KeyError as e :
383
+ self .logger .error ("PFUIFW: Metadata not found! Trying 'expires' timestamp. {}" .format (e ))
384
+ try :
385
+ db_expires = int (v [b'expires' ].decode ('utf-8' ))
386
+ except KeyError as e :
387
+ self .logger .error ("PFUIFW: No 'expires' found either! {}" .format (e ))
388
+ continue
389
+ if db_expires is None or db_expires <= now :
390
+ db_last , db_ttl = now , 0
391
+ except Exception as e :
392
+ self .logger .error ("PFUIFW: Exception getting key '{}' values. {}" .format (k , e ))
393
+ continue
394
+ if db_last <= now - (db_ttl * self .cfg ['TTL_MULTIPLIER' ]):
365
395
ip = k .decode ('utf-8' ).split ("^" )[1 ]
366
396
if self .cfg ['LOGGING' ]:
367
397
self .logger .info ("PFUIFW: TTL Expired for IP {}" .format (ip ))
@@ -509,7 +539,7 @@ def run(self):
509
539
if self .cfg ['SOCKET_PROTO' ] == "UDP" :
510
540
self .soc = socket .socket (socket .AF_INET , socket .SOCK_DGRAM ) # UDP Datagram Socket
511
541
self .soc .setsockopt (socket .SOL_SOCKET , socket .SO_REUSEADDR , True )
512
- self .soc .setsockopt (socket .SOL_SOCKET , socket .SO_SNDBUF , 36 )
542
+ self .soc .setsockopt (socket .SOL_SOCKET , socket .SO_SNDBUF , 36 ) # 'ACK' = 36bytes
513
543
self .soc .settimeout (self .cfg ['SOCKET_TIMEOUT' ]) # accept() & recv() blocking timeouts
514
544
self .soc .bind ((self .cfg ['SOCKET_LISTEN' ], self .cfg ['SOCKET_PORT' ]))
515
545
while not self .got_sigterm (): # Watch Socket until Signal
@@ -615,8 +645,7 @@ def disconnect(proto, soc, conn):
615
645
ntime = time ()
616
646
self .logger .info ("PFUIFW: Received {} from {}:{} ({})" .format (data , ip , port , proto ))
617
647
618
- # Get Valid Data
619
- af4_data , af6_data = [], []
648
+ # Guard Statements
620
649
if isinstance (data , dict ):
621
650
try :
622
651
af4_data = [(rr ['ip' ], rr ['ttl' ]) for rr in data ['AF4' ] if is_ipv4 (rr ['ip' ]) and rr ['ttl' ]]
0 commit comments