Skip to content

Commit 4b61b91

Browse files
committed
refactor: use Schema.from_dict in task transforms
1 parent 8cbe491 commit 4b61b91

File tree

1 file changed

+185
-147
lines changed

1 file changed

+185
-147
lines changed

src/taskgraph/transforms/task.py

Lines changed: 185 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,13 @@ def run_task_suffix():
5050
return hash_path(RUN_TASK)[0:20]
5151

5252

53-
class WorkerSchema(Schema, forbid_unknown_fields=False, kw_only=True):
54-
# The worker implementation type.
55-
implementation: str
53+
WORKER_SCHEMA = Schema.from_dict(
54+
{
55+
# The worker implementation type.
56+
"implementation": str,
57+
},
58+
forbid_unknown_fields=False,
59+
)
5660

5761

5862
#: Schema for the task transforms
@@ -134,7 +138,7 @@ class TaskDescriptionSchema(Schema):
134138
# the release promotion phase that this task belongs to.
135139
shipping_phase: Optional[Literal["build", "promote", "push", "ship"]] = None
136140
# Information specific to the worker implementation that will run this task.
137-
worker: Optional[WorkerSchema] = None
141+
worker: Optional[WORKER_SCHEMA] = None
138142

139143
def __post_init__(self):
140144
if self.dependencies:
@@ -241,58 +245,66 @@ def verify_index(config, index):
241245
DockerImage = Union[str, dict[str, str]]
242246

243247

244-
class DockerWorkerCacheEntry(Schema):
245-
# only one type is supported by any of the workers right now
246-
type: Literal["persistent"] = "persistent"
247-
# name of the cache, allowing reuse by subsequent tasks naming the same cache
248-
name: Optional[str] = None
249-
# location in the task image where the cache will be mounted
250-
mount_point: Optional[str] = None
251-
# Whether the cache is not used in untrusted environments (like the Try repo).
252-
skip_untrusted: Optional[bool] = None
253-
254-
255-
class DockerWorkerArtifact(Schema):
256-
# type of artifact -- simple file, or recursive directory, or a volume mounted directory.
257-
type: Optional[Literal["file", "directory", "volume"]] = None
258-
# task image path from which to read artifact
259-
path: Optional[str] = None
260-
# name of the produced artifact (root of the names for type=directory)
261-
name: Optional[str] = None
262-
263-
264-
class DockerWorkerPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True):
265-
implementation: Literal["docker-worker"]
266-
os: Literal["linux"]
267-
# For tasks that will run in docker-worker, this is the name of the docker
268-
# image or in-tree docker image to run the task in.
269-
docker_image: DockerImage
270-
# worker features that should be enabled
271-
relengapi_proxy: bool
272-
chain_of_trust: bool
273-
taskcluster_proxy: bool
274-
allow_ptrace: bool
275-
loopback_video: bool
276-
# environment variables
277-
env: dict[str, taskref_or_string_msgspec]
278-
# the maximum time to run, in seconds
279-
max_run_time: int
280-
# Paths to Docker volumes.
281-
volumes: Optional[list[str]] = None
282-
# caches to set up for the task
283-
caches: Optional[list[DockerWorkerCacheEntry]] = None
284-
# artifacts to extract from the task image after completion
285-
artifacts: Optional[list[DockerWorkerArtifact]] = None
286-
# the command to run; if not given, docker-worker will default to the
287-
# command in the docker image
288-
command: Optional[list[taskref_or_string_msgspec]] = None
289-
# the exit status code(s) that indicates the task should be retried
290-
retry_exit_status: Optional[list[int]] = None
291-
# the exit status code(s) that indicates the caches used by the task
292-
# should be purged
293-
purge_caches_exit_status: Optional[list[int]] = None
294-
# Whether any artifacts are assigned to this worker
295-
skip_artifacts: Optional[bool] = None
248+
DOCKER_WORKER_CACHE_ENTRY = Schema.from_dict(
249+
{
250+
# only one type is supported by any of the workers right now
251+
"type": (Literal["persistent"], "persistent"),
252+
# name of the cache, allowing reuse by subsequent tasks naming the same cache
253+
"name": (str, None),
254+
# location in the task image where the cache will be mounted
255+
"mount-point": (str, None),
256+
# Whether the cache is not used in untrusted environments (like the Try repo).
257+
"skip-untrusted": (bool, None),
258+
},
259+
)
260+
261+
DOCKER_WORKER_ARTIFACT = Schema.from_dict(
262+
{
263+
# type of artifact -- simple file, or recursive directory, or a volume mounted directory.
264+
"type": (Optional[Literal["file", "directory", "volume"]], None),
265+
# task image path from which to read artifact
266+
"path": (str, None),
267+
# name of the produced artifact (root of the names for type=directory)
268+
"name": (str, None),
269+
},
270+
)
271+
272+
DockerWorkerPayloadSchema = Schema.from_dict(
273+
{
274+
"implementation": Literal["docker-worker"],
275+
"os": Literal["linux"],
276+
# For tasks that will run in docker-worker, this is the name of the docker
277+
# image or in-tree docker image to run the task in.
278+
"docker-image": DockerImage,
279+
# worker features that should be enabled
280+
"relengapi-proxy": bool,
281+
"chain-of-trust": bool,
282+
"taskcluster-proxy": bool,
283+
"allow-ptrace": bool,
284+
"loopback-video": bool,
285+
# environment variables
286+
"env": dict[str, taskref_or_string_msgspec],
287+
# the maximum time to run, in seconds
288+
"max-run-time": int,
289+
# Paths to Docker volumes.
290+
"volumes": (list[str], None),
291+
# caches to set up for the task
292+
"caches": (list[DOCKER_WORKER_CACHE_ENTRY], None),
293+
# artifacts to extract from the task image after completion
294+
"artifacts": (list[DOCKER_WORKER_ARTIFACT], None),
295+
# the command to run; if not given, docker-worker will default to the
296+
# command in the docker image
297+
"command": (list[taskref_or_string_msgspec], None),
298+
# the exit status code(s) that indicates the task should be retried
299+
"retry-exit-status": (list[int], None),
300+
# the exit status code(s) that indicates the caches used by the task
301+
# should be purged
302+
"purge-caches-exit-status": (list[int], None),
303+
# Whether any artifacts are assigned to this worker
304+
"skip-artifacts": (bool, None),
305+
},
306+
forbid_unknown_fields=False,
307+
)
296308

