Skip to content

Commit 1608763

Browse files
committed
adds streaming and updates from google3
1 parent 90a9383 commit 1608763

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+5037
-1841
lines changed

BUILD

+7-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,13 @@ package_group(
2222

2323
package_group(
2424
name = "approved_clients",
25-
packages = [],
25+
packages = [
26+
"//java/com/google/apps/framework/server/kotlin/...",
27+
"//java/com/google/apps/tiktok/rpc/...",
28+
"//java/com/google/corp/accessibility/tracker/...",
29+
"//javatests/com/google/apps/tiktok/rpc/...",
30+
"//javatests/com/google/corp/accessibility/tracker/...",
31+
],
2632
)
2733

2834
alias(

example/src/main/kotlin/io/grpc/examples/routeguide/BUILD

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ kt_jvm_library(
1717
"//google/protobuf:duration_java_proto",
1818
"//java/com/google/common/base",
1919
"//java/com/google/common/io",
20-
"//java/com/google/protobuf/util:javatime",
2120
"//java/com/google/protobuf/util:json",
21+
"//java/com/google/protobuf/util:time",
2222
"//third_party/java/grpc:core",
2323
"//third_party/java/grpc:netty",
2424
"//third_party/kotlin/grpc_kotlin/example/src/main/proto:route_guide_grpc_kotlin",
25-
"//third_party/kotlin/grpc_kotlin/example/src/main/proto:route_guide_proto_kotlin",
25+
"//third_party/kotlin/grpc_kotlin/example/src/main/proto:route_guide_proto_java",
2626
"//third_party/kotlin/grpc_kotlin/example/src/main/resources/io/grpc/examples/routeguide:resources",
2727
"//third_party/kotlin/kotlinx_coroutines",
2828
],
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,30 @@
11
package io.grpc.examples.routeguide
22

3-
import com.google.protobuf.util.JavaTimeConversions
43
import io.grpc.ManagedChannel
54
import io.grpc.ManagedChannelBuilder
65
import io.grpc.examples.routeguide.RouteGuideGrpcKt.RouteGuideCoroutineStub
7-
import java.io.Closeable
8-
import java.util.concurrent.Executors
9-
import java.util.concurrent.TimeUnit
10-
import kotlin.random.Random
11-
import kotlin.random.nextLong
126
import kotlinx.coroutines.CoroutineDispatcher
137
import kotlinx.coroutines.asCoroutineDispatcher
148
import kotlinx.coroutines.asExecutor
15-
import kotlinx.coroutines.async
16-
import kotlinx.coroutines.channels.Channel
179
import kotlinx.coroutines.delay
10+
import kotlinx.coroutines.flow.asFlow
11+
import kotlinx.coroutines.flow.collect
12+
import kotlinx.coroutines.flow.flow
13+
import kotlinx.coroutines.flow.onEach
1814
import kotlinx.coroutines.launch
1915
import kotlinx.coroutines.runBlocking
16+
import java.io.Closeable
17+
import java.util.concurrent.Executors
18+
import java.util.concurrent.TimeUnit
19+
import kotlin.random.Random
20+
import kotlin.random.nextLong
2021

2122
class RouteGuideClient private constructor(
2223
val channel: ManagedChannel,
2324
val stub: RouteGuideCoroutineStub,
2425
val printer: Printer
2526
) : Closeable {
26-
val random = Random(314159)
27+
private val random = Random(314159)
2728

2829
constructor(
2930
channelBuilder: ManagedChannelBuilder<*>,
@@ -55,13 +56,18 @@ class RouteGuideClient private constructor(
5556
}
5657
}
5758
}
59+
60+
private fun point(lat: Int, lon: Int): Point =
61+
Point.newBuilder()
62+
.setLatitude(lat)
63+
.setLongitude(lon)
64+
.build()
5865
}
5966

6067
override fun close() {
6168
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS)
6269
}
6370

