Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions compile_kaitai_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
import subprocess
import sys
from typing import Any, Dict, List, Optional, Tuple
import os
import glob
import yaml

# Copyleft Licenses to exclude
EXCLUDE_LICENSES = ['AGPL', 'EUPL', 'GPL', 'LGPL', 'OSL', 'ODbL', 'Ms-RL', 'GFDL']


POLYFILE_DIR: Path = Path(__file__).absolute().parent
COMPILE_SCRIPT: Path = POLYFILE_DIR / "polyfile" / "kaitai" / "compiler.py"
Expand All @@ -14,6 +21,46 @@
MANIFEST_PATH: Path = KAITAI_PARSERS_DIR / "manifest.json"


def find_files_with_excluded_licenses(directory, license_list) -> List[str]:
"""
Recursively scans a directory for files and identifies any that contain
a license from the excluded list.

The check is performed as a substring match (e.g., 'GPL' in the list
will match a license named 'GPL-3.0-or-later').
"""
# Create the recursive search pattern
search_path = os.path.join(directory, '**', f'*.ksy')
file_paths = glob.glob(search_path, recursive=True)

if not file_paths:
return []

flagged_files = []

for file_path in file_paths:
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data and isinstance(data, dict):
license_val = data.get('meta', {}).get('license')
if not license_val:
continue

# Check if any part of the license name is in our exclude list
for excluded_license in license_list:
if excluded_license in license_val:
flagged_files.append(file_path)
break # Found a match, no need to check other excluded licenses for this file

except yaml.YAMLError as e:
print(f"❌ Error parsing YAML in file '{file_path}': {e}")
except Exception as e:
print(f"❌ An unexpected error occurred with file '{file_path}': {e}")

return flagged_files


# Make sure the ktaitai_struct_formats submodlue is cloned:
if not (KAITAI_FORMAT_LIBRARY / "README.md").exists():
subprocess.check_call(["git", "submodule", "init"], cwd=str(POLYFILE_DIR))
Expand Down Expand Up @@ -48,6 +95,14 @@ def mtime(path: Path) -> datetime:


def rebuild(force: bool = False):
# Get the list of copyleft-licensed files to exclude

excluded_files = find_files_with_excluded_licenses(
KAITAI_FORMAT_LIBRARY,
EXCLUDE_LICENSES
)
excluded_paths = {Path(f).absolute() for f in excluded_files}

# Remove the manifest file to force a rebuild:
if force or not MANIFEST_PATH.exists():
if MANIFEST_PATH.exists():
Expand All @@ -57,6 +112,9 @@ def rebuild(force: bool = False):
# see if any of the files are out of date and need to be recompiled
newest_definition: Optional[datetime] = None
for definition in KAITAI_FORMAT_LIBRARY.glob("**/*.ksy"):
# Skip excluded files
if definition.absolute() in excluded_paths:
continue
modtime = mtime(definition)
if newest_definition is None or newest_definition < modtime:
newest_definition = modtime
Expand All @@ -71,7 +129,15 @@ def rebuild(force: bool = False):
sys.stderr.write("Error: You must have kaitai-struct-compiler installed\nSee https://kaitai.io/#download\n")
sys.exit(1)

num_files = sum(1 for _ in KAITAI_FORMAT_LIBRARY.glob("**/*.ksy"))
# Count non-excluded files
all_ksy_files = list(KAITAI_FORMAT_LIBRARY.glob("**/*.ksy"))
ksy_files_to_compile = [f for f in all_ksy_files if f.absolute() not in excluded_paths]
num_excluded = len(all_ksy_files) - len(ksy_files_to_compile)

if num_excluded > 0:
print(f"Excluding {num_excluded} copyleft-licensed KSY files from compilation")

num_files = len(ksy_files_to_compile)

try:
from tqdm import tqdm
Expand Down Expand Up @@ -99,7 +165,7 @@ def update(self, n: int):
with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count()) as executor:
futures_to_path: Dict[concurrent.futures.Future, Path] = {
executor.submit(compile_ksy, file): file
for file in KAITAI_FORMAT_LIBRARY.glob("**/*.ksy")
for file in ksy_files_to_compile
}
for future in concurrent.futures.as_completed(futures_to_path):
t.update(1)
Expand Down
Loading