-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Simplify request queue implementation #653
Open
janbuchar
wants to merge
16
commits into
master
Choose a base branch
from
simplify-request-queue
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+162
−144
Open
Changes from 1 commit
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
a2a3b33
Simplify request queue
janbuchar c2468d0
Do that convoluted memory storage thingy
janbuchar db9212d
Merge remote-tracking branch 'origin/master' into simplify-request-queue
janbuchar 66b3d8d
Merge remote-tracking branch 'origin/master' into simplify-request-queue
janbuchar 87e08b9
WIP update is_finished()
janbuchar bf03e50
Merge remote-tracking branch 'origin/master' into simplify-request-queue
janbuchar cc2657a
Use the queue_has_locked_requests property
janbuchar 68a42ac
Align is_finished with its javascript counterpart
janbuchar 9bf0e56
Add queue_has_locked_requests flag to memory storage
janbuchar 3ae4def
Port over forefront handling logic
janbuchar cab27c4
Remove obsolete TODO
janbuchar 4ff3495
Misc fixes
janbuchar 9300f27
Merge remote-tracking branch 'origin/master' into simplify-request-queue
janbuchar 37e3db5
Merge remote-tracking branch 'origin/master' into simplify-request-queue
janbuchar 8603d8b
Pass pytest log level to crawler loggers
janbuchar b4a5593
Fix memory storage locking bug
janbuchar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ | |
from contextlib import suppress | ||
from datetime import datetime, timedelta, timezone | ||
from logging import getLogger | ||
from typing import TYPE_CHECKING, Any, Generic, TypedDict, TypeVar | ||
from typing import TYPE_CHECKING, Any, TypedDict, TypeVar | ||
|
||
from typing_extensions import override | ||
|
||
|
@@ -31,30 +31,6 @@ | |
T = TypeVar('T') | ||
|
||
|
||
class BoundedSet(Generic[T]): | ||
"""A simple set datastructure that removes the least recently accessed item when it reaches `max_length`.""" | ||
|
||
def __init__(self, max_length: int) -> None: | ||
self._max_length = max_length | ||
self._data = OrderedDict[T, object]() | ||
|
||
def __contains__(self, item: T) -> bool: | ||
found = item in self._data | ||
if found: | ||
self._data.move_to_end(item, last=True) | ||
return found | ||
|
||
def add(self, item: T) -> None: | ||
self._data[item] = True | ||
self._data.move_to_end(item) | ||
|
||
if len(self._data) > self._max_length: | ||
self._data.popitem(last=False) | ||
|
||
def clear(self) -> None: | ||
self._data.clear() | ||
|
||
|
||
class CachedRequest(TypedDict): | ||
id: str | ||
was_already_handled: bool | ||
|
@@ -97,12 +73,6 @@ class RequestQueue(BaseStorage, RequestProvider): | |
_MAX_CACHED_REQUESTS = 1_000_000 | ||
"""Maximum number of requests that can be cached.""" | ||
|
||
_RECENTLY_HANDLED_CACHE_SIZE = 1000 | ||
"""Cache size for recently handled requests.""" | ||
|
||
_STORAGE_CONSISTENCY_DELAY = timedelta(seconds=3) | ||
"""Expected delay for storage to achieve consistency, guiding the timing of subsequent read operations.""" | ||
Comment on lines
-103
to
-107
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
def __init__( | ||
self, | ||
id: str, | ||
|
@@ -117,7 +87,6 @@ def __init__( | |
|
||
# Get resource clients from storage client | ||
self._resource_client = client.request_queue(self._id) | ||
self._resource_collection_client = client.request_queues() | ||
|
||
self._request_lock_time = timedelta(minutes=3) | ||
self._queue_paused_for_migration = False | ||
|
@@ -134,9 +103,7 @@ def __init__( | |
self._assumed_handled_count = 0 | ||
self._queue_head_dict: OrderedDict[str, str] = OrderedDict() | ||
self._list_head_and_lock_task: asyncio.Task | None = None | ||
self._in_progress: set[str] = set() | ||
self._last_activity = datetime.now(timezone.utc) | ||
self._recently_handled: BoundedSet[str] = BoundedSet(max_length=self._RECENTLY_HANDLED_CACHE_SIZE) | ||
self._requests_cache: LRUCache[CachedRequest] = LRUCache(max_length=self._MAX_CACHED_REQUESTS) | ||
|
||
@override | ||
|
@@ -209,15 +176,7 @@ async def add_request( | |
|
||
self._cache_request(cache_key, processed_request) | ||
|
||
request_id, was_already_present = processed_request.id, processed_request.was_already_present | ||
is_handled = request.handled_at is not None | ||
|
||
if ( | ||
not is_handled | ||
and not was_already_present | ||
and request_id not in self._in_progress | ||
and request_id not in self._recently_handled | ||
): | ||
if request.handled_at is None and not processed_request.was_already_present: | ||
self._assumed_total_count += 1 | ||
|
||
return processed_request | ||
|
@@ -300,27 +259,7 @@ async def fetch_next_request(self) -> Request | None: | |
return None | ||
|
||
next_request_id, _ = self._queue_head_dict.popitem(last=False) # ~removeFirst() | ||
|
||
# This should never happen, but... | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. haha, great we're getting rid of this 😄 |
||
if next_request_id in self._in_progress or next_request_id in self._recently_handled: | ||
logger.warning( | ||
'Queue head returned a request that is already in progress?!', | ||
extra={ | ||
'nextRequestId': next_request_id, | ||
'inProgress': next_request_id in self._in_progress, | ||
'recentlyHandled': next_request_id in self._recently_handled, | ||
}, | ||
) | ||
return None | ||
|
||
self._in_progress.add(next_request_id) | ||
|
||
try: | ||
request = await self._get_or_hydrate_request(next_request_id) | ||
except Exception: | ||
# On error, remove the request from in progress, otherwise it would be there forever | ||
self._in_progress.remove(next_request_id) | ||
raise | ||
request = await self._get_or_hydrate_request(next_request_id) | ||
|
||
# NOTE: It can happen that the queue head index is inconsistent with the main queue table. | ||
# This can occur in two situations: | ||
|
@@ -336,10 +275,6 @@ async def fetch_next_request(self) -> Request | None: | |
'Cannot find a request from the beginning of queue, will be retried later', | ||
extra={'nextRequestId': next_request_id}, | ||
) | ||
asyncio.get_running_loop().call_later( | ||
self._STORAGE_CONSISTENCY_DELAY.total_seconds(), | ||
lambda: self._in_progress.remove(next_request_id), | ||
) | ||
return None | ||
|
||
# 2) | ||
|
@@ -352,7 +287,6 @@ async def fetch_next_request(self) -> Request | None: | |
'Request fetched from the beginning of queue was already handled', | ||
extra={'nextRequestId': next_request_id}, | ||
) | ||
self._recently_handled.add(next_request_id) | ||
return None | ||
|
||
return request | ||
|
@@ -370,19 +304,12 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | | |
""" | ||
self._last_activity = datetime.now(timezone.utc) | ||
|
||
if request.id not in self._in_progress: | ||
logger.debug(f'Cannot mark request (ID: {request.id}) as handled, because it is not in progress!') | ||
return None | ||
|
||
if request.handled_at is None: | ||
request.handled_at = datetime.now(timezone.utc) | ||
|
||
processed_request = await self._resource_client.update_request(request) | ||
processed_request.unique_key = request.unique_key | ||
|
||
self._in_progress.remove(request.id) | ||
self._recently_handled.add(request.id) | ||
|
||
if not processed_request.was_already_handled: | ||
self._assumed_handled_count += 1 | ||
|
||
|
@@ -408,10 +335,6 @@ async def reclaim_request( | |
""" | ||
self._last_activity = datetime.now(timezone.utc) | ||
|
||
if request.id not in self._in_progress: | ||
logger.debug(f'Cannot reclaim request (ID: {request.id}), because it is not in progress!') | ||
return None | ||
|
||
# TODO: If request hasn't been changed since the last get_request(), we don't need to call update_request() | ||
# and thus improve performance. | ||
# https://github.com/apify/apify-sdk-python/issues/143 | ||
|
@@ -420,11 +343,6 @@ async def reclaim_request( | |
self._cache_request(unique_key_to_request_id(request.unique_key), processed_request) | ||
|
||
if processed_request: | ||
# Mark the request as no longer in progress, | ||
# as the moment we delete the lock, we could end up also re-fetching the request in a subsequent | ||
# _ensure_head_is_non_empty() which could potentially lock the request again | ||
self._in_progress.discard(request.id) | ||
|
||
# Try to delete the request lock if possible | ||
try: | ||
await self._resource_client.delete_request_lock(request.id, forefront=forefront) | ||
|
@@ -451,21 +369,6 @@ async def is_finished(self) -> bool: | |
Returns: | ||
bool: `True` if all requests were already handled and there are no more left. `False` otherwise. | ||
""" | ||
seconds_since_last_activity = datetime.now(timezone.utc) - self._last_activity | ||
if self._in_progress_count() > 0 and seconds_since_last_activity > self._internal_timeout: | ||
logger.warning( | ||
f'The request queue seems to be stuck for {self._internal_timeout.total_seconds()}s, ' | ||
'resetting internal state.', | ||
extra={ | ||
'queue_head_ids_pending': len(self._queue_head_dict), | ||
'in_progress': list(self._in_progress), | ||
}, | ||
) | ||
|
||
# We only need to reset these two variables, no need to reset all the other stats | ||
self._queue_head_dict.clear() | ||
self._in_progress.clear() | ||
|
||
if self._queue_head_dict: | ||
logger.debug( | ||
'There are still ids in the queue head that are pending processing', | ||
|
@@ -476,24 +379,14 @@ async def is_finished(self) -> bool: | |
|
||
return False | ||
|
||
if self._in_progress: | ||
logger.debug( | ||
'There are still requests in progress (or zombie)', | ||
extra={ | ||
'in_progress': list(self._in_progress), | ||
}, | ||
) | ||
|
||
return False | ||
|
||
current_head = await self._resource_client.list_head(limit=2) | ||
|
||
if current_head.items: | ||
logger.debug( | ||
'Queue head still returned requests that need to be processed (or that are locked by other clients)', | ||
) | ||
|
||
return not current_head.items and not self._in_progress | ||
return not current_head.items | ||
|
||
async def get_info(self) -> RequestQueueMetadata | None: | ||
"""Get an object containing general information about the request queue.""" | ||
|
@@ -534,19 +427,12 @@ async def _list_head_and_lock(self) -> None: | |
|
||
for request in response.items: | ||
# Queue head index might be behind the main table, so ensure we don't recycle requests | ||
if ( | ||
not request.id | ||
or not request.unique_key | ||
or request.id in self._in_progress | ||
or request.id in self._recently_handled | ||
): | ||
if not request.id or not request.unique_key: | ||
logger.debug( | ||
'Skipping request from queue head, already in progress or recently handled', | ||
extra={ | ||
'id': request.id, | ||
'unique_key': request.unique_key, | ||
'in_progress': request.id in self._in_progress, | ||
'recently_handled': request.id in self._recently_handled, | ||
}, | ||
) | ||
|
||
|
@@ -568,14 +454,9 @@ async def _list_head_and_lock(self) -> None: | |
), | ||
) | ||
|
||
def _in_progress_count(self) -> int: | ||
return len(self._in_progress) | ||
|
||
def _reset(self) -> None: | ||
self._queue_head_dict.clear() | ||
self._list_head_and_lock_task = None | ||
self._in_progress.clear() | ||
self._recently_handled.clear() | ||
self._assumed_total_count = 0 | ||
self._assumed_handled_count = 0 | ||
self._requests_cache.clear() | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