Skip to content

Commit 746cf4d

Browse files
author
Matteo Di Pirro
committed
SCALA-355 Add code for Akka Stream article
1 parent a70fa3e commit 746cf4d

File tree

4 files changed

+104
-1
lines changed

4 files changed

+104
-1
lines changed

build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,8 @@ lazy val scala_akka_2 = (project in file("scala-akka-2"))
217217
"com.lightbend.akka" %% "akka-stream-alpakka-sse" % "5.0.0",
218218
"com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
219219
"com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,
220-
"com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion % Test
220+
"com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion % Test,
221+
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test
221222
) ++ scalaTestDeps
222223
)
223224
val monocleVersion = "2.1.0"
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.baeldung.scala.akka.stream
2+
3+
import akka.actor.ActorSystem
4+
5+
import scala.util.{Failure, Success}
6+
7+
object Main extends App {
8+
implicit val system: ActorSystem = ActorSystem("baeldung")
9+
10+
source
11+
.via(parse)
12+
.via(compare)
13+
.runWith(sink)
14+
.andThen {
15+
case Failure(exception) => println(exception)
16+
case Success((correct, total)) =>
17+
println(s"$correct/$total correct answers")
18+
}(system.dispatcher)
19+
.onComplete(_ => system.terminate())(system.dispatcher)
20+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.baeldung.scala.akka
2+
3+
import akka.NotUsed
4+
import akka.stream.scaladsl.{Flow, Sink, Source}
5+
6+
import scala.concurrent.Future
7+
8+
package object stream {
9+
val source: Source[String, NotUsed] = Source(
10+
Seq("5,10", "15,15", "78,79", "12,12", "0,0", "456,456")
11+
)
12+
13+
val parse: Flow[String, (Int, Int), NotUsed] =
14+
Flow[String]
15+
.map { pair =>
16+
val parts = pair.split(",")
17+
(parts(0).toInt, parts(1).toInt)
18+
}
19+
20+
val compare: Flow[(Int, Int), Boolean, NotUsed] =
21+
Flow[(Int, Int)]
22+
.map { case (userAnswer, correctAnswer) => userAnswer == correctAnswer }
23+
24+
val sink: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
25+
case ((correctCount, total), wasCorrect) =>
26+
if (wasCorrect) (correctCount + 1, total + 1)
27+
else (correctCount, total + 1)
28+
}
29+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.baeldung.scala.akka.stream
2+
3+
import akka.actor.ActorSystem
4+
import akka.stream.scaladsl.Keep
5+
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
6+
import org.scalatest.concurrent.ScalaFutures.convertScalaFuture
7+
import org.scalatest.flatspec.AnyFlatSpec
8+
import org.scalatest.matchers.should.Matchers
9+
10+
class StreamTest extends AnyFlatSpec with Matchers {
11+
implicit val system: ActorSystem = ActorSystem("baeldung")
12+
13+
"The parse flow" should "parse pairs of integers" in {
14+
val (pub, sub) = TestSource[String]()
15+
.via(parse)
16+
.toMat(TestSink[(Int, Int)]())(Keep.both)
17+
.run()
18+
19+
pub.sendNext("1,1")
20+
pub.sendNext("145,146")
21+
pub.sendComplete()
22+
23+
sub.requestNext((1, 1))
24+
sub.requestNext((145, 146))
25+
sub.expectComplete()
26+
}
27+
28+
"The compare flow" should "compare pairs of integers" in {
29+
val (pub, sub) = TestSource[(Int, Int)]()
30+
.via(compare)
31+
.toMat(TestSink[Boolean]())(Keep.both)
32+
.run()
33+
34+
pub.sendNext((1, 1))
35+
pub.sendNext((145, 146))
36+
pub.sendComplete()
37+
38+
sub.requestNext(true)
39+
sub.requestNext(false)
40+
sub.expectComplete()
41+
}
42+
43+
"The sink sink" should "count the number of trues" in {
44+
val (probe, result) = TestSource[Boolean]().toMat(sink)(Keep.both).run()
45+
46+
probe.sendNext(true)
47+
probe.sendNext(false)
48+
probe.sendNext(false)
49+
probe.sendComplete()
50+
51+
result.futureValue shouldBe (1, 3)
52+
}
53+
}

0 commit comments

Comments
 (0)