@@ -406,58 +406,6 @@ class ClientCallsTest: AbstractCallsTest() {
406
406
}
407
407
}
408
408
409
- @FlowPreview
410
- @Test
411
- fun clientStreamingRpcCancellation () = runBlocking {
412
- val serverCancelled = Job ()
413
- val serverReceived = Job ()
414
- val serverImpl = object : GreeterGrpc .GreeterImplBase () {
415
- override fun clientStreamSayHello (
416
- responseObserver : StreamObserver <HelloReply >
417
- ): StreamObserver <HelloRequest > {
418
- return object : StreamObserver <HelloRequest > {
419
- private val names = mutableListOf<String >()
420
-
421
- override fun onNext (value : HelloRequest ) {
422
- whenContextIsCancelled { serverCancelled.complete() }
423
- Context .current().withCancellation().addListener(
424
- Context .CancellationListener {
425
- serverCancelled.complete()
426
- },
427
- directExecutor()
428
- )
429
- serverReceived.complete()
430
- names + = value.name
431
- }
432
-
433
- override fun onError (t : Throwable ) = throw t
434
-
435
- override fun onCompleted () {
436
- responseObserver.onNext(
437
- helloReply(names.joinToString(prefix = " Hello, " , separator = " , " ))
438
- )
439
- responseObserver.onCompleted()
440
- }
441
- }
442
- }
443
- }
444
-
445
- channel = makeChannel(serverImpl)
446
-
447
- val requests = Channel <HelloRequest >()
448
- val rpc = async {
449
- ClientCalls .clientStreamingRpc(
450
- channel = channel,
451
- method = clientStreamingSayHelloMethod,
452
- requests = requests.consumeAsFlow()
453
- )
454
- }
455
- requests.send(helloRequest(" Tim" ))
456
- serverReceived.join()
457
- rpc.cancel(CancellationException (" no longer needed" ))
458
- serverCancelled.join()
459
- }
460
-
461
409
@FlowPreview
462
410
@Test
463
411
fun clientStreamingRpcCancelled () = runBlocking {
@@ -661,39 +609,4 @@ class ClientCallsTest: AbstractCallsTest() {
661
609
assertThat(responses.single()).isEqualTo(helloReply(" Hello, Sunstone" ))
662
610
assertThat(requestsEvaluations.get()).isEqualTo(2 )
663
611
}
664
-
665
- @ExperimentalCoroutinesApi
666
- @Test
667
- fun bidiStreamingCancelResponsesCancelsRequests () = runBlocking {
668
- val serverImpl = object : GreeterGrpc .GreeterImplBase () {
669
- override fun bidiStreamSayHello (
670
- responseObserver : StreamObserver <HelloReply >
671
- ): StreamObserver <HelloRequest > {
672
- return object : StreamObserver <HelloRequest > {
673
- override fun onNext (value : HelloRequest ) {
674
- responseObserver.onNext(helloReply(" Hello, ${value.name} " ))
675
- }
676
-
677
- override fun onError (t : Throwable ) = throw t
678
-
679
- override fun onCompleted () {
680
- responseObserver.onCompleted()
681
- }
682
- }
683
- }
684
- }
685
-
686
- val channel = makeChannel(serverImpl)
687
-
688
- val cancelled = Job ()
689
- val requests = flow {
690
- emit(helloRequest(" Steven" ))
691
- emit(helloRequest(" Garnet" ))
692
- suspendUntilCancelled { cancelled.complete() }
693
- }
694
- assertThat(
695
- ClientCalls .bidiStreamingRpc(channel, bidiStreamingSayHelloMethod, requests).take(2 ).toList()
696
- ).containsExactly(helloReply(" Hello, Steven" ), helloReply(" Hello, Garnet" )).inOrder()
697
- cancelled.join()
698
- }
699
612
}
0 commit comments