Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
fb419de
Add dual host ffmpeg tests for various video formats and configurations
Falron98 Aug 11, 2025
ef780d0
Fix dual basic tests
Falron98 Aug 12, 2025
1e1cab7
Fix dual ffmpeg tests;
Falron98 Aug 12, 2025
9bac7a5
Fix linter issues
Falron98 Aug 12, 2025
442b439
Add: netsniff-ng capturing capabilities based on already-created tcpd…
MateuszGrabuszynski Aug 13, 2025
57dffb1
Doc: Modifying comments about packet capture
MateuszGrabuszynski Aug 13, 2025
bda5c5a
Add: netsniff-ng can be executed in all tests where tcpdump capture w…
MateuszGrabuszynski Aug 13, 2025
7a31926
Lint: applying isort recommendations
MateuszGrabuszynski Aug 13, 2025
d6fcab1
Fix: improper order in if-else oneliner
MateuszGrabuszynski Aug 13, 2025
0258dfe
Merge branch 'ffmpeg-dual' into netsniff-autom
MateuszGrabuszynski Aug 18, 2025
565f4cb
Test: Fixing the netsniff-ng preparation
MateuszGrabuszynski Aug 18, 2025
ccb3361
Test: Add netsniff-ng to dual-node tests
MateuszGrabuszynski Aug 18, 2025
4f6754f
Removing unnecessary ands from comments; Removing unnecessarily added…
MateuszGrabuszynski Aug 18, 2025
f4631c0
filter -> self.filter so no -f None is present in netsniff-ng's command
MateuszGrabuszynski Aug 19, 2025
e964bd3
Adding missing quotation marks around filter, removing unnecessary de…
MateuszGrabuszynski Aug 19, 2025
46ddf17
Add filtering to netsniff-ng, so only the test streams are captured, …
MateuszGrabuszynski Aug 19, 2025
1d2b2f3
Adding datetime into generated filename
MateuszGrabuszynski Aug 19, 2025
76545e2
Fixing lack of space in netsniff-ng filter
MateuszGrabuszynski Aug 20, 2025
01bad5b
filter -> capture_filter to avoid shadowing over Python's filter func…
MateuszGrabuszynski Aug 20, 2025
a77c7bd
Documentation for netsniff installation
MateuszGrabuszynski Aug 20, 2025
48bec15
Adding usage docs
MateuszGrabuszynski Aug 22, 2025
2b05644
Merge branch 'main' into netsniff-autom
MateuszGrabuszynski Aug 22, 2025
914a2db
Changing ramdisk fixture so it is universal
MateuszGrabuszynski Aug 22, 2025
59f464f
Changing netsniff.capture() to netsniff.start() so the capture is non…
MateuszGrabuszynski Aug 22, 2025
ebacb84
Compliance uploads and checks added (fixes required)
MateuszGrabuszynski Aug 29, 2025
37c13fc
Merge branch 'main' into netsniff-autom
MateuszGrabuszynski Aug 29, 2025
efcadb7
Re-adding netsniff packet capture after wrongly removing it during me…
MateuszGrabuszynski Sep 1, 2025
ae6c12f
Adding pcap_capture fixture stub
MateuszGrabuszynski Sep 1, 2025
da56832
Moving the capturing process into a fixture
MateuszGrabuszynski Sep 2, 2025
c69405a
Working EBU LIST compliance check upload
MateuszGrabuszynski Sep 2, 2025
8ed9547
Fixing linter issues
MateuszGrabuszynski Sep 2, 2025
48d5ea9
Further linter-raised fixes
MateuszGrabuszynski Sep 3, 2025
9b08329
Further linter changes in pcap_compliance.py
MateuszGrabuszynski Sep 3, 2025
60ec24e
Further linter changes in pcap_compliance.py
MateuszGrabuszynski Sep 3, 2025
a78a592
Merge branch 'main' into netsniff-autom
MateuszGrabuszynski Sep 3, 2025
f7f8abd
Removing netsniff and tcpdump references in GstreamerApp.py, as they …
MateuszGrabuszynski Sep 3, 2025
eeeee84
Removing unused import in GstreamerApp
MateuszGrabuszynski Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
37 changes: 37 additions & 0 deletions tests/validation/compliance/check_compliance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import os
import sys

from csv_report import TestCSVReport
from pcap_compliance import PcapComplianceClient

# Usage:
# python3 check_compliance.py <uuid> <test_name> [csv_path]
# <uuid> : UUID of the uploaded PCAP file (from upload step)
# <test_name> : Name for the test (used for report directory)
# [csv_path] : (Optional) Path to CSV file for appending compliance results and path to logs

if len(sys.argv) not in (3, 4):
print("Usage: python3 check_compliance.py <uuid> <test_name> [csv_path]")
sys.exit(1)

