-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.py
387 lines (317 loc) · 15.7 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
from datetime import datetime
import csv
import time
from rich.live import Live
from sim_viewer import RenderScreen
from sim_viewer import RenderStats
from pcb import PCB, SysClock
class Scheduler:
'''
Handles the life cycle of the CPU scheduling simulation
The CPU Scheduler can run on either a
FCFS first come first serve basis
a Round Robin approach
or a priority based approach
see schedule() for more info on scheduling algorithms
The simulated CPU has 2 cores by default but can be changed.
the number of cores is the allowed number of processes in the running queue at once.
The simulated runtime environment only has 2 'io_devices' by default but can also be changed.,
this handles input/output between calculations between CPU bursts
'''
def __init__(self, cores:int=2, io_devices:int=2):
self.clock = SysClock()
self.cores = cores
self.io_devices = io_devices
self.num_processes = 0
self.process_turnover_time = 0
self.pcb_arrivals = {}
self.messages = []
self.time_slice_tracker = {}
self.highest_priority = 10000
self.new = []
self.ready = []
self.running = []
self.waiting = []
self.IO = []
self.exited = []
def readData(self, datfile):
'''
handles all .dat files produced for simulation.
loads 'pcb_arrivals' queue with new process control blocks (PCB)
using the format found in processes.dat and explained in the code below
pcb_arrivals: format --> {0: [<PID obj-1>], 1: [<PID obj-2>, <PID obj-3>], 3: [<PID obj-4>]}
'''
try:
self.datfile = datfile
with open(datfile) as f:
for process in f.read().split("\n"):
if len(process) > 0:
self.num_processes += 1
parts = process.split(' ')
arrival, pid = int(parts[0]), parts[1]
priority, bursts = int(parts[2][1]), [int(i) for i in parts[3:]]
if arrival in self.pcb_arrivals:
self.pcb_arrivals[arrival].append(PCB(pid, priority, bursts, arrival))
else:
self.pcb_arrivals[arrival] = [PCB(pid, priority, bursts, arrival)]
except FileNotFoundError as err:
print(f"Simulation ERR: File could not be found... ")
def saveRunInfo(self, csv_file:str=f"saved_run"):
'''
saves the run info in CSV format for viewance later.
'''
with open(f"{csv_file}.csv", "w", newline="") as f:
csv_f = csv.writer(f)
# headers
csv_f.writerow(["pid", "arrivalTime", "priority",
"processTime", "timeExited", "readyQueueTime",
"waitQueueTime", "runningQueueTime", "IOQueueTime"])
# write the process results
for process in self.exited:
csv_f.writerow([process.pid, process.arrivalTime, process.priority,
process.processTime, process.timeExited, process.readyQueueTime,
process.waitingQueueTime, process.runningQueueTime, process.ioQueueTime])
def get_total_new_processes(self):
'''
helper func
grabs number of total processes in self.pcb_arrivals for scheduling type to process.
'''
return len([each for x in self.pcb_arrivals for each in self.pcb_arrivals[x]])
def update_messages(self, message:str, tick:int, style="green"):
'''
helper func
pass a message to add to messages
pass the Live object from Rich app so the update can occur.
'''
self.messages.insert(0, {"message": message, "style": style})
def new_to_ready(self):
'''
helper func
'''
for _ in range(len(self.new)):
self.ready.append(self.new.pop(0))
def find_highest_priority(self, queue):
'''
helper func
return index of PCB inside of queue with the highest priority
'''
highest_priority = (self.highest_priority, -1)
for count, p in enumerate(queue):
if p.priority < highest_priority[0]:
highest_priority = [p.priority, count]
return highest_priority[1]
def check_slice_tracker(self, clock_tick:int, time_slice:int):
'''
for Round Robin scheduling.
handles all Round Robin functionality in schedule()
checks all active slices created by load_waiting() and load_ready()
removes PCBs from 'running' and 'io' if they have been in the queue
for the given round robin time slice.
'''
to_remove = []
for process in self.time_slice_tracker:
# time entered and the queue name the pcb entered into. Either
t_entered = self.time_slice_tracker[process][0]
queue = self.time_slice_tracker[process][1]
# time to switch PCB receiving resources
if t_entered + time_slice <= clock_tick:
temp = -1
if queue == "io":
# find the process PCB object in IO.
for each in self.IO:
if each.pid == process:
temp = each
if temp == -1: # the process was removed mid IO burst
break # break out, the process has already been handled.
# remove the process from IO and add to waiting
self.IO = [p for p in self.IO if p.pid != temp.pid]
self.waiting.append(temp)
else: # running
# find the process PCB object in running.
for each in self.running:
if each.pid == process:
temp = each
if temp == -1: # the process was removed mid CPU burst
break
# remove the PCB from running and add to ready.
self.running = [p for p in self.running if p.pid != temp.pid]
self.ready.append(temp)
# mark the process for removal from time_slice_tracker
to_remove.append(process)
# remove any non active slice tracking objects.
if to_remove:
for p in to_remove:
del self.time_slice_tracker[p]
def load_new(self, clock_tick:int):
'''
load the new queue with PCBs according to the current clock tick.
'''
try:
self.new = [pcb for pcb in self.pcb_arrivals[clock_tick]
if clock_tick in self.pcb_arrivals]
for new in self.new: # display the messages associated with the loading of the PCBs
self.update_messages(
f"{clock_tick}, job {new.pid} entered the 'new' queue", clock_tick
)
except KeyError:
pass
def load_ready(self, clock_tick:int, mode:str="FCFS"):
'''
## loads PCBs from 'new' into 'ready' immediately.
## loads PCBs from 'ready' into 'running' based off the number of 'cores'
chooses PCBs to send to 'running' off of a first come first serve approach by default.
can run in 'priority based' 'PB' mode as well for choosing which PCB's load into 'running' from 'ready'.
## if there exists processes in 'ready', and there are open slots in 'running'
## fill the 'running' queue up to the number of available 'cores' in the CPU.
'''
# load the PCB from 'new' into 'ready'
self.new_to_ready()
# determine the difference between lengths of 'running' and the available 'cores'
diff = self.cores - len(self.running)
# if there exists available cpu 'cores', assign the ready processes immediately to 'running.
while diff > 0 and self.ready:
if len(self.ready) >= 1:
if mode == "FCFS":
process = self.ready.pop(0)
elif mode == "PB":
highest = self.find_highest_priority(self.ready)
if highest != -1:
process = self.ready.pop(highest)
elif mode == "RR":
process = self.ready.pop(0)
self.time_slice_tracker[process.pid] = (clock_tick, "running")
self.update_messages(
f"{clock_tick}, job {process.pid}, priority {process.priority} began running {process.cpuBurst} CPU bursts, burst: {process.currBurstIndex}/{process.totalBursts}",
clock_tick, style="cyan"
)
self.running.append(process)
diff -= 1
# increment the time in ready by one if they are staying in ready.
for process in self.ready:
process.incrementReadyTime()
def load_waiting(self, clock_tick:int, mode:str="FCFS"):
'''
# loads PCBs from 'waiting' into 'IO' based off the number of 'io_devices'
# increments the waiting time of each of the processes inside of waiting.
'''
diff = self.io_devices - len(self.IO)
while diff > 0 and self.waiting: # if there is a difference
if len(self.waiting) >= 1: # if there are any waiting, assign them immediately based off 'mode'
if mode == "FCFS":
process = self.waiting.pop(0)
elif mode == "PB":
process = self.waiting.pop(self.find_highest_priority(self.waiting))
elif mode == "RR":
process = self.waiting.pop(0)
self.time_slice_tracker[process.pid] = (clock_tick, "io")
# update the message section
self.update_messages(
f"{clock_tick}, job {process.pid} priority {process.priority} began running {process.ioBurst} IO bursts, {process.currBurstIndex}/{process.totalBursts}",
clock_tick, style="bright_magenta"
)
self.IO.append(process) # add the process to the IO queue
diff -= 1
for process in self.waiting:
process.incrementWaitingTime()
def IO_tick(self):
'''
# decrements the ioBurst of the PCBs in 'IO' by 1
sends PCBs to 'ready' whenever they are done processing, can be changed
to allow for slicing or even priority based loading from 'waiting'
decrement 1 from the waiting queue and see if any are done.
If they are done move from 'waiting' to 'ready'
'''
to_remove = []
removed = False
for process in self.IO:
process.decrementIoBurst()
if process.ioBurst == 0:
process.incrementBurstIndex()
process.cpuBurst = process.getCurrentBurstTime()
self.ready.append(process)
to_remove.append(process.pid)
removed=True
if removed: # rearrange the queues
# remove any processes that were marked for removal.
self.IO = [x for x in self.IO if x.pid not in to_remove]
def CPU_tick(self, clock_tick:int):
'''
### decrements the cpuBurst of the PCBs in 'running' by 1
### A single clock cycle for the CPU component.
Determines if processes need to be placed in waiting for their next ioBurst,
or if they need
to be placed in exited because they are completely done processing.
'''
removed = []
for process in self.running:
process.decrementCpuBurst()
if process.cpuBurst == 0:
process.incrementBurstIndex()
# if its not the last burst
if process.currBurstIndex != process.totalBursts:
process.ioBurst = process.getCurrentBurstTime()
self.waiting.append(process)
else: # if it is the last burst
process.processTime = clock_tick - process.arrivalTime
process.queueTime = (process.readyQueueTime + process.runningQueueTime \
+ process.waitingQueueTime + process.ioQueueTime) - 1
self.update_messages(
f"{clock_tick}, job {process.pid} exited. ST --> {process.arrivalTime} TAT --> {process.processTime}",
clock_tick, style="red"
)
process.timeExited = clock_tick
self.exited.append(process)
self.total_processed += 1
removed.append(process.pid)
if removed: # rearrange the queues
# remove any processes from 'running' that were marked for removal.
self.running = [x for x in self.running if x.pid not in removed]
def schedule(self, mode:str="FCFS", time_slice:int=10, speed:float=0.1):
'''
Given a mode and loaded PCBs from readData use one of the available
scheduling algorithms to process the PCBs to completion.
Available scheduling algorithms --> \n
# 'FCFS' - First come first serve
# 'RR' - Round Robin, you can also change the 'time_slice'
# 'PB' - Priority Based, scheduled based off of highest priority.
will also show the simulation of the running processes in the various
queues using the various methods in __name__ == '__main__'.
'''
total_processes = self.get_total_new_processes()
self.total_processed = 0
start = self.clock.time()
# Run the scheduling algorithm while showing a visual representation with 'rich'.
with Live(RenderScreen(self.new, self.ready, self.running, self.waiting, self.IO,
self.exited, 0, self.messages), refresh_per_second=10) as live:
# run until all of the processes are all in exited.
while self.total_processed < total_processes:
# load the PCB's into new queue based off of the clock time
tick = self.clock.time()
self.load_new(tick)
if mode == "FCFS":
self.load_ready(tick)
self.load_waiting(tick)
elif mode == "PB":
self.load_ready(tick, mode="PB")
self.load_waiting(tick, mode="PB")
elif mode == "RR":
self.check_slice_tracker(tick, time_slice)
self.load_ready(tick, mode="RR")
self.load_waiting(tick, mode="RR")
self.CPU_tick(tick)
self.IO_tick()
time.sleep(speed)
self.clock.tick()
# update the simulation.
live.update(RenderScreen(self.new, self.ready, self.running,
self.waiting, self.IO, self.exited, tick, self.messages))
end = self.clock.time()
RenderStats(self.exited, ((end - start) - 1), self.cores, self.io_devices, mode)
if __name__=='__main__':
pass
# Examples use cases.
# scheduler = Scheduler(cores=1, io_devices=1)
# scheduler.readData("processes.dat")
# scheduler.schedule(mode="FCFS")
# scheduler.schedule(mode="PB", speed=0.00001) # very fast
# scheduler.schedule(mode="RR", time_slice=4)