@@ -406,58 +406,6 @@ class ClientCallsTest: AbstractCallsTest() {
406
406
}
407
407
}
408
408
409
- @FlowPreview
410
- @Test
411
- fun clientStreamingRpcCancellation () = runBlocking {
412
- val serverCancelled = Job ()
413
- val serverReceived = Job ()
414
- val serverImpl = object : GreeterGrpc .GreeterImplBase () {
415
- override fun clientStreamSayHello (
416
- responseObserver : StreamObserver <HelloReply >
417
- ): StreamObserver <HelloRequest > {
418
- return object : StreamObserver <HelloRequest > {
419
- private val names = mutableListOf<String >()
420
-
421
- override fun onNext (value : HelloRequest ) {
422
- whenContextIsCancelled { serverCancelled.complete() }
423
- Context .current().withCancellation().addListener(
424
- Context .CancellationListener {
425
- serverCancelled.complete()
426
- },
427
- directExecutor()
428
- )
429
- serverReceived.complete()
430
- names + = value.name
431
- }
432
-
433
- override fun onError (t : Throwable ) = throw t
434
-
435
- override fun onCompleted () {
436
- responseObserver.onNext(
437
- helloReply(names.joinToString(prefix = " Hello, " , separator = " , " ))
438
- )
439
- responseObserver.onCompleted()
440
- }
441
- }
442
- }
443
- }
444
-
445
- channel = makeChannel(serverImpl)
446
-
447
- val requests = Channel <HelloRequest >()
448
- val rpc = async {
449
- ClientCalls .clientStreamingRpc(
450
- channel = channel,
451
- method = clientStreamingSayHelloMethod,
452
- requests = requests.consumeAsFlow()
453
- )
454
- }
455
- requests.send(helloRequest(" Tim" ))
456
- serverReceived.join()
457
- rpc.cancel(CancellationException (" no longer needed" ))
458
- serverCancelled.join()
459
- }
460
-
461
409
@FlowPreview
462
410
@Test
463
411
fun clientStreamingRpcCancelled () = runBlocking {
0 commit comments