Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,13 @@ def _load_kinds(self, graph_config, target_kind=None):
yield FakeKind(kind_name, "/fake", config, graph_config)


class FakeGraphConfig(GraphConfig):
def register(self):
pass


def fake_load_graph_config(root_dir):
graph_config = GraphConfig(
graph_config = FakeGraphConfig(
{
"trust-domain": "test-domain",
"taskgraph": {
Expand Down Expand Up @@ -103,7 +108,6 @@ def fake_load_graph_config(root_dir):
},
root_dir,
)
graph_config.__dict__["register"] = lambda: None
return graph_config


Expand Down
155 changes: 124 additions & 31 deletions src/taskgraph/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
import copy
import logging
import os
import platform
from concurrent.futures import (
FIRST_COMPLETED,
ProcessPoolExecutor,
wait,
)
from dataclasses import dataclass
from typing import Callable, Dict, Optional, Union

Expand Down Expand Up @@ -46,16 +52,20 @@ def _get_loader(self) -> Callable:
assert callable(loader)
return loader

def load_tasks(self, parameters, loaded_tasks, write_artifacts):
def load_tasks(self, parameters, kind_dependencies_tasks, write_artifacts):
logger.debug(f"Loading tasks for kind {self.name}")

parameters = Parameters(**parameters)
loader = self._get_loader()
config = copy.deepcopy(self.config)

kind_dependencies = config.get("kind-dependencies", [])
kind_dependencies_tasks = {
task.label: task for task in loaded_tasks if task.kind in kind_dependencies
}

inputs = loader(self.name, self.path, config, parameters, loaded_tasks)
inputs = loader(
self.name,
self.path,
config,
parameters,
list(kind_dependencies_tasks.values()),
)

transforms = TransformSequence()
for xform_path in config["transforms"]:
Expand Down Expand Up @@ -89,6 +99,7 @@ def load_tasks(self, parameters, loaded_tasks, write_artifacts):
)
for task_dict in transforms(trans_config, inputs)
]
logger.info(f"Generated {len(tasks)} tasks for kind {self.name}")
return tasks

@classmethod
Expand Down Expand Up @@ -253,6 +264,101 @@ def _load_kinds(self, graph_config, target_kinds=None):
except KindNotFound:
continue

def _load_tasks_serial(self, kinds, kind_graph, parameters):
all_tasks = {}
for kind_name in kind_graph.visit_postorder():
logger.debug(f"Loading tasks for kind {kind_name}")

kind = kinds.get(kind_name)
if not kind:
message = f'Could not find the kind "{kind_name}"\nAvailable kinds:\n'
for k in sorted(kinds):
message += f' - "{k}"\n'
raise Exception(message)

try:
new_tasks = kind.load_tasks(
parameters,
{
k: t
for k, t in all_tasks.items()
if t.kind in kind.config.get("kind-dependencies", [])
},
self._write_artifacts,
)
except Exception:
logger.exception(f"Error loading tasks for kind {kind_name}:")
raise
for task in new_tasks:
if task.label in all_tasks:
raise Exception("duplicate tasks with label " + task.label)
all_tasks[task.label] = task

return all_tasks

def _load_tasks_parallel(self, kinds, kind_graph, parameters):
all_tasks = {}
futures_to_kind = {}
futures = set()
edges = set(kind_graph.edges)

with ProcessPoolExecutor() as executor:

def submit_ready_kinds():
"""Create the next batch of tasks for kinds without dependencies."""
nonlocal kinds, edges, futures
loaded_tasks = all_tasks.copy()
kinds_with_deps = {edge[0] for edge in edges}
ready_kinds = (
set(kinds) - kinds_with_deps - set(futures_to_kind.values())
)
for name in ready_kinds:
kind = kinds.get(name)
if not kind:
message = (
f'Could not find the kind "{name}"\nAvailable kinds:\n'
)
for k in sorted(kinds):
message += f' - "{k}"\n'
raise Exception(message)

future = executor.submit(
kind.load_tasks,
dict(parameters),
{
k: t
for k, t in loaded_tasks.items()
if t.kind in kind.config.get("kind-dependencies", [])
},
self._write_artifacts,
)
futures.add(future)
futures_to_kind[future] = name