297309

298310
@payload_builder("docker-worker", schema=DockerWorkerPayloadSchema)
@@ -497,69 +509,78 @@ def build_docker_worker_payload(config, task, task_def):
497509
payload["capabilities"] = capabilities
498510

499511

500-
class GenericWorkerArtifact(Schema):
501-
# type of artifact -- simple file, or recursive directory
502-
type: Literal["file", "directory"]
503-
# filesystem path from which to read artifact
504-
path: str
505-
# if not specified, path is used for artifact name
506-
name: Optional[str] = None
507-
508-
509-
class MountContentSchema(Schema):
510-
# Artifact name that contains the content.
511-
artifact: Optional[str] = None
512-
# Task ID that has the artifact that contains the content.
513-
task_id: Optional[taskref_or_string_msgspec] = None
514-
# URL that supplies the content in response to an unauthenticated GET request.
515-
url: Optional[str] = None
516-
517-
518-
class MountSchema(Schema):
519-
# A unique name for the cache volume.
520-
cache_name: Optional[str] = None
521-
# Optional content for pre-loading cache, or mandatory content for
522-
# read-only file or directory.
523-
content: Optional[MountContentSchema] = None
524-
# If mounting a cache or read-only directory.
525-
directory: Optional[str] = None
526-
# If mounting a file.
527-
file: Optional[str] = None
528-
# Archive format of the content.
529-
format: Optional[Literal["rar", "tar.bz2", "tar.gz", "zip"]] = None
530-
531-
532-
class GenericWorkerPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True):
533-
implementation: Literal["generic-worker"]
534-
os: Literal["windows", "macosx", "linux", "linux-bitbar"]
535-
# command is a list of commands to run, sequentially
536-
# On Windows, each command is a string; on Linux/OS X, each command is a string array
537-
command: list
538-
# environment variables
539-
env: dict[str, taskref_or_string_msgspec]
540-
# the maximum time to run, in seconds
541-
max_run_time: int
542-
# optional features
543-
chain_of_trust: bool
544-
# artifacts to extract from the task image after completion
545-
artifacts: Optional[list[GenericWorkerArtifact]] = None
546-
# Directories and/or files to be mounted.
547-
mounts: Optional[list[MountSchema]] = None
548-
# the exit status code(s) that indicates the task should be retried
549-
retry_exit_status: Optional[list[int]] = None
550-
# the exit status code(s) that indicates the caches used by the task
551-
# should be purged
552-
purge_caches_exit_status: Optional[list[int]] = None
553-
# os user groups for test task workers
554-
os_groups: Optional[list[str]] = None
555-
# feature for test task to run as administrator
556-
run_as_administrator: Optional[bool] = None
557-
# feature for task to run as current OS user
558-
run_task_as_current_user: Optional[bool] = None
559-
taskcluster_proxy: Optional[bool] = None
560-
hide_cmd_window: Optional[bool] = None
561-
# Whether any artifacts are assigned to this worker
562-
skip_artifacts: Optional[bool] = None
512+
GENERIC_WORKER_ARTIFACT = Schema.from_dict(
513+
{
514+
# type of artifact -- simple file, or recursive directory
515+
"type": Literal["file", "directory"],
516+
# filesystem path from which to read artifact
517+
"path": str,
518+
# if not specified, path is used for artifact name
519+
"name": (str, None),
520+
},
521+
)
522+
523+
MOUNT_SCHEMA = Schema.from_dict(
524+
{
525+
# A unique name for the cache volume.
526+
"cache-name": (str, None),
527+
# Optional content for pre-loading cache, or mandatory content for
528+
# read-only file or directory.
529+
"content": Schema.from_dict(
530+
{
531+
# Artifact name that contains the content.
532+
"artifact": (str, None),
533+
# Task ID that has the artifact that contains the content.
534+
"task-id": (taskref_or_string_msgspec, None),
535+
# URL that supplies the content in response to an unauthenticated GET request.
536+
"url": (str, None),
537+
},
538+
optional=True,
539+
),
540+
# If mounting a cache or read-only directory.
541+
"directory": (str, None),
542+
# If mounting a file.
543+
"file": (str, None),
544+
# Archive format of the content.
545+
"format": (Optional[Literal["rar", "tar.bz2", "tar.gz", "zip"]], None),
546+
},
547+
)
548+
549+
GenericWorkerPayloadSchema = Schema.from_dict(
550+
{
551+
"implementation": Literal["generic-worker"],
552+
"os": Literal["windows", "macosx", "linux", "linux-bitbar"],
553+
# command is a list of commands to run, sequentially
554+
# On Windows, each command is a string; on Linux/OS X, each command is a string array
555+
"command": list,
556+
# environment variables
557+
"env": dict[str, taskref_or_string_msgspec],
558+
# the maximum time to run, in seconds
559+
"max-run-time": int,
560+
# optional features
561+
"chain-of-trust": bool,
562+
# artifacts to extract from the task image after completion
563+
"artifacts": (list[GENERIC_WORKER_ARTIFACT], None),
564+
# Directories and/or files to be mounted.
565+
"mounts": (list[MOUNT_SCHEMA], None),
566+
# the exit status code(s) that indicates the task should be retried
567+
"retry-exit-status": (list[int], None),
568+
# the exit status code(s) that indicates the caches used by the task
569+
# should be purged
570+
"purge-caches-exit-status": (list[int], None),
571+
# os user groups for test task workers
572+
"os-groups": (list[str], None),
573+
# feature for test task to run as administrator
574+
"run-as-administrator": (bool, None),
575+
# feature for task to run as current OS user
576+
"run-task-as-current-user": (bool, None),
577+
"taskcluster-proxy": (bool, None),
578+
"hide-cmd-window": (bool, None),
579+
# Whether any artifacts are assigned to this worker
580+
"skip-artifacts": (bool, None),
581+
},
582+
forbid_unknown_fields=False,
583+
)
563584

