Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 269 additions & 0 deletions analysis/benchmark_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ class ProgressMetricsSummary:
failure_rate_p75: Optional[float]


@dataclass
class RelativeScoreSummary:
fuzzer: str
relscore: Optional[float]
relcov: Optional[float]


def parse_optional_float(value: str | None) -> Optional[float]:
if value is None:
return None
Expand Down Expand Up @@ -272,6 +279,66 @@ def load_progress_metrics_summary(path: Path) -> Dict[str, ProgressMetricsSummar
return rows


def _find_column(fieldnames: List[str], candidates: List[str]) -> Optional[str]:
normalized = {name.strip().lower().replace("_", ""): name for name in fieldnames}
for candidate in candidates:
found = normalized.get(candidate.strip().lower().replace("_", ""))
if found is not None:
return found
return None


def load_relative_scores(path: Path) -> Dict[str, RelativeScoreSummary]:
if not path.exists():
return {}
rows: Dict[str, RelativeScoreSummary] = {}
with path.open("r", newline="", encoding="utf-8") as handle:
reader = csv.DictReader(handle)
fieldnames = reader.fieldnames or []
fuzzer_col = _find_column(
fieldnames,
["fuzzer", "fuzzer_label", "approach", "tool", "name"],
)
relscore_col = _find_column(fieldnames, ["relscore", "relscores", "relative_score"])
relcov_col = _find_column(
fieldnames,
["relcov", "relative_coverage", "coverage_score", "covered_edges"],
)
if fuzzer_col is None:
die(f"missing fuzzer column in relative score CSV {path}")
if relscore_col is None and relcov_col is None:
die(
f"missing relscore/relscores or relcov/covered_edges column in "
f"relative score CSV {path}"
)

raw_rows = [row for row in reader]
coverage_values = [
value
for value in (
parse_optional_float(row.get(relcov_col)) if relcov_col else None
for row in raw_rows
)
if value is not None and math.isfinite(value)
]
max_coverage_value = max(coverage_values, default=0.0)
relcov_from_covered_edges = relcov_col == _find_column(fieldnames, ["covered_edges"])

for row in raw_rows:
fuzzer = str(row.get(fuzzer_col, "")).strip()
if not fuzzer:
continue
relcov = parse_optional_float(row.get(relcov_col)) if relcov_col else None
if relcov_from_covered_edges:
relcov = _relative_to_best(relcov, max_coverage_value)
rows[fuzzer] = RelativeScoreSummary(
fuzzer=fuzzer,
relscore=parse_optional_float(row.get(relscore_col)) if relscore_col else None,
relcov=relcov,
)
return rows


def load_metric_samples_csv(path: Path, value_columns: List[str]) -> pd.DataFrame:
if not path.exists():
cols = ["fuzzer", "series_id", "time_hours", *value_columns]
Expand Down Expand Up @@ -447,6 +514,185 @@ def append_progress_metrics_section(
lines.append("")


def _relative_to_best(value: Optional[float], best: float) -> Optional[float]:
if value is None or not math.isfinite(value) or best <= 0:
return None
return value / best


def build_relative_score_summaries(
*,
metrics: List[FuzzerMetrics],
progress_metrics_by_fuzzer: Dict[str, ProgressMetricsSummary],
relative_scores_by_fuzzer: Dict[str, RelativeScoreSummary] | None = None,
) -> Dict[str, RelativeScoreSummary]:
summaries: Dict[str, RelativeScoreSummary] = {
name: RelativeScoreSummary(
fuzzer=score.fuzzer,
relscore=score.relscore,
relcov=score.relcov,
)
for name, score in (relative_scores_by_fuzzer or {}).items()
}

best_final = max((float(metric.final_p50) for metric in metrics), default=0.0)
best_cov = max(
(
float(row.coverage_p50)
for row in progress_metrics_by_fuzzer.values()
if row.coverage_p50 is not None and math.isfinite(row.coverage_p50)
),
default=0.0,
)

for metric in metrics:
current = summaries.get(metric.fuzzer)
relscore = current.relscore if current else None
relcov = current.relcov if current else None
if relscore is None:
relscore = _relative_to_best(float(metric.final_p50), best_final)
if relcov is None:
progress = progress_metrics_by_fuzzer.get(metric.fuzzer)
relcov = _relative_to_best(progress.coverage_p50 if progress else None, best_cov)
summaries[metric.fuzzer] = RelativeScoreSummary(
fuzzer=metric.fuzzer,
relscore=relscore,
relcov=relcov,
)

return summaries


def fmt_relative_score(value: Optional[float]) -> str:
if value is None or not math.isfinite(value):
return "n/a"
return f"{value:.3f}"


def append_relative_scoreboard(
lines: List[str],
relative_scores_by_fuzzer: Dict[str, RelativeScoreSummary],
fuzzer_order: List[str],
) -> None:
if not relative_scores_by_fuzzer:
return

def sort_key(score: RelativeScoreSummary) -> Tuple[float, float, str]:
relscore = (
score.relscore
if score.relscore is not None and math.isfinite(score.relscore)
else -1.0
)
relcov = (
score.relcov
if score.relcov is not None and math.isfinite(score.relcov)
else -1.0
)
return (relscore, relcov, score.fuzzer)

ordered = sorted(relative_scores_by_fuzzer.values(), key=sort_key, reverse=True)
best_relscore = max(
(
score.relscore
for score in ordered
if score.relscore is not None and math.isfinite(score.relscore)
),
default=None,
)
best_relcov = max(
(
score.relcov
for score in ordered
if score.relcov is not None and math.isfinite(score.relcov)
),
default=None,
)
relscore_leaders = [
score.fuzzer
for score in ordered
if best_relscore is not None
and score.relscore is not None
and math.isclose(score.relscore, best_relscore)
]
relcov_leaders = [
score.fuzzer
for score in ordered
if best_relcov is not None
and score.relcov is not None
and math.isclose(score.relcov, best_relcov)
]

lines.append("## Fuzzer scoreboard (higher is better)")
lines.append(
"`relscore` orders approaches by coverage value: edges that fewer approaches reach "
"are worth more, and an approach gets more credit when it reaches those edges "
"consistently across non-empty trials. `relcov` is directional coverage overlap: "
"`relcov(A, B)` is the share of B's total reached coverage that A reaches. In this "
"scoreboard, higher values are better within each column; a normalized `relcov` of "
"1.000 means the largest observed coverage total for the selected report input."
)
if relscore_leaders:
lines.append(
f"- Best relative score: **{', '.join(relscore_leaders)}** "
f"({fmt_relative_score(best_relscore)} relscore)"
)
if relcov_leaders:
lines.append(
f"- Best coverage score: **{', '.join(relcov_leaders)}** "
f"({fmt_relative_score(best_relcov)} relcov)"
)
lines.append("")

order_index = {fuzzer: idx for idx, fuzzer in enumerate(fuzzer_order)}
ordered = sorted(
ordered,
key=lambda score: (
-(
score.relscore
if score.relscore is not None and math.isfinite(score.relscore)
else -1.0
),
-(
score.relcov
if score.relcov is not None and math.isfinite(score.relcov)
else -1.0
),
order_index.get(score.fuzzer, len(order_index)),
score.fuzzer,
),
)
header = ["Rank", "Fuzzer", "relscore", "relcov", "Readout"]
lines.append("| " + " | ".join(header) + " |")
lines.append("|" + "|".join(["---"] * len(header)) + "|")
for rank, score in enumerate(ordered, start=1):
if score.relscore is not None and score.relcov is not None:
readout = (
"best overall"
if score.fuzzer in relscore_leaders and score.fuzzer in relcov_leaders
else "compare both scores"
)
elif score.relscore is not None:
readout = "relative score only"
elif score.relcov is not None:
readout = "coverage score only"
else:
readout = "no relative scores"
lines.append(
"| "
+ " | ".join(
[
str(rank),
score.fuzzer,
fmt_relative_score(score.relscore),
fmt_relative_score(score.relcov),
readout,
]
)
+ " |"
)
lines.append("")


def nan_percentile_rows(arr: np.ndarray, percentile_value: float) -> np.ndarray:
out = np.full(arr.shape[0], np.nan, dtype=float)
for idx, row in enumerate(arr):
Expand Down Expand Up @@ -1075,6 +1321,7 @@ def write_report(
outpath: Path,
throughput_by_fuzzer: Dict[str, ThroughputSummary] | None = None,
progress_metrics_by_fuzzer: Dict[str, ProgressMetricsSummary] | None = None,
relative_scores_by_fuzzer: Dict[str, RelativeScoreSummary] | None = None,
stat_results: Optional[List[PairwiseResult]] = None,
stat_warnings: Optional[List[str]] = None,
alpha: float = 0.05,
Expand All @@ -1095,6 +1342,16 @@ def write_report(
)
lines.append("")

append_relative_scoreboard(
lines,
build_relative_score_summaries(
metrics=metrics,
progress_metrics_by_fuzzer=progress_metrics_by_fuzzer or {},
relative_scores_by_fuzzer=relative_scores_by_fuzzer or {},
),
fuzzer_order=[metric.fuzzer for metric in metrics],
)

lines.append("## Bugs found at fixed time budgets (median [IQR])")
header = ["Fuzzer", "Runs"] + [f"{t:g}h" for t in checkpoints]
lines.append("| " + " | ".join(header) + " |")
Expand Down Expand Up @@ -1288,6 +1545,12 @@ def main() -> int:
default=None,
help="Optional progress metrics samples CSV for time-series charts.",
)
parser.add_argument(
"--relative-scores-csv",
type=Path,
default=None,
help="Optional CSV with fuzzer plus relscore/relscores and relcov columns for the report scoreboard.",
)
parser.add_argument(
"--additional-metrics-summary-csv",
dest="progress_metrics_summary_csv",
Expand Down Expand Up @@ -1316,6 +1579,11 @@ def main() -> int:
if args.progress_metrics_summary_csv is not None
else {}
)
relative_scores_by_fuzzer = (
load_relative_scores(args.relative_scores_csv)
if args.relative_scores_csv is not None
else {}
)
throughput_samples_df = (
load_metric_samples_csv(args.throughput_samples_csv, THROUGHPUT_SAMPLE_VALUE_COLS)
if args.throughput_samples_csv is not None
Expand Down Expand Up @@ -1441,6 +1709,7 @@ def main() -> int:
outpath=report_outdir / "REPORT.md",
throughput_by_fuzzer=throughput_by_fuzzer,
progress_metrics_by_fuzzer=progress_metrics_by_fuzzer,
relative_scores_by_fuzzer=relative_scores_by_fuzzer,
stat_results=stat_results,
stat_warnings=stat_warnings,
)
Expand Down
Loading