Skip to content

Commit e9ee9a1

Browse files
Merge pull request #538 from sgeorgakis/master
[SCALA-353] Introduction to Event Sourcing using Akka Persistence
2 parents dbf54bd + f384000 commit e9ee9a1

File tree

7 files changed

+198
-18
lines changed

7 files changed

+198
-18
lines changed

build.sbt

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,11 @@ val scalaTestDeps = Seq(
1313
"org.scalatest" %% "scalatest" % "3.2.15" % Test,
1414
"org.scalatest" %% "scalatest-shouldmatchers" % "3.2.15" % Test,
1515
"org.scalatest" %% "scalatest-wordspec" % "3.2.15" % Test,
16-
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % Test,
16+
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % Test
1717
)
1818
val scalaMock = "org.scalamock" %% "scalamock" % "5.2.0" % Test
1919
val zioVersion = "2.0.6"
2020

21-
2221
lazy val scala_core = (project in file("scala-core"))
2322
.settings(
2423
name := "scala-core",
@@ -51,7 +50,7 @@ lazy val scala_core_4 = (project in file("scala-core-4"))
5150
name := "scala-core-4",
5251
libraryDependencies ++= scalaTestDeps,
5352
libraryDependencies += jUnitInterface,
54-
libraryDependencies += scalaReflection,
53+
libraryDependencies += scalaReflection
5554
)
5655

5756
lazy val scala_core_5 = (project in file("scala-core-5"))
@@ -134,14 +133,15 @@ lazy val scala_core_collections = (project in file("scala-core-collections"))
134133
name := "scala-core-collections",
135134
libraryDependencies ++= Seq(
136135
"org.scala-lang.modules" %% "scala-parallel-collections" % "1.0.4"
137-
) ++ scalaTestDeps,
136+
) ++ scalaTestDeps
138137
)
139138

140-
lazy val scala_core_collections_2 = (project in file("scala-core-collections-2"))
141-
.settings(
142-
name := "scala-core-collections-2",
143-
libraryDependencies ++= scalaTestDeps,
144-
)
139+
lazy val scala_core_collections_2 =
140+
(project in file("scala-core-collections-2"))
141+
.settings(
142+
name := "scala-core-collections-2",
143+
libraryDependencies ++= scalaTestDeps
144+
)
145145

