Skip to content

Commit 7dd35b2

Browse files
Merge pull request #46 from python-thread/perf/memory-optimisation
Perf: memory optimisation for ParallelProcessing
2 parents 339a964 + 3ba1811 commit 7dd35b2

File tree

3 files changed

+72
-16
lines changed

3 files changed

+72
-16
lines changed

src/thread/thread.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class ParallelProcessing: ...
3232
HookFunction,
3333
)
3434
from typing_extensions import Generic, ParamSpec
35-
from typing import List, Callable, Optional, Union, Mapping, Sequence, Tuple
35+
from typing import List, Callable, Optional, Union, Mapping, Sequence, Tuple, Generator
3636

3737

3838
Threads: set['Thread'] = set()
@@ -387,15 +387,19 @@ def _wrap_function(self, function: TargetFunction) -> TargetFunction:
387387
@wraps(function)
388388
def wrapper(
389389
index: int,
390-
data_chunk: Sequence[_Dataset_T],
390+
length: int,
391+
data_chunk: Generator[_Dataset_T, None, None],
391392
*args: _Target_P.args,
392393
**kwargs: _Target_P.kwargs,
393394
) -> List[_Target_T]:
394395
computed: List[Data_Out] = []
395-
for i, data_entry in enumerate(data_chunk):
396+
397+
i = 0
398+
for data_entry in data_chunk:
396399
v = function(data_entry, *args, **kwargs)
397400
computed.append(v)
398-
self._threads[index].progress = round((i + 1) / len(data_chunk), 5)
401+
self._threads[index].progress = round((i + 1) / length, 5)
402+
i += 1
399403

400404
self._completed += 1
401405
if self._completed == len(self._threads):
@@ -507,15 +511,23 @@ def start(self) -> None:
507511
i: v for i, v in self.overflow_kwargs.items() if i != 'name' and i != 'args'
508512
}
509513

510-
for i, data_chunk in enumerate(chunk_split(self.dataset, max_threads)):
514+
i = 0
515+
for chunkStart, chunkEnd in chunk_split(len(self.dataset), max_threads):
511516
chunk_thread = Thread(
512517
target=self.function,
513-
args=[i, data_chunk, *parsed_args, *self.overflow_args],
518+
args=[
519+
i,
520+
chunkEnd - chunkStart,
521+
(self.dataset[x] for x in range(chunkStart, chunkEnd)),
522+
*parsed_args,
523+
*self.overflow_args,
524+
],
514525
name=name_format and name_format % i or None,
515526
**self.overflow_kwargs,
516527
)
517528
self._threads.append(_ThreadWorker(chunk_thread, 0))
518529
chunk_thread.start()
530+
i += 1
519531

520532

521533
# Handle abrupt exit

src/thread/utils/algorithm.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88
|_ b.py
99
"""
1010

11-
from typing import List, Sequence, Any
11+
from typing import List, Tuple
1212

1313

14-
def chunk_split(dataset: Sequence[Any], number_of_chunks: int) -> List[List[Any]]:
14+
def chunk_split(dataset_length: int, number_of_chunks: int) -> List[Tuple[int, int]]:
1515
"""
1616
Splits a dataset into balanced chunks
1717
@@ -21,33 +21,32 @@ def chunk_split(dataset: Sequence[Any], number_of_chunks: int) -> List[List[Any]
2121
2222
Parameters
2323
----------
24-
:param dataset: This should be the dataset you want to split into chunks
24+
:param dataset_length: This should be the length of the dataset you want to split into chunks
2525
:param number_of_chunks: The should be the number of chunks it will attempt to split into
2626
2727
2828
Returns
2929
-------
30-
:returns list[list[Any]]: The split dataset
30+
:returns list[tuple[int, int]]: The chunked dataset slices
3131
3232
Raises
3333
------
3434
AssertionError: The number of chunks specified is larger than the dataset size
3535
"""
36-
length = len(dataset)
3736
assert (
38-
length >= number_of_chunks
37+
dataset_length >= number_of_chunks
3938
), 'The number of chunks specified is larger than the dataset size'
4039

41-
chunk_count = length // number_of_chunks
42-
overflow = length % number_of_chunks
40+
chunk_count = dataset_length // number_of_chunks
41+
overflow = dataset_length % number_of_chunks
4342

4443
i = 0
4544
split = []
46-
while i < length:
45+
while i < dataset_length:
4746
chunk_length = chunk_count + int(overflow > 0)
4847
b = i + chunk_length
4948

50-
split.append(dataset[i:b])
49+
split.append((i, b))
5150
overflow -= 1
5251
i = b
5352

tests/test_algorithm.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import random
2+
from src.thread.utils import algorithm
3+
4+
5+
def test_chunking_1():
6+
assert algorithm.chunk_split(5, 1) == [(0, 5)]
7+
8+
9+
def test_chunking_2():
10+
assert algorithm.chunk_split(5, 2) == [(0, 3), (3, 5)]
11+
12+
13+
def test_chunking_3():
14+
assert algorithm.chunk_split(100, 8) == [
15+
(0, 13),
16+
(13, 26),
17+
(26, 39),
18+
(39, 52),
19+
(52, 64),
20+
(64, 76),
21+
(76, 88),
22+
(88, 100),
23+
]
24+
25+
26+
def test_chunking_dynamic():
27+
dataset_length = random.randint(400, int(10e6))
28+
thread_count = random.randint(2, 100)
29+
30+
expected_chunk_length_low = dataset_length // thread_count
31+
expected_chunk_high = dataset_length % thread_count
32+
33+
i = 0
34+
heap = []
35+
while i < dataset_length:
36+
chunk_length = expected_chunk_length_low + int(expected_chunk_high > 0)
37+
b = i + chunk_length
38+
39+
heap.append((i, b))
40+
expected_chunk_high -= 1
41+
i = b
42+
43+
assert (
44+
algorithm.chunk_split(dataset_length, thread_count) == heap
45+
), f'\nLength: {dataset_length}\nThreads: {thread_count}\nExpected: {heap}\nActual: {algorithm.chunk_split(dataset_length, thread_count)}'

0 commit comments

Comments
 (0)