Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 311 additions & 0 deletions tests/test_inventory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from euci import EUci
from unittest.mock import patch, mock_open, MagicMock
import subprocess

from nethsec import inventory

Expand Down Expand Up @@ -967,3 +969,312 @@ def test_fact_adblock(tmp_path):
u = _setup_db(tmp_path)
result = inventory.fact_adblock(u)
assert result == {"enabled": True, "community": 5, "enterprise": 2}

# Tests for info_* helper functions

def test_info_kernel_version():
"""Test info_kernel_version reads from /proc/version"""
u = EUci()

# Test successful read
mock_version = "Linux version 5.10.176 (builder@nethsec) (gcc version 11.2.0) #0 SMP Mon Jan 1 00:00:00 2024\n"
with patch('builtins.open', mock_open(read_data=mock_version)):
result = inventory.info_kernel_version(u)
assert result == "5.10.176"

# Test exception handling (file not found)
with patch('builtins.open', side_effect=FileNotFoundError):
result = inventory.info_kernel_version(u)
assert result == ''

def test_info_uptime_seconds():
"""Test info_uptime_seconds reads from /proc/uptime"""
u = EUci()

# Test successful read
mock_uptime = "12345.67 98765.43\n"
with patch('builtins.open', mock_open(read_data=mock_uptime)):
result = inventory.info_uptime_seconds(u)
assert result == 12345

# Test exception handling
with patch('builtins.open', side_effect=FileNotFoundError):
result = inventory.info_uptime_seconds(u)
assert result == 0

def test_info_fqdn(tmp_path):
"""Test info_fqdn retrieves hostname from UCI"""
# Setup system config
system_db = """
config system
option hostname 'myhost'
option domain 'example.com'
"""
with tmp_path.joinpath('system').open('w') as fp:
fp.write(system_db)
with tmp_path.joinpath('ns-plug').open('w') as fp:
fp.write("config config\n\toption type 'no'\n")

u = EUci(confdir=tmp_path.as_posix())
result = inventory.info_fqdn(u)
# Should be anonymized since no subscription
assert result.startswith('anon-')

def test_info_default_ipv4_dig_method(tmp_path):
"""Test info_default_ipv4 with dig method"""
with tmp_path.joinpath('ns-plug').open('w') as fp:
fp.write("config config\n\toption type 'no'\n")
u = EUci(confdir=tmp_path.as_posix())

# Mock successful dig command
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '"192.0.2.1"\n'

with patch('subprocess.run', return_value=mock_result):
result = inventory.info_default_ipv4(u)
assert result.startswith('anon-') # Should be anonymized

def test_info_default_ipv4_curl_fallback(tmp_path):
"""Test info_default_ipv4 falls back to curl when dig fails"""
with tmp_path.joinpath('ns-plug').open('w') as fp:
fp.write("config config\n\toption type 'no'\n")
u = EUci(confdir=tmp_path.as_posix())

# Mock dig failure and curl success
def mock_subprocess_run(cmd, **kwargs):
result = MagicMock()
if 'dig' in cmd:
result.returncode = 1
result.stdout = ''
elif 'curl' in cmd:
result.returncode = 0
result.stdout = '203.0.113.1\n'
else:
result.returncode = 1
result.stdout = ''
return result

with patch('subprocess.run', side_effect=mock_subprocess_run):
result = inventory.info_default_ipv4(u)
assert result.startswith('anon-')

def test_info_default_ipv4_ip_command_fallback(tmp_path):
"""Test info_default_ipv4 falls back to ip command when dig and curl fail"""
with tmp_path.joinpath('network').open('w') as fp:
fp.write(network_db)
with tmp_path.joinpath('firewall').open('w') as fp:
fp.write(firewall_db)
with tmp_path.joinpath('ns-plug').open('w') as fp:
fp.write("config config\n\toption type 'no'\n")
u = EUci(confdir=tmp_path.as_posix())

# Mock dig and curl failure, ip command success
def mock_subprocess_run(cmd, **kwargs):
result = MagicMock()
if isinstance(cmd, list) and 'ip' in cmd:
result.returncode = 0
result.stdout = '[{"addr_info": [{"family": "inet", "local": "198.51.100.1"}]}]'
else:
result.returncode = 1
result.stdout = ''
return result

with patch('subprocess.run', side_effect=mock_subprocess_run):
with patch('nethsec.utils.get_all_wan_devices', return_value=['eth0']):
result = inventory.info_default_ipv4(u)
assert result.startswith('anon-')

def test_info_default_ipv4_all_methods_fail(tmp_path):
"""Test info_default_ipv4 returns empty string when all methods fail"""
with tmp_path.joinpath('ns-plug').open('w') as fp:
fp.write("config config\n\toption type 'no'\n")
u = EUci(confdir=tmp_path.as_posix())

# Mock all methods failing
mock_result = MagicMock()
mock_result.returncode = 1
mock_result.stdout = ''

with patch('subprocess.run', return_value=mock_result):
with patch('nethsec.utils.get_all_wan_devices', return_value=[]):
result = inventory.info_default_ipv4(u)
assert result == ''

def test_info_default_ipv6_no_wan_ipv6(tmp_path):
"""Test info_default_ipv6 returns empty when no IPv6 on WAN"""
with tmp_path.joinpath('ns-plug').open('w') as fp:
fp.write("config config\n\toption type 'no'\n")
u = EUci(confdir=tmp_path.as_posix())

# Mock no WAN devices or no IPv6
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '[]'

