|
17 | 17 | - [Server](#server-1)
|
18 | 18 | - [Client](#client-1)
|
19 | 19 | - [Result](#result)
|
| 20 | +- [Server-streaming RPC service: `GetTemperature`](#server-streaming-rpc-service-gettemperature) |
| 21 | + - [Protocol](#protocol-3) |
| 22 | + - [Server](#server-2) |
| 23 | + - [Client](#client-2) |
| 24 | + - [Result](#result-1) |
20 | 25 |
|
21 | 26 | <!-- END doctoc generated TOC please keep comment here to allow auto update -->
|
22 | 27 |
|
@@ -457,6 +462,185 @@ And the server log the request as expected:
|
457 | 462 | INFO - SmartHomeService - Request: IsEmptyRequest()
|
458 | 463 | ```
|
459 | 464 |
|
| 465 | + |
| 466 | +## Server-streaming RPC service: `GetTemperature` |
| 467 | + |
| 468 | +Following the established plan, the next step is building the service that returns a stream of temperature values, to let clients subscribe to collect real-time info. |
| 469 | + |
| 470 | +### Protocol |
| 471 | + |
| 472 | +As usual we should add this operation in the protocol. |
| 473 | + |
| 474 | +**_Messages.scala_** |
| 475 | + |
| 476 | +Adding new models: |
| 477 | + |
| 478 | +```scala |
| 479 | +case class TemperatureUnit(value: String) extends AnyVal |
| 480 | +case class Temperature(value: Double, unit: TemperatureUnit) |
| 481 | +``` |
| 482 | + |
| 483 | +**_SmartHomeService.scala_** |
| 484 | + |
| 485 | +And the `getTemperature` operation: |
| 486 | + |
| 487 | +```scala |
| 488 | +@service(Protobuf) trait SmartHomeService[F[_]] { |
| 489 | + |
| 490 | + def isEmpty(request: IsEmptyRequest): F[IsEmptyResponse] |
| 491 | + |
| 492 | + def getTemperature(empty: Empty.type): Stream[F, Temperature] |
| 493 | +} |
| 494 | +``` |
| 495 | + |
| 496 | +### Server |
| 497 | + |
| 498 | +If we want to emit a stream of `Temperature` values, we would be well advised to develop a producer of `Temperature` in the server side. For instance: |
| 499 | + |
| 500 | +```scala |
| 501 | +trait TemperatureReader[F[_]] { |
| 502 | + def sendSamples: Stream[F, Temperature] |
| 503 | +} |
| 504 | + |
| 505 | +object TemperatureReader { |
| 506 | + implicit def instance[F[_]: Sync: Logger: Timer]: TemperatureReader[F] = |
| 507 | + new TemperatureReader[F] { |
| 508 | + val seed = Temperature(77d, TemperatureUnit("Fahrenheit")) |
| 509 | + |
| 510 | + def readTemperature(current: Temperature): F[Temperature] = |
| 511 | + Timer[F] |
| 512 | + .sleep(1.second) |
| 513 | + .flatMap(_ => |
| 514 | + Sync[F].delay { |
| 515 | + val increment: Double = Random.nextDouble() / 2d |
| 516 | + val signal = if (Random.nextBoolean()) 1 else -1 |
| 517 | + val currentValue = current.value |
| 518 | + |
| 519 | + current.copy( |
| 520 | + value = BigDecimal(currentValue + (signal * increment)) |
| 521 | + .setScale(2, RoundingMode.HALF_UP) |
| 522 | + .doubleValue) |
| 523 | + }) |
| 524 | + |
| 525 | + override def sendSamples: Stream[F, Temperature] = |
| 526 | + Stream.iterateEval(seed) { t => |
| 527 | + Logger[F].info(s"* New Temperature 👍 --> $t").flatMap(_ => readTemperature(t)) |
| 528 | + } |
| 529 | + } |
| 530 | + |
| 531 | + def apply[F[_]](implicit ev: TemperatureReader[F]): TemperatureReader[F] = ev |
| 532 | +} |
| 533 | +``` |
| 534 | + |
| 535 | +And this can be returned as response of the new service, in the interpreter. |
| 536 | + |
| 537 | +```scala |
| 538 | +override def getTemperature(empty: Empty.type): Stream[F, Temperature] = for { |
| 539 | + _ <- Stream.eval(Logger[F].info(s"$serviceName - getTemperature Request")) |
| 540 | + temperatures <- TemperatureReader[F].sendSamples.take(20) |
| 541 | +} yield temperatures |
| 542 | +``` |
| 543 | + |
| 544 | +### Client |
| 545 | + |
| 546 | +We have nothing less than adapt the client to consume the new service when it starting up. To this, a couple of changes are needed: |
| 547 | + |
| 548 | +Firstly we should enrich the algebra |
| 549 | + |
| 550 | +```scala |
| 551 | +trait SmartHomeServiceApi[F[_]] { |
| 552 | + |
| 553 | + def isEmpty(): F[Boolean] |
| 554 | + |
| 555 | + def getTemperature(): Stream[F, Temperature] |
| 556 | + |
| 557 | +} |
| 558 | +``` |
| 559 | + |
| 560 | +Whose interpretation could be: |
| 561 | + |
| 562 | +```scala |
| 563 | +def getTemperature: Stream[F, TemperaturesSummary] = for { |
| 564 | + client <- Stream.eval(clientF) |
| 565 | + response <- client |
| 566 | + .getTemperature(Empty) |
| 567 | + .flatMap(t => Stream.eval(L.info(s"* Received new temperature: 👍 --> $t")).as(t)) |
| 568 | + .fold(TemperaturesSummary.empty)((summary, temperature) => summary.append(temperature)) |
| 569 | +} yield response |
| 570 | +``` |
| 571 | + |
| 572 | +Basically, we are logging the incoming values and at the end we calculate the average of those values. |
| 573 | + |
| 574 | +Now, the client app calls to both services: `isEmpty` and `getTemperature`. |
| 575 | +And finally, to call it: |
| 576 | + |
| 577 | +```scala |
| 578 | +for { |
| 579 | + serviceApi <- SmartHomeServiceApi.createInstance(config.host.value, config.port.value) |
| 580 | + _ <- Stream.eval(serviceApi.isEmpty) |
| 581 | + summary <- serviceApi.getTemperature |
| 582 | + _ <- Stream.eval(Logger[F].info(s"The average temperature is: ${summary.averageTemperature}")) |
| 583 | +} yield StreamApp.ExitCode.Success |
| 584 | +``` |
| 585 | + |
| 586 | +### Result |
| 587 | + |
| 588 | +When we run the client now with `sbt runClient` we get: |
| 589 | + |
| 590 | +```bash |
| 591 | +INFO - Created new RPC client for (localhost,19683) |
| 592 | +INFO - Result: IsEmptyResponse(true) |
| 593 | +INFO - * Received new temperature: 👍 --> Temperature(77.0,TemperatureUnit(Fahrenheit)) |
| 594 | +INFO - * Received new temperature: 👍 --> Temperature(77.25,TemperatureUnit(Fahrenheit)) |
| 595 | +INFO - * Received new temperature: 👍 --> Temperature(77.58,TemperatureUnit(Fahrenheit)) |
| 596 | +INFO - * Received new temperature: 👍 --> Temperature(78.02,TemperatureUnit(Fahrenheit)) |
| 597 | +INFO - * Received new temperature: 👍 --> Temperature(77.67,TemperatureUnit(Fahrenheit)) |
| 598 | +INFO - * Received new temperature: 👍 --> Temperature(77.5,TemperatureUnit(Fahrenheit)) |
| 599 | +INFO - * Received new temperature: 👍 --> Temperature(77.58,TemperatureUnit(Fahrenheit)) |
| 600 | +INFO - * Received new temperature: 👍 --> Temperature(77.15,TemperatureUnit(Fahrenheit)) |
| 601 | +INFO - * Received new temperature: 👍 --> Temperature(76.66,TemperatureUnit(Fahrenheit)) |
| 602 | +INFO - * Received new temperature: 👍 --> Temperature(76.45,TemperatureUnit(Fahrenheit)) |
| 603 | +INFO - * Received new temperature: 👍 --> Temperature(76.77,TemperatureUnit(Fahrenheit)) |
| 604 | +INFO - * Received new temperature: 👍 --> Temperature(76.74,TemperatureUnit(Fahrenheit)) |
| 605 | +INFO - * Received new temperature: 👍 --> Temperature(76.41,TemperatureUnit(Fahrenheit)) |
| 606 | +INFO - * Received new temperature: 👍 --> Temperature(76.59,TemperatureUnit(Fahrenheit)) |
| 607 | +INFO - * Received new temperature: 👍 --> Temperature(76.77,TemperatureUnit(Fahrenheit)) |
| 608 | +INFO - * Received new temperature: 👍 --> Temperature(76.49,TemperatureUnit(Fahrenheit)) |
| 609 | +INFO - * Received new temperature: 👍 --> Temperature(76.04,TemperatureUnit(Fahrenheit)) |
| 610 | +INFO - * Received new temperature: 👍 --> Temperature(76.42,TemperatureUnit(Fahrenheit)) |
| 611 | +INFO - * Received new temperature: 👍 --> Temperature(75.95,TemperatureUnit(Fahrenheit)) |
| 612 | +INFO - * Received new temperature: 👍 --> Temperature(75.97,TemperatureUnit(Fahrenheit)) |
| 613 | +INFO - The average temperature is: Temperature(76.85,TemperatureUnit(Fahrenheit)) |
| 614 | +INFO - Removed 1 RPC clients from cache. |
| 615 | +``` |
| 616 | + |
| 617 | +And the server log the request as expected: |
| 618 | + |
| 619 | +```bash |
| 620 | +INFO - ServiceName(seedServer) - Starting app.server at Host(localhost):Port(19683) |
| 621 | +INFO - SmartHomeService - Request: IsEmptyRequest() |
| 622 | +INFO - SmartHomeService - getTemperature Request |
| 623 | +INFO - * New Temperature 👍 --> Temperature(77.0,TemperatureUnit(Fahrenheit)) |
| 624 | +INFO - * New Temperature 👍 --> Temperature(77.25,TemperatureUnit(Fahrenheit)) |
| 625 | +INFO - * New Temperature 👍 --> Temperature(77.58,TemperatureUnit(Fahrenheit)) |
| 626 | +INFO - * New Temperature 👍 --> Temperature(78.02,TemperatureUnit(Fahrenheit)) |
| 627 | +INFO - * New Temperature 👍 --> Temperature(77.67,TemperatureUnit(Fahrenheit)) |
| 628 | +INFO - * New Temperature 👍 --> Temperature(77.5,TemperatureUnit(Fahrenheit)) |
| 629 | +INFO - * New Temperature 👍 --> Temperature(77.58,TemperatureUnit(Fahrenheit)) |
| 630 | +INFO - * New Temperature 👍 --> Temperature(77.15,TemperatureUnit(Fahrenheit)) |
| 631 | +INFO - * New Temperature 👍 --> Temperature(76.66,TemperatureUnit(Fahrenheit)) |
| 632 | +INFO - * New Temperature 👍 --> Temperature(76.45,TemperatureUnit(Fahrenheit)) |
| 633 | +INFO - * New Temperature 👍 --> Temperature(76.77,TemperatureUnit(Fahrenheit)) |
| 634 | +INFO - * New Temperature 👍 --> Temperature(76.74,TemperatureUnit(Fahrenheit)) |
| 635 | +INFO - * New Temperature 👍 --> Temperature(76.41,TemperatureUnit(Fahrenheit)) |
| 636 | +INFO - * New Temperature 👍 --> Temperature(76.59,TemperatureUnit(Fahrenheit)) |
| 637 | +INFO - * New Temperature 👍 --> Temperature(76.77,TemperatureUnit(Fahrenheit)) |
| 638 | +INFO - * New Temperature 👍 --> Temperature(76.49,TemperatureUnit(Fahrenheit)) |
| 639 | +INFO - * New Temperature 👍 --> Temperature(76.04,TemperatureUnit(Fahrenheit)) |
| 640 | +INFO - * New Temperature 👍 --> Temperature(76.42,TemperatureUnit(Fahrenheit)) |
| 641 | +INFO - * New Temperature 👍 --> Temperature(75.95,TemperatureUnit(Fahrenheit)) |
| 642 | +``` |
| 643 | + |
460 | 644 | <!-- DOCTOC SKIP -->
|
461 | 645 | # Copyright
|
462 | 646 |
|
|
0 commit comments