Skip to content

Commit 41e250b

Browse files
committed
Server streaming: getTemperature
1 parent 3a97c8e commit 41e250b

File tree

5 files changed

+35
-11
lines changed

5 files changed

+35
-11
lines changed

README.md

+7-7
Original file line numberDiff line numberDiff line change
@@ -560,13 +560,13 @@ trait SmartHomeServiceApi[F[_]] {
560560
Whose interpretation could be:
561561

562562
```scala
563-
def getTemperature: Stream[F, TemperaturesSummary] = for {
564-
client <- Stream.eval(clientF)
565-
response <- client
566-
.getTemperature(Empty)
567-
.flatMap(t => Stream.eval(L.info(s"* Received new temperature: 👍 --> $t")).as(t))
568-
.fold(TemperaturesSummary.empty)((summary, temperature) => summary.append(temperature))
569-
} yield response
563+
def getTemperature: Stream[F, TemperaturesSummary] = {
564+
for {
565+
client <- Stream.eval(clientF)
566+
temperature <- client.getTemperature(Empty)
567+
_ <- Stream.eval(L.info(s"* Received new temperature: 👍 --> $temperature"))
568+
} yield temperature
569+
}.fold(TemperaturesSummary.empty)((summary, temperature) => summary.append(temperature))
570570
```
571571

572572
Basically, we are logging the incoming values and at the end we calculate the average of those values.

client/src/main/scala/client/ClientApp.scala

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ class ClientProgram[F[_]: Effect: Logger] extends AppBoot[F] {
1717
for {
1818
serviceApi <- SmartHomeServiceApi.createInstance(config.host.value, config.port.value)
1919
_ <- Stream.eval(serviceApi.isEmpty)
20+
summary <- serviceApi.getTemperature
21+
_ <- Stream.eval(Logger[F].info(s"The average temperature is: ${summary.averageTemperature}"))
2022
} yield StreamApp.ExitCode.Success
2123
}
2224

client/src/main/scala/client/SmartHomeServiceApi.scala

+12
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import cats.syntax.applicative._
55
import cats.syntax.flatMap._
66
import cats.syntax.functor._
77
import com.fortyseven.protocol._
8+
import freestyle.rpc.protocol.Empty
9+
import fs2.Stream
810
import io.chrisdavenport.log4cats.Logger
911
import io.grpc.{CallOptions, ManagedChannel}
1012
import monix.execution.Scheduler
@@ -14,6 +16,8 @@ import scala.concurrent.duration._
1416
trait SmartHomeServiceApi[F[_]] {
1517

1618
def isEmpty: F[Boolean]
19+
20+
def getTemperature: Stream[F, TemperaturesSummary]
1721
}
1822

1923
object SmartHomeServiceApi {
@@ -27,6 +31,14 @@ object SmartHomeServiceApi {
2731
result <- clientRPC.isEmpty(IsEmptyRequest())
2832
_ <- L.info(s"Result: $result")
2933
} yield result.result
34+
35+
def getTemperature: Stream[F, TemperaturesSummary] = {
36+
for {
37+
clientRPC <- Stream.eval(clientRPCF)
38+
temperature <- clientRPC.getTemperature(Empty)
39+
_ <- Stream.eval(L.info(s"* Received new temperature: 👍 --> $temperature"))
40+
} yield temperature
41+
}.fold(TemperaturesSummary.empty)((summary, temperature) => summary.append(temperature))
3042
}
3143

3244
def createInstance[F[_]: Effect](
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.fortyseven.protocol
22

3-
import freestyle.rpc.protocol.{service, Protobuf}
3+
import freestyle.rpc.protocol._
4+
import fs2.Stream
45

56
@service(Protobuf) trait SmartHomeService[F[_]] {
67

78
def isEmpty(request: IsEmptyRequest): F[IsEmptyResponse]
89

10+
def getTemperature(empty: Empty.type): Stream[F, Temperature]
911
}
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
package com.fortyseven.server
22

3-
import cats.effect.Sync
3+
import cats.effect.{Async, Timer}
44
import cats.syntax.functor._
5-
import com.fortyseven.protocol.{IsEmptyRequest, IsEmptyResponse, SmartHomeService}
5+
import com.fortyseven.protocol._
6+
import freestyle.rpc.protocol.Empty
7+
import fs2.Stream
68
import io.chrisdavenport.log4cats.Logger
79

8-
class SmartHomeServiceHandler[F[_]: Sync: Logger] extends SmartHomeService[F] {
10+
class SmartHomeServiceHandler[F[_]: Async: Logger: Timer: TemperatureReader]
11+
extends SmartHomeService[F] {
912
val serviceName = "SmartHomeService"
1013

1114
override def isEmpty(request: IsEmptyRequest): F[IsEmptyResponse] =
1215
Logger[F].info(s"$serviceName - Request: $request").as(IsEmptyResponse(true))
1316

17+
override def getTemperature(empty: Empty.type): Stream[F, Temperature] =
18+
for {
19+
_ <- Stream.eval(Logger[F].info(s"$serviceName - getTemperature Request"))
20+
temperatures <- TemperatureReader[F].sendSamples.take(20)
21+
} yield temperatures
1422
}

0 commit comments

Comments
 (0)