Skip to content

Commit 0451eab

Browse files
authored
Scala 635 cancellation in cats effect (#1194)
* SCALA-635: add examples on Cancellation in Cats Effect * SCALA-635: bump cats-effect to 3.5.3
1 parent 7db6f45 commit 0451eab

File tree

3 files changed

+220
-0
lines changed

3 files changed

+220
-0
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.baeldung.scala.catseffects
2+
3+
import cats.effect.{IO, Resource}
4+
import cats.implicits.{catsSyntaxParallelTraverse1, toTraverseOps}
5+
6+
import java.io.{InputStream, OutputStream}
7+
import java.util.concurrent.atomic.AtomicBoolean
8+
import scala.concurrent.duration.DurationInt
9+
10+
object Cancellation {
11+
12+
def cancelFiberDirectly[A](io: IO[A], onCancellation: => IO[Unit]): IO[Unit] =
13+
for {
14+
fiber <- io.onCancel(onCancellation).start
15+
_ <- fiber.cancel
16+
_ <- fiber.join
17+
} yield ()
18+
19+
def naiveParMap_1[A, B, C](
20+
ioa: IO[A],
21+
iob: IO[B],
22+
onCancelA: IO[A],
23+
onCancelB: IO[B],
24+
f: (A, B) => C
25+
): IO[C] =
26+
for {
27+
fiberA <- ioa.start
28+
fiberB <- iob.start
29+
a <- fiberA.joinWith(onCancelA).onError(_ => fiberB.cancel)
30+
b <- fiberB.joinWith(onCancelB).onError(_ => fiberA.cancel)
31+
} yield f(a, b)
32+
33+
def naiveParMap_2[A, B, C](
34+
ioa: IO[A],
35+
iob: IO[B],
36+
onCancelA: IO[A],
37+
onCancelB: IO[B],
38+
f: (A, B) => C
39+
): IO[C] =
40+
for {
41+
fiberA <- ioa.start
42+
fiberB <- iob.start
43+
fiberAj = fiberA.joinWith(onCancelA).onError(_ => fiberB.cancel)
44+
fiberBj = fiberB.joinWith(onCancelB).onError(_ => fiberA.cancel)
45+
regA <- fiberAj.start
46+
regB <- fiberBj.start
47+
a <- regA.joinWith(onCancelA)
48+
b <- regB.joinWith(onCancelB)
49+
} yield f(a, b)
50+
51+
def getEmails(): IO[List[String]] = IO(
52+
53+
)
54+
55+
def send(message: String)(email: String): IO[Unit] = IO.sleep(
56+
1.seconds
57+
) >> IO.println(s"sending email to $email with $message")
58+
59+
def sendEmailsUncancelable(message: String): IO[Unit] =
60+
for {
61+
emails <- getEmails()
62+
_ <- IO.println("ready to send emails")
63+
_ <- IO.uncancelable(_ => emails.traverse(send(message)))
64+
_ <- IO.println("emails are sent")
65+
} yield ()
66+
67+
def sendEmailsUncancelableResource(message: String) =
68+
Resource
69+
.make(getEmails())(emails => IO.println(s"emails $emails are released"))
70+
.use(emails =>
71+
IO.println("ready to send emails") >>
72+
IO.uncancelable(_ => emails.traverse(send(message))) >>
73+
IO.println("emails are sent")
74+
)
75+
76+
def sendEmailsUncancelableBracket(message: String): IO[Unit] =
77+
(getEmails()).bracket(emails =>
78+
IO.println("ready to send emails") >>
79+
IO.uncancelable(_ => emails.traverse(send(message))) >>
80+
IO.println("emails are sent")
81+
)(emails => IO.println(s"[bracket] emails $emails are released"))
82+
83+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.baeldung.scala.catseffects
2+
3+
import cats.effect.IOApp
4+
import cats.effect.IO
5+
import scala.concurrent.duration.DurationInt
6+
import java.util.concurrent.atomic.AtomicBoolean
7+
import cats.effect.ExitCode
8+
9+
object CancellationApp extends IOApp {
10+
11+
def example() = {
12+
val flag = new AtomicBoolean(false)
13+
var counter = 0
14+
val ioa = IO.blocking {
15+
while (!flag.get()) {
16+
Thread.sleep(500)
17+
println(s"counter = $counter")
18+
counter += 1
19+
}
20+
}
21+
22+
ioa.cancelable(
23+
IO.sleep(3.seconds) >> IO.println("executing the finalizer...") >> IO
24+
.delay(flag.set(true))
25+
)
26+
}
27+
28+
override def run(args: List[String]) =
29+
example().as(ExitCode.Success)
30+
31+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package com.baeldung.scala.catseffects
2+
3+
import cats.effect.kernel.CancelScope.Cancelable
4+
import cats.effect.{IO, Resource}
5+
import cats.effect.unsafe.implicits.global
6+
import cats.implicits.catsSyntaxTuple2Parallel
7+
import org.scalatest.matchers.should.Matchers
8+
import org.scalatest.wordspec.AnyWordSpec
9+
import java.util.concurrent.atomic.AtomicBoolean
10+
11+
import scala.concurrent.duration.DurationInt
12+
13+
class CancellationSpec extends AnyWordSpec with Matchers {
14+
15+
"Cancellation" should {
16+
17+
"cancel the fiber directly and execute the action on cancellation" in {
18+
val flag = new AtomicBoolean(false)
19+
val io = IO.println("ready to cancel the fiber") >> IO.sleep(
20+
5.seconds
21+
) >> IO.println("Hello, World!")
22+
val onCancel =
23+
IO.println("cancellation signal received") >> IO.delay(flag.set(true))
24+
val cancelFiber = Cancellation.cancelFiberDirectly(io, onCancel)
25+
cancelFiber.unsafeRunSync()
26+
flag.get() shouldBe true
27+
}
28+
29+
"naive parMap works" in {
30+
def tickingClock: IO[Unit] =
31+
for {
32+
_ <- IO.println(s"current time = ${System.currentTimeMillis()}")
33+
_ <- IO.sleep(250.millisecond)
34+
_ <- tickingClock
35+
} yield ()
36+
val error: IO[Unit] =
37+
IO.sleep(1.second) *> IO.raiseError(new RuntimeException("boom!"))
38+
39+
val parMapNaive_2 = Cancellation.naiveParMap_2(
40+
tickingClock,
41+
error,
42+
IO.println("[2] tickingClock was cancelled"),
43+
IO.println("[2] error was cancelled"),
44+
(_: Unit, _: Unit) => println("[2] Exit")
45+
)
46+
47+
val res = parMapNaive_2.handleError(e => e.getMessage).unsafeRunSync()
48+
res shouldBe "boom!"
49+
}
50+
51+
"send emails if the cancellation signal came during the sending process" in {
52+
val sendEmails: IO[Unit] =
53+
Cancellation
54+
.sendEmailsUncancelable("Exam on FP is shifted to Jan 11, 2024")
55+
.onCancel(IO.println("cancellation signal received"))
56+
IO.race(IO.sleep(1.seconds), sendEmails).unsafeRunSync()
57+
}
58+
59+
"if one of the effects running in parallel throws an error, the second one in cancelled, but it won't affect uncancelalble regions" in {
60+
val flag = new AtomicBoolean(false)
61+
val res = (
62+
IO.sleep(500.millisecond) >> IO.raiseError(
63+
new RuntimeException("Boom!")
64+
),
65+
IO.uncancelable(_ =>
66+
IO.sleep(1.second) >> IO.println("Hey there") >> IO.delay(
67+
flag.set(true)
68+
)
69+
)
70+
).parTupled
71+
.handleError(_ => flag.get)
72+
.unsafeRunSync()
73+
res shouldBe true
74+
}
75+
76+
"partially cancel the unancelable IO" in {
77+
var counter = 0
78+
val example: IO[Unit] = {
79+
val flag = new AtomicBoolean(false)
80+
val ioa = IO.blocking {
81+
while (!flag.get()) {
82+
Thread.sleep(100)
83+
println(s"counter = $counter")
84+
counter += 1
85+
}
86+
}
87+
88+
ioa.cancelable(
89+
IO.println("executing the finalizer...") >> IO.delay(flag.set(true))
90+
)
91+
}
92+
93+
(for {
94+
fiber <- example.start
95+
_ <- IO.sleep(500.millisecond)
96+
_ <- IO.println("cancelling the fiber")
97+
_ <- fiber.cancel
98+
_ <- fiber.join
99+
} yield ()).unsafeRunSync()
100+
101+
counter <= 6 shouldBe true
102+
}
103+
104+
}
105+
106+
}

0 commit comments

Comments
 (0)