-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlinux_cpu.py
76 lines (63 loc) · 2.28 KB
/
linux_cpu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/usr/bin/env python3
import sys
class _LinuxCpu(object):
def _read_cpu_file(self, filename):
f = open(filename, "r");
cpus = set()
for cpu_range in filter(None, f.read().rstrip().split(",")):
if "-" in cpu_range:
l, r = cpu_range.split("-")
cpus |= set(range(int(l), int(r)+1))
else:
cpus |= {int(cpu_range)}
return cpus
def _set_cpu_online(self, cpu_id, online):
f = open("/sys/devices/system/cpu/cpu{}/online".format(cpu_id), "w")
f.write("1" if online else "0")
def _set_cpus_online(self, cpus, online):
for cpu in cpus:
self._set_cpu_online(cpu, online)
@property
def online_cpus(self):
"""Set of CPUs currently online. Setting this property enables and
disables CPUs as needed."""
return self._read_cpu_file("/sys/devices/system/cpu/online")
@online_cpus.setter
def online_cpus(self, cpus):
self._set_cpus_online(self.online_cpus - cpus, False)
self._set_cpus_online(self.offline_cpus & cpus, True)
@property
def offline_cpus(self):
"""Set of CPUs currently offline. Setting this property enables and
disables CPUs as needed."""
return self._read_cpu_file("/sys/devices/system/cpu/offline")
@offline_cpus.setter
def offline_cpus(self, cpus):
self._set_cpus_online(self.online_cpus & cpus, False)
self._set_cpus_online(self.offline_cpus - cpus, True)
@property
def online_count(self):
"""Number of CPUs currently online. Setting this property enables and
disables CPUs as needed."""
return len(self.online_cpus)
@online_count.setter
def online_count(self, count):
online_cpus = self.online_cpus
offline_cpus = self.offline_cpus
cpus_to_enable = count - len(online_cpus)
if cpus_to_enable > len(offline_cpus):
raise Exception("cannot not get {} CPUs online: only {} more are"
" available".format(count, len(offline_cpus)))
elif count < 1:
raise Exception("cannot get less than one CPU online")
elif cpus_to_enable < 0:
for cpu_id in sorted(list(online_cpus))[cpus_to_enable:]:
self._set_cpu_online(cpu_id, False)
else:
for cpu_id in sorted(list(offline_cpus))[:cpus_to_enable]:
self._set_cpu_online(cpu_id, True)
new_count = len(self.online_cpus)
if (new_count != count):
raise Exception("got {} CPUs online instead of {}"
.format(new_count, count))
sys.modules[__name__] = _LinuxCpu()