-
Notifications
You must be signed in to change notification settings - Fork 139
Description
We are using a singleton AsyncClient from httpx, configured with HTTP/2.
During investigation of intermittent HTTP failures, we observed the following exception:
httpcore.LocalProtocolError: Max outbound streams is 100, 100 open
This indicates a mismatch in stream state between httpcore and h2. Each maintains its own count of open streams, but in certain edge cases, they fall out of sync.
How the Mismatch Happens
When httpcore wants to close a stream, it calls:
self._h2_state.end_stream(stream_id)
await self._write_outgoing_data(request)end_stream()prepares aDataFramewith theEND_STREAMflag set._write_outgoing_data()transmits that frame over the network, but it must first acquire a lock viaasync with self._write_lock:.
So for h2 to register a stream as closed, the END_STREAM frame must actually be sent—which depends on acquiring the write lock.
If an asyncio task is cancelled before that happens, a catch-all except BaseException in the cancellation path triggers _response_closed(). That method does the following:
- Releases a slot on the semaphore:
self._max_streams_semaphore.release() - Removes the stream from
self._events
However, at this point, h2 may never have received the END_STREAM frame, so its internal stream count does not decrement. The result: httpcore believes a slot is free, while h2 still considers the stream open. This eventually leads to:
httpcore.LocalProtocolError: Max outbound streams is 100, 100 open
Scenarios That Trigger the Bug
-
Mass Cancellation:
- All 100 streams (limit = 100) are cancelled at once.
httpcorereleases 100 semaphore slots.- But if none of the
END_STREAMframes were sent,h2still sees 100 open streams. - Attempting to open a 101st stream causes the exception.
-
Lock Contention:
- One stream holds the
_write_lockand is sending data. - Other streams are created and cancelled faster than data can be written.
- Again,
httpcorereleases the semaphore, buth2does not observe stream closure.
- One stream holds the
-
Other Race Conditions:
- Variants of the above where timing differences between cancellation, lock acquisition, and data flush lead to desynchronization.
Reproducing the Issue
This can be reliably reproduced with a custom script that opens and cancels many streams rapidly.
Potential Solutions
1. Reset Streams on Cancellation
Modify _response_closed() to explicitly reset the stream in h2 if it's still open:
async def _response_closed(self, stream_id: int) -> None:
# START ADDITION
if stream_id in self._h2_state.streams:
h2_stream = self._h2_state.streams[stream_id]
if not h2_stream.closed:
self._h2_state.reset_stream(stream_id)
# END ADDITION
await self._max_streams_semaphore.release()
del self._events[stream_id]
async with self._state_lock:
if self._connection_terminated and not self._events:
await self.aclose()
elif self._state == HTTPConnectionState.ACTIVE and not self._events:
self._state = HTTPConnectionState.IDLE
if self._keepalive_expiry is not None:
now = time.monotonic()
self._expire_at = now + self._keepalive_expiry
if self._used_all_stream_ids: # pragma: nocover
await self.aclose()This reduces—but does not fully eliminate—the race condition by explicitly notifying h2 that the stream is no longer active.
2. Release Semaphore Only When h2 Confirms Closure
Alternatively, only release the semaphore once h2 has confirmed the stream is closed. This would eliminate the mismatch but requires a mechanism to defer semaphore release until END_STREAM or RST_STREAM is successfully sent.
This approach is more correct but also more complex. It introduces the need for additional state tracking or callbacks from the h2 state machine.
Once the issue is triggered we cannot recover from it if no streams are to send the data.
Reproduction
Set up a HTTP2 server
-
Create a virtual environment (optional but recommended):
python -m venv venv source venv/bin/activate # On Windows use: venv\Scripts\activate
-
Install required dependencies:
pip install hypercorn cryptography
-
Create a
server.pyfile with the following content:
import asyncio
import tempfile
import os
from hypercorn.config import Config
from hypercorn.asyncio import serve
PORT = 8000
def create_self_signed_cert():
"""Create a self-signed certificate for testing"""
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
import datetime
# Generate private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Create certificate
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"localhost"),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=1)
).add_extension(
x509.SubjectAlternativeName([
x509.DNSName(u"localhost"),
]),
critical=False,
).sign(private_key, hashes.SHA256())
# Write to temporary files
cert_file = tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pem')
key_file = tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pem')
cert_file.write(cert.public_bytes(serialization.Encoding.PEM))
key_file.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
cert_file.close()
key_file.close()
return cert_file.name, key_file.name
async def app(scope, receive, send):
assert scope["type"] == "http"
path = scope["path"]
if path == "/post":
body = b""
while True:
message = await receive()
body += message.get("body", b"")
if not message.get("more_body", False):
break
await asyncio.sleep(0.01)
await send({
"type": "http.response.start",
"status": 200,
"headers": [[b"content-type", b"application/json"]],
})
await send({
"type": "http.response.body",
"body": f'{{"data": "received {len(body)} bytes"}}'.encode(),
})
async def run_server():
cert_file, key_file = create_self_signed_cert()
try:
config = Config()
config.bind = [f"localhost:{PORT}"]
config.certfile = cert_file
config.keyfile = key_file
config.alpn_protocols = ["h2"] # Prefer HTTP/2
print(f"Starting HTTPS HTTP/2 server on https://localhost:{PORT}")
await serve(app, config)
finally:
# Cleanup temp files
os.unlink(cert_file)
os.unlink(key_file)
if __name__ == "__main__":
asyncio.run(run_server())- Run the server
After installing the dependencies, save your script (e.g., server.py) and run it:
python server.pyYou should see:
Starting HTTPS HTTP/2 server on https://localhost:8000
You can now make a POST request to https://localhost:8000/post (e.g., using curl or httpx with SSL verification disabled since it’s a self-signed cert):
curl -k -X POST https://localhost:8000/post -d "hello"This will return something like:
{"data": "received 5 bytes"}Script to reproduce the issue
- Install deps
pip install httpcore
- Create a
reproduce.pyfile with the following content:
import asyncio
import httpcore
import ssl
async def h2_stream_desync_realistic():
# Create SSL context that accepts self-signed certificates
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Use local HTTP/2 server
async with httpcore.AsyncConnectionPool(http2=True, max_connections=1, ssl_context=ssl_context) as pool:
print("=== Starting realistic desync test ===")
# Give it time to start and acquire the write lock
await asyncio.sleep(0.2)
print("Starting requests that will be cancelled...")
cancelled_tasks = []
# Create multiple POST requests that will all need to write data
for i in range(700):
post_data = b"data" * 1000 # 4KB per request
task = asyncio.create_task(
pool.request("POST", f"https://localhost:8000/post", content=post_data)
)
cancelled_tasks.append(task)
# Let them start and queue up waiting for write lock
await asyncio.sleep(0.1)
# Cancel all the queued requests
print("Cancelling queued requests...")
for task in cancelled_tasks:
task.cancel()
tasks = []
for i in range(50):
post_data = b"data" * 1000
tasks.append(asyncio.create_task(
pool.request("POST", f"https://localhost:8000/post", content=post_data)
))
for task in cancelled_tasks:
try:
await task
except asyncio.CancelledError:
pass
for task in tasks:
try:
await task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
result = asyncio.run(h2_stream_desync_realistic())- Run the script
You should see the error message.
Task exception was never retrieved
future: <Task finished name='Task-723' coro=<AsyncRequestInterface.request() done, defined at /Users/florian.courtial/Documents/code/httpcore/httpcore/_async/interfaces.py:21> exception=LocalProtocolError(TooManyStreamsError('Max outbound streams is 100, 100 open'))>
Traceback (most recent call last):
File "/Users/florian.courtial/Documents/code/httpcore/httpcore/_async/interfaces.py", line 45, in request
response = await self.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/florian.courtial/Documents/code/httpcore/httpcore/_async/connection_pool.py", line 256, in handle_async_request
raise exc from None
File "/Users/florian.courtial/Documents/code/httpcore/httpcore/_async/connection_pool.py", line 236, in handle_async_request
response = await connection.handle_async_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/florian.courtial/Documents/code/httpcore/httpcore/_async/connection.py", line 103, in handle_async_request
return await self._connection.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/florian.courtial/Documents/code/httpcore/httpcore/_async/http2.py", line 185, in handle_async_request
raise LocalProtocolError(exc) # pragma: nocover
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
httpcore.LocalProtocolError: Max outbound streams is 100, 100 open