Skip to content

Update apt_info.py #250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 140 additions & 60 deletions apt_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,107 +22,187 @@
Daniel Swarbrick <[email protected]>
"""

import apt
import apt_pkg
import collections
import os
from prometheus_client import CollectorRegistry, Gauge, generate_latest

_UpgradeInfo = collections.namedtuple("_UpgradeInfo", ["labels", "count"])


def _convert_candidates_to_upgrade_infos(candidates):
changes_dict = collections.defaultdict(lambda: collections.defaultdict(int))
import sys
from typing import Dict, Iterable, List, NamedTuple
# Import defaultdict for explicit type hinting
from collections import defaultdict

# Gracefully handle missing dependencies, which is a common operational issue.
try:
import apt
import apt_pkg
from prometheus_client import CollectorRegistry, Gauge, generate_latest
except ImportError as e:
sys.stderr.write(f"Error: Missing required library. {e}\n")
sys.stderr.write("Please install python3-apt and python3-prometheus-client.\n")
sys.exit(1)


# FIX: Renamed 'count' to 'pkg_count' to avoid name collision with the
# built-in tuple.count() method, which resolves the mypy error.
class UpgradeInfo(NamedTuple):
labels: Dict[str, str]
pkg_count: int


def _convert_candidates_to_upgrade_infos(
candidates: Iterable[apt.package.Package],
) -> List[UpgradeInfo]:
"""Groups package candidates by origin and architecture."""
changes_dict: defaultdict[str, defaultdict[str, int]] = collections.defaultdict(
lambda: collections.defaultdict(int)
)

for candidate in candidates:
origins = sorted(
{f"{o.origin}:{o.codename}/{o.archive}" for o in candidate.origins}
)
changes_dict[",".join(origins)][candidate.architecture] += 1
# Gracefully handle packages with no origins.
if not candidate.origins:
origin_str = "unknown"
else:
origins = sorted(
{f"{o.origin}:{o.codename}/{o.archive}" for o in candidate.origins}
)
origin_str = ",".join(origins)

changes_list = list()
for origin in sorted(changes_dict.keys()):
for arch in sorted(changes_dict[origin].keys()):
changes_dict[origin_str][candidate.architecture] += 1

changes_list = []
for origin, arch_counts in sorted(changes_dict.items()):
for arch, count in sorted(arch_counts.items()):
changes_list.append(
_UpgradeInfo(
UpgradeInfo(
labels=dict(origin=origin, arch=arch),
count=changes_dict[origin][arch],
# Use the new field name 'pkg_count'.
pkg_count=count,
)
)

return changes_list


def _write_pending_upgrades(registry, cache):
candidates = {
p.candidate for p in cache if p.is_upgradable
}
def _write_pending_upgrades(
registry: CollectorRegistry, cache: apt.cache.Cache
) -> None:
"""Exposes metrics for all pending upgrades."""
candidates = {p.candidate for p in cache if p.is_upgradable}
upgrade_list = _convert_candidates_to_upgrade_infos(candidates)

if upgrade_list:
g = Gauge('apt_upgrades_pending', "Apt packages pending updates by origin",
['origin', 'arch'], registry=registry)
g = Gauge(
"apt_upgrades_pending",
"Apt packages pending updates by origin",
["origin", "arch"],
registry=registry,
)
for change in upgrade_list:
g.labels(change.labels['origin'], change.labels['arch']).set(change.count)
# Use the new field name 'pkg_count'.
g.labels(**change.labels).set(change.pkg_count)


def _write_held_upgrades(registry, cache):
def _write_held_upgrades(registry: CollectorRegistry, cache: apt.cache.Cache) -> None:
"""Exposes metrics for held packages that could be upgraded."""
held_candidates = {
p.candidate for p in cache
p.candidate
for p in cache
if p.is_upgradable and p._pkg.selected_state == apt_pkg.SELSTATE_HOLD
}
upgrade_list = _convert_candidates_to_upgrade_infos(held_candidates)

if upgrade_list:
g = Gauge('apt_upgrades_held', "Apt packages pending updates but held back.",
['origin', 'arch'], registry=registry)
g = Gauge(
"apt_upgrades_held",
"Apt packages pending updates but held back",
["origin", "arch"],
registry=registry,
)
for change in upgrade_list:
g.labels(change.labels['origin'], change.labels['arch']).set(change.count)
# Use the new field name 'pkg_count'.
g.labels(**change.labels).set(change.pkg_count)


def _write_autoremove_pending(registry, cache):
def _write_autoremove_pending(
registry: CollectorRegistry, cache: apt.cache.Cache
) -> None:
"""Exposes metrics for packages pending autoremoval."""
autoremovable_packages = {p for p in cache if p.is_auto_removable}
g = Gauge('apt_autoremove_pending', "Apt packages pending autoremoval.",
registry=registry)
g = Gauge(
"apt_autoremove_pending",
"Apt packages pending autoremoval",
registry=registry,
)
g.set(len(autoremovable_packages))


def _write_cache_timestamps(registry):
g = Gauge('apt_package_cache_timestamp_seconds', "Apt update last run time.", registry=registry)
apt_pkg.init_config()
if (
apt_pkg.config.find_b("APT::Periodic::Update-Package-Lists") and
os.path.isfile("/var/lib/apt/periodic/update-success-stamp")
):
# if we run updates automatically with APT::Periodic, we can
# check this timestamp file if it exists
stamp_file = "/var/lib/apt/periodic/update-success-stamp"
else:
# if not, let's just fallback on the partial file of the lists directory
stamp_file = '/var/lib/apt/lists/partial'
def _write_cache_timestamps(registry: CollectorRegistry) -> None:
"""Exposes the timestamp of the last successful apt update."""
g = Gauge(
"apt_package_cache_timestamp_seconds",
"Apt update last run time",
registry=registry,
)
stamp_file = None
try:
g.set(os.stat(stamp_file).st_mtime)
except OSError:
pass


def _write_reboot_required(registry):
g = Gauge('node_reboot_required', "Node reboot is required for software updates.",
registry=registry)
g.set(int(os.path.isfile('/run/reboot-required')))
apt_pkg.init_config()
# Prefer the official periodic update stamp file if it exists.
periodic_stamp = "/var/lib/apt/periodic/update-success-stamp"
if (
apt_pkg.config.find_b("APT::Periodic::Update-Package-Lists", "0") != "0"
and os.path.isfile(periodic_stamp)
):
stamp_file = periodic_stamp
else:
# Fallback to the partial directory mtime as a less accurate indicator.
stamp_file = "/var/lib/apt/lists/partial"


def _main():
cache = apt.cache.Cache()
g.set(os.stat(stamp_file).st_mtime)
except FileNotFoundError:
# This is a common case if apt update has never run; not an error.
sys.stderr.write(f"Warning: Timestamp file not found: {stamp_file}\n")
except OSError as e:
# This indicates a more serious issue, like a permissions error.
sys.stderr.write(f"Warning: Could not read timestamp file {stamp_file}: {e}\n")


def _write_reboot_required(registry: CollectorRegistry) -> None:
"""Exposes a metric indicating if a reboot is required."""
g = Gauge(
"node_reboot_required",
"Node reboot is required for software updates",
registry=registry,
)
g.set(float(os.path.isfile("/run/reboot-required")))


def main() -> int:
"""Main entry point for the script."""
try:
# Explicitly open the cache to catch errors early.
# The 'progress' argument is omitted for non-interactive use.
cache = apt.cache.Cache()
cache.open()
except apt.cache.LockingFailedException as e:
sys.stderr.write(
f"Error: Failed to lock apt cache. Is another apt process running? {e}\n"
)
return 1
except SystemError as e:
sys.stderr.write(
f"Error: Failed to initialize apt cache. Check permissions and configuration. {e}\n"
)
return 1

registry = CollectorRegistry()
_write_pending_upgrades(registry, cache)
_write_held_upgrades(registry, cache)
_write_autoremove_pending(registry, cache)
_write_cache_timestamps(registry)
_write_reboot_required(registry)
print(generate_latest(registry).decode(), end='')

# Print the metrics to standard output.
print(generate_latest(registry).decode("utf-8"), end="")
return 0


if __name__ == "__main__":
_main()
sys.exit(main())