Skip to content

Commit f798df0

Browse files
committed
Bidirectional streaming: comingBackMode
1 parent d9df9a5 commit f798df0

File tree

5 files changed

+34
-3
lines changed

5 files changed

+34
-3
lines changed

client/src/main/scala/client/ClientApp.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.fortyseven.client
33
import cats.effect.{Effect, IO, Timer}
44
import com.fortyseven.commons._
55
import com.fortyseven.commons.config.ServiceConfig
6+
import com.fortyseven.protocol.implicits._
67
import fs2.{Stream, StreamApp}
78
import io.chrisdavenport.log4cats.Logger
89
import monix.execution.Scheduler
@@ -13,13 +14,15 @@ class ClientProgram[F[_]: Effect: Logger] extends AppBoot[F] {
1314

1415
implicit val TM: Timer[F] = Timer.derive[F](Effect[F], IO.timer(S))
1516

16-
override def appStream(config: ServiceConfig): fs2.Stream[F, StreamApp.ExitCode] =
17+
override def appStream(config: ServiceConfig): fs2.Stream[F, StreamApp.ExitCode] = {
1718
for {
1819
serviceApi <- SmartHomeServiceApi.createInstance(config.host.value, config.port.value)
1920
_ <- Stream.eval(serviceApi.isEmpty)
2021
summary <- serviceApi.getTemperature
2122
_ <- Stream.eval(Logger[F].info(s"The average temperature is: ${summary.averageTemperature}"))
22-
} yield StreamApp.ExitCode.Success
23+
response <- serviceApi.comingBackMode(LocationsGenerator.get[F])
24+
} yield response.actions
25+
}.to(LogSink[F].showLines).drain.as(StreamApp.ExitCode.Success)
2326
}
2427

2528
object ClientApp extends ClientProgram[IO]

client/src/main/scala/client/SmartHomeServiceApi.scala

+8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ trait SmartHomeServiceApi[F[_]] {
1818
def isEmpty: F[Boolean]
1919

2020
def getTemperature: Stream[F, TemperaturesSummary]
21+
22+
def comingBackMode(locations: Stream[F, Location]): Stream[F, ComingBackModeResponse]
2123
}
2224

2325
object SmartHomeServiceApi {
@@ -39,6 +41,12 @@ object SmartHomeServiceApi {
3941
_ <- Stream.eval(L.info(s"* Received new temperature: 👍 --> $temperature"))
4042
} yield temperature
4143
}.fold(TemperaturesSummary.empty)((summary, temperature) => summary.append(temperature))
44+
45+
def comingBackMode(locations: Stream[F, Location]): Stream[F, ComingBackModeResponse] =
46+
for {
47+
clientRPC <- Stream.eval(clientRPCF)
48+
response <- clientRPC.comingBackMode(locations)
49+
} yield response
4250
}
4351

4452
def createInstance[F[_]: Effect](

protocol/src/main/scala/protocol/Messages.scala

+3
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,6 @@ final case class IsEmptyRequest()
1717

1818
@message
1919
final case class IsEmptyResponse(result: Boolean)
20+
21+
@message
22+
final case class ComingBackModeResponse(actions: List[SmartHomeAction])

protocol/src/main/scala/protocol/SmartHomeService.scala

+2
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,6 @@ import fs2.Stream
88
def isEmpty(request: IsEmptyRequest): F[IsEmptyResponse]
99

1010
def getTemperature(empty: Empty.type): Stream[F, Temperature]
11+
12+
def comingBackMode(request: Stream[F, Location]): Stream[F, ComingBackModeResponse]
1113
}

server/src/main/scala/server/SmartHomeServiceHandler.scala

+16-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import freestyle.rpc.protocol.Empty
77
import fs2.Stream
88
import io.chrisdavenport.log4cats.Logger
99

10-
class SmartHomeServiceHandler[F[_]: Async: Logger: Timer: TemperatureReader]
10+
class SmartHomeServiceHandler[F[_]: Async: Logger: Timer: TemperatureReader: SmartHomeSupervisor]
1111
extends SmartHomeService[F] {
1212
val serviceName = "SmartHomeService"
1313

@@ -19,4 +19,19 @@ class SmartHomeServiceHandler[F[_]: Async: Logger: Timer: TemperatureReader]
1919
_ <- Stream.eval(Logger[F].info(s"$serviceName - getTemperature Request"))
2020
temperatures <- TemperatureReader[F].sendSamples.take(20)
2121
} yield temperatures
22+
23+
override def comingBackMode(request: Stream[F, Location]): Stream[F, ComingBackModeResponse] =
24+
for {
25+
_ <- Stream.eval(Logger[F].info(s"$serviceName - Enabling Coming Back Home mode"))
26+
location <- request
27+
_ <- Stream.eval(
28+
if (location.distanceToDestination > 0.0d)
29+
Logger[F]
30+
.info(s"$serviceName - Distance to destination: ${location.distanceToDestination} mi")
31+
else
32+
Logger[F]
33+
.info(s"$serviceName - You have reached your destination 🏡"))
34+
response <- Stream.eval(
35+
SmartHomeSupervisor[F].performAction(location).map(ComingBackModeResponse))
36+
} yield response
2237
}

0 commit comments

Comments
 (0)