64-
6571
interface Printer {
6672
companion object {
6773
val stdout = object : Printer {
@@ -76,10 +82,10 @@ class RouteGuideClient private constructor(
7682
fun getFeature(latitude: Int, longitude: Int) = runBlocking {
7783
printer.println("*** GetFeature: lat=$latitude lon=$longitude")
7884

79-
val request = point {
80-
this.latitude = latitude
81-
this.longitude = longitude
82-
}
85+
val request = Point.newBuilder()
86+
.setLatitude(latitude)
87+
.setLongitude(longitude)
88+
.build()
8389
val feature = stub.getFeature(request)
8490

8591
if (feature.exists()) {
@@ -92,90 +98,67 @@ class RouteGuideClient private constructor(
9298
fun listFeatures(lowLat: Int, lowLon: Int, hiLat: Int, hiLon: Int) = runBlocking {
9399
printer.println("*** ListFeatures: lowLat=$lowLat lowLon=$lowLon hiLat=$hiLat liLon=$hiLon")
94100

95-
val request = rectangle {
96-
lo = point {
97-
latitude = lowLat
98-
longitude = lowLon
99-
}
100-
hi = point {
101-
latitude = lowLat
102-
longitude = lowLon
103-
}
104-
}
101+
val request = Rectangle.newBuilder()
102+
.setLo(point(lowLat, lowLon))
103+
.setHi(point(hiLat, hiLon))
104+
.build()
105105
var i = 1
106-
for (feature in stub.listFeatures(request)) {
107-
printer.println("Result #$i: $feature")
108-
i++
106+
stub.listFeatures(request).collect { feature ->
107+
printer.println("Result #${i++}: $feature")
109108
}
110109
}
111110

112111
fun recordRoute(features: List<Feature>, numPoints: Int) = runBlocking {
113112
printer.println("*** RecordRoute")
114-
val requests = Channel<Point>()
113+
val requests = flow {
114+
for (i in 1..numPoints) {
115+
val feature = features.random(random)
116+
println("Visiting point ${feature.location.toStr()}")
117+
emit(feature.location)
118+
delay(timeMillis = random.nextLong(500L..1500L))
119+
}
120+
}
115121
val finish = launch {
116122
val summary = stub.recordRoute(requests)
117123
printer.println("Finished trip with ${summary.pointCount} points.")
118124
printer.println("Passed ${summary.featureCount} features.")
119125
printer.println("Travelled ${summary.distance} meters.")
120-
val duration = JavaTimeConversions.toJavaDuration(summary.elapsedTime).seconds
126+
val duration = summary.elapsedTime.seconds
121127
printer.println("It took $duration seconds.")
122128
}
123-
for (i in 1..numPoints) {
124-
val feature = features.random(random)
125-
println("Visiting point ${feature.location.toStr()}")
126-
requests.send(feature.location)
127-
delay(timeMillis = random.nextLong(500L..1500L))
128-
}
129-
requests.close()
130129
finish.join()
131130
}
132131

133132
fun routeChat() = runBlocking {
134133
printer.println("*** RouteChat")
135-
val requests = Channel<RouteNote>()
136-
val rpc = launch {
137-
val responses = stub.routeChat(requests)
138-
for (note in responses) {
139-
printer.println("Got message \"${note.message}\" at ${note.location.toStr()}")
140-
}
141-
println("Finished RouteChat")
142-
}
143134
val requestList = listOf(
144-
routeNote {
135+
RouteNote.newBuilder().apply {
145136
message = "First message"
146-
location = point {
147-
latitude = 0
148-
longitude = 0
149-
}
150-
},
151-
routeNote {
137+
location = point(0, 0)
138+
}.build(),
139+
RouteNote.newBuilder().apply {
152140
message = "Second message"
153-
location = point {
154-
latitude = 0
155-
longitude = 1
156-
}
157-
},
158-
routeNote {
141+
location = point(0, 0)
142+
}.build(),
143+
RouteNote.newBuilder().apply {
159144
message = "Third message"
160-
location = point {
161-
latitude = 1
162-
longitude = 0
163-
}
164-
},
165-
routeNote {
145+
location = point(1, 0)
146+
}.build(),
147+
RouteNote.newBuilder().apply {
166148
message = "Fourth message"
167-
location = point {
168-
latitude = 1
169-
longitude = 1
170-
}
171-
}
149+
location = point(1, 1)
150+
}.build()
172151
)
173-
174-
for (request in requestList) {
152+
val requests = requestList.asFlow().onEach { request ->
175153
printer.println("Sending message \"${request.message}\" at ${request.location.toStr()}")
176-
requests.send(request)
177154
}
178-
requests.close()
155+
val rpc = launch {
156+
stub.routeChat(requests).collect { note ->
157+
printer.println("Got message \"${note.message}\" at ${note.location.toStr()}")
158+
}
159+
println("Finished RouteChat")
160+
}
161+
179162
rpc.join()
180163
}
181164
}

example/src/main/kotlin/io/grpc/examples/routeguide/RouteGuideServer.kt

+30-28
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@ package io.grpc.examples.routeguide
33
import com.google.common.base.Stopwatch
44
import com.google.common.base.Ticker
55
import com.google.common.io.ByteSource
6-
import com.google.protobuf.util.JavaTimeConversions
6+
import com.google.protobuf.util.Durations
77
import io.grpc.Server
88
import io.grpc.ServerBuilder
9+
import kotlinx.coroutines.flow.Flow
10+
import kotlinx.coroutines.flow.asFlow
11+
import kotlinx.coroutines.flow.collect
12+
import kotlinx.coroutines.flow.filter
13+
import kotlinx.coroutines.flow.flow
914
import java.util.Collections
1015
import java.util.concurrent.ConcurrentHashMap
11-
import kotlinx.coroutines.channels.ReceiveChannel
12-
import kotlinx.coroutines.channels.SendChannel
16+
import java.util.concurrent.TimeUnit
1317

1418
/**
1519
* Kotlin adaptation of RouteGuideServer from the Java gRPC example.
@@ -74,53 +78,51 @@ class RouteGuideServer private constructor(
7478

7579
override suspend fun getFeature(request: Point): Feature {
7680
return features.find { it.location == request }
77-
?: feature { location = request } // No feature was found, return an unnamed feature.
81+
?: Feature.newBuilder().apply { location = request }.build()
82+
// No feature was found, return an unnamed feature.
7883
}
7984

80-
override suspend fun listFeatures(request: Rectangle, responses: SendChannel<Feature>) {
81-
for (feature in features) {
82-
if (feature.exists() && feature.location in request) {
83-
responses.send(feature)
84-
}
85-
}
85+
override fun listFeatures(request: Rectangle): Flow<Feature> {
86+
return features.asFlow().filter { it.exists() && it.location in request }
8687
}
8788

88-
override suspend fun recordRoute(requests: ReceiveChannel<Point>): RouteSummary {
89+
override suspend fun recordRoute(requests: Flow<Point>): RouteSummary {
8990
var pointCount = 0
9091
var featureCount = 0
9192
var distance = 0
9293
var previous: Point? = null
9394
val stopwatch = Stopwatch.createStarted(ticker)
94-
for (request in requests) {
95+
requests.collect { request ->
9596
pointCount++
9697
if (getFeature(request).exists()) {
9798
featureCount++
9899
}
99-
if (previous != null) {
100-
distance += previous distanceTo request
100+
val prev = previous
101+
if (prev != null) {
102+
distance += prev distanceTo request
101103
}
102104
previous = request
103105
}
104-
return routeSummary {
106+
return RouteSummary.newBuilder().apply {
105107
this.pointCount = pointCount
106108
this.featureCount = featureCount
107109
this.distance = distance
108-
this.elapsedTime = JavaTimeConversions.toProtoDuration(stopwatch.elapsed())
109-
}
110+
this.elapsedTime = Durations.fromMicros(stopwatch.elapsed(TimeUnit.MICROSECONDS))
111+
}.build()
110112
}
111113

112-
override suspend fun routeChat(
113-
requests: ReceiveChannel<RouteNote>,
114-
responses: SendChannel<RouteNote>
115-
) {
116-
for (note in requests) {
117-
val notes: MutableList<RouteNote> = routeNotes.computeIfAbsent(note.location) {
118-
Collections.synchronizedList(mutableListOf<RouteNote>())
119-
}
120-
for (prevNote in notes.toTypedArray()) { // thread-safe snapshot
121-
responses.send(prevNote)
114+
override fun routeChat(requests: Flow<RouteNote>): Flow<RouteNote> {
115+
return flow {
116+
// could use transform, but it's currently experimental
117+
requests.collect { note ->
118+
val notes: MutableList<RouteNote> = routeNotes.computeIfAbsent(note.location) {
119+
Collections.synchronizedList(mutableListOf<RouteNote>())
120+
}
121+
for (prevNote in notes.toTypedArray()) { // thread-safe snapshot
122+
emit(prevNote)
123+
}
124+
notes += note
122125
}
123-
notes += note
124126
}
125127
}
126128
}

example/src/main/proto/BUILD

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
load("//tools/build_defs/kotlin:rules.bzl", "kt_jvm_grpc_library", "kt_jvm_proto_library")
1+
load("//tools/build_defs/kotlin:rules.bzl", "kt_jvm_grpc_library")
22

33
package(
44
default_visibility = ["//third_party/kotlin/grpc_kotlin:__subpackages__"],
@@ -13,8 +13,8 @@ proto_library(
1313
deps = ["//google/protobuf:duration"],
1414
)
1515

16-
kt_jvm_proto_library(
17-
name = "route_guide_proto_kotlin",
16+
java_proto_library(
17+
name = "route_guide_proto_java",
1818
deps = [":route_guide_proto"],
1919
)
2020

0 commit comments

Comments
 (0)