with patch('subprocess.run', return_value=mock_result):
with patch('nethsec.utils.get_all_wan_devices', return_value=['eth0']):
result = inventory.info_default_ipv6(u)
assert result == ''

def test_info_default_ipv6_with_wan_ipv6(tmp_path):
"""Test info_default_ipv6 with IPv6 on WAN"""
with tmp_path.joinpath('ns-plug').open('w') as fp:
fp.write("config config\n\toption type 'no'\n")
u = EUci(confdir=tmp_path.as_posix())

# Mock WAN with IPv6
def mock_subprocess_run(cmd, **kwargs):
result = MagicMock()
if isinstance(cmd, list) and 'ip' in cmd and '-6' in cmd:
result.returncode = 0
result.stdout = '[{"addr_info": [{"family": "inet6", "local": "2001:db8::1", "scope": "global"}]}]'
elif isinstance(cmd, list) and 'dig' in cmd:
result.returncode = 0
result.stdout = '"2001:db8::2"\n'
else:
result.returncode = 1
result.stdout = ''
return result

with patch('subprocess.run', side_effect=mock_subprocess_run):
with patch('nethsec.utils.get_all_wan_devices', return_value=['eth0']):
result = inventory.info_default_ipv6(u)
assert result.startswith('anon-')

def test_info_default_ipv6_fallback_to_curl(tmp_path):
"""Test info_default_ipv6 falls back to curl when dig fails"""
with tmp_path.joinpath('ns-plug').open('w') as fp:
fp.write("config config\n\toption type 'no'\n")
u = EUci(confdir=tmp_path.as_posix())

# Mock WAN with IPv6, dig fails, curl succeeds
def mock_subprocess_run(cmd, **kwargs):
result = MagicMock()
if isinstance(cmd, list) and 'ip' in cmd and '-6' in cmd:
result.returncode = 0
result.stdout = '[{"addr_info": [{"family": "inet6", "local": "2001:db8::1", "scope": "global"}]}]'
elif isinstance(cmd, list) and 'curl' in cmd:
result.returncode = 0
result.stdout = '2001:db8::3\n'
else:
result.returncode = 1
result.stdout = ''
return result

with patch('subprocess.run', side_effect=mock_subprocess_run):
with patch('nethsec.utils.get_all_wan_devices', return_value=['eth0']):
result = inventory.info_default_ipv6(u)
assert result.startswith('anon-')

def test_info_package_updates_available_with_updates():
"""Test info_package_updates_available when updates are available"""
u = EUci()

mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '{"updates": ["package1", "package2"]}'

with patch('subprocess.run', return_value=mock_result):
result = inventory.info_package_updates_available(u)
assert result is True

def test_info_package_updates_available_no_updates():
"""Test info_package_updates_available when no updates available"""
u = EUci()

mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '{"updates": []}'

with patch('subprocess.run', return_value=mock_result):
result = inventory.info_package_updates_available(u)
assert result is False

def test_info_package_updates_available_command_fails():
"""Test info_package_updates_available when command fails"""
u = EUci()

mock_result = MagicMock()
mock_result.returncode = 1
mock_result.stdout = ''

with patch('subprocess.run', return_value=mock_result):
result = inventory.info_package_updates_available(u)
assert result is False

def test_parse_version_with_prefix():
"""Test parse_version removes NethSecurity prefix"""
assert inventory.parse_version('NethSecurity 8.1.0-dev') == (8, 1, 0)
assert inventory.parse_version('NethSecurity 8.2.1') == (8, 2, 1)

def test_parse_version_without_prefix():
"""Test parse_version handles versions without prefix"""
assert inventory.parse_version('8.1.0-dev') == (8, 1, 0)
assert inventory.parse_version('8.2.1') == (8, 2, 1)

def test_parse_version_comparison():
"""Test parse_version version comparison logic"""
v1 = inventory.parse_version('8.1.0')
v2 = inventory.parse_version('8.2.0')
v3 = inventory.parse_version('8.1.1')

assert v2 > v1
assert v3 > v1
assert v2 > v3

def test_parse_version_invalid():
"""Test parse_version handles invalid input"""
result = inventory.parse_version('invalid')
assert result == ()

result = inventory.parse_version('')
assert result == ()

def test_info_image_updates_available_update_exists():
"""Test info_image_updates_available when newer version exists"""
u = EUci()

mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '{"currentVersion": "NethSecurity 8.1.0", "lastVersion": "NethSecurity 8.2.0"}'

with patch('subprocess.run', return_value=mock_result):
result = inventory.info_image_updates_available(u)
assert result is True

def test_info_image_updates_available_no_update():
"""Test info_image_updates_available when current version is latest"""
u = EUci()

mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '{"currentVersion": "NethSecurity 8.2.0", "lastVersion": "NethSecurity 8.2.0"}'

with patch('subprocess.run', return_value=mock_result):
result = inventory.info_image_updates_available(u)
assert result is False

def test_info_image_updates_available_current_newer():
"""Test info_image_updates_available when current version is newer than last"""
u = EUci()

mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '{"currentVersion": "NethSecurity 8.3.0", "lastVersion": "NethSecurity 8.2.0"}'

with patch('subprocess.run', return_value=mock_result):
result = inventory.info_image_updates_available(u)
assert result is False

def test_info_image_updates_available_command_fails():
"""Test info_image_updates_available when command fails"""
u = EUci()

mock_result = MagicMock()
mock_result.returncode = 1
mock_result.stdout = ''

with patch('subprocess.run', return_value=mock_result):
result = inventory.info_image_updates_available(u)
assert result is False