Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(BA-820): Add force, noprune options to PurgeImages GQL API #3987

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions changes/3987.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `force`, `noprune` options to `PurgeImages` GQL API, and allow `PurgeImages` to be performed on multiple agents (breaking change).
23 changes: 20 additions & 3 deletions docs/manager/graphql-reference/schema.graphql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
schema {

Check failure on line 1 in docs/manager/graphql-reference/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

Type 'PurgeImages' was removed

Type 'PurgeImages' was removed
query: Queries
mutation: Mutations
}
Expand Down Expand Up @@ -1984,7 +1984,7 @@
clear_images(registry: String): ClearImages

"""Added in 25.4.0"""
purge_images(agent_id: String!, images: [ImageRefType]!): PurgeImages
purge_images(keys: [PurgeImagesKey]!, options: PurgeImagesOptions = {force: false, noprune: false}): PurgeImagesPayload

Check failure on line 1987 in docs/manager/graphql-reference/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

Field 'Mutations.purge_images' changed type from 'PurgeImages' to 'PurgeImagesPayload'

Field 'Mutations.purge_images' changed type from 'PurgeImages' to 'PurgeImagesPayload'

Check failure on line 1987 in docs/manager/graphql-reference/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

New arguments must include a description with a version number in the format "Added in XX.XX.X." or "Added in XX.X.X.", Argument 'keys: [PurgeImagesKey]!' added to field 'Mutations.purge_images'

New arguments must include a description with a version number in the format "Added in XX.XX.X." or "Added in XX.X.X."

Check failure on line 1987 in docs/manager/graphql-reference/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

New arguments must include a description with a version number in the format "Added in XX.XX.X." or "Added in XX.X.X.", Argument 'options: PurgeImagesOptions' (with default value) added to field 'Mutations.purge_images'

New arguments must include a description with a version number in the format "Added in XX.XX.X." or "Added in XX.X.X."

Check failure on line 1987 in docs/manager/graphql-reference/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

Argument 'agent_id: String!' was removed from field 'Mutations.purge_images'

Removing a field argument is a breaking change because it will cause existing queries that use this argument to error.

Check failure on line 1987 in docs/manager/graphql-reference/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

Argument 'images: [ImageRefType]!' was removed from field 'Mutations.purge_images'

Removing a field argument is a breaking change because it will cause existing queries that use this argument to error.

"""Added in 24.09.0."""
modify_compute_session(input: ModifyComputeSessionInput!): ModifyComputeSessionPayload
Expand Down Expand Up @@ -2584,17 +2584,34 @@
msg: String
}

