Skip to content

Commit eb50f4b

Browse files
Add shared memory threshold size, default to 1e7
Shared memory is slower than using queues for Numpy arrays with a size of less than around 1e7. To optimise performance, shared memory should only be used for large arrays; therefore, a new shared memory threshold has been added which is used to dictate whether an array should be transferred using shared memory, depending on its size (dimensions, not memory usage). In future, a more elegant approach may be required. Note: These tests are for regular 64-bit arrays. Improved logic may be needed for other types of array.
1 parent cee7e25 commit eb50f4b

File tree

1 file changed

+15
-5
lines changed

1 file changed

+15
-5
lines changed

scheduler/Scheduler.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def __init__(
5454
cpu_threshold: float = 95,
5555
cpu_update_interval: float = 5,
5656
shared_memory: bool = True,
57+
shared_memory_threshold:int = 1e7,
5758
):
5859
"""
5960
:param progress_callback: a function taking the number of finished tasks and the total number of tasks, which is
@@ -64,6 +65,7 @@ def __init__(
6465
to be below the threshold, the number of simultaneous tasks will be increased
6566
:param cpu_update_interval: the time, in seconds, between consecutive CPU usage checks when `dynamic` is enabled
6667
:param shared_memory: whether to use shared memory if possible
68+
:param shared_memory_threshold: the minimum size of a Numpy array which will cause it to be transferred using shared memory if possible
6769
"""
6870
self.dynamic = dynamic
6971
self.update_interval = update_interval
@@ -78,6 +80,8 @@ def __init__(
7880
else:
7981
self.mgr = None
8082

83+
self.shared_memory_threshold = shared_memory_threshold
84+
8185
self.tasks: List[Task] = []
8286
self.output: List[Tuple] = []
8387

@@ -148,7 +152,7 @@ def add(
148152

149153
queue = queue_type()
150154

151-
_args = (queue, self.mgr) + args
155+
_args = (queue, self.mgr, self.shared_memory_threshold) + args
152156
_wrapper = functools.partial(wrapper, target)
153157

154158
process = process_type(target=_wrapper, args=_args)
@@ -455,7 +459,11 @@ def _tasks_to_run(self) -> List[Task]:
455459

456460

457461
def wrapper(
458-
function: Callable, queue: Queue, manager: Optional["SharedMemoryManager"], *args
462+
function: Callable,
463+
queue: Queue,
464+
manager: Optional["SharedMemoryManager"],
465+
threshold: int,
466+
*args: Any
459467
) -> None:
460468
"""
461469
Wrapper which calls a function with its specified arguments and puts the output in a queue.
@@ -472,7 +480,7 @@ def wrapper(
472480

473481
if manager:
474482
for item in result:
475-
out.append(SharedMemoryObject.attach(manager, item))
483+
out.append(SharedMemoryObject.attach(manager, item, threshold))
476484
else:
477485
out = result
478486

@@ -502,8 +510,10 @@ def __init__(self, name: str, shape, dtype):
502510
self.dtype = dtype
503511

504512
@staticmethod
505-
def attach(manager, obj) -> Union["SharedMemoryObject", Any]:
506-
if SharedMemoryObject.is_ndarray(obj) and obj.size > 2:
513+
def attach(
514+
manager: "SharedMemoryManager", obj: Any, threshold: int
515+
) -> Union["SharedMemoryObject", Any]:
516+
if SharedMemoryObject.is_ndarray(obj) and obj.size > threshold:
507517
from numpy import ndarray
508518

509519
arr = obj

0 commit comments

Comments
 (0)