Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Addon.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def __init__(
self.tags = set() # Just a cache, loaded from Metadata
self.remote_last_updated: Optional[datetime.datetime] = None
self.stats = AddonStats()
self.curated = True
self.score = 0

# In cases where there are multiple versions/branches/installations available for an addon,
Expand Down
15 changes: 14 additions & 1 deletion AddonCatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class AddonCatalogEntry:
branch_display_name: Optional[str] = None
metadata: Optional[CatalogEntryMetadata] = None # Generated by the cache system
last_update_time: str = "" # Generated by the cache system
curated: bool = True # Generated by the cache system
sparse_cache: bool = False # Generated by the cache system
relative_cache_path: str = "" # Generated by the cache system

def __init__(self, raw_data: Dict[str, str]) -> None:
Expand Down Expand Up @@ -133,7 +135,16 @@ def instantiate_addon(self, addon_id: str) -> Addon:
state = Addon.Status.UNCHECKED
else:
state = Addon.Status.NOT_INSTALLED
url = self.repository if self.repository else self.zip_url
if self.sparse_cache:
if self.zip_url:
url = self.zip_url
else:
# Technically, this should never happen, but just in case...
raise RuntimeError(f"Sparse cache entry {addon_id} has no zip_url")
Comment on lines 141 to 143
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instantiate_addon() raises RuntimeError when sparse_cache is true but zip_url is missing. Since CacheWriter can set sparse_cache=True without ensuring zip_url exists, this can cause addons/branches to be skipped at runtime. Consider falling back to repository when zip_url is absent (or validate/enforce zip_url at cache-generation time).

Suggested change
else:
# TODO: Try to generate the expected URL form based on the repo location
raise RuntimeError(f"Sparse cache entry {addon_id} has no zip_url")
elif self.repository:
# Fallback: use the repository URL when a sparse cache entry has no explicit zip_url
url = self.repository
else:
raise RuntimeError(
f"Sparse cache entry {addon_id} has neither zip_url nor repository URL"
)

Copilot uses AI. Check for mistakes.
elif self.repository:
url = self.repository
else:
url = self.zip_url
Comment on lines 138 to 147
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instantiate_addon() raises a RuntimeError if sparse_cache is true but zip_url is missing. Since zip_url is optional for git-based entries (and sparse_cache is set automatically during cache generation for some repos), this can cause Addon Manager to crash while building the addon list. Consider a safe fallback (e.g. use repository when zip_url is absent) or make cache generation guarantee zip_url for sparse entries.

Copilot uses AI. Check for mistakes.
if self.git_ref:
addon = Addon(addon_id, url, state, branch=self.git_ref)
else:
Expand Down Expand Up @@ -179,6 +190,8 @@ def instantiate_addon(self, addon_id: str) -> Addon:
self.branch_display_name if self.branch_display_name else self.git_ref
)

addon.curated = self.curated

return addon