"""Added in 25.4.0."""
type PurgeImages {
"""Added in 25.5.0."""
type PurgeImagesPayload {

Check notice on line 2588 in docs/manager/graphql-reference/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

Type 'PurgeImagesPayload' was added

Type 'PurgeImagesPayload' was added
task_id: String
}

"""Added in 25.5.0."""
input PurgeImagesKey {

Check notice on line 2593 in docs/manager/graphql-reference/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

Type 'PurgeImagesKey' was added

Type 'PurgeImagesKey' was added
agent_id: String!
images: [ImageRefType]!
}

input ImageRefType {
name: String!
registry: String
architecture: String
}

"""Added in 25.5.0."""
input PurgeImagesOptions {

Check notice on line 2605 in docs/manager/graphql-reference/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

Type 'PurgeImagesOptions' was added

Type 'PurgeImagesOptions' was added
"""
Remove the images even if it is being used by stopped containers or has other tags, Added in 25.5.0.
"""
force: Boolean = false

"""Don't delete untagged parent images, Added in 25.5.0."""
noprune: Boolean = false
}

"""Added in 24.09.0."""
type ModifyComputeSessionPayload {
item: ComputeSessionNode
Expand Down
8 changes: 3 additions & 5 deletions src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
from ai.backend.common.config import model_definition_iv
from ai.backend.common.defs import REDIS_STATISTICS_DB, REDIS_STREAM_DB, RedisRole
from ai.backend.common.docker import MAX_KERNELSPEC, MIN_KERNELSPEC, ImageRef
from ai.backend.common.dto.agent.response import PurgeImageResponses
from ai.backend.common.dto.agent.response import PurgeImagesResp
from ai.backend.common.dto.manager.rpc_request import PurgeImagesReq
from ai.backend.common.events import (
AbstractEvent,
AgentErrorEvent,
Expand Down Expand Up @@ -1667,10 +1668,7 @@ async def pull_image(
"""

@abstractmethod
async def purge_images(
self,
images: list[str],
) -> PurgeImageResponses:
async def purge_images(self, request: PurgeImagesReq) -> PurgeImagesResp:
"""
Purge the given images from the agent.
"""
Expand Down
37 changes: 28 additions & 9 deletions src/ai/backend/agent/docker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import struct
import sys
from collections.abc import Iterable, Mapping, MutableMapping, Sequence
from dataclasses import dataclass
from decimal import Decimal
from functools import partial
from io import StringIO
Expand Down Expand Up @@ -49,7 +50,8 @@
from ai.backend.common import redis_helper
from ai.backend.common.cgroup import get_cgroup_mount_point
from ai.backend.common.docker import MAX_KERNELSPEC, MIN_KERNELSPEC, ImageRef
from ai.backend.common.dto.agent.response import PurgeImageResponse, PurgeImageResponses
from ai.backend.common.dto.agent.response import PurgeImageResp, PurgeImagesResp
from ai.backend.common.dto.manager.rpc_request import PurgeImagesReq
from ai.backend.common.events import EventProducer, KernelLifecycleEventReason
from ai.backend.common.exception import ImageNotAvailable, InvalidImageName, InvalidImageTag
from ai.backend.common.plugin.monitor import ErrorPluginContext, StatsPluginContext
Expand Down Expand Up @@ -182,6 +184,13 @@ def _DockerContainerError_reduce(self):
)


@dataclass
class DockerPurgeImageReq:
image: str
force: bool
noprune: bool


class DockerKernelCreationContext(AbstractKernelCreationContext[DockerKernel]):
scratch_dir: Path
tmp_dir: Path
Expand Down Expand Up @@ -1687,25 +1696,35 @@ async def pull_image(
elif error := result[-1].get("error"):
raise RuntimeError(f"Failed to pull image: {error}")

async def _purge_image(self, docker: Docker, image: str) -> PurgeImageResponse:
async def _purge_image(self, docker: Docker, request: DockerPurgeImageReq) -> PurgeImageResp:
try:
await docker.images.delete(image)
return PurgeImageResponse.success(image=image)
await docker.images.delete(request.image, force=request.force, noprune=request.noprune)
return PurgeImageResp.success(image=request.image)
except Exception as e:
log.error(f'Failed to purge image "{image}": {e}')
return PurgeImageResponse.failure(image=image, error=str(e))
log.error(f'Failed to purge image "{request.image}": {e}')
return PurgeImageResp.failure(image=request.image, error=str(e))

async def purge_images(self, images: list[str]) -> PurgeImageResponses:
async def purge_images(self, request: PurgeImagesReq) -> PurgeImagesResp:
async with closing_async(Docker()) as docker:
async with TaskGroup() as tg:
tasks = [tg.create_task(self._purge_image(docker, image)) for image in images]
tasks = [
tg.create_task(
self._purge_image(
docker,
DockerPurgeImageReq(
image=image, force=request.force, noprune=request.noprune
),
)
)
for image in request.images
]

results = []
for task in tasks:
deleted_info = task.result()
results.append(deleted_info)

return PurgeImageResponses(responses=results)
return PurgeImagesResp(responses=results)

async def check_image(
self, image_ref: ImageRef, image_id: str, auto_pull: AutoPullBehavior
Expand Down
10 changes: 4 additions & 6 deletions src/ai/backend/agent/dummy/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

from ai.backend.common.config import read_from_file
from ai.backend.common.docker import ImageRef
from ai.backend.common.dto.agent.response import PurgeImageResponses
from ai.backend.common.dto.agent.response import PurgeImagesResp
from ai.backend.common.dto.manager.rpc_request import PurgeImagesReq
from ai.backend.common.events import EventProducer
from ai.backend.common.types import (
AgentId,
Expand Down Expand Up @@ -305,13 +306,10 @@ async def push_image(
delay = self.dummy_agent_cfg["delay"]["push-image"]
await asyncio.sleep(delay)

async def purge_images(
self,
images: list[str],
) -> PurgeImageResponses:
async def purge_images(self, request: PurgeImagesReq) -> PurgeImagesResp:
delay = self.dummy_agent_cfg["delay"]["purge-images"]
await asyncio.sleep(delay)
return PurgeImageResponses([])
return PurgeImagesResp([])

async def check_image(
self, image_ref: ImageRef, image_id: str, auto_pull: AutoPullBehavior
Expand Down
10 changes: 4 additions & 6 deletions src/ai/backend/agent/kubernetes/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@

from ai.backend.common.asyncio import current_loop
from ai.backend.common.docker import ImageRef
from ai.backend.common.dto.agent.response import PurgeImageResponses
from ai.backend.common.dto.agent.response import PurgeImagesResp
from ai.backend.common.dto.manager.rpc_request import PurgeImagesReq
from ai.backend.common.etcd import AsyncEtcd
from ai.backend.common.events import EventProducer
from ai.backend.common.plugin.monitor import ErrorPluginContext, StatsPluginContext
Expand Down Expand Up @@ -1025,12 +1026,9 @@ async def pull_image(
# TODO: Add support for appropriate image pulling mechanism on K8s
pass

async def purge_images(
self,
images: list[str],
) -> PurgeImageResponses:
async def purge_images(self, request: PurgeImagesReq) -> PurgeImagesResp:
# TODO: Add support for appropriate image purging mechanism on K8s
return PurgeImageResponses([])
return PurgeImagesResp([])

async def check_image(
self, image_ref: ImageRef, image_id: str, auto_pull: AutoPullBehavior
Expand Down
14 changes: 9 additions & 5 deletions src/ai/backend/agent/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
from ai.backend.common.auth import AgentAuthHandler, PublicKey, SecretKey
from ai.backend.common.bgtask import ProgressReporter
from ai.backend.common.docker import ImageRef
from ai.backend.common.dto.agent.response import AbstractAgentResponse, PurgeImageResponses
from ai.backend.common.dto.agent.response import AbstractAgentResp, PurgeImagesResp
from ai.backend.common.dto.manager.rpc_request import PurgeImagesReq
from ai.backend.common.etcd import AsyncEtcd, ConfigScopes
from ai.backend.common.events import (
ImagePullFailedEvent,
Expand Down Expand Up @@ -212,7 +213,7 @@ def __init__(self) -> None:

def __call__(
self,
meth: Callable[..., Coroutine[None, None, AbstractAgentResponse]],
meth: Callable[..., Coroutine[None, None, AbstractAgentResp]],
) -> Callable[[AgentRPCServer, RPCMessage], Coroutine[None, None, Any]]:
@functools.wraps(meth)
@_collect_metrics(self._metric_observer)
Expand Down Expand Up @@ -927,9 +928,12 @@ async def _push_image(reporter: ProgressReporter) -> None:

@rpc_function_v2
@collect_error
async def purge_images(self, images: list[str]) -> PurgeImageResponses:
log.info("rpc::purge_images(images:{0})", images)
return await self.agent.purge_images(images)
async def purge_images(
self, image_canonicals: list[str], force: bool, noprune: bool
) -> PurgeImagesResp:
request = PurgeImagesReq(images=image_canonicals, force=force, noprune=noprune)
log.info("rpc::purge_images(request:{0})", request)
return await self.agent.purge_images(request)

@rpc_function
@collect_error
Expand Down
8 changes: 4 additions & 4 deletions src/ai/backend/common/dto/agent/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@


@dataclass
class AbstractAgentResponse(ABC):
class AbstractAgentResp(ABC):
@abstractmethod
def as_dict(self) -> dict:
raise NotImplementedError


@dataclass
class PurgeImageResponse(AbstractAgentResponse):
class PurgeImageResp(AbstractAgentResp):
image: str
error: Optional[str] = None

Expand All @@ -31,8 +31,8 @@ def failure(cls, image: str, error: str) -> Self:


@dataclass
class PurgeImageResponses(AbstractAgentResponse):
responses: list[PurgeImageResponse]
class PurgeImagesResp(AbstractAgentResp):
responses: list[PurgeImageResp]

@override
def as_dict(self) -> dict:
Expand Down
17 changes: 17 additions & 0 deletions src/ai/backend/common/dto/manager/rpc_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from pydantic import Field

from ai.backend.common.api_handlers import BaseRequestModel


class PurgeImagesReq(BaseRequestModel):
images: list[str] = Field(
description="List of image canonical names to be purged",
)
force: bool = Field(
description="Remove the images even if it is being used by stopped containers or has other tags",
default=False,
)
noprune: bool = Field(
description="Don't delete untagged parent images",
default=False,
)
Loading
Loading