Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 81 additions & 2 deletions src/nethsec/firewall/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,17 @@ def add_zone(uci, name: str, input: str, forward: str, traffic_to_wan: bool = Fa
uci.set('firewall', zone_config_name, 'input', input)
uci.set('firewall', zone_config_name, 'forward', forward)
uci.set('firewall', zone_config_name, 'output', 'ACCEPT')
uci.set('firewall', zone_config_name, 'log', '1' if log else '0')
if log:
uci.set('firewall', zone_config_name, 'log', '1')
log_opts = get_default_logging_options(uci)
uci.set('firewall', zone_config_name, 'log_limit', log_opts['zone_log_limit'])
else:
uci.set('firewall', zone_config_name, 'log', '0')
try:
uci.delete('firewall', zone_config_name, 'log_limit')
except:
Comment thread
gsanchietti marked this conversation as resolved.
pass


forwardings_added = set()

Expand Down Expand Up @@ -846,6 +856,18 @@ def edit_zone(uci, name: str, input: str, forward: str, traffic_to_wan: bool = F
uci.set('firewall', zone_config_name, 'input', input)
uci.set('firewall', zone_config_name, 'forward', forward)
uci.set('firewall', zone_config_name, 'output', 'ACCEPT')
if log:
uci.set('firewall', zone_config_name, 'log', '1')
if uci.get('firewall', zone_config_name, 'log_limit', default=None) is None:
# set default log limit, do not overwrite existing value
log_opts = get_default_logging_options(uci)
uci.set('firewall', zone_config_name, 'log_limit', log_opts['zone_log_limit'])
else:
uci.set('firewall', zone_config_name, 'log', '0')
try:
uci.delete('firewall', zone_config_name, 'log_limit')
except:
pass
uci.set('firewall', zone_config_name, 'log', '1' if log else '0')

Comment thread
gsanchietti marked this conversation as resolved.
# delete old forwardings
Expand Down Expand Up @@ -1542,7 +1564,18 @@ def setup_rule(uci, id: str, name: str, src: str, src_ip: list[str], dest: str,
pass

uci.set('firewall', id, 'enabled', '1' if enabled else '0')
uci.set('firewall', id, 'log', '1' if log else '0')
if log:
uci.set('firewall', id, 'log', '1')
if uci.get('firewall', id, 'log_limit', default=None) is None:
# set default log limit, do not overwrite existing value
log_opts = get_default_logging_options(uci)
uci.set('firewall', id, 'log_limit', log_opts['rule_log_limit'])
else:
uci.set('firewall', id, 'log', '0')
try:
uci.delete('firewall', id, 'log_limit')
except:
Comment thread
gsanchietti marked this conversation as resolved.
pass
uci.set('firewall', id, 'ns_tag', tag)
if ns_src:
uci.set('firewall', id, 'ns_src', ns_src)
Expand Down Expand Up @@ -2047,3 +2080,49 @@ def list_object_suggestions(uci, expand = False):
a list of all objects, each object is a dict with keys value, label, type
"""
return objects.list_all_objects(uci, expand)

def get_default_logging_options(uci):
"""
Get default logging options

Args:
uci: EUci pointer

Returns:
a dict with keys rule_log_limit, zone_log_limit, redirect_log_limit
"""

ret = {}
defaults = uci.get('firewall', 'ns_defaults', 'rule_log_limit', default=None)
if defaults is None:
# Set defaults, if needed
uci.set('firewall', 'ns_defaults', 'defaults')
uci.set('firewall', 'ns_defaults', 'rule_log_limit', '1/s')
uci.set('firewall', 'ns_defaults', 'zone_log_limit', '5/s')
uci.set('firewall', 'ns_defaults', 'redirect_log_limit', '1/s')
uci.save('firewall')

for key in ['rule_log_limit', 'zone_log_limit', 'redirect_log_limit']:
ret[key] = uci.get('firewall', 'ns_defaults', key)

return ret

def apply_default_logging_options(uci):
"""
Walk all rules, zones and redirect rules and set logging options to default

Args:
uci: EUci pointer

Returns:
None
"""
log_opts = get_default_logging_options(uci)
for section in uci.get_all("firewall"):
if uci.get('firewall', section) == 'rule' and uci.get('firewall', section, 'log', default='0') == '1':
uci.set('firewall', section, 'log_limit', log_opts['rule_log_limit'])
elif uci.get('firewall', section) == 'zone' and uci.get('firewall', section, 'log', default='0') == '1':
uci.set('firewall', section, 'log_limit', log_opts['zone_log_limit'])
elif uci.get('firewall', section) == 'redirect' and uci.get('firewall', section, 'log', default='0') == '1':
uci.set('firewall', section, 'log_limit', log_opts['redirect_log_limit'])
uci.save('firewall')
72 changes: 70 additions & 2 deletions tests/test_firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@
option masq '1'
option mtu_fix '1'

config zone with_log
option name 'with_log'
option input 'DROP'
option output 'DROP'
option forward 'DROP'
option log '1'

config forwarding fw1
option src 'lan'
option dest 'wan'
Expand Down Expand Up @@ -149,6 +156,19 @@
config rule 'r4'
option ns_dst ''
option ns_src ''

config redirect 'ns_59eea167'
option src 'wan'
option target 'DNAT'
option dest_ip '192.168.1.22'
option dest_port '4466'
option enabled '1'
option log '1'
option name 'pf1'
list proto 'tcp'
list proto 'udp'
option reflection '0'
option src_dport '4455'
"""

network_db = """
Expand Down Expand Up @@ -764,7 +784,7 @@ def test_add_zone(u):
assert u.get("firewall", "ns_guest2new_zone", "dest") == "new_zone"
assert firewall.add_zone(u, "new_zone_with_log", "REJECT", "DROP", True, ["lan"], ["lan", "guest"], True)
assert u.get("firewall", "ns_new_zone_with_log", "log") == "1"

assert u.get("firewall", "ns_new_zone_with_log", "log_limit") == "5/s"

def test_edit_zone(u):
assert firewall.edit_zone(u, "new_zone", "DROP", "ACCEPT", False, ["lan"], ["lan", "guest"]) == (
Expand Down Expand Up @@ -907,6 +927,7 @@ def test_add_rule(u, mocker):
assert u.get("firewall", rid, "ns_service") == "ssh"
assert u.get("firewall", rid, "enabled") == "1"
assert u.get("firewall", rid, "log") == "1"
assert u.get("firewall", rid, "log_limit") == "1/s"
assert u.get_all("firewall", rid, "ns_tag") == ("tag1",)
assert u.get("firewall", rid, "ns_link", default="notpresent") == "notpresent"

Expand Down Expand Up @@ -1160,4 +1181,51 @@ def test_edit_rule_remove_object(u):
u.get("firewall", idf1, "src_ip")
assert u.get('firewall', idf1, 'ns_src', default='NONE') == 'NONE'
firewall.delete_rule(u, idf1)
objects.delete_host_set(u, host1)
objects.delete_host_set(u, host1)

def test_get_default_logging_options(u):
logging_options = firewall.get_default_logging_options(u)
assert logging_options['rule_log_limit'] == '1/s'
assert logging_options['zone_log_limit'] == '5/s'
assert logging_options['redirect_log_limit'] == '1/s'

def test_rule_default_log_limit(u):
rid = firewall.add_rule(u, 'myrule3', 'lan', ['192.168.1.22'], 'wan', ['1.2.3.4'], ['tcp', 'udp'], '443', 'ACCEPT', "", log=True)
assert u.get("firewall", rid, "log_limit") == "1/s"

def test_change_rule_default_log_limit(u):
u.set('firewall', 'ns_defaults', 'rule_log_limit', '2/s')
u.save('firewall')
rid = firewall.add_rule(u, 'myrule4', 'lan', ['192.168.1.22'], 'wan', ['1.2.3.4'], ['tcp', 'udp'], '443', 'ACCEPT', "", log=True)
assert u.get("firewall", rid, "log_limit") == "2/s"

def test_assert_custom_log_limit(u):
# Check rule
rid = firewall.add_rule(u, 'myrule5', 'lan', ['192.168.1.22'], 'wan', ['1.2.3.4'], ['tcp', 'udp'], '443', 'ACCEPT', "", log=True)
u.set('firewall', rid, 'log_limit', '123/s')
u.save('firewall')
rid = firewall.edit_rule(u, rid, 'myrule5', 'lan', ['192.168.1.22'], 'wan', ['1.2.3.4'], ['tcp', 'udp'], '443', 'ACCEPT', "", log=True)
assert u.get("firewall", rid, "log_limit") == "123/s"
# Check zone
(zid, forwardings) = firewall.add_zone(u, "ztwl", "REJECT", "DROP", log=True)
print(zid)
u.set('firewall', zid, 'log_limit', '123/s')
u.save('firewall')
firewall.edit_zone(u, zid, "DROP", "ACCEPT", log=True)
assert u.get('firewall', zid, 'log_limit') == '123/s'

def test_change_zone_default_log_limit(u):
u.set("firewall", "ns_defaults", "zone_log_limit", "20/s")
u.save('firewall')
(zid, forwardings) = firewall.add_zone(u, "new_zone3", "REJECT", "DROP", True, ["lan"], ["lan", "guest"], True)
assert u.get("firewall", zid, "log_limit") == "20/s"

def test_apply_default_logging_options(u):
firewall.apply_default_logging_options(u)
assert u.get("firewall", "ns_59eea167", "log_limit") == "1/s"
assert u.get("firewall", "f1", "log_limit") == "2/s"
assert u.get("firewall", "with_log", "log_limit") == "20/s"
assert u.get("firewall", "grey", "log_limit", default=None) == None
assert u.get("firewall", "o1", "log_limit", default=None) == None
assert u.get("firewall", "redirect3", "log_limit", default=None) == None

2 changes: 1 addition & 1 deletion tests/test_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ def test_fact_firewall_stats(tmp_path):
assert result['firewall']['nat']['accept'] == 2
assert result['firewall']['netmap']['source'] == 3
assert result['firewall']['netmap']['destination'] == 2
assert result['firewall']['rules']['forward'] == 12
assert result['firewall']['rules']['forward'] == 15
assert result['firewall']['rules']['input'] == 7
assert result['firewall']['rules']['output'] == 2

Expand Down