uuid = sys.argv[1]
test_name = sys.argv[2]
csv_path = sys.argv[3] if len(sys.argv) == 4 else None

# Step 1: Run compliance check and generate report/logs using PcapComplianceClient
checker = PcapComplianceClient()
checker.authenticate()
checker.pcap_id = uuid
report_path = checker.download_report(test_name)
is_compliant = checker.check_compliance(report_path)
print(f"Report and logs saved in: {os.path.abspath(checker.report_dir)}")

# Step 2 (optional): If csv_path is provided, append compliance result to CSV
if csv_path:
report = TestCSVReport(csv_path)
compliance_result = "PASSED" if is_compliant else "FAILED"
report.add_result(test_name, compliance_result, checker.report_dir)
print(f"Compliance result added to CSV: {csv_path}")

# Step 3 Delete the PCAP analysis from the EBU server
checker.delete_pcap(uuid)
30 changes: 30 additions & 0 deletions tests/validation/compliance/csv_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import csv
import os


class TestCSVReport:
def __init__(self, csv_path):
"""
Initialize the CSV report.
If the file does not exist, create it and write the header row.
"""
self.csv_path = csv_path
# Write header if file does not exist
if not os.path.isfile(self.csv_path):
with open(self.csv_path, mode="w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(
[
"test_name", # Name of the test
"compliance_result", # Compliance check result (e.g., PASSED/FAILED)
"logs_path", # Final result (e.g., PASSED/FAILED)
]
)

