Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions conformance/test/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,13 +674,13 @@ async def send_unary_request(

test_response.response.payloads.extend(payloads)

for name in meta.headers():
for name in meta.headers:
test_response.response.response_headers.add(
name=name, value=meta.headers().getall(name)
name=name, value=meta.headers.getall(name)
)
for name in meta.trailers():
for name in meta.trailers:
test_response.response.response_trailers.add(
name=name, value=meta.trailers().getall(name)
name=name, value=meta.trailers.getall(name)
)

return test_response
Expand Down
10 changes: 5 additions & 5 deletions conformance/test/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,22 @@ def _send_headers(
) -> None:
for header in definition.response_headers:
for value in header.value:
ctx.response_headers().add(header.name, value)
ctx.response_headers.add(header.name, value)
for trailer in definition.response_trailers:
for value in trailer.value:
ctx.response_trailers().add(trailer.name, value)
ctx.response_trailers.add(trailer.name, value)


def _create_request_info(
ctx: RequestContext, reqs: list[Any]
) -> ConformancePayload.RequestInfo:
request_info = ConformancePayload.RequestInfo(requests=reqs)
timeout_ms = ctx.timeout_ms()
timeout_ms = ctx.timeout_ms
if timeout_ms is not None:
request_info.timeout_ms = int(timeout_ms)
for key in ctx.request_headers():
for key in ctx.request_headers:
request_info.request_headers.add(
name=key, value=ctx.request_headers().getall(key)
name=key, value=ctx.request_headers.getall(key)
)
return request_info

Expand Down
2 changes: 1 addition & 1 deletion connectrpc-otel/connectrpc_otel/_instrumentor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from opentelemetry.metrics import MeterProvider
from opentelemetry.trace import TracerProvider

_instruments = ("connectrpc>=0.9.0",)
_instruments = ("connectrpc>=0.11.0",)

P = ParamSpec("P")
R = TypeVar("R")
Expand Down
10 changes: 5 additions & 5 deletions connectrpc-otel/connectrpc_otel/_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ async def on_start(self, ctx: RequestContext) -> Token:
def on_start_sync(self, ctx: RequestContext) -> Token:
start_time = time.perf_counter()

rpc_method = f"{ctx.method().service_name}/{ctx.method().name}"
rpc_method = f"{ctx.method.service_name}/{ctx.method.name}"
shared_attrs: dict[str, AttributeValue] = {
RPC_SYSTEM_NAME: RpcSystemNameValues.CONNECTRPC.value,
RPC_METHOD: rpc_method,
}

if sa := ctx.server_address():
if sa := ctx.server_address:
addr, port = sa.rsplit(":", 1)
shared_attrs[SERVER_ADDRESS] = addr
shared_attrs[SERVER_PORT] = int(port)
Expand Down Expand Up @@ -150,18 +150,18 @@ def _start_span(
parent_otel_ctx = None
if self._client:
span_kind = SpanKind.CLIENT
carrier = ctx.request_headers()
carrier = ctx.request_headers
self._propagator.inject(carrier, setter=_DEFAULT_TEXTMAP_SETTER)
else:
span_kind = SpanKind.SERVER
parent_span = get_current_span()
if not parent_span.get_span_context().is_valid:
carrier = ctx.request_headers()
carrier = ctx.request_headers
parent_otel_ctx = self._propagator.extract(carrier)

attrs: dict[str, AttributeValue] = shared_attrs.copy()

if ca := ctx.client_address():
if ca := ctx.client_address:
addr, port = ca.rsplit(":", 1)
attrs[CLIENT_ADDRESS] = addr
attrs[CLIENT_PORT] = int(port)
Expand Down
2 changes: 1 addition & 1 deletion connectrpc-otel/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ dev = [
# have a real dependency to avoid any possible version conflicts. But for Python,
# the ecosystem vastly favors auto-instrumentation, and it is easier to add than remove
# a transitive dependency, so we go ahead and leave it out.
"connectrpc>=0.8.0",
"connectrpc>=0.11.0",

"connect-python-example",
"pytest",
Expand Down
2 changes: 1 addition & 1 deletion protoc-gen-connect-python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "protoc-gen-connectrpc"
version = "0.10.1"
version = "0.11.0"
description = "Code generator for connect-python"
readme = "README.md"
requires-python = ">= 3.10"
Expand Down
2 changes: 1 addition & 1 deletion protoc-gen-connect-python/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "connectrpc"
version = "0.10.1"
version = "0.11.0"
description = "Server and client runtime library for Connect RPC"
readme = "README.md"
requires-python = ">= 3.10"
Expand Down
18 changes: 9 additions & 9 deletions src/connectrpc/_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ async def _send_request_unary(
self._send_request_bidi_stream(_yield_single_message(request), ctx)
)

request_headers = HTTPHeaders(ctx.request_headers().allitems())
url = f"{self._address}/{ctx.method().service_name}/{ctx.method().name}"
if (timeout_ms := ctx.timeout_ms()) is not None:
request_headers = HTTPHeaders(ctx.request_headers.allitems())
url = f"{self._address}/{ctx.method.service_name}/{ctx.method.name}"
if (timeout_ms := ctx.timeout_ms) is not None:
timeout_s = timeout_ms / 1000.0
else:
timeout_s = None
Expand All @@ -295,7 +295,7 @@ async def _send_request_unary(
if self._send_compression:
request_data = self._send_compression.compress(request_data)

if ctx.http_method() == "GET":
if ctx.http_method == "GET":
params = _client_shared.prepare_get_params(
self._codec, request_data, request_headers
)
Expand Down Expand Up @@ -333,7 +333,7 @@ async def _send_request_unary(
f"message is larger than configured max {self._read_max_bytes}",
)

response = ctx.method().output()
response = ctx.method.output()
self._codec.decode(resp.content, response)
return response
raise ConnectWireError.from_response(resp).to_exception()
Expand Down Expand Up @@ -361,9 +361,9 @@ def _send_request_server_stream(
async def _send_request_bidi_stream(
self, request: AsyncIterator[REQ], ctx: RequestContext[REQ, RES]
) -> AsyncIterator[RES]:
request_headers = HTTPHeaders(ctx.request_headers().allitems())
url = f"{self._address}/{ctx.method().service_name}/{ctx.method().name}"
if (timeout_ms := ctx.timeout_ms()) is not None:
request_headers = HTTPHeaders(ctx.request_headers.allitems())
url = f"{self._address}/{ctx.method.service_name}/{ctx.method.name}"
if (timeout_ms := ctx.timeout_ms) is not None:
timeout_s = timeout_ms / 1000.0
else:
timeout_s = None
Expand All @@ -390,7 +390,7 @@ async def _send_request_bidi_stream(
resp.headers, self._response_compressions, stream=True
)
reader = self._protocol.create_envelope_reader(
ctx.method().output,
ctx.method.output,
self._codec,
compression,
self._read_max_bytes,
Expand Down
2 changes: 1 addition & 1 deletion src/connectrpc/_client_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def maybe_map_stream_reset(
case StreamErrorCode.CANCEL:
# Some servers use CANCEL when deadline expires. We can't differentiate
# that from normal cancel without checking our own deadline.
if (t := ctx.timeout_ms()) is not None and t <= 0:
if (t := ctx.timeout_ms) is not None and t <= 0:
return ConnectError(Code.DEADLINE_EXCEEDED, msg)
return ConnectError(Code.CANCELED, msg)
case StreamErrorCode.ENHANCE_YOUR_CALM:
Expand Down
20 changes: 10 additions & 10 deletions src/connectrpc/_client_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ def _send_request_unary(self, request: REQ, ctx: RequestContext[REQ, RES]) -> RE
self._send_request_bidi_stream(iter([request]), ctx)
)

request_headers = HTTPHeaders(ctx.request_headers().allitems())
url = f"{self._address}/{ctx.method().service_name}/{ctx.method().name}"
if (timeout_ms := ctx.timeout_ms()) is not None:
request_headers = HTTPHeaders(ctx.request_headers.allitems())
url = f"{self._address}/{ctx.method.service_name}/{ctx.method.name}"
if (timeout_ms := ctx.timeout_ms) is not None:
timeout_s = timeout_ms / 1000.0
else:
timeout_s = None
Expand All @@ -292,7 +292,7 @@ def _send_request_unary(self, request: REQ, ctx: RequestContext[REQ, RES]) -> RE
if self._send_compression:
request_data = self._send_compression.compress(request_data)

if ctx.http_method() == "GET":
if ctx.http_method == "GET":
params = _client_shared.prepare_get_params(
self._codec, request_data, request_headers
)
Expand Down Expand Up @@ -330,7 +330,7 @@ def _send_request_unary(self, request: REQ, ctx: RequestContext[REQ, RES]) -> RE
f"message is larger than configured max {self._read_max_bytes}",
)

response = ctx.method().output()
response = ctx.method.output()
self._codec.decode(resp.content, response)
return response
raise ConnectWireError.from_response(resp).to_exception()
Expand All @@ -354,9 +354,9 @@ def _send_request_server_stream(
def _send_request_bidi_stream(
self, request: Iterator[REQ], ctx: RequestContext[REQ, RES]
) -> Iterator[RES]:
request_headers = HTTPHeaders(ctx.request_headers().allitems())
url = f"{self._address}/{ctx.method().service_name}/{ctx.method().name}"
if (timeout_ms := ctx.timeout_ms()) is not None:
request_headers = HTTPHeaders(ctx.request_headers.allitems())
url = f"{self._address}/{ctx.method.service_name}/{ctx.method.name}"
if (timeout_ms := ctx.timeout_ms) is not None:
timeout_s = timeout_ms / 1000.0
else:
timeout_s = None
Expand Down Expand Up @@ -386,7 +386,7 @@ def _send_request_bidi_stream(
resp.headers, self._response_compressions, stream=True
)
reader = self._protocol.create_envelope_reader(
ctx.method().output,
ctx.method.output,
self._codec,
compression,
self._read_max_bytes,
Expand All @@ -402,7 +402,7 @@ def _send_request_bidi_stream(
# correctly which is used for server timeouts. We go ahead and check
# the timeout ourselves too.
# https://github.com/hyperium/hyper/issues/3681#issuecomment-3734084436
if (t := ctx.timeout_ms()) is not None and t <= 0:
if (t := ctx.timeout_ms) is not None and t <= 0:
raise TimeoutError

reader.handle_response_complete(resp)
Expand Down
4 changes: 3 additions & 1 deletion src/connectrpc/_response_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def handle_response_trailers(
response = _current_response.get(None)
if not response:
return
response_trailers = response.trailers()
response_trailers = response.trailers
for key, value in trailers.items():
if isinstance(value, str):
response_trailers.add(key, value)
Expand Down Expand Up @@ -95,12 +95,14 @@ def __exit__(
_current_response.reset(self._token)
self._token = None

@property
def headers(self) -> Headers:
"""Returns the response headers."""
if self._headers is None:
return Headers()
return self._headers

@property
def trailers(self) -> Headers:
"""Returns the response trailers."""
if self._trailers is None:
Expand Down
10 changes: 4 additions & 6 deletions src/connectrpc/_server_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ async def _watch_for_disconnect() -> None:
error = e
finally:
end_message = writer.end(
ctx.response_trailers(),
ctx.response_trailers,
ConnectWireError.from_exception(error) if error else None,
)
if not sent_headers:
Expand Down Expand Up @@ -546,8 +546,7 @@ async def _send_stream_response_headers(
(protocol.compression_header_name().encode(), compression_name.encode()),
]
response_headers.extend(
(key.encode(), value.encode())
for key, value in ctx.response_headers().allitems()
(key.encode(), value.encode()) for key, value in ctx.response_headers.allitems()
)
await send(
{
Expand Down Expand Up @@ -624,12 +623,11 @@ def _add_context_headers(
headers: list[tuple[bytes, bytes]], ctx: RequestContext
) -> None:
headers.extend(
(key.encode(), value.encode())
for key, value in ctx.response_headers().allitems()
(key.encode(), value.encode()) for key, value in ctx.response_headers.allitems()
)
headers.extend(
(f"trailer-{key}".encode(), value.encode())
for key, value in ctx.response_trailers().allitems()
for key, value in ctx.response_trailers.allitems()
)


Expand Down
12 changes: 6 additions & 6 deletions src/connectrpc/_server_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ def _handle_stream(
# without error.
return [
_end_response(
writer.end(ctx.response_trailers(), None), send_trailers
writer.end(ctx.response_trailers, None), send_trailers
)
]

Expand All @@ -499,7 +499,7 @@ def _handle_stream(
return [
_end_response(
writer.end(
ctx.response_trailers(), ConnectWireError.from_exception(e)
ctx.response_trailers, ConnectWireError.from_exception(e)
),
send_trailers,
)
Expand Down Expand Up @@ -542,9 +542,9 @@ def _end_response(


def _add_context_headers(headers: list[tuple[str, str]], ctx: RequestContext) -> None:
headers.extend((key, value) for key, value in ctx.response_headers().allitems())
headers.extend((key, value) for key, value in ctx.response_headers.allitems())
headers.extend(
(f"trailer-{key}", value) for key, value in ctx.response_trailers().allitems()
(f"trailer-{key}", value) for key, value in ctx.response_trailers.allitems()
)


Expand All @@ -560,7 +560,7 @@ def _send_stream_response_headers(
(protocol.compression_header_name(), compression_name),
]
response_headers.extend(
(key, value) for key, value in ctx.response_headers().allitems()
(key, value) for key, value in ctx.response_headers.allitems()
)
start_response("200 OK", response_headers)

Expand Down Expand Up @@ -598,7 +598,7 @@ def _response_stream(

yield _end_response(
writer.end(
ctx.response_trailers(),
ctx.response_trailers,
ConnectWireError.from_exception(error) if error else None,
),
send_trailers,
Expand Down
Loading
Loading