submit_ready_kinds()
while futures:
done, _ = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
if exc := future.exception():
executor.shutdown(wait=False, cancel_futures=True)
raise exc
kind = futures_to_kind.pop(future)
futures.remove(future)

for task in future.result():
if task.label in all_tasks:
raise Exception("duplicate tasks with label " + task.label)
all_tasks[task.label] = task

# Update state for next batch of futures.
del kinds[kind]
edges = {e for e in edges if e[1] != kind}

# Submit any newly unblocked kinds
submit_ready_kinds()

return all_tasks

def _run(self):
logger.info("Loading graph configuration.")
graph_config = load_graph_config(self.root_dir)
Expand Down Expand Up @@ -307,31 +413,18 @@ def _run(self):
)

logger.info("Generating full task set")
all_tasks = {}
for kind_name in kind_graph.visit_postorder():
logger.debug(f"Loading tasks for kind {kind_name}")

kind = kinds.get(kind_name)
if not kind:
message = f'Could not find the kind "{kind_name}"\nAvailable kinds:\n'
for k in sorted(kinds):
message += f' - "{k}"\n'
raise Exception(message)
# Current parallel generation relies on multiprocessing, and forking.
# This causes problems on Windows and macOS due to how new processes
# are created there, and how doing so reinitializes global variables
# that are modified earlier in graph generation, that doesn't get
# redone in the new processes. Ideally this would be fixed, or we
# would take another approach to parallel kind generation. In the
# meantime, it's not supported outside of Linux.
if platform.system() != "Linux":
all_tasks = self._load_tasks_serial(kinds, kind_graph, parameters)
else:
all_tasks = self._load_tasks_parallel(kinds, kind_graph, parameters)

try:
new_tasks = kind.load_tasks(
parameters,
list(all_tasks.values()),
self._write_artifacts,
)
except Exception:
logger.exception(f"Error loading tasks for kind {kind_name}:")
raise
for task in new_tasks:
if task.label in all_tasks:
raise Exception("duplicate tasks with label " + task.label)
all_tasks[task.label] = task
logger.info(f"Generated {len(new_tasks)} tasks for kind {kind_name}")
full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset()))
yield self.verify("full_task_set", full_task_set, graph_config, parameters)

Expand Down
19 changes: 15 additions & 4 deletions test/test_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,27 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


from concurrent.futures import ProcessPoolExecutor

import pytest
from pytest_taskgraph import FakeKind, WithFakeKind, fake_load_graph_config
from pytest_taskgraph import WithFakeKind, fake_load_graph_config

from taskgraph import generator, graph
from taskgraph.generator import Kind, load_tasks_for_kind, load_tasks_for_kinds
from taskgraph.loader.default import loader as default_loader


def test_kind_ordering(maketgg):
class FakePPE(ProcessPoolExecutor):
loaded_kinds = []

def submit(self, kind_load_tasks, *args):
self.loaded_kinds.append(kind_load_tasks.__self__.name)
return super().submit(kind_load_tasks, *args)


def test_kind_ordering(mocker, maketgg):
"When task kinds depend on each other, they are loaded in postorder"
mocked_ppe = mocker.patch.object(generator, "ProcessPoolExecutor", new=FakePPE)
tgg = maketgg(
kinds=[
("_fake3", {"kind-dependencies": ["_fake2", "_fake1"]}),
Expand All @@ -21,7 +32,7 @@ def test_kind_ordering(maketgg):
]
)
tgg._run_until("full_task_set")
assert FakeKind.loaded_kinds == ["_fake1", "_fake2", "_fake3"]
assert mocked_ppe.loaded_kinds == ["_fake1", "_fake2", "_fake3"]


def test_full_task_set(maketgg):
Expand Down Expand Up @@ -293,5 +304,5 @@ def test_kind_load_tasks(monkeypatch, graph_config, parameters, datadir, kind_co
kind = Kind(
name="fake", path="foo/bar", config=kind_config, graph_config=graph_config
)
tasks = kind.load_tasks(parameters, [], False)
tasks = kind.load_tasks(parameters, {}, False)
assert tasks
Loading