Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions src/crawlee/_utils/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from contextlib import suppress
from datetime import datetime, timezone
from logging import getLogger
from typing import Annotated
from typing import Annotated, Any

import psutil
from pydantic import BaseModel, ConfigDict, Field, PlainSerializer, PlainValidator
Expand Down Expand Up @@ -79,19 +79,36 @@ def get_memory_info() -> MemoryInfo:
logger.debug('Calling get_memory_info()...')
current_process = psutil.Process(os.getpid())

# Retrieve the Resident Set Size (RSS) of the current process. RSS is the portion of memory
# occupied by a process that is held in RAM.
current_size_bytes = int(current_process.memory_info().rss)
# Retrieve estimated memory usage of the current process.
current_size_bytes = int(_get_used_memory(current_process.memory_full_info()))

# Sum memory usage by all children processes, try to exclude shared memory from the sum if allowed by OS.
for child in current_process.children(recursive=True):
# Ignore any NoSuchProcess exception that might occur if a child process ends before we retrieve
# its memory usage.
with suppress(psutil.NoSuchProcess):
current_size_bytes += int(child.memory_info().rss)
current_size_bytes += _get_used_memory(child.memory_full_info())

total_size_bytes = psutil.virtual_memory().total

return MemoryInfo(
total_size=ByteSize(total_size_bytes),
current_size=ByteSize(current_size_bytes),
)


def _get_used_memory(memory_full_info: Any) -> int:
Copy link
Collaborator Author

@Pijukatel Pijukatel May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This internal type hint does not seem to be available. The actual type is dependent on the OS as well.

"""Get the most suitable available used memory metric.

`Proportional Set Size (PSS)`, is the amount of own memory and memory shared with other processes, accounted in a
way that the shared amount is divided evenly between the processes that share it. Available on Linux. Suitable for
avoiding overestimation by counting the same shared memory used by children processes multiple times.

`Resident Set Size (RSS)` is the non-swapped physical memory a process has used; it includes shared memory. It
should be available everywhere.
"""
try:
# Linux
return int(memory_full_info.pss)
except AttributeError:
return int(memory_full_info.rss)
109 changes: 109 additions & 0 deletions tests/unit/_utils/test_system.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
from __future__ import annotations

import os
from multiprocessing import Barrier, Process, Value, synchronize
from multiprocessing.shared_memory import SharedMemory
from typing import Callable

import pytest

from crawlee._utils.byte_size import ByteSize
from crawlee._utils.system import get_cpu_info, get_memory_info

Expand All @@ -14,3 +21,105 @@ def test_get_memory_info_returns_valid_values() -> None:
def test_get_cpu_info_returns_valid_values() -> None:
cpu_info = get_cpu_info()
assert 0 <= cpu_info.used_ratio <= 1


@pytest.mark.skipif(os.name == 'nt', reason='Improved estimation not available on Windows')
def test_memory_estimation_does_not_overestimate_due_to_shared_memory() -> None:
"""Test that memory usage estimation is not overestimating memory usage by counting shared memory multiple times.

In this test, the parent process is started and its memory usage is measured in situations where it is running
child processes without additional memory, with shared additional memory and with own unshared additional memory.
Child process without additional memory are used to estimate baseline memory usage of any child process.
The following estimation is asserted by the test:
additional_memory_size_estimate_per_shared_memory_child * number_of_sharing_children_processes is approximately
equal to additional_memory_size_estimate_per_unshared_memory_child where the additional shared memory is exactly
the same as the unshared memory.
"""
estimated_memory_expectation = Value('b', False) # noqa: FBT003 # Common usage pattern for multiprocessing.Value

def parent_process() -> None:
extra_memory_size = 1024 * 1024 * 100 # 100 MB
children_count = 4
# Memory calculation is not exact, so allow for some tolerance.
test_tolerance = 0.1

def no_extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier) -> None:
ready.wait()
measured.wait()

def extra_memory_child(ready: synchronize.Barrier, measured: synchronize.Barrier) -> None:
memory = SharedMemory(size=extra_memory_size, create=True)
memory.buf[:] = bytearray([255 for _ in range(extra_memory_size)])
ready.wait()
measured.wait()
memory.close()
memory.unlink()

def shared_extra_memory_child(
ready: synchronize.Barrier, measured: synchronize.Barrier, memory: SharedMemory
) -> None:
print(memory.buf[-1])
ready.wait()
measured.wait()

def get_additional_memory_estimation_while_running_processes(
*, target: Callable, count: int = 1, use_shared_memory: bool = False
) -> float:
processes = []
ready = Barrier(parties=count + 1)
measured = Barrier(parties=count + 1)
shared_memory: None | SharedMemory = None
memory_before = get_memory_info().current_size

if use_shared_memory:
shared_memory = SharedMemory(size=extra_memory_size, create=True)
shared_memory.buf[:] = bytearray([255 for _ in range(extra_memory_size)])
extra_args = [shared_memory]
else:
extra_args = []

for _ in range(count):
p = Process(target=target, args=[ready, measured, *extra_args])
p.start()
processes.append(p)

ready.wait()
memory_during = get_memory_info().current_size
measured.wait()

for p in processes:
p.join()

if shared_memory:
shared_memory.close()
shared_memory.unlink()

return (memory_during - memory_before).to_mb() / count

additional_memory_simple_child = get_additional_memory_estimation_while_running_processes(
target=no_extra_memory_child, count=children_count
)
additional_memory_extra_memory_child = (
get_additional_memory_estimation_while_running_processes(target=extra_memory_child, count=children_count)
- additional_memory_simple_child
)
additional_memory_shared_extra_memory_child = (
get_additional_memory_estimation_while_running_processes(
target=shared_extra_memory_child, count=children_count, use_shared_memory=True
)
- additional_memory_simple_child
)

estimated_memory_expectation.value = (
abs((additional_memory_shared_extra_memory_child * children_count) - additional_memory_extra_memory_child)
/ additional_memory_extra_memory_child
< test_tolerance
)

process = Process(target=parent_process)
process.start()
process.join()

assert estimated_memory_expectation.value, (
'Estimated memory usage for process with shared memory does not meet the expectation.'
)
Loading