33
33
)
34
34
from .frame_buffer import FrameBuffer
35
35
from .settings import Settings , SettingCodes
36
- from .stream import H2Stream
36
+ from .stream import H2Stream , StreamClosedBy
37
37
from .utilities import guard_increment_window
38
38
from .windows import WindowManager
39
39
@@ -378,10 +378,13 @@ def __init__(self, client_side=True, header_encoding='utf-8', config=None):
378
378
# Data that needs to be sent.
379
379
self ._data_to_send = b''
380
380
381
- # Keeps track of streams that have been forcefully reset by this peer .
381
+ # Keeps track of how streams are closed .
382
382
# Used to ensure that we don't blow up in the face of frames that were
383
- # in flight when a stream was reset.
384
- self ._reset_streams = set ()
383
+ # in flight when a RST_STREAM was sent.
384
+ # Also used to determine whether we should consider a frame received
385
+ # while a stream is closed as either a stream error or a connection
386
+ # error.
387
+ self ._closed_streams = {}
385
388
386
389
# The flow control window manager for the connection.
387
390
self ._inbound_flow_control_window_manager = WindowManager (
@@ -425,7 +428,8 @@ def _open_streams(self, remainder):
425
428
to_delete .append (stream_id )
426
429
427
430
for stream_id in to_delete :
428
- del self .streams [stream_id ]
431
+ stream = self .streams .pop (stream_id )
432
+ self ._closed_streams [stream_id ] = stream .closed_by
429
433
430
434
return count
431
435
@@ -1077,8 +1081,6 @@ def reset_stream(self, stream_id, error_code=0):
1077
1081
stream = self ._get_stream_by_id (stream_id )
1078
1082
frames = stream .reset_stream (error_code )
1079
1083
self ._prepare_for_sending (frames )
1080
- self ._reset_streams .add (stream_id )
1081
- del self .streams [stream_id ]
1082
1084
1083
1085
def close_connection (self , error_code = 0 , additional_data = None ,
1084
1086
last_stream_id = None ):
@@ -1542,27 +1544,37 @@ def _receive_frame(self, frame):
1542
1544
# I don't love using __class__ here, maybe reconsider it.
1543
1545
frames , events = self ._frame_dispatch_table [frame .__class__ ](frame )
1544
1546
except StreamClosedError as e :
1545
- # We need to send a RST_STREAM frame on behalf of the stream.
1546
- # The frame the stream wants to emit is already present in the
1547
- # exception.
1548
- # This does not require re-raising: it's an expected behaviour. The
1549
- # only time we don't do that is if this is a stream the user
1550
- # manually reset.
1551
- if frame .stream_id not in self ._reset_streams :
1547
+ # If the stream was closed by RST_STREAM, we just send a RST_STREAM
1548
+ # to the remote peer. Otherwise, this is a connection error, and so
1549
+ # we will re-raise to trigger one.
1550
+ if self ._stream_is_closed_by_reset (e .stream_id ):
1552
1551
f = RstStreamFrame (e .stream_id )
1553
1552
f .error_code = e .error_code
1554
1553
self ._prepare_for_sending ([f ])
1555
1554
events = e ._events
1556
1555
else :
1557
- events = []
1556
+ raise
1558
1557
except StreamIDTooLowError as e :
1559
- # The stream ID seems invalid. This is unlikely, so it's probably
1560
- # the case that this frame is actually for a stream that we've
1561
- # already reset and removed the state for. If it is, just swallow
1562
- # the error. If we didn't do that, re-raise.
1563
- if frame .stream_id not in self ._reset_streams :
1558
+ # The stream ID seems invalid. This may happen when the closed
1559
+ # stream has been cleaned up, or when the remote peer has opened a
1560
+ # new stream with a higher stream ID than this one, forcing it
1561
+ # closed implicitly.
1562
+ #
1563
+ # Check how the stream was closed: depending on the mechanism, it
1564
+ # is either a stream error or a connection error.
1565
+ if self ._stream_is_closed_by_reset (e .stream_id ):
1566
+ # Closed by RST_STREAM is a stream error.
1567
+ f = RstStreamFrame (e .stream_id )
1568
+ f .error_code = ErrorCodes .STREAM_CLOSED
1569
+ self ._prepare_for_sending ([f ])
1570
+ events = []
1571
+ elif self ._stream_is_closed_by_end (e .stream_id ):
1572
+ # Closed by END_STREAM is a connection error.
1573
+ raise StreamClosedError (e .stream_id )
1574
+ else :
1575
+ # Closed implicitly, also a connection error, but of type
1576
+ # PROTOCOL_ERROR.
1564
1577
raise
1565
- events = []
1566
1578
except KeyError as e : # pragma: no cover
1567
1579
# We don't have a function for handling this frame. Let's call this
1568
1580
# a PROTOCOL_ERROR and exit.
@@ -1647,7 +1659,8 @@ def _receive_push_promise_frame(self, frame):
1647
1659
# PROTOCOL_ERROR: pushing a stream on a naturally closed stream is
1648
1660
# a real problem because it creates a brand new stream that the
1649
1661
# remote peer now believes exists.
1650
- if frame .stream_id in self ._reset_streams :
1662
+ if (self ._stream_closed_by (frame .stream_id ) ==
1663
+ StreamClosedBy .SEND_RST_STREAM ):
1651
1664
f = RstStreamFrame (frame .promised_stream_id )
1652
1665
f .error_code = ErrorCodes .REFUSED_STREAM
1653
1666
return [f ], events
@@ -1662,11 +1675,19 @@ def _receive_push_promise_frame(self, frame):
1662
1675
if (frame .stream_id % 2 ) == 0 :
1663
1676
raise ProtocolError ("Cannot recursively push streams." )
1664
1677
1665
- frames , stream_events = stream .receive_push_promise_in_band (
1666
- frame .promised_stream_id ,
1667
- pushed_headers ,
1668
- self .config .header_encoding ,
1669
- )
1678
+ try :
1679
+ frames , stream_events = stream .receive_push_promise_in_band (
1680
+ frame .promised_stream_id ,
1681
+ pushed_headers ,
1682
+ self .config .header_encoding ,
1683
+ )
1684
+ except StreamClosedError :
1685
+ # The parent stream was reset by us, so we presume that
1686
+ # PUSH_PROMISE was in flight when we reset the parent stream.
1687
+ # So we just reset the new stream.
1688
+ f = RstStreamFrame (frame .promised_stream_id )
1689
+ f .error_code = ErrorCodes .REFUSED_STREAM
1690
+ return [f ], events
1670
1691
1671
1692
new_stream = self ._begin_new_stream (
1672
1693
frame .promised_stream_id , AllowedStreamIDs .EVEN
@@ -1931,6 +1952,40 @@ def _stream_id_is_outbound(self, stream_id):
1931
1952
"""
1932
1953
return (stream_id % 2 == int (self .config .client_side ))
1933
1954
1955
+ def _stream_closed_by (self , stream_id ):
1956
+ """
1957
+ Returns how the stream was closed.
1958
+
1959
+ The return value will be either a member of
1960
+ ``h2.stream.StreamClosedBy`` or ``None``. If ``None``, the stream was
1961
+ closed implicitly by the peer opening a stream with a higher stream ID
1962
+ before opening this one.
1963
+ """
1964
+ if stream_id in self .streams :
1965
+ return self .streams [stream_id ].closed_by
1966
+ if stream_id in self ._closed_streams :
1967
+ return self ._closed_streams [stream_id ]
1968
+ return None
1969
+
1970
+ def _stream_is_closed_by_reset (self , stream_id ):
1971
+ """
1972
+ Returns ``True`` if the stream was closed by sending or receiving a
1973
+ RST_STREAM frame. Returns ``False`` otherwise.
1974
+ """
1975
+ return self ._stream_closed_by (stream_id ) in (
1976
+ StreamClosedBy .RECV_RST_STREAM , StreamClosedBy .SEND_RST_STREAM
1977
+ )
1978
+
1979
+ def _stream_is_closed_by_end (self , stream_id ):
1980
+ """
1981
+ Returns ``True`` if the stream was closed by sending or receiving an
1982
+ END_STREAM flag in a HEADERS or DATA frame. Returns ``False``
1983
+ otherwise.
1984
+ """
1985
+ return self ._stream_closed_by (stream_id ) in (
1986
+ StreamClosedBy .RECV_END_STREAM , StreamClosedBy .SEND_END_STREAM
1987
+ )
1988
+
1934
1989
1935
1990
def _add_frame_priority (frame , weight = None , depends_on = None , exclusive = None ):
1936
1991
"""
0 commit comments