@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions AddonCatalog.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
"branch_display_name": {
"type": "string"
},
"last_update_time": {
"type": "string"
"curated": {
"type": "boolean"
}
},
"anyOf": [
Expand Down
224 changes: 183 additions & 41 deletions AddonCatalogCacheCreator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
Addon Manager in each FreeCAD installation."""

import datetime
import shutil
import sys
from dataclasses import is_dataclass, fields
from typing import Any, List, Optional, Dict

Expand All @@ -36,6 +38,7 @@
import re
import requests
import subprocess
from typing import List
import xml.etree.ElementTree
import zipfile

Expand All @@ -44,14 +47,15 @@
import addonmanager_utilities as utils
import addonmanager_icon_utilities as icon_utils

ADDON_CATALOG_URL = (
"https://raw.githubusercontent.com/FreeCAD/FreeCAD-addons/master/AddonCatalog.json"
)
ADDON_CATALOG_URL = "https://raw.githubusercontent.com/FreeCAD/Addons/main/Data/Index.json"
BASE_DIRECTORY = "./CatalogCache"
MAX_COUNT = 10000 # Do at most this many repos (for testing purposes this can be made smaller)
CLONE_TIMEOUT = (
300 # Seconds: repos that take longer than this are assumed to be too large to index
)

# Repos that are too large, or that should for some reason not be cloned here
EXCLUDED_REPOS = ["parts_library", "offline-documentation", "FreeCAD-Documentation-html"]
# Repos that are too large, or that should for some reason not be fully cloned here
FORCE_SPARSE_CLONE = ["parts_library", "offline-documentation", "FreeCAD-Documentation-html"]


def recursive_serialize(obj: Any):
Expand Down Expand Up @@ -82,7 +86,7 @@ class GitRefType(enum.IntEnum):


class CatalogFetcher:
"""Fetches the addon catalog from the given URL and returns an AddonCatalog object. Separated
"""Fetches the addon index from the given URL and returns an AddonCatalog object. Separated
from the main class for easy mocking during tests. Note that every instantiation of this class
will run a new fetch of the catalog."""

Expand All @@ -94,9 +98,7 @@ def fetch_catalog(self) -> AddonCatalog.AddonCatalog:
"""Fetch the addon catalog from the given URL and return an AddonCatalog object."""
response = requests.get(self.addon_catalog_url, timeout=10.0)
if response.status_code != 200:
raise RuntimeError(
f"ERROR: Failed to fetch addon catalog from {self.addon_catalog_url}"
)
raise RuntimeError(f"ERROR: Failed to fetch addon index from {self.addon_catalog_url}")
return AddonCatalog.AddonCatalog(response.json())


Expand All @@ -106,8 +108,9 @@ class CacheWriter:
as a base64-encoded icon image. The cache is written to the current working directory."""

def __init__(self):
self.catalog: AddonCatalog = None
self.catalog: Optional[AddonCatalog.AddonCatalog] = None
self.icon_errors = {}
self.clone_errors = {}
if os.path.isabs(BASE_DIRECTORY):
self.cwd = BASE_DIRECTORY
else:
Expand All @@ -116,39 +119,80 @@ def __init__(self):
self._sanitize_counter = 0
self._directory_name_cache: Dict[str, str] = {}

def write(self):
def write(self, addon_id: Optional[str] = None) -> None:
original_working_directory = os.getcwd()
os.makedirs(self.cwd, exist_ok=True)
os.chdir(self.cwd)
self.create_local_copy_of_addons()

with zipfile.ZipFile(
os.path.join(self.cwd, "addon_catalog_cache.zip"), "w", zipfile.ZIP_DEFLATED
) as zipf:
zipf.writestr(
"addon_catalog_cache.json",
json.dumps(recursive_serialize(self.catalog.get_catalog()), indent=" "),
)

# Also generate the sha256 hash of the zip file and store it
with open("addon_catalog_cache.zip", "rb") as cache_file:
cache_file_content = cache_file.read()
sha256 = hashlib.sha256(cache_file_content).hexdigest()
with open("addon_catalog_cache.zip.sha256", "w", encoding="utf-8") as hash_file:
hash_file.write(sha256)
try:
fetcher = CatalogFetcher()
self.catalog = fetcher.catalog

Comment on lines 122 to 130
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write() changes the process working directory with os.chdir(self.cwd) but does not restore it if an exception occurs (e.g., addon_id not in index, fetch failure, zip write error). Wrap the body in try/finally and always os.chdir(original_working_directory) to avoid leaking state to callers/tests.

Copilot uses AI. Check for mistakes.
if addon_id is None:
self.create_local_copy_of_addons()
else:
catalog = self.catalog.get_catalog()
if addon_id not in catalog:
raise RuntimeError(f"ERROR: Addon {addon_id} not in index")
catalog_entries = catalog[addon_id]
self.create_local_copy_of_single_addon(addon_id, catalog_entries)

# Write the entire index for versions of the Addon Manager after 2026-01-24
with zipfile.ZipFile(
os.path.join(self.cwd, "addon_index_cache.zip"), "w", zipfile.ZIP_DEFLATED
) as zipf:
zipf.writestr(
"addon_index_cache.json",
json.dumps(recursive_serialize(self.catalog.get_catalog()), indent=" "),
)

# Also generate the sha256 hash of the zip file and store it
with open("addon_index_cache.zip", "rb") as cache_file:
cache_file_content = cache_file.read()
sha256 = hashlib.sha256(cache_file_content).hexdigest()
with open("addon_index_cache.zip.sha256", "w", encoding="utf-8") as hash_file:
hash_file.write(sha256)

# For pre-2026-01-24 write only curated addons into a separate catalog file so older
# versions of the Addon Manager don't accidentally install uncurated addons.
with zipfile.ZipFile(
os.path.join(self.cwd, "addon_catalog_cache.zip"), "w", zipfile.ZIP_DEFLATED
) as zipf:
catalog = self.catalog.get_catalog()
reduced_catalog = {}
for addon_id, catalog_entries in catalog.items():
approved_entries: List[AddonCatalog.AddonCatalogEntry] = []
for entry in catalog_entries:
if entry.curated:
approved_entries.append(entry)
if approved_entries:
reduced_catalog[addon_id] = approved_entries
zipf.writestr(
"addon_catalog_cache.json",
json.dumps(recursive_serialize(reduced_catalog), indent=" "),
)

# Also generate the sha256 hash of the zip file and store it
with open("addon_catalog_cache.zip", "rb") as cache_file:
cache_file_content = cache_file.read()
sha256 = hashlib.sha256(cache_file_content).hexdigest()
with open("addon_catalog_cache.zip.sha256", "w", encoding="utf-8") as hash_file:
hash_file.write(sha256)

with open(os.path.join(self.cwd, "icon_errors.json"), "w") as f:
json.dump(self.icon_errors, f, indent=" ")

with open(os.path.join(self.cwd, "icon_errors.json"), "w") as f:
json.dump(self.icon_errors, f, indent=" ")
with open(os.path.join(self.cwd, "clone_errors.json"), "w") as f:
json.dump(self.clone_errors, f, indent=" ")

os.chdir(original_working_directory)
print(f"Wrote cache to {os.path.join(self.cwd, 'addon_catalog_cache.zip')}")
print(f"Wrote index to {os.path.join(self.cwd, 'addon_index_cache.zip')}")
print(f"Wrote cache to {os.path.join(self.cwd, 'addon_catalog_cache.zip')}")
finally:
os.chdir(original_working_directory)

def create_local_copy_of_addons(self):
self.catalog = CatalogFetcher().catalog
counter = 0
for addon_id, catalog_entries in self.catalog.get_catalog().items():
if addon_id in EXCLUDED_REPOS:
continue
self.create_local_copy_of_single_addon(addon_id, catalog_entries)
counter += 1
if counter >= MAX_COUNT:
Expand All @@ -158,7 +202,22 @@ def create_local_copy_of_single_addon(
self, addon_id: str, catalog_entries: List[AddonCatalog.AddonCatalogEntry]
):
for index, catalog_entry in enumerate(catalog_entries):
if catalog_entry.repository is not None:
if addon_id in FORCE_SPARSE_CLONE:
if catalog_entry.repository is None:
print(
f"ERROR: Cannot use sparse clone for {addon_id} because it has no git repo."
)
continue
if catalog_entry.zip_url is None:
print(
f"ERROR: Cannot use sparse clone for {addon_id} because it has no zip URL."
)
continue
catalog_entry.sparse_cache = True
self.create_local_copy_of_single_addon_with_git_sparse(
addon_id, index, catalog_entry
)
Comment on lines +216 to +219
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When addon_id is in FORCE_SPARSE_CLONE, the code unconditionally sets catalog_entry.sparse_cache = True but does not ensure catalog_entry.zip_url is present. Downstream, AddonCatalogEntry.instantiate_addon() raises if sparse_cache is true and zip_url is missing, so entries that only provide a git repo/ref will break. Either require/derive zip_url before setting sparse_cache, or fall back to the normal git path when zip_url is absent.

Suggested change
catalog_entry.sparse_cache = True
self.create_local_copy_of_single_addon_with_git_sparse(
addon_id, index, catalog_entry
)
if catalog_entry.zip_url is not None:
catalog_entry.sparse_cache = True
self.create_local_copy_of_single_addon_with_git_sparse(
addon_id, index, catalog_entry
)
else:
print(
f"WARNING: Cannot use sparse clone for {addon_id} because it has no zip_url; "
"falling back to normal git clone."
)
self.create_local_copy_of_single_addon_with_git(
addon_id, index, catalog_entry
)

Copilot uses AI. Check for mistakes.
elif catalog_entry.repository is not None:
self.create_local_copy_of_single_addon_with_git(addon_id, index, catalog_entry)
elif catalog_entry.zip_url is not None:
self.create_local_copy_of_single_addon_with_zip(addon_id, index, catalog_entry)
Expand Down Expand Up @@ -280,6 +339,23 @@ def create_local_copy_of_single_addon_with_git(
print(f"ERROR: Failed to clone or update {addon_id} from {catalog_entry.repository}.")
print(f"ERROR: {e}")

def create_local_copy_of_single_addon_with_git_sparse(
self, addon_id: str, index: int, catalog_entry: AddonCatalog.AddonCatalogEntry
):
expected_name = self.get_directory_name(addon_id, index, catalog_entry)
try:
files = ["package.xml", "requirements.txt", "metadata.txt"]
self.sparse_clone(expected_name, catalog_entry.repository, catalog_entry.git_ref, files)
if os.path.exists(os.path.join(self.cwd, expected_name, "package.xml")):
metadata = addonmanager_metadata.MetadataReader.from_file(
os.path.join(self.cwd, expected_name, "package.xml")
)
if metadata.icon:
self.add_to_sparse_clone(expected_name, [metadata.icon])
except RuntimeError as e:
print(f"ERROR: Failed to clone or update {addon_id} from {catalog_entry.repository}.")
print(f"ERROR: {e}")

def sanitize_directory_name(self, expected_name: str) -> str:
"""Take a string and return a sanitized version suitable for use as a directory name."""
if expected_name in self._directory_name_cache:
Expand Down Expand Up @@ -339,8 +415,7 @@ def create_local_copy_of_single_addon_with_zip(
catalog_entry.last_update_time = datetime.datetime(*latest).isoformat()
zip_file.extractall(path=extract_to_dir)

@staticmethod
def clone_or_update(name: str, url: str, branch: str) -> None:
def clone_or_update(self, name: str, url: str, branch: str) -> None:
"""If a directory called "name" exists, and it contains a subdirectory called .git,
then 'git fetch' is called; otherwise we use 'git clone' to make a bare, shallow
copy of the repo (in the normal case where minimal is True), or a normal clone,
Expand All @@ -359,8 +434,14 @@ def clone_or_update(name: str, url: str, branch: str) -> None:
url,
name,
]
completed_process = subprocess.run(command)
try:
completed_process = subprocess.run(command, timeout=CLONE_TIMEOUT)
except subprocess.TimeoutExpired:
self.clone_errors[name] = f"Timed out after {CLONE_TIMEOUT} seconds."
raise RuntimeError(f"Clone of {url} timed out.")
# TODO: Automatically fall back to a sparse clone
if completed_process.returncode != 0:
self.clone_errors[name] = f"Failed to clone {url}: {completed_process.returncode}"
raise RuntimeError(f"Clone failed for {url}")
else:
print(f"Updating {name}", flush=True)
Expand Down Expand Up @@ -391,9 +472,68 @@ def clone_or_update(name: str, url: str, branch: str) -> None:
print("Deleting and re-cloning the original repo")
os.chdir(old_dir)
utils.rmdir(os.path.join(old_dir, name))
CacheWriter.clone_or_update(name, url, branch)
self.clone_or_update(name, url, branch)
os.chdir(old_dir)

def sparse_clone(self, name: str, url: str, branch: str, files: List[str]) -> None:
"""Perform a sparse clone of a git repo, including only the specified files. Overwrite any
existing path."""

if not os.path.exists(os.path.join(os.getcwd(), name, ".git")):
print(f"Creating sparse clone {name}", flush=True)
cwd = os.getcwd()
clone_path = os.path.join(cwd, name)
if os.path.exists(clone_path):
try:
shutil.rmtree(clone_path)
except OSError as e:
self.clone_errors[name] = f"Failed to remove existing path {clone_path}: {e}"
print(f"ERROR: Failed to remove existing path {clone_path}: {e}")
return
os.makedirs(clone_path)
os.chdir(clone_path)
try:
subprocess.run(["git", "init", "--quiet"], check=True)
subprocess.run(["git", "remote", "add", "origin", url], check=True)
subprocess.run(["git", "config", "core.sparsecheckout", "true"], check=True)
with open(".git/info/sparse-checkout", "w") as f:
f.write("\n".join(files))
f.write("\n") # So we are safe appending later
subprocess.run(
["git", "fetch", "--depth=1", "origin", branch],
check=True,
timeout=CLONE_TIMEOUT,
)
subprocess.run(["git", "checkout", branch], check=True)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
self.clone_errors[name] = str(e)
print(f"ERROR: {e}")
os.chdir(cwd)
else:
print(f"Updating sparse clone {name}", flush=True)
cwd = os.getcwd()
os.chdir(os.path.join(cwd, name))
try:
subprocess.run(["git", "pull", "--depth=1"], check=True)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
self.clone_errors[name] = str(e)
print(f"ERROR: {e}")
os.chdir(cwd)

def add_to_sparse_clone(self, name: str, files: List[str]) -> None:
"""Clones additional files to an existing sparse clone."""
cwd = os.getcwd()
clone_path = os.path.join(cwd, name)
os.chdir(clone_path)
with open(".git/info/sparse-checkout", "a") as f:
f.write("\n".join(files))
try:
subprocess.run(["git", "pull", "--depth=1"], check=True)
except subprocess.CalledProcessError as e:
self.clone_errors[name] = str(e)
print(f"ERROR: {e}")
os.chdir(cwd)

def find_file(
self,
filename: str,
Expand Down Expand Up @@ -431,7 +571,7 @@ def get_icon_from_metadata(metadata: addonmanager_metadata.Metadata) -> Optional
return None

@staticmethod
def determine_git_ref_type(name: str, url: str, branch: str) -> GitRefType:
def determine_git_ref_type(name: str, _url: str, branch: str) -> GitRefType:
"""Determine if the given branch, tag, or hash is a tag, branch, or hash. Returns the type
if determinable, otherwise raises a RuntimeError."""
command = ["git", "show-ref", "--verify", f"refs/remotes/origin/{branch}"]
Expand Down Expand Up @@ -516,6 +656,8 @@ def create_zip_of_entry(


if __name__ == "__main__":
single_addon_id = None
if len(sys.argv) > 1:
single_addon_id = sys.argv[1]
writer = CacheWriter()

writer.write()
writer.write(single_addon_id)
Loading
Loading