Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 44 additions & 35 deletions test/case/syslog/hostname_filter/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"""

import infamy
import time
import infamy.iface as iface
from infamy import until

TEST_MESSAGES = [
("router1", "Message from router1"),
Expand All @@ -16,6 +17,27 @@
("other", "Message from a different host"),
]

def verify_log_content(ssh, logfile, expected_hosts, test):
"""Verify log file contains only messages from expected hosts."""
def check_log():
rc = ssh.runsh(f"cat /var/log/{logfile} 2>/dev/null")
log_content = rc.stdout if rc.returncode == 0 else ""

expected_messages = [msg for host, msg in TEST_MESSAGES if host in expected_hosts]
missing = [msg for msg in expected_messages if msg not in log_content]
if missing:
return False

if expected_hosts: # Only check for unwanted if filtering by hostname
unwanted_messages = [msg for host, msg in TEST_MESSAGES if host not in expected_hosts]
found = [msg for msg in unwanted_messages if msg in log_content]
if found:
test.fail(f"Found unwanted messages in /var/log/{logfile}: {found}")

return True

until(check_log, attempts=20)

with infamy.Test() as test:
with test.step("Set up topology and attach to DUTs"):
env = infamy.Env()
Expand Down Expand Up @@ -91,6 +113,15 @@
}
})

with test.step("Wait for server interface to be operational"):
until(lambda: iface.get_param(server, server_link, "oper-status") == "up", attempts=20)

with test.step("Verify server IP address is configured"):
until(lambda: iface.address_exist(server, server_link, "10.0.0.1", proto="static"), attempts=20)

with test.step("Verify syslog server is listening on UDP port 514"):
until(lambda: "10.0.0.1:514" in serverssh.runsh("ss -ulnp 2>/dev/null | grep :514 || true").stdout, attempts=20)

with test.step("Configure client to forward logs to server"):
_, client_link = env.ltop.xlate("client", "link")

Expand Down Expand Up @@ -131,49 +162,27 @@
}
})

time.sleep(2)
with test.step("Wait for client interface to be operational"):
until(lambda: iface.get_param(client, client_link, "oper-status") == "up", attempts=20)

with test.step("Verify client IP address is configured"):
until(lambda: iface.address_exist(client, client_link, "10.0.0.2", proto="static"), attempts=20)

with test.step("Verify network connectivity between client and server"):
until(lambda: clientssh.runsh("ping -c 1 -W 1 10.0.0.1 >/dev/null 2>&1").returncode == 0, attempts=10)

with test.step("Send log messages with different hostnames"):
for hostname, message in TEST_MESSAGES:
clientssh.runsh(f"logger -t test -p daemon.info -H {hostname} -h 10.0.0.1 '{message}'")
time.sleep(2)

with test.step("Verify router1 log contains only router1 messages"):
rc = serverssh.runsh("cat /var/log/router1 2>/dev/null")
log_content = rc.stdout if rc.returncode == 0 else ""

router1_messages = [msg for host, msg in TEST_MESSAGES if host == "router1"]
missing = [msg for msg in router1_messages if msg not in log_content]
if missing:
test.fail(f"Missing router1 messages in /var/log/router1: {missing}")

unwanted_messages = [msg for host, msg in TEST_MESSAGES if host != "router1"]
found = [msg for msg in unwanted_messages if msg in log_content]
if found:
test.fail(f"Found unwanted messages in /var/log/router1: {found}")
verify_log_content(serverssh, "router1", ["router1"], test)

with test.step("Verify router2 log contains only router2 messages"):
rc = serverssh.runsh("cat /var/log/router2 2>/dev/null")
log_content = rc.stdout if rc.returncode == 0 else ""

router2_messages = [msg for host, msg in TEST_MESSAGES if host == "router2"]
missing = [msg for msg in router2_messages if msg not in log_content]
if missing:
test.fail(f"Missing router2 messages in /var/log/router2: {missing}")

unwanted_messages = [msg for host, msg in TEST_MESSAGES if host != "router2"]
found = [msg for msg in unwanted_messages if msg in log_content]
if found:
test.fail(f"Found unwanted messages in /var/log/router2: {found}")
verify_log_content(serverssh, "router2", ["router2"], test)

with test.step("Verify all-hosts log contains all messages"):
rc = serverssh.runsh("cat /var/log/all-hosts 2>/dev/null")
log_content = rc.stdout if rc.returncode == 0 else ""

expected_messages = [msg for _, msg in TEST_MESSAGES]
missing = [msg for msg in expected_messages if msg not in log_content]

if missing:
test.fail(f"Missing messages in /var/log/all-hosts: {missing}")
all_hosts = [host for host, _ in TEST_MESSAGES]
verify_log_content(serverssh, "all-hosts", all_hosts, test)

test.succeed()