Skip to content

Commit e5fb1dc

Browse files
authored
Fix mypy errors: Observer, Scheduler, PriorityQueue, SchedulerItem, AsyncSubject (#411)
* fix PriorityQueue: correct items type hint * fix ScheduledItem: relax __eq__ type hints * fix Observer: remove monkey patch of _xxx_core functions by wrapping the given handler functions in __init__ * fix AsyncSubject: add type hints + set the list of observers to an empty list on dispose * Observer: use default on_xxx function exposed in rx.core.internal instead
1 parent 06f4bba commit e5fb1dc

File tree

4 files changed

+29
-22
lines changed

4 files changed

+29
-22
lines changed

Diff for: rx/core/observer/observer.py

+16-12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Any, Callable, Optional
22

33
from .. import typing
4+
from rx.internal import noop, default_error
45

56

67
class Observer(typing.Observer, typing.Disposable):
@@ -15,20 +16,20 @@ def __init__(self,
1516
on_completed: Optional[typing.OnCompleted] = None
1617
) -> None:
1718
self.is_stopped = False
18-
if on_next is not None:
19-
self._on_next_core = on_next
20-
if on_error is not None:
21-
self._on_error_core = on_error
22-
if on_completed is not None:
23-
self._on_completed_core = on_completed
19+
self._handler_on_next = on_next or noop
20+
self._handler_on_error = on_error or default_error
21+
self._handler_on_completed = on_completed or noop
2422

2523
def on_next(self, value: Any) -> None:
2624
"""Notify the observer of a new element in the sequence."""
2725
if not self.is_stopped:
2826
self._on_next_core(value)
2927

3028
def _on_next_core(self, value: Any) -> None:
31-
pass
29+
"""For Subclassing purpose. This method is called by `on_next()`
30+
method until the observer is stopped.
31+
"""
32+
self._handler_on_next(value)
3233

3334
def on_error(self, error: Exception) -> None:
3435
"""Notify the observer that an exception has occurred.
@@ -42,10 +43,10 @@ def on_error(self, error: Exception) -> None:
4243
self._on_error_core(error)
4344

4445
def _on_error_core(self, error: Exception) -> None:
45-
if isinstance(error, BaseException):
46-
raise error
47-
else:
48-
raise Exception(error)
46+
"""For Subclassing purpose. This method is called by `on_error()`
47+
method until the observer is stopped.
48+
"""
49+
self._handler_on_error(error)
4950

5051
def on_completed(self) -> None:
5152
"""Notifies the observer of the end of the sequence."""
@@ -55,7 +56,10 @@ def on_completed(self) -> None:
5556
self._on_completed_core()
5657

5758
def _on_completed_core(self) -> None:
58-
pass
59+
"""For Subclassing purpose. This method is called by `on_completed()`
60+
method until the observer is stopped.
61+
"""
62+
self._handler_on_completed()
5963

6064
def dispose(self) -> None:
6165
"""Disposes the observer, causing it to transition to the

Diff for: rx/internal/priorityqueue.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import heapq
22
from sys import maxsize
3-
from typing import Generic, List
3+
from typing import Generic, List, Tuple
44

55
from rx.core.typing import T1
66

@@ -11,7 +11,7 @@ class PriorityQueue(Generic[T1]):
1111
MIN_COUNT = ~maxsize
1212

1313
def __init__(self) -> None:
14-
self.items: List[T1] = []
14+
self.items: List[Tuple[T1, int]] = []
1515
self.count = PriorityQueue.MIN_COUNT # Monotonic increasing for sort stability
1616

1717
def __len__(self):

Diff for: rx/scheduler/scheduleditem.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import datetime
2-
from typing import Generic, Optional
2+
from typing import Generic, Optional, Any
33

44
from rx.core import typing
55
from rx.disposable import SingleAssignmentDisposable
@@ -40,5 +40,8 @@ def __lt__(self, other: 'ScheduledItem') -> bool:
4040
def __gt__(self, other: 'ScheduledItem') -> bool:
4141
return self.duetime > other.duetime
4242

43-
def __eq__(self, other: 'ScheduledItem') -> bool:
44-
return self.duetime == other.duetime
43+
def __eq__(self, other: Any) -> bool:
44+
try:
45+
return self.duetime == other.duetime
46+
except AttributeError:
47+
return NotImplemented

Diff for: rx/subjects/asyncsubject.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import threading
2-
from typing import Any
2+
from typing import Any, List, Optional
33

44
from rx.disposable import Disposable
55
from rx.core import Observable
@@ -24,16 +24,16 @@ def __init__(self) -> None:
2424
self.is_stopped = False
2525
self.value = None
2626
self.has_value = False
27-
self.observers = []
28-
self.exception = None
27+
self.observers: List[Observer] = []
28+
self.exception: Optional[Exception] = None
2929

3030
self.lock = threading.RLock()
3131

3232
def check_disposed(self) -> None:
3333
if self.is_disposed:
3434
raise DisposedException()
3535

36-
def _subscribe_core(self, observer, scheduler=None):
36+
def _subscribe_core(self, observer: Observer, scheduler=None):
3737
with self.lock:
3838
self.check_disposed()
3939
if not self.is_stopped:
@@ -103,6 +103,6 @@ def on_next(self, value: Any) -> None:
103103
def dispose(self) -> None:
104104
with self.lock:
105105
self.is_disposed = True
106-
self.observers = None
106+
self.observers = []
107107
self.exception = None
108108
self.value = None

0 commit comments

Comments
 (0)