33
33
)
34
34
from .frame_buffer import FrameBuffer
35
35
from .settings import Settings , SettingCodes
36
- from .stream import H2Stream
36
+ from .stream import H2Stream , StreamClosedBy
37
37
from .utilities import guard_increment_window
38
38
from .windows import WindowManager
39
39
@@ -378,10 +378,13 @@ def __init__(self, client_side=True, header_encoding='utf-8', config=None):
378
378
# Data that needs to be sent.
379
379
self ._data_to_send = b''
380
380
381
- # Keeps track of streams that have been forcefully reset by this peer .
381
+ # Keeps track of how streams are closed .
382
382
# Used to ensure that we don't blow up in the face of frames that were
383
- # in flight when a stream was reset.
384
- self ._reset_streams = set ()
383
+ # in flight when a RST_STREAM was sent.
384
+ # Also used to determine whether we should consider a frame received
385
+ # while a stream is closed as either a stream error or a connection
386
+ # error.
387
+ self ._closed_streams = {}
385
388
386
389
# The flow control window manager for the connection.
387
390
self ._inbound_flow_control_window_manager = WindowManager (
@@ -425,7 +428,8 @@ def _open_streams(self, remainder):
425
428
to_delete .append (stream_id )
426
429
427
430
for stream_id in to_delete :
428
- del self .streams [stream_id ]
431
+ stream = self .streams .pop (stream_id )
432
+ self ._closed_streams [stream_id ] = stream .closed_by
429
433
430
434
return count
431
435
@@ -1035,8 +1039,6 @@ def reset_stream(self, stream_id, error_code=0):
1035
1039
stream = self ._get_stream_by_id (stream_id )
1036
1040
frames = stream .reset_stream (error_code )
1037
1041
self ._prepare_for_sending (frames )
1038
- self ._reset_streams .add (stream_id )
1039
- del self .streams [stream_id ]
1040
1042
1041
1043
def close_connection (self , error_code = 0 , additional_data = None ,
1042
1044
last_stream_id = None ):
@@ -1488,27 +1490,37 @@ def _receive_frame(self, frame):
1488
1490
# I don't love using __class__ here, maybe reconsider it.
1489
1491
frames , events = self ._frame_dispatch_table [frame .__class__ ](frame )
1490
1492
except StreamClosedError as e :
1491
- # We need to send a RST_STREAM frame on behalf of the stream.
1492
- # The frame the stream wants to emit is already present in the
1493
- # exception.
1494
- # This does not require re-raising: it's an expected behaviour. The
1495
- # only time we don't do that is if this is a stream the user
1496
- # manually reset.
1497
- if frame .stream_id not in self ._reset_streams :
1493
+ # If the stream was closed by RST_STREAM, we just send a RST_STREAM
1494
+ # to the remote peer. Otherwise, this is a connection error, and so
1495
+ # we will re-raise to trigger one.
1496
+ if self ._stream_is_closed_by_reset (e .stream_id ):
1498
1497
f = RstStreamFrame (e .stream_id )
1499
1498
f .error_code = e .error_code
1500
1499
self ._prepare_for_sending ([f ])
1501
1500
events = e ._events
1502
1501
else :
1503
- events = []
1502
+ raise
1504
1503
except StreamIDTooLowError as e :
1505
- # The stream ID seems invalid. This is unlikely, so it's probably
1506
- # the case that this frame is actually for a stream that we've
1507
- # already reset and removed the state for. If it is, just swallow
1508
- # the error. If we didn't do that, re-raise.
1509
- if frame .stream_id not in self ._reset_streams :
1504
+ # The stream ID seems invalid. This may happen when the closed
1505
+ # stream has been cleaned up, or when the remote peer has opened a
1506
+ # new stream with a higher stream ID than this one, forcing it
1507
+ # closed implicitly.
1508
+ #
1509
+ # Check how the stream was closed: depending on the mechanism, it
1510
+ # is either a stream error or a connection error.
1511
+ if self ._stream_is_closed_by_reset (e .stream_id ):
1512
+ # Closed by RST_STREAM is a stream error.
1513
+ f = RstStreamFrame (e .stream_id )
1514
+ f .error_code = ErrorCodes .STREAM_CLOSED
1515
+ self ._prepare_for_sending ([f ])
1516
+ events = []
1517
+ elif self ._stream_is_closed_by_end (e .stream_id ):
1518
+ # Closed by END_STREAM is a connection error.
1519
+ raise StreamClosedError (e .stream_id )
1520
+ else :
1521
+ # Closed implicitly, also a connection error, but of type
1522
+ # PROTOCOL_ERROR.
1510
1523
raise
1511
- events = []
1512
1524
except KeyError as e : # pragma: no cover
1513
1525
# We don't have a function for handling this frame. Let's call this
1514
1526
# a PROTOCOL_ERROR and exit.
@@ -1593,7 +1605,8 @@ def _receive_push_promise_frame(self, frame):
1593
1605
# PROTOCOL_ERROR: pushing a stream on a naturally closed stream is
1594
1606
# a real problem because it creates a brand new stream that the
1595
1607
# remote peer now believes exists.
1596
- if frame .stream_id in self ._reset_streams :
1608
+ if (self ._stream_closed_by (frame .stream_id ) ==
1609
+ StreamClosedBy .SEND_RST_STREAM ):
1597
1610
f = RstStreamFrame (frame .promised_stream_id )
1598
1611
f .error_code = ErrorCodes .REFUSED_STREAM
1599
1612
return [f ], events
@@ -1608,11 +1621,19 @@ def _receive_push_promise_frame(self, frame):
1608
1621
if (frame .stream_id % 2 ) == 0 :
1609
1622
raise ProtocolError ("Cannot recursively push streams." )
1610
1623
1611
- frames , stream_events = stream .receive_push_promise_in_band (
1612
- frame .promised_stream_id ,
1613
- pushed_headers ,
1614
- self .config .header_encoding ,
1615
- )
1624
+ try :
1625
+ frames , stream_events = stream .receive_push_promise_in_band (
1626
+ frame .promised_stream_id ,
1627
+ pushed_headers ,
1628
+ self .config .header_encoding ,
1629
+ )
1630
+ except StreamClosedError :
1631
+ # The parent stream was reset by us, so we presume that
1632
+ # PUSH_PROMISE was in flight when we reset the parent stream.
1633
+ # So we just reset the new stream.
1634
+ f = RstStreamFrame (frame .promised_stream_id )
1635
+ f .error_code = ErrorCodes .REFUSED_STREAM
1636
+ return [f ], events
1616
1637
1617
1638
new_stream = self ._begin_new_stream (
1618
1639
frame .promised_stream_id , AllowedStreamIDs .EVEN
@@ -1877,6 +1898,40 @@ def _stream_id_is_outbound(self, stream_id):
1877
1898
"""
1878
1899
return (stream_id % 2 == int (self .config .client_side ))
1879
1900
1901
+ def _stream_closed_by (self , stream_id ):
1902
+ """
1903
+ Returns how the stream was closed.
1904
+
1905
+ The return value will be either a member of
1906
+ ``h2.stream.StreamClosedBy`` or ``None``. If ``None``, the stream was
1907
+ closed implicitly by the peer opening a stream with a higher stream ID
1908
+ before opening this one.
1909
+ """
1910
+ if stream_id in self .streams :
1911
+ return self .streams [stream_id ].closed_by
1912
+ if stream_id in self ._closed_streams :
1913
+ return self ._closed_streams [stream_id ]
1914
+ return None
1915
+
1916
+ def _stream_is_closed_by_reset (self , stream_id ):
1917
+ """
1918
+ Returns ``True`` if the stream was closed by sending or receiving a
1919
+ RST_STREAM frame. Returns ``False`` otherwise.
1920
+ """
1921
+ return self ._stream_closed_by (stream_id ) in (
1922
+ StreamClosedBy .RECV_RST_STREAM , StreamClosedBy .SEND_RST_STREAM
1923
+ )
1924
+
1925
+ def _stream_is_closed_by_end (self , stream_id ):
1926
+ """
1927
+ Returns ``True`` if the stream was closed by sending or receiving an
1928
+ END_STREAM flag in a HEADERS or DATA frame. Returns ``False``
1929
+ otherwise.
1930
+ """
1931
+ return self ._stream_closed_by (stream_id ) in (
1932
+ StreamClosedBy .RECV_END_STREAM , StreamClosedBy .SEND_END_STREAM
1933
+ )
1934
+
1880
1935
1881
1936
def _add_frame_priority (frame , weight = None , depends_on = None , exclusive = None ):
1882
1937
"""
0 commit comments