Skip to content

Commit 8c3f8e3

Browse files
committed
feat: add final build
1 parent c872684 commit 8c3f8e3

File tree

6 files changed

+145
-37
lines changed

6 files changed

+145
-37
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,4 @@ ruff_cache/
114114
output.*
115115
tmp*
116116
test_state
117+
test_file.*

src/scheduler/manager.py

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,61 +16,58 @@
1616

1717

1818
class Scheduler:
19-
"""Scheduler class."""
20-
21-
__slots__ = (
22-
"_completed_tasks",
23-
"_config",
24-
"_context_manager",
25-
"_failed_tasks",
26-
"_lock",
27-
"_state_manager",
28-
"_tasks",
29-
)
19+
"""Task scheduler."""
3020

3121
def __init__(self, context_manager: ContextManagerProtocol, state_manager: StateManagerProtocol) -> None:
3222
self._context_manager = context_manager
3323
self._state_manager = state_manager
3424
self._config = settings.scheduler
3525
self._lock = RLock()
36-
self._tasks: deque[BaseTask] = deque()
26+
self._tasks: deque[BaseTask] = deque(maxlen=self._config.max_concurrent_tasks)
3727
self._completed_tasks: set[UUID] = set()
3828
self._failed_tasks: set[UUID] = set()
3929

4030
def add_task(self, task: BaseTask) -> None:
4131
"""Add task to the scheduler queue."""
4232
self._tasks.append(task)
33+
context = next(self._context_manager.create_context())
34+
next(self._context_manager.associate_task(task.task_id, context.id))
4335

4436
def _can_execute(self, task: BaseTask) -> bool:
45-
"""Check if task can be executed."""
46-
if task.config.start_time and task.config.start_time > get_current_timestamp():
47-
return False
37+
"""Checks if all dependencies are completed and if start time is in the future."""
4838
for dependency in task.dependencies:
4939
if dependency in self._failed_tasks:
5040
task.set_state(TaskState.FAILED)
41+
self._failed_tasks.add(task.task_id)
5142
return False
5243
if dependency not in self._completed_tasks:
5344
return False
54-
return True
45+
return not (task.config.start_time and task.config.start_time > get_current_timestamp())
5546

5647
def _process_task(self, task: BaseTask) -> Generator[None, None, None]:
57-
"""Process task execution."""
58-
yield None
48+
"""Process task execution. Moves task to completed or failed states."""
49+
context = self._context_manager.get_context(task.task_id)
50+
yield from task.execute(context)
51+
match task.state:
52+
case TaskState.COMPLETED:
53+
self._completed_tasks.add(task.task_id)
54+
case TaskState.FAILED:
55+
self._failed_tasks.add(task.task_id)
5956

6057
def run(self) -> Generator[None, None, None]:
6158
"""Run event loop."""
6259
while self._tasks:
6360
task = self._tasks.popleft()
6461
if self._can_execute(task):
6562
yield from self._process_task(task)
66-
else:
63+
elif task.state != TaskState.FAILED:
6764
self._tasks.append(task)
6865

6966
def __enter__(self) -> "Scheduler":
7067
"""Logic when the scheduler starts."""
7168
logger.info("Starting scheduler.")
7269
with self._lock:
73-
self._state_manager.load()
70+
next(self._state_manager.load())
7471
logger.info("Loaded state from storage.")
7572
return self
7673

@@ -79,7 +76,7 @@ def __exit__(
7976
) -> None:
8077
"""Logic when the scheduler exits."""
8178
with self._lock:
82-
self._state_manager.save()
79+
next(self._state_manager.save())
8380
logger.info("Saved state to storage.")
8481
logger.info("Scheduler stopped")
8582
if exc_val:

src/schemas/task.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from pydantic import BaseModel, ConfigDict, Field, model_validator
55

66
from src.helpers import get_current_timestamp
7-
from src.schemas.enums import FileOperation, TaskPriority, TaskType
7+
from src.schemas.enums import FileOperation, HttpOperation, TaskPriority, TaskType
88
from src.schemas.mixins import CreatedAtMixin, UpdatedAtMixin, UUIDMixin
99

1010

@@ -48,19 +48,12 @@ class FileTaskConfig(TaskConfig):
4848
file_path: str = Field(..., min_length=1, description="Target file path")
4949
content: str | None = Field(None, description="Content for write operations")
5050

51-
@model_validator(mode="after")
52-
def validate_content_required(self) -> "FileTaskConfig":
53-
"""Validates content presence for write operations."""
54-
if self.operation in [FileOperation.WRITE, FileOperation.APPEND] and self.content is None:
55-
raise ValueError("Content is required for write and append operations")
56-
return self
57-
5851

5952
class HttpTaskConfig(TaskConfig):
6053
"""Configuration for HTTP request tasks."""
6154

6255
task_type: TaskType = Field(TaskType.HTTP, frozen=True)
6356
url: str = Field(..., description="Target URL for HTTP request")
64-
method: str = Field(default="GET", description="HTTP method")
57+
method: HttpOperation = Field(default=HttpOperation.GET, description="HTTP method")
6558
headers: dict[str, str] = Field(default_factory=dict, description="HTTP headers")
6659
timeout: float = Field(default=30.0, ge=0, description="HTTP request timeout")

tests/test_context_manager.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
from idlelib.configdialog import changes
2-
from uuid import UUID, uuid4
1+
from uuid import uuid4
32

43
import pytest
54

@@ -9,7 +8,7 @@
98
from src.context import ContextManager
109
from src.core.exceptions import ContextNotFoundError, ContextVersionError
1110
from src.helpers import get_current_timestamp
12-
from src.schemas import Context, TaskConfig
11+
from src.schemas import Context
1312

1413

1514
class BaseFactory:

tests/test_scheduler.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
from collections.abc import Generator
2+
from datetime import timedelta
3+
from uuid import UUID, uuid4
4+
5+
import pytest
6+
7+
from polyfactory.factories.pydantic_factory import ModelFactory
8+
from pytest_mock import MockerFixture
9+
10+
from src.helpers import get_current_timestamp
11+
from src.scheduler import Scheduler
12+
from src.schemas import Context, FileTaskConfig, TaskState
13+
from src.task import BaseTask
14+
15+
16+
class BaseFactory:
17+
__random_seed__ = 1
18+
19+
20+
class BaseTaskFactory(BaseFactory, ModelFactory[FileTaskConfig]): ...
21+
22+
23+
@pytest.fixture
24+
def mock_context_manager(mocker: MockerFixture):
25+
return mocker.Mock()
26+
27+
28+
@pytest.fixture
29+
def mock_state_manager(mocker: MockerFixture):
30+
return mocker.Mock()
31+
32+
33+
@pytest.fixture
34+
def scheduler(mock_context_manager, mock_state_manager):
35+
return Scheduler(mock_context_manager, mock_state_manager)
36+
37+
38+
class FBaseTask(BaseTask):
39+
def _do_execute(self, context: Context) -> Generator[None, None, None]:
40+
"""Simulate a task that always fails."""
41+
yield None
42+
43+
44+
class TestScheduler:
45+
def test_add_task(self, scheduler, mocker: MockerFixture):
46+
config = BaseTaskFactory.build()
47+
task = FBaseTask(config)
48+
49+
mock_context = mocker.Mock()
50+
scheduler._context_manager.create_context.return_value = iter([mock_context])
51+
scheduler._context_manager.associate_task.return_value = iter([None])
52+
53+
scheduler.add_task(task)
54+
55+
assert len(scheduler._tasks) == 1, "Task was not added to the queue"
56+
assert scheduler._tasks[0] == task, "Added task is not at the front of the queue"
57+
58+
def test_can_execute_start_time_not_reached(self, scheduler, mocker: MockerFixture):
59+
future_time = get_current_timestamp() + timedelta(hours=1)
60+
config = BaseTaskFactory.build(start_time=future_time.timestamp())
61+
task = FBaseTask(config)
62+
63+
mocker.patch("src.helpers.get_current_timestamp", return_value=get_current_timestamp().timestamp())
64+
65+
assert not scheduler._can_execute(task), "Task should not be executable before start time"
66+
67+
def test_can_execute_dependencies_completed(self, scheduler):
68+
dependency = uuid4()
69+
config = BaseTaskFactory.build(dependencies=[dependency])
70+
task = FBaseTask(config)
71+
scheduler._completed_tasks.add(dependency)
72+
73+
assert scheduler._can_execute(task), "Task should be executable if all dependencies are completed"
74+
75+
def test_can_execute_dependencies_failed(self, scheduler, mocker: MockerFixture):
76+
from datetime import datetime, timedelta
77+
from zoneinfo import ZoneInfo
78+
79+
future_time = datetime.now(ZoneInfo("UTC")) + timedelta(hours=1)
80+
81+
dependency = uuid4()
82+
config = BaseTaskFactory.build(dependencies=[dependency], start_time=future_time)
83+
task = FBaseTask(config)
84+
scheduler._failed_tasks.add(dependency)
85+
86+
mock_context = mocker.Mock()
87+
scheduler._context_manager.create_context.return_value = iter([mock_context])
88+
scheduler._context_manager.associate_task.return_value = iter([None])
89+
90+
scheduler.add_task(task)
91+
list(scheduler.run())
92+
93+
assert task.task_id in scheduler._failed_tasks, "Task should be added to failed tasks"
94+
assert task not in scheduler._tasks, "Task should be removed from the queue"
95+
assert task.state == TaskState.FAILED, "Task state should be set to FAILED"
96+
97+
def test_run(self, scheduler, mocker: MockerFixture):
98+
config1 = BaseTaskFactory.build(start_time=None)
99+
task1 = FBaseTask(config1)
100+
config2 = BaseTaskFactory.build(start_time=None, dependencies=[task1.task_id])
101+
task2 = FBaseTask(config2)
102+
scheduler._tasks.extend([task1, task2])
103+
104+
mocker.patch.object(scheduler, "_can_execute", side_effect=[True, False, True])
105+
mocker.patch.object(scheduler, "_process_task", return_value=iter([None]))
106+
107+
list(scheduler.run())
108+
109+
assert len(scheduler._tasks) == 0, "Tasks should be removed from the queue"
110+
assert scheduler._process_task.call_count == 2, "Task should be processed twice"
111+
112+
def test_context_manager(self, scheduler, mocker: MockerFixture):
113+
mock_load = mocker.patch.object(scheduler._state_manager, "load", return_value=iter([None]))
114+
mock_save = mocker.patch.object(scheduler._state_manager, "save", return_value=iter([None]))
115+
116+
with scheduler:
117+
pass
118+
119+
mock_load.assert_called_once(), "State manager should be loaded"
120+
mock_save.assert_called_once(), "State manager should be saved"

tests/test_state_manager.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
1-
import os
2-
31
from datetime import UTC, datetime, timedelta
42
from pathlib import Path
5-
from uuid import UUID, uuid4
3+
from uuid import uuid4
64

75
import pytest
86

97
from polyfactory.factories.pydantic_factory import ModelFactory
108
from pytest_mock import MockerFixture
119

12-
from src.core.exceptions import StateLoadError, StateLockError, StateNotFoundError, StateSaveError
10+
from src.core.exceptions import StateLoadError, StateLockError, StateNotFoundError
1311
from src.schemas import StateData, TaskState, TaskStateData, TaskStates
1412

1513

0 commit comments

Comments
 (0)