564585

565586
@payload_builder("generic-worker", schema=GenericWorkerPayloadSchema)
@@ -677,13 +698,16 @@ def build_generic_worker_payload(config, task, task_def):
677698
task_def["payload"]["features"] = features
678699

679700

680-
class ReleaseProperties(Schema):
681-
app_name: Optional[str] = None
682-
app_version: Optional[str] = None
683-
branch: Optional[str] = None
684-
build_id: Optional[str] = None
685-
hash_type: Optional[str] = None
686-
platform: Optional[str] = None
701+
RELEASE_PROPERTIES = Schema.from_dict(
702+
{
703+
"app-name": (str, None),
704+
"app-version": (str, None),
705+
"branch": (str, None),
706+
"build-id": (str, None),
707+
"hash-type": (str, None),
708+
"platform": (str, None),
709+
},
710+
)
687711

688712

689713
class UpstreamArtifact(Schema, rename="camel"):
@@ -697,18 +721,22 @@ class UpstreamArtifact(Schema, rename="camel"):
697721
locale: str
698722

699723

700-
class BeetmoverPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True):
701-
implementation: Literal["beetmover"]
702-
# the maximum time to run, in seconds
703-
max_run_time: int
704-
# release properties
705-
release_properties: ReleaseProperties
706-
# list of artifact URLs for the artifacts that should be beetmoved
707-
upstream_artifacts: list[UpstreamArtifact]
708-
# locale key, if this is a locale beetmover task
709-
locale: Optional[str] = None
710-
partner_public: Optional[bool] = None
711-
artifact_map: Optional[object] = None
724+
BeetmoverPayloadSchema = Schema.from_dict(
725+
{
726+
"implementation": Literal["beetmover"],
727+
# the maximum time to run, in seconds
728+
"max-run-time": int,
729+
# release properties
730+
"release-properties": RELEASE_PROPERTIES,
731+
# list of artifact URLs for the artifacts that should be beetmoved
732+
"upstream-artifacts": list[UpstreamArtifact],
733+
# locale key, if this is a locale beetmover task
734+
"locale": (str, None),
735+
"partner-public": (bool, None),
736+
"artifact-map": (object, None),
737+
},
738+
forbid_unknown_fields=False,
739+
)
712740

