forked from python/typeshed
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection.pyi
83 lines (71 loc) · 3.64 KB
/
connection.pyi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import socket
import sys
from _typeshed import Incomplete, ReadableBuffer
from collections.abc import Iterable
from types import TracebackType
from typing import Any, Generic, SupportsIndex, TypeVar
from typing_extensions import Self, TypeAlias
__all__ = ["Client", "Listener", "Pipe", "wait"]
# https://docs.python.org/3/library/multiprocessing.html#address-formats
_Address: TypeAlias = str | tuple[str, int]
# Defaulting to Any to avoid forcing generics on a lot of pre-existing code
_SendT_contra = TypeVar("_SendT_contra", contravariant=True, default=Any)
_RecvT_co = TypeVar("_RecvT_co", covariant=True, default=Any)
class _ConnectionBase(Generic[_SendT_contra, _RecvT_co]):
def __init__(self, handle: SupportsIndex, readable: bool = True, writable: bool = True) -> None: ...
@property
def closed(self) -> bool: ... # undocumented
@property
def readable(self) -> bool: ... # undocumented
@property
def writable(self) -> bool: ... # undocumented
def fileno(self) -> int: ...
def close(self) -> None: ...
def send_bytes(self, buf: ReadableBuffer, offset: int = 0, size: int | None = None) -> None: ...
def send(self, obj: _SendT_contra) -> None: ...
def recv_bytes(self, maxlength: int | None = None) -> bytes: ...
def recv_bytes_into(self, buf: Any, offset: int = 0) -> int: ...
def recv(self) -> _RecvT_co: ...
def poll(self, timeout: float | None = 0.0) -> bool: ...
def __enter__(self) -> Self: ...
def __exit__(
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: TracebackType | None
) -> None: ...
def __del__(self) -> None: ...
class Connection(_ConnectionBase[_SendT_contra, _RecvT_co]): ...
if sys.platform == "win32":
class PipeConnection(_ConnectionBase[_SendT_contra, _RecvT_co]): ...
class Listener:
def __init__(
self, address: _Address | None = None, family: str | None = None, backlog: int = 1, authkey: bytes | None = None
) -> None: ...
def accept(self) -> Connection[Incomplete, Incomplete]: ...
def close(self) -> None: ...
@property
def address(self) -> _Address: ...
@property
def last_accepted(self) -> _Address | None: ...
def __enter__(self) -> Self: ...
def __exit__(
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: TracebackType | None
) -> None: ...
# Any: send and recv methods unused
if sys.version_info >= (3, 12):
def deliver_challenge(connection: Connection[Any, Any], authkey: bytes, digest_name: str = "sha256") -> None: ...
else:
def deliver_challenge(connection: Connection[Any, Any], authkey: bytes) -> None: ...
def answer_challenge(connection: Connection[Any, Any], authkey: bytes) -> None: ...
def wait(
object_list: Iterable[Connection[_SendT_contra, _RecvT_co] | socket.socket | int], timeout: float | None = None
) -> list[Connection[_SendT_contra, _RecvT_co] | socket.socket | int]: ...
def Client(address: _Address, family: str | None = None, authkey: bytes | None = None) -> Connection[Any, Any]: ...
# N.B. Keep this in sync with multiprocessing.context.BaseContext.Pipe.
# _ConnectionBase is the common base class of Connection and PipeConnection
# and can be used in cross-platform code.
#
# The two connections should have the same generic types but inverted (Connection[_T1, _T2], Connection[_T2, _T1]).
# However, TypeVars scoped entirely within a return annotation is unspecified in the spec.
if sys.platform != "win32":
def Pipe(duplex: bool = True) -> tuple[Connection[Any, Any], Connection[Any, Any]]: ...
else:
def Pipe(duplex: bool = True) -> tuple[PipeConnection[Any, Any], PipeConnection[Any, Any]]: ...