Skip to content

Commit 73ef5fe

Browse files
Make functions internal, update docstrings
1 parent dca7677 commit 73ef5fe

File tree

2 files changed

+99
-70
lines changed

2 files changed

+99
-70
lines changed

scheduler/Scheduler.py

Lines changed: 98 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,34 @@ class Scheduler:
3939
concurrently and an 8-core, 16-thread CPU will run 17 processes
4040
concurrently.
4141
42-
If CPU usage is found to be below the threshold, the number of
43-
simultaneous processes will be increased. CPU usage is checked
44-
every 5 seconds.
42+
If `dynamic` is enabled, Scheduler will also check CPU usage and increase
43+
the number of processes if it is below the threshold.
4544
"""
4645

4746
def __init__(
48-
self,
49-
progress_callback: Callable[[int, int], None] = None,
50-
delay_seconds: float = 0.05,
47+
self,
48+
progress_callback: Callable[[int, int], None] = None,
49+
update_interval: float = 0.05,
50+
dynamic: bool = True,
51+
cpu_threshold: float = 95,
52+
cpu_update_interval: float = 5,
5153
):
54+
"""
55+
:param progress_callback: a function taking the number of finished tasks and the total number of tasks, which is
56+
called when a task finishes
57+
58+
:param update_interval: the time between consecutive updates (when tasks are checked and new tasks are scheduled)
59+
60+
:param dynamic: whether to dynamically increase the number of processes based on the CPU usage
61+
62+
:param cpu_threshold: the minimum target CPU usage, in percent; if `dynamic` is enabled and CPU usage is found
63+
to be below the threshold, the number of simultaneous tasks will be increased
64+
65+
:param cpu_update_interval: the time, in seconds, between consecutive CPU usage checks when `dynamic` is enabled
66+
"""
67+
self.dynamic = dynamic
68+
self.update_interval = update_interval
69+
5270
self.tasks: List[Task] = []
5371
self.output: List[tuple] = []
5472

@@ -61,29 +79,24 @@ def __init__(
6179
# List of currently running tasks.
6280
self.running_tasks: List[Task] = []
6381

64-
# The delay between each consecutive check for finished tasks.
65-
self.delay_seconds = delay_seconds
66-
self.delay_millis = int(delay_seconds * 1000)
67-
6882
self.time_start: float = 0
6983
self.started = False
70-
self.stopped = False
7184
self.terminated = False
7285

7386
# Most recent time at which CPU utilisation was checked.
7487
self.time_cpu_checked: float = 0
7588

7689
# Number of seconds between CPU utilisation checks.
77-
self.time_between_cpu_checks: float = 5
90+
self.time_between_cpu_checks: float = cpu_update_interval
7891

7992
# If the CPU utilisation in percent is below the threshold, more tasks will be run.
80-
self.cpu_threshold = 95
93+
self.cpu_threshold = cpu_threshold
8194

8295
self.total_task_count: int = 0
8396
self.tasks_completed: int = 0
8497

8598
# Callback which allows the Scheduler to report its progress;
86-
# the 1st input is the number of completed tasks, while the 2nd is
99+
# the 1st input is the number of completed tasks, and the 2nd is
87100
# the total number of tasks.
88101
self.progress_callback: Callable[[int, int], None] = progress_callback
89102

@@ -92,37 +105,70 @@ def add_task(self, task: Task) -> None:
92105
Adds a task to the Scheduler.
93106
"""
94107
if self.started:
95-
raise SchedulerException("Do not add tasks to an running Scheduler.")
108+
raise SchedulerException("add_task() cannot be called on a running Scheduler.")
96109

97110
self.tasks.append(task)
98111

112+
def add_tasks(self, *args: Task) -> None:
113+
"""
114+
Adds multiple tasks to the Scheduler.
115+
"""
116+
if self.started:
117+
raise SchedulerException("add_tasks() cannot be called on a running Scheduler.")
118+
119+
self.tasks.extend(args)
120+
99121
async def run(self) -> List[tuple]:
100122
"""
101-
Runs the tasks with coroutines. Returns a list containing
102-
the output of each task, after all tasks are complete.
123+
Runs the tasks with coroutines.
103124
104-
Important: the list is not ordered. Each task should return some
105-
form of identifier in its results; for example, the name of the
106-
signal.
125+
:returns an ordered list containing the output of each task
107126
"""
108-
# Initialize `self.output` so that it can be indexed into.
109-
self.output = [() for _ in self.tasks]
110-
self.start()
127+
self._initialize_output()
128+
self._start()
111129

112-
while not self.stopped and not self.all_tasks_finished():
113-
await asyncio.sleep(self.delay_seconds)
114-
self.update()
130+
while not self.terminated and not self._all_tasks_finished():
131+
await asyncio.sleep(self.update_interval)
132+
self._update()
115133

116134
return self.output
117135

118-
def update(self):
119-
should_update_tasks = False
136+
def run_blocking(self) -> List[tuple]:
137+
"""
138+
Runs the tasks. Will block the current thread until all tasks are complete.
139+
140+
:returns an ordered list containing the output of each task
141+
"""
142+
self._initialize_output()
143+
self._start()
144+
145+
while not self.terminated and not self._all_tasks_finished():
146+
time.sleep(self.update_interval)
147+
self._update()
148+
149+
return self.output
150+
151+
def terminate(self) -> None:
152+
"""Terminates all running tasks by killing their processes."""
153+
if not self.terminated:
154+
[t.terminate() for t in self.tasks]
155+
self.terminated = True
156+
157+
def _initialize_output(self) -> None:
158+
# Initialize `self.output` so that it can be indexed into.
159+
self.output = [None for _ in self.tasks]
160+
161+
def _update(self) -> None:
162+
"""
163+
Checks whether tasks have finished, and schedules new tasks if applicable.
164+
"""
165+
schedule_new_tasks = False
120166