def add_result(self, test_name, compliance_result, logs_path):
"""
Append a new row with the test results to the CSV file.
"""
with open(self.csv_path, mode="a", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow([test_name, compliance_result, logs_path])
13 changes: 13 additions & 0 deletions tests/validation/compliance/load_ebu_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import yaml


def load_ebu_config(config_path: str):
with open(config_path, "r") as f:
config = yaml.safe_load(f)
instance = config["instances"]
config_response = {
"ebu_ip": instance.get("name", ""),
"username": instance.get("username", ""),
"password": instance.get("password", ""),
}
return config_response
150 changes: 150 additions & 0 deletions tests/validation/compliance/pcap_compliance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import datetime
import json
import os

import requests
import yaml


class PcapComplianceClient:
def __init__(
self,
pcap_file=None,
config_path="ebu_list.yaml",
proxies={"http": "", "https": "", "ftp": ""},
):
"""
Initialize the client with optional PCAP file and config path.
Loads EBU server IP and credentials from the YAML config.
"""
self.pcap_file = pcap_file
self.token = None
self.ebu_ip = None
self.user = None
self.password = None
self.pcap_id = None
self.report_dir = None
self.proxies = proxies
self.session = requests.Session()
self.session.trust_env = False # Do not use system proxy settings

# Load EBU IP and credentials from YAML config
if config_path:
with open(config_path, "r") as f:
config = yaml.safe_load(f)
instance = config["instances"]
self.ebu_ip = instance.get("name", "")
self.user = instance.get("username", "")
self.password = instance.get("password", "")

def authenticate(self):
"""
Authenticate with the EBU server and store the access token.
"""
url = f"http://{self.ebu_ip}/auth/login"
headers = {"Content-Type": "application/json"}
data = {"username": self.user, "password": self.password}
response = self.session.post(
url, headers=headers, json=data, verify=False, proxies=self.proxies
)
response.raise_for_status()
self.token = response.json().get("content", {}).get("token")
if not self.token:
raise Exception("Authentication failed: No token received.")

def upload_pcap(self):
"""
Upload the PCAP file to the EBU server and store the returned UUID.
Returns the UUID of the uploaded PCAP.
"""
url = f"http://{self.ebu_ip}/api/pcap"
headers = {"Authorization": f"Bearer {self.token}"}
if self.pcap_file:
with open(self.pcap_file, "rb") as f:
files = {
"pcap": (
os.path.basename(self.pcap_file),
f,
"application/vnd.tcpdump.pcap",
)
}
response = self.session.put(
url,
headers=headers,
files=files,
verify=False,
proxies=self.proxies,
)
response.raise_for_status()
self.pcap_id = response.json().get("uuid")
if not self.pcap_id:
raise Exception("Upload failed: No UUID received.")
print(f"Extracted UUID: >>>{self.pcap_id}<<<")
return self.pcap_id

def download_report(self, test_name):
"""
Download the compliance report for the uploaded PCAP file.
Saves the report as a JSON file in a timestamped directory.
Returns the path to the saved report.
"""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
self.report_dir = os.path.join("reports", test_name, timestamp)
os.makedirs(self.report_dir, exist_ok=True)
url = f"http://{self.ebu_ip}/api/pcap/{self.pcap_id}/report?type=json"
headers = {"Authorization": f"Bearer {self.token}"}
response = self.session.get(
url, headers=headers, verify=False, proxies=self.proxies
)
response.raise_for_status()
report_path = os.path.join(self.report_dir, f"{self.pcap_id}.json")
with open(report_path, "w") as f:
json.dump(response.json(), f, indent=2)
return report_path

def check_compliance(self, report_path):
"""
Check the compliance result from the downloaded report.
Prints the result, writes PASSED/FAILED files, and lists error IDs if failed.
Returns True if compliant, False otherwise.
"""
with open(report_path, "r") as f:
report = json.load(f)
is_compliant = report.get("not_compliant_streams", 1) == 0
result_file = "PASSED" if is_compliant else "FAILED"
print(f"Result: {result_file}")
with open(os.path.join(self.report_dir, result_file), "w") as f_result:
if is_compliant:
f_result.write("")
else:
error_ids = []
for err in report.get("summary", {}).get("error_list", []):
error_id = err.get("value", {}).get("id")
print(error_id)
error_ids.append(str(error_id))
for error_id in error_ids:
f_result.write(f"{error_id}\n")
print(f"Json file saved: file://{os.path.abspath(report_path)}")
return is_compliant

def delete_pcap(self, pcap_id=None):
"""
Delete the PCAP file and its report from the EBU server.
If pcap_id is not provided, uses self.pcap_id.
"""
if pcap_id is None:
pcap_id = self.pcap_id
if not pcap_id:
raise ValueError("No PCAP ID provided for deletion.")
url = f"http://{self.ebu_ip}/api/pcap/{pcap_id}"
headers = {"Authorization": f"Bearer {self.token}"}
response = self.session.delete(
url, headers=headers, verify=False, proxies=self.proxies
)
if response.status_code == 200:
print(f"PCAP {pcap_id} deleted successfully from EBU server.")
else:
print(
f"Failed to delete PCAP {pcap_id}: {response.status_code} {response.text}"
)
response.raise_for_status()
69 changes: 69 additions & 0 deletions tests/validation/compliance/upload_pcap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import argparse
import os

if __name__ == "__main__":
from pcap_compliance import PcapComplianceClient
else:
from .pcap_compliance import PcapComplianceClient


def parse_args():
parser = argparse.ArgumentParser(
description="Upload a PCAP file to the EBU server."
)
parser.add_argument(
"--pcap_file_path",
type=str,
required=True,
help="Full path to the PCAP file to upload.",
)
parser.add_argument(
"--ip", type=str, required=True, help="IP address to the EBU LIST service."
)
parser.add_argument(
"--login", type=str, required=True, help="Login to the EBU LIST service."
)
parser.add_argument(
"--password", type=str, required=True, help="Password to the EBU LIST service."
)
parser.add_argument(
"--proxy",
type=str,
required=False,
help="Proxy used to upload to the EBU LIST service.",
)
return parser.parse_args()


def upload_pcap(file_path, ip, login, password, proxies):
# Check for login and password
if not ip or not login or not password:
raise Exception("IP address, login and password are required.")

# Check if the file exists before proceeding
if not os.path.isfile(file_path):
raise Exception(f"File not found: {file_path}")

# Create the uploader object and upload the PCAP file
# Empty config_path to avoid loading from non-existing YAML file
uploader = PcapComplianceClient(
pcap_file=file_path, config_path="", proxies=proxies
)
uploader.ebu_ip = ip
uploader.user = login
uploader.password = password
uploader.authenticate() # Authenticate with the EBU server
uuid = uploader.upload_pcap() # Upload the PCAP file and get the UUID

return uuid


if __name__ == "__main__":
args = parse_args()
# TODO: Handle proxies better, so each type can have a proper value
proxies = None
if args.proxy is not None:
proxies = {"http": args.proxy, "https": args.proxy, "ftp": args.proxy}
uuid = upload_pcap(args.pcap_file_path, args.ip, args.login, args.password, proxies)
# Print extractable UUID
print(f">>>UUID>>>{uuid}<<<UUID<<<")
8 changes: 8 additions & 0 deletions tests/validation/configs/ebu_list.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
metadata:
version: '1.0'
instances:
name: {{ ebu_ip }}
username: {{ ebu_login }}
password: {{ ebu_password }}
verify: true
token: json_token
1 change: 1 addition & 0 deletions tests/validation/configs/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mtl_path: MTL_PATH_PLACEHOLDER
media_path: /mnt/media
capture_cfg:
enable: false
tool: netsniff-ng
test_name: test_name
pcap_dir: /mnt/ramdisk/pcap
capture_time: 5
Expand Down
Loading
Loading