713741

714742
@payload_builder("beetmover", schema=BeetmoverPayloadSchema)
@@ -737,23 +765,33 @@ def build_beetmover_payload(config, task, task_def):
737765
task_def["payload"]["is_partner_repack_public"] = worker["partner-public"]
738766

739767

740-
class InvalidPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True):
741-
# an invalid task is one which should never actually be created; this is used in
742-
# release automation on branches where the task just doesn't make sense
743-
implementation: Literal["invalid"]
768+
InvalidPayloadSchema = Schema.from_dict(
769+
{
770+
# an invalid task is one which should never actually be created; this is used in
771+
# release automation on branches where the task just doesn't make sense
772+
"implementation": Literal["invalid"],
773+
},
774+
forbid_unknown_fields=False,
775+
)
744776

745777

746778
@payload_builder("invalid", schema=InvalidPayloadSchema)
747779
def build_invalid_payload(config, task, task_def):
748780
task_def["payload"] = "invalid task - should never be created"
749781

750782

751-
class AlwaysOptimizedPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True):
752-
implementation: Literal["always-optimized"]
753-
783+
AlwaysOptimizedPayloadSchema = Schema.from_dict(
784+
{
785+
"implementation": Literal["always-optimized"],
786+
},
787+
forbid_unknown_fields=False,
788+
)
754789

755-
class SucceedPayloadSchema(Schema):
756-
implementation: Literal["succeed"]
790+
SucceedPayloadSchema = Schema.from_dict(
791+
{
792+
"implementation": Literal["succeed"],
793+
},
794+
)
757795

758796

759797
@payload_builder("always-optimized", schema=AlwaysOptimizedPayloadSchema)

0 commit comments

Comments
 (0)