121167
t = time.time()
122-
if t - self.time_cpu_checked > self.time_between_cpu_checks:
168+
if self.dynamic and t - self.time_cpu_checked > self.time_between_cpu_checks:
123169
self.time_cpu_checked = t
124170
total_remaining_tasks = sum(
125-
[t.total_tasks() for t in self.available_tasks()]
171+
[t.total_tasks() for t in self._available_tasks()]
126172
)
127173

128174
if total_remaining_tasks > self.concurrent_count:
@@ -135,91 +181,74 @@ def update(self):
135181
new_count += 1
136182

137183
self.concurrent_count = new_count
138-
should_update_tasks = True
184+
schedule_new_tasks = True
139185

140186
for t in self.running_tasks:
141187
t.update()
142188
if t.finished:
143189
index = self.tasks.index(t)
144190
self.output[index] = t.queue.get()
145191

146-
self.on_task_completed(t)
147-
should_update_tasks = True
192+
self._on_task_completed(t)
193+
schedule_new_tasks = True
148194

149-
if should_update_tasks:
150-
self.schedule_tasks()
195+
if schedule_new_tasks:
196+
self._schedule_tasks()
151197

152-
def start(self):
153-
"""Starts the scheduler running the assigned tasks."""
198+
def _start(self) -> None:
199+
"""
200+
Starts the scheduler running its tasks.
201+
"""
154202
self.started = True
155203
self.total_task_count = sum([t.total_tasks() for t in self.tasks])
156204

157205
self.time_start = time.time()
158206
self.time_cpu_checked = self.time_start
159-
self.report_progress(0)
207+
self._report_progress(0)
160208

161-
self.schedule_tasks()
209+
self._schedule_tasks()
162210

163-
def schedule_tasks(self):
211+
def _schedule_tasks(self) -> None:
164212
"""Updates the currently running tasks by starting new tasks if necessary."""
165-
tasks = self.tasks_to_run()
213+
tasks = self._tasks_to_run()
166214
self.running_tasks.extend(tasks)
167215
[t.start() for t in tasks]
168216

169-
def terminate(self):
170-
"""Terminates all running tasks by killing their processes."""
171-
if not (self.terminated or self.stopped):
172-
[t.terminate() for t in self.tasks]
173-
self.terminated = True
174-
self.stop_timer()
175-
176-
def stop_timer(self):
177-
"""
178-
Stops the timer from checking whether tasks have finished.
179-
This should be called when all tasks have been completed.
180-
"""
181-
if not self.stopped:
182-
self.stopped = True
183-
184-
def on_task_completed(self, task):
217+
def _on_task_completed(self, task) -> None:
185218
"""Called when a task finishes."""
186-
self.report_progress(task.total_tasks())
219+
self._report_progress(task.total_tasks())
187220
self.running_tasks.remove(task)
188221

189-
def on_all_tasks_completed(self):
190-
"""Called when all assigned tasks have been completed."""
191-
self.stop_timer()
192-
193-
def report_progress(self, tasks_just_finished: int):
222+
def _report_progress(self, tasks_just_finished: int) -> None:
194223
self.tasks_completed += tasks_just_finished
195224
if self.progress_callback:
196225
self.progress_callback(self.tasks_completed, self.total_task_count)
197226

198-
def available_tasks(self) -> List[Task]:
227+
def _available_tasks(self) -> List[Task]:
199228
"""Gets all tasks which are available to run."""
200229
return [t for t in self.tasks if not (t.running or t.finished)]
201230

202-
def all_tasks_finished(self) -> bool:
231+
def _all_tasks_finished(self) -> bool:
203232
"""Returns whether all tasks have been finished."""
204233
return all([t.finished for t in self.tasks])
205234

206-
def total_running_tasks(self) -> int:
235+
def _total_running_tasks(self) -> int:
207236
"""Returns the total number of running tasks, including sub-tasks."""
208237
running = self.running_tasks
209238
return sum([t.total_tasks() for t in running])
210239

211-
def tasks_to_run(self) -> List[Task]:
240+
def _tasks_to_run(self) -> List[Task]:
212241
"""
213242
Gets the tasks that should be run, based on the core count
214243
and the current number of running tasks.
215244
"""
216245
# Number of remaining tasks to run.
217-
available = self.available_tasks()
246+
available = self._available_tasks()
218247

219248
# The total number of tasks (including sub-tasks) for each available task.
220249
task_counts = [t.total_tasks() for t in available]
221250

222-
running_count = self.total_running_tasks()
251+
running_count = self._total_running_tasks()
223252

224253
# Number of tasks that can be started without reducing efficiency.
225254
num_to_run = self.concurrent_count - running_count

scheduler/Task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
class Task:
2929
"""
3030
A simple class containing a process and an associated queue. The queue should have been
31-
passed to the process, the process should put its output in the queue.
31+
passed to the process, so that the process can put its output in the queue.
3232
"""
3333

3434
def __init__(self, process: Process, queue: Queue, subtasks=0):

0 commit comments

Comments
 (0)