Skip to content

Commit 3c17848

Browse files
authored
Merge pull request grpc#24576 from lidizheng/unary-stream-exp
Add support for unary-stream benchmarking for Python
2 parents 96dc42e + 85228ef commit 3c17848

File tree

5 files changed

+68
-0
lines changed

5 files changed

+68
-0
lines changed

src/python/grpcio_tests/tests/qps/benchmark_client.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class GenericStub(object):
3434
def __init__(self, channel):
3535
self.UnaryCall = channel.unary_unary(
3636
'/grpc.testing.BenchmarkService/UnaryCall')
37+
self.StreamingFromServer = channel.unary_stream(
38+
'/grpc.testing.BenchmarkService/StreamingFromServer')
3739
self.StreamingCall = channel.stream_stream(
3840
'/grpc.testing.BenchmarkService/StreamingCall')
3941

@@ -200,3 +202,31 @@ def stop(self):
200202
stream.stop()
201203
self._pool.shutdown(wait=True)
202204
self._stub = None
205+
206+
207+
class ServerStreamingSyncBenchmarkClient(BenchmarkClient):
208+
209+
def __init__(self, server, config, hist):
210+
super(ServerStreamingSyncBenchmarkClient,
211+
self).__init__(server, config, hist)
212+
self._pool = futures.ThreadPoolExecutor(
213+
max_workers=config.outstanding_rpcs_per_channel)
214+
self._rpcs = []
215+
216+
def send_request(self):
217+
self._pool.submit(self._one_stream_streaming_rpc)
218+
219+
def _one_stream_streaming_rpc(self):
220+
response_stream = self._stub.StreamingFromServer(
221+
self._request, _TIMEOUT)
222+
self._rpcs.append(response_stream)
223+
start_time = time.time()
224+
for _ in response_stream:
225+
self._handle_response(self, time.time() - start_time)
226+
start_time = time.time()
227+
228+
def stop(self):
229+
for call in self._rpcs:
230+
call.cancel()
231+
self._pool.shutdown(wait=False)
232+
self._stub = None

src/python/grpcio_tests/tests/qps/qps_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"""The entry point for the qps worker."""
1515

1616
import argparse
17+
import logging
1718
import time
1819

1920
import grpc
@@ -35,6 +36,7 @@ def run_worker_server(driver_port, server_port):
3536

3637

3738
if __name__ == '__main__':
39+
logging.basicConfig(level=logging.DEBUG)
3840
parser = argparse.ArgumentParser(
3941
description='gRPC Python performance testing worker')
4042
parser.add_argument('--driver_port',

src/python/grpcio_tests/tests/qps/worker_server.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ def _create_client_runner(self, server, config, qps_data):
151151
elif config.rpc_type == control_pb2.STREAMING:
152152
client = benchmark_client.StreamingSyncBenchmarkClient(
153153
server, config, qps_data)
154+
elif config.rpc_type == control_pb2.STREAMING_FROM_SERVER:
155+
client = benchmark_client.ServerStreamingSyncBenchmarkClient(
156+
server, config, qps_data)
154157
elif config.client_type == control_pb2.ASYNC_CLIENT:
155158
if config.rpc_type == control_pb2.UNARY:
156159
client = benchmark_client.UnaryAsyncBenchmarkClient(

src/python/grpcio_tests/tests_aio/benchmark/benchmark_client.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ class GenericStub(object):
3333
def __init__(self, channel: aio.Channel):
3434
self.UnaryCall = channel.unary_unary(
3535
'/grpc.testing.BenchmarkService/UnaryCall')
36+
self.StreamingFromServer = channel.unary_stream(
37+
'/grpc.testing.BenchmarkService/StreamingFromServer')
3638
self.StreamingCall = channel.stream_stream(
3739
'/grpc.testing.BenchmarkService/StreamingCall')
3840

@@ -153,3 +155,32 @@ async def stop(self):
153155
self._running = False
154156
await self._stopped.wait()
155157
await super().stop()
158+
159+
160+
class ServerStreamingAsyncBenchmarkClient(BenchmarkClient):
161+
162+
def __init__(self, address: str, config: control_pb2.ClientConfig,
163+
hist: histogram.Histogram):
164+
super().__init__(address, config, hist)
165+
self._running = None
166+
self._stopped = asyncio.Event()
167+
168+
async def _one_server_streaming_call(self):
169+
call = self._stub.StreamingFromServer(self._request)
170+
while self._running:
171+
start_time = time.time()
172+
await call.read()
173+
self._record_query_time(time.time() - start_time)
174+
175+
async def run(self):
176+
await super().run()
177+
self._running = True
178+
senders = (
179+
self._one_server_streaming_call() for _ in range(self._concurrency))
180+
await asyncio.gather(*senders)
181+
self._stopped.set()
182+
183+
async def stop(self):
184+
self._running = False
185+
await self._stopped.wait()
186+
await super().stop()

src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ def _create_client(server: str, config: control_pb2.ClientConfig,
133133
client_type = benchmark_client.UnaryAsyncBenchmarkClient
134134
elif config.rpc_type == control_pb2.STREAMING:
135135
client_type = benchmark_client.StreamingAsyncBenchmarkClient
136+
elif config.rpc_type == control_pb2.STREAMING_FROM_SERVER:
137+
client_type = benchmark_client.ServerStreamingAsyncBenchmarkClient
136138
else:
137139
raise NotImplementedError(
138140
f'Unsupported rpc_type [{config.rpc_type}]')

0 commit comments

Comments
 (0)