146146
lazy val scala_test = (project in file("scala-test"))
147147
.settings(
@@ -187,11 +187,13 @@ lazy val scala_akka_2 = (project in file("scala-akka-2"))
187187
.settings(
188188
name := "scala-akka-2",
189189
libraryDependencies ++= Seq(
190-
"com.typesafe.akka" %% "akka-actor-typed" % "2.6.19",
191-
"com.typesafe.akka" %% "akka-http" % "10.2.10",
192-
"com.typesafe.akka" %% "akka-http-spray-json" % "10.2.10",
193-
"com.lightbend.akka" %% "akka-stream-alpakka-sse" % "4.0.0",
194-
"com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.6.19" % Test,
190+
"com.typesafe.akka" %% "akka-actor-typed" % "2.7.0",
191+
"com.typesafe.akka" %% "akka-stream" % "2.7.0",
192+
"com.typesafe.akka" %% "akka-http" % "10.4.0",
193+
"com.typesafe.akka" %% "akka-http-spray-json" % "10.4.0",
194+
"com.lightbend.akka" %% "akka-stream-alpakka-sse" % "5.0.0",
195+
"com.typesafe.akka" %% "akka-persistence-typed" % "2.7.0",
196+
"com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.7.0" % Test
195197
) ++ scalaTestDeps
196198
)
197199
val monocleVersion = "2.1.0"
@@ -359,7 +361,7 @@ lazy val scala3_lang = (project in file("scala3-lang")).settings(
359361

360362
lazy val scala3_lang_2 = (project in file("scala3-lang-2")).settings(
361363
libraryDependencies ++= scalaTestDeps
362-
)
364+
)
363365

364366
lazy val cats_effects = (project in file("cats-effects"))
365367
.settings(

project/plugins.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.3.2")
33
addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.4")
44
addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.10.1")
55

6-
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.1.6")
6+
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.2.1")

scala-akka-2/src/main/resources/application.conf

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,15 @@ akka.grpc.client {
77
}
88
}
99

10-
akka.http.server.preview.enable-http2 = on
10+
akka.http.server.preview.enable-http2 = on
11+
12+
akka.persistence {
13+
journal {
14+
plugin = "akka.persistence.journal.inmem"
15+
}
16+
17+
snapshot-store {
18+
plugin = "akka.persistence.snapshot-store.local"
19+
local.dir = "target/snapshot"
20+
}
21+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package com.baeldung.scala.akka_2.persistence
2+
3+
import akka.actor.typed.{ActorRef, Behavior}
4+
import akka.pattern.StatusReply
5+
import akka.persistence.typed.PersistenceId
6+
import akka.persistence.typed.scaladsl.{
7+
Effect,
8+
EventSourcedBehavior,
9+
ReplyEffect,
10+
RetentionCriteria
11+
}
12+
13+
object BankAccount {
14+
sealed trait Command
15+
final case class WithdrawCommand(
16+
amount: Int,
17+
replyTo: ActorRef[StatusReply[Response]]
18+
) extends Command
19+
final case class DepositCommand(
20+
amount: Int,
21+
replyTo: ActorRef[StatusReply[Response]]
22+
) extends Command
23+
final case class BalanceCheckCommand(replyTo: ActorRef[StatusReply[Response]])
24+
extends Command
25+
26+
sealed trait Event
27+
final case class WithdrawalEvent(amount: Int) extends Event
28+
final case class DepositEvent(amount: Int) extends Event
29+
30+
final case class State(amount: Int)
31+
32+
sealed trait Response
33+
final case class ActionResponse(message: String) extends Response
34+
final case class BalanceCheckResponse(amount: Int) extends Response
35+
36+
def apply(id: Long): Behavior[Command] =
37+
EventSourcedBehavior
38+
.withEnforcedReplies[Command, Event, State](
39+
persistenceId = PersistenceId(id.toString, "bank-account"),
40+
emptyState = State(0),
41+
commandHandler = commandHandler,
42+
eventHandler = eventHandler
43+
)
44+
.withRetention(
45+
RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 3)
46+
)
47+
48+
val commandHandler: (State, Command) => ReplyEffect[Event, State] = {
49+
(state, command) =>
50+
command match {
51+
case WithdrawCommand(amount, replyTo) =>
52+
if (state.amount >= amount) {
53+
Effect
54+
.persist(WithdrawalEvent(amount))
55+
.thenReply(replyTo)(_ =>
56+
StatusReply.success(
57+
ActionResponse(s"Amount $amount was withdrawn")
58+
)
59+
)
60+
} else {
61+
Effect.reply(replyTo)(
62+
StatusReply.error(
63+
s"Account has insufficient funds. Available balance ${state.amount}"
64+
)
65+
)
66+
}
67+
case DepositCommand(amount, replyTo) =>
68+
Effect
69+
.persist(DepositEvent(amount))
70+
.thenReply(replyTo)(_ =>
71+
StatusReply.success(
72+
ActionResponse(s"Amount $amount was deposited")
73+
)
74+
)
75+
case BalanceCheckCommand(replyTo) =>
76+
Effect.reply(replyTo)(
77+
StatusReply.Success(BalanceCheckResponse(state.amount))
78+
)
79+
}
80+
}
81+
82+
val eventHandler: (State, Event) => State = { (state, event) =>
83+
event match {
84+
case WithdrawalEvent(amount) => state.copy(state.amount - amount)
85+
case DepositEvent(amount) => state.copy(state.amount + amount)
86+
}
87+
}
88+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
akka.persistence {
2+
journal {
3+
plugin = "akka.persistence.journal.inmem"
4+
}
5+
snapshot-store {
6+
plugin = "akka.persistence.snapshot-store.local"
7+
local.dir = "target/snapshot"
8+
}
9+
}

scala-akka-2/src/test/resources/application.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@ akka.grpc.client {
77
}
88
}
99

10-
akka.http.server.preview.enable-http2 = on
10+
akka.http.server.preview.enable-http2 = on
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.baeldung.scala.akka_2.persistence
2+
3+
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
4+
import akka.pattern.StatusReply
5+
import org.scalatest.wordspec.AnyWordSpecLike
6+
7+
class BankAccountTest extends ScalaTestWithActorTestKit with AnyWordSpecLike {
8+
9+
"A Bank Account" should {
10+
"deposit money to a bank account" in {
11+
val bankAccount = testKit.spawn(BankAccount(1))
12+
val probe = createTestProbe[StatusReply[BankAccount.Response]]
13+
bankAccount ! BankAccount.BalanceCheckCommand(probe.ref)
14+
probe.expectMessage(
15+
StatusReply.Success(BankAccount.BalanceCheckResponse(0))
16+
)
17+
bankAccount ! BankAccount.DepositCommand(100, probe.ref)
18+
probe.expectMessage(StatusReply.Success(BankAccount.ActionResponse("Amount 100 was deposited")))
19+
bankAccount ! BankAccount.BalanceCheckCommand(probe.ref)
20+
probe.expectMessage(
21+
StatusReply.Success(BankAccount.BalanceCheckResponse(100))
22+
)
23+
testKit.stop(bankAccount)
24+
}
25+
26+
"withdraw money from a bank account" in {
27+
val bankAccount = testKit.spawn(BankAccount(2))
28+
val probe = createTestProbe[StatusReply[BankAccount.Response]]
29+
bankAccount ! BankAccount.BalanceCheckCommand(probe.ref)
30+
probe.expectMessage(
31+
StatusReply.Success(BankAccount.BalanceCheckResponse(0))
32+
)
33+
bankAccount ! BankAccount.DepositCommand(100, probe.ref)
34+
probe.expectMessage(StatusReply.Success(BankAccount.ActionResponse("Amount 100 was deposited")))
35+
bankAccount ! BankAccount.BalanceCheckCommand(probe.ref)
36+
probe.expectMessage(
37+
StatusReply.Success(BankAccount.BalanceCheckResponse(100))
38+
)
39+
bankAccount ! BankAccount.WithdrawCommand(20, probe.ref)
40+
probe.expectMessage(StatusReply.Success(BankAccount.ActionResponse("Amount 20 was withdrawn")))
41+
bankAccount ! BankAccount.BalanceCheckCommand(probe.ref)
42+
probe.expectMessage(
43+
StatusReply.Success(BankAccount.BalanceCheckResponse(80))
44+
)
45+
testKit.stop(bankAccount)
46+
}
47+
48+
"fail to withdraw money from a bank account" in {
49+
val bankAccount = testKit.spawn(BankAccount(3))
50+
val probe = createTestProbe[StatusReply[BankAccount.Response]]
51+
bankAccount ! BankAccount.BalanceCheckCommand(probe.ref)
52+
probe.expectMessage(
53+
StatusReply.Success(BankAccount.BalanceCheckResponse(0))
54+
)
55+
bankAccount ! BankAccount.DepositCommand(100, probe.ref)
56+
probe.expectMessage(StatusReply.Success(BankAccount.ActionResponse("Amount 100 was deposited")))
57+
bankAccount ! BankAccount.BalanceCheckCommand(probe.ref)
58+
probe.expectMessage(
59+
StatusReply.Success(BankAccount.BalanceCheckResponse(100))
60+
)
61+
bankAccount ! BankAccount.WithdrawCommand(120, probe.ref)
62+
probe.expectMessage(StatusReply.Error("Account has insufficient funds. Available balance 100"))
63+
bankAccount ! BankAccount.BalanceCheckCommand(probe.ref)
64+
probe.expectMessage(
65+
StatusReply.Success(BankAccount.BalanceCheckResponse(100))
66+
)
67+
testKit.stop(bankAccount)
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)