Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
import logging

from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType, json_extract
from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
from macaron.util import send_head_http_raw

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -23,13 +22,6 @@ class WheelAbsenceAnalyzer(BaseHeuristicAnalyzer):
heuristic fails.
"""

WHEEL: str = "bdist_wheel"
# as per https://github.com/pypi/inspector/blob/main/inspector/main.py line 125
INSPECTOR_TEMPLATE = (
"{inspector_url_scheme}://{inspector_url_netloc}/project/"
"{name}/{version}/packages/{first}/{second}/{rest}/{filename}"
)

def __init__(self) -> None:
super().__init__(
name="wheel_absence_analyzer",
Expand All @@ -53,83 +45,17 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
Raises
------
HeuristicAnalyzerValueError
If there is no release information, or has other missing package information.
If there is missing package information.
"""
releases = pypi_package_json.get_releases()
if releases is None: # no release information
error_msg = "There is no information for any release of this package."
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

version = pypi_package_json.component_version
if version is None: # check latest release version
version = pypi_package_json.get_latest_version()

if version is None:
error_msg = "There is no latest version of this package."
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

# Contains a boolean field identifying if the link is reachable by this Macaron instance or not.
inspector_links: dict[str, JsonType] = {}
wheel_present: bool = False

release_distributions = json_extract(releases, [version], list)
if release_distributions is None:
error_msg = f"The version {version} is not available as a release."
if not pypi_package_json.get_inspector_links():
error_msg = "Unable to retrieve PyPI inspector information about package"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

for distribution in release_distributions:
# validate data
package_type = json_extract(distribution, ["packagetype"], str)
if package_type is None:
error_msg = f"The version {version} has no 'package type' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

name = json_extract(pypi_package_json.package_json, ["info", "name"], str)
if name is None:
error_msg = f"The version {version} has no 'name' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

blake2b_256 = json_extract(distribution, ["digests", "blake2b_256"], str)
if blake2b_256 is None:
error_msg = f"The version {version} has no 'blake2b_256' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

filename = json_extract(distribution, ["filename"], str)
if filename is None:
error_msg = f"The version {version} has no 'filename' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

if package_type == self.WHEEL:
wheel_present = True

inspector_link = self.INSPECTOR_TEMPLATE.format(
inspector_url_scheme=pypi_package_json.pypi_registry.inspector_url_scheme,
inspector_url_netloc=pypi_package_json.pypi_registry.inspector_url_netloc,
name=name,
version=version,
first=blake2b_256[0:2],
second=blake2b_256[2:4],
rest=blake2b_256[4:],
filename=filename,
)

# use a head request because we don't care about the response contents
inspector_links[inspector_link] = False
if send_head_http_raw(inspector_link):
inspector_links[inspector_link] = True # link was reachable

detail_info: dict[str, JsonType] = {
"inspector_links": inspector_links,
}
detail_info: dict = {"inspector_links": pypi_package_json.inspector_asset.package_link_reachability}

if wheel_present:
# At least one wheel file exists
if len(pypi_package_json.inspector_asset.package_whl_links) > 0:
return HeuristicResult.PASS, detail_info

return HeuristicResult.FAIL, detail_info
10 changes: 8 additions & 2 deletions src/macaron/repo_finder/repo_finder_pypi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from macaron.repo_finder.repo_finder_enums import RepoFinderInfo
from macaron.repo_finder.repo_validator import find_valid_repository_url
from macaron.slsa_analyzer.package_registry import PACKAGE_REGISTRIES, PyPIRegistry
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset, find_or_create_pypi_asset
from macaron.slsa_analyzer.package_registry.pypi_registry import (
PyPIInspectorAsset,
PyPIPackageJsonAsset,
find_or_create_pypi_asset,
)
from macaron.slsa_analyzer.specs.package_registry_spec import PackageRegistryInfo

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,7 +62,9 @@ def find_repo(
pypi_registry = next((registry for registry in PACKAGE_REGISTRIES if isinstance(registry, PyPIRegistry)), None)
if not pypi_registry:
return "", RepoFinderInfo.PYPI_NO_REGISTRY
pypi_asset = PyPIPackageJsonAsset(purl.name, purl.version, False, pypi_registry, {}, "")
pypi_asset = PyPIPackageJsonAsset(
purl.name, purl.version, False, pypi_registry, {}, "", PyPIInspectorAsset("", [], {})
)

if not pypi_asset:
# This should be unreachable, as the pypi_registry has already been confirmed to be of type PyPIRegistry.
Expand Down
126 changes: 124 additions & 2 deletions src/macaron/slsa_analyzer/package_registry/pypi_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
from macaron.json_tools import json_extract
from macaron.malware_analyzer.datetime_parser import parse_datetime
from macaron.slsa_analyzer.package_registry.package_registry import PackageRegistry
from macaron.util import download_file_with_size_limit, send_get_http_raw, stream_file_with_size_limit
from macaron.util import (
download_file_with_size_limit,
send_get_http_raw,
send_head_http_raw,
stream_file_with_size_limit,
)

if TYPE_CHECKING:
from macaron.slsa_analyzer.specs.package_registry_spec import PackageRegistryInfo
Expand Down Expand Up @@ -443,6 +448,33 @@ def extract_attestation(attestation_data: dict) -> dict | None:
return attestations[0]


# as per https://github.com/pypi/inspector/blob/main/inspector/main.py line 125
INSPECTOR_TEMPLATE = (
"{inspector_url_scheme}://{inspector_url_netloc}/project/"
"{name}/{version}/packages/{first}/{second}/{rest}/{filename}"
)


@dataclass
class PyPIInspectorAsset:
"""The package PyPI inspector information."""

#: the pypi inspector link to the tarball
package_sdist_link: str

#: the pypi inspector link(s) to the wheel(s)
package_whl_links: list[str]

#: a mapping of inspector links to whether they are reachable
package_link_reachability: dict[str, bool]

def __bool__(self) -> bool:
"""Determine if this inspector object is empty."""
if (self.package_sdist_link or self.package_whl_links) and self.package_link_reachability:
return True
return False


@dataclass
class PyPIPackageJsonAsset:
"""The package JSON hosted on the PyPI registry."""
Expand All @@ -465,6 +497,9 @@ class PyPIPackageJsonAsset:
#: the source code temporary location name
package_sourcecode_path: str

#: the pypi inspector information about this package
inspector_asset: PyPIInspectorAsset

#: The size of the asset (in bytes). This attribute is added to match the AssetLocator
#: protocol and is not used because pypi API registry does not provide it.
@property
Expand Down Expand Up @@ -718,6 +753,91 @@ def get_sha256(self) -> str | None:
logger.debug("Found sha256 hash: %s", artifact_hash)
return artifact_hash

def get_inspector_links(self) -> bool:
"""Generate PyPI inspector links for this package version's distributions and fill in the inspector asset.

Returns
-------
bool
True if the link generation was successful, False otherwise.
"""
if self.inspector_asset:
return True

if not self.package_json and not self.download(""):
logger.warning("No package metadata available, cannot get links")
return False

releases = self.get_releases()
if releases is None:
logger.warning("Package has no releases, cannot create inspector links.")
return False

version = self.component_version
if self.component_version is None:
version = self.get_latest_version()

if version is None:
logger.warning("No version set, and no latest version exists. cannot create inspector links.")
return False

distributions = json_extract(releases, [version], list)

if not distributions:
logger.warning(
"Package has no distributions for release version %s. Cannot create inspector links.", version
)
return False

for distribution in distributions:
package_type = json_extract(distribution, ["packagetype"], str)
if package_type is None:
logger.warning("The version %s has no 'package type' field in a distribution", version)
continue

name = json_extract(self.package_json, ["info", "name"], str)
if name is None:
logger.warning("The version %s has no 'name' field in a distribution", version)
continue

blake2b_256 = json_extract(distribution, ["digests", "blake2b_256"], str)
if blake2b_256 is None:
logger.warning("The version %s has no 'blake2b_256' field in a distribution", version)
continue

filename = json_extract(distribution, ["filename"], str)
if filename is None:
logger.warning("The version %s has no 'filename' field in a distribution", version)
continue

link = INSPECTOR_TEMPLATE.format(
inspector_url_scheme=self.pypi_registry.inspector_url_scheme,
inspector_url_netloc=self.pypi_registry.inspector_url_netloc,
name=name,
version=version,
first=blake2b_256[0:2],
second=blake2b_256[2:4],
rest=blake2b_256[4:],
filename=filename,
)

# use a head request because we don't care about the response contents
reachable = False
if send_head_http_raw(link):
reachable = True # link was reachable

if package_type == "sdist":
self.inspector_asset.package_sdist_link = link
self.inspector_asset.package_link_reachability[link] = reachable
elif package_type == "bdist_wheel":
self.inspector_asset.package_whl_links.append(link)
self.inspector_asset.package_link_reachability[link] = reachable
else: # no other package types exist, so else statement should never occur
logger.debug("Unknown package distribution type: %s", package_type)

# if all distributions were invalid and went along a 'continue' path
return bool(self.inspector_asset)


def find_or_create_pypi_asset(
asset_name: str, asset_version: str | None, pypi_registry_info: PackageRegistryInfo
Expand Down Expand Up @@ -755,6 +875,8 @@ def find_or_create_pypi_asset(
logger.debug("Failed to create PyPIPackageJson asset.")
return None

asset = PyPIPackageJsonAsset(asset_name, asset_version, False, package_registry, {}, "")
asset = PyPIPackageJsonAsset(
asset_name, asset_version, False, package_registry, {}, "", PyPIInspectorAsset("", [], {})
)
pypi_registry_info.metadata.append(asset)
return asset
5 changes: 3 additions & 2 deletions tests/malware_analyzer/pypi/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains test configurations for malware analyzer."""
Expand All @@ -8,7 +8,7 @@
import pytest

from macaron.database.table_definitions import Analysis, Component, RepoFinderMetadata
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset, PyPIRegistry
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIInspectorAsset, PyPIPackageJsonAsset, PyPIRegistry


@pytest.fixture(autouse=True)
Expand All @@ -26,4 +26,5 @@ def pypi_package_json() -> MagicMock:
pypi_package.component = Component(
purl="pkg:pypi/package", analysis=Analysis(), repository=None, repo_finder_metadata=RepoFinderMetadata()
)
pypi_package.inspector_asset = MagicMock(spec=PyPIInspectorAsset)
return pypi_package
Loading
Loading