Skip to content

Commit 8bcc43a

Browse files
authored
Merge pull request #193 from avast/PMHDelayMonitoring
PMH delay monitoring
2 parents 2d94bfd + b787117 commit 8bcc43a

File tree

3 files changed

+140
-54
lines changed

3 files changed

+140
-54
lines changed

core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala

+46-25
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.avast.clients.rabbitmq
22

3-
import cats.Applicative
43
import cats.effect.{Resource, Sync, Timer}
54
import cats.implicits.{catsSyntaxApplicativeError, catsSyntaxFlatMapOps, toFunctorOps}
65
import com.avast.bytes.Bytes
@@ -11,6 +10,7 @@ import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
1110
import com.avast.metrics.scalaeffectapi.{Meter, Monitor}
1211

1312
import java.time.Instant
13+
import scala.concurrent.duration.FiniteDuration
1414
import scala.reflect.ClassTag
1515
import scala.util.Try
1616
import scala.util.control.NonFatal
@@ -24,7 +24,7 @@ private trait PoisonedMessageHandlerAction[F[_], A] {
2424
def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[A], maxAttempts: Int)(implicit dctx: DeliveryContext): F[Unit]
2525
}
2626

27-
private sealed abstract class PoisonedMessageHandlerBase[F[_]: Sync: Timer, A](maxAttempts: Int,
27+
private abstract class PoisonedMessageHandlerBase[F[_]: Sync: Timer, A](maxAttempts: Int,
2828
republishDelay: Option[ExponentialDelay],
2929
helper: PoisonedMessageHandlerHelper[F])
3030
extends PoisonedMessageHandler[F, A]
@@ -69,11 +69,13 @@ private[rabbitmq] class NoOpPoisonedMessageHandler[F[_]: Sync, A](helper: Poison
6969
extends PoisonedMessageHandler[F, A] {
7070
override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)(
7171
implicit dctx: DeliveryContext): F[DeliveryResult] = {
72+
import helper._
73+
7274
result match {
7375
case DeliveryResult.DirectlyPoison =>
7476
helper.logger.warn("Delivery can't be poisoned, because NoOpPoisonedMessageHandler is installed! Rejecting instead...").as(Reject)
7577

76-
case _ => Sync[F].pure(result)
78+
case _ => F.pure(result)
7779
}
7880
}
7981
}
@@ -147,13 +149,15 @@ private[rabbitmq] object PoisonedMessageHandler {
147149
helper: PoisonedMessageHandlerHelper[F],
148150
republishDelay: Option[ExponentialDelay],
149151
handler: PoisonedMessageHandlerAction[F, A])(r: DeliveryResult)(implicit dctx: DeliveryContext): F[DeliveryResult] = {
152+
import helper._
153+
150154
r match {
151155
case Republish(isPoisoned, newHeaders) if isPoisoned =>
152156
adjustDeliveryResult(delivery, messageId, maxAttempts, newHeaders, helper, republishDelay, handler.handlePoisonedMessage(rawBody))
153157

154158
case DirectlyPoison => poisonRightAway(delivery, messageId, helper, handler.handlePoisonedMessage(rawBody))
155159

156-
case r => Applicative[F].pure(r) // keep other results as they are
160+
case r => F.pure(r) // keep other results as they are
157161
}
158162
}
159163

@@ -165,24 +169,23 @@ private[rabbitmq] object PoisonedMessageHandler {
165169
helper: PoisonedMessageHandlerHelper[F],
166170
republishDelay: Option[ExponentialDelay],
167171
handlePoisonedMessage: (Delivery[A], Int) => F[Unit])(implicit dctx: DeliveryContext): F[DeliveryResult] = {
168-
import cats.syntax.traverse._
169172
import helper._
170173

171174
// get current attempt no. from passed headers with fallback to original (incoming) headers - the fallback will most likely happen
172175
// but we're giving the programmer chance to programmatically _pretend_ lower attempt number
173-
val attempt = (delivery.properties.headers ++ newHeaders)
174-
.get(RepublishCountHeaderName)
175-
.flatMap(v => Try(v.toString.toInt).toOption)
176-
.getOrElse(0) + 1
176+
val attempt = getCurrentAttempt(delivery, newHeaders)
177177

178178
logger.debug(s"Attempt $attempt/$maxAttempts for $messageId") >> {
179179
if (attempt < maxAttempts) {
180-
for {
181-
_ <- republishDelay.traverse { d =>
180+
val republish =
181+
Republish(countAsPoisoned = true, newHeaders = newHeaders + (RepublishCountHeaderName -> attempt.asInstanceOf[AnyRef]))
182+
183+
republishDelay match {
184+
case Some(d) =>
182185
val delay = d.getExponentialDelay(attempt)
183-
logger.debug(s"Will republish the message in $delay") >> Timer[F].sleep(delay)
184-
}
185-
} yield Republish(countAsPoisoned = true, newHeaders = newHeaders + (RepublishCountHeaderName -> attempt.asInstanceOf[AnyRef]))
186+
logger.debug(s"Will republish the message in $delay") >> delayRepublish(delay)(republish)
187+
case None => F.pure(republish)
188+
}
186189
} else {
187190
val now = Instant.now()
188191

@@ -201,40 +204,58 @@ private[rabbitmq] object PoisonedMessageHandler {
201204
}
202205

203206
handlePoisonedMessage(finalDelivery, maxAttempts)
204-
.recoverWith {
205-
case NonFatal(e) => logger.warn(e)("Poisoned message handler failed")
206-
}
207-
.map(_ => Reject) // always REJECT the message
207+
.recoverWith { case NonFatal(e) => logger.warn(e)("Poisoned message handler failed") }
208+
.as(Reject) // always REJECT the message
208209
}
209210
}
210211
}
211212

213+
private def getCurrentAttempt[F[_]: Sync, A](delivery: Delivery[A], newHeaders: Map[String, AnyRef]): Int = {
214+
(delivery.properties.headers ++ newHeaders)
215+
.get(RepublishCountHeaderName)
216+
.flatMap(v => Try(v.toString.toInt).toOption)
217+
.getOrElse(0) + 1
218+
}
219+
212220
private def poisonRightAway[F[_]: Sync, A](
213221
delivery: Delivery[A],
214222
messageId: MessageId,
215223
helper: PoisonedMessageHandlerHelper[F],
216224
handlePoisonedMessage: (Delivery[A], Int) => F[Unit])(implicit dctx: DeliveryContext): F[DeliveryResult] = {
217-
helper.logger.info(s"Directly poisoning delivery $messageId") >>
225+
import helper._
226+
227+
logger.info(s"Directly poisoning delivery $messageId") >>
218228
handlePoisonedMessage(delivery, 0) >>
219-
helper.directlyPoisonedMeter.mark >>
220-
Sync[F].pure(Reject: DeliveryResult)
229+
directlyPoisonedMeter.mark >>
230+
F.pure(Reject: DeliveryResult)
221231
}
222232

223233
}
224234

225-
private[rabbitmq] class PoisonedMessageHandlerHelper[F[_]: Sync](val logger: ImplicitContextLogger[F],
226-
val monitor: Monitor[F],
227-
redactPayload: Boolean) {
235+
private[rabbitmq] class PoisonedMessageHandlerHelper[F[_]: Sync: Timer](val logger: ImplicitContextLogger[F],
236+
val monitor: Monitor[F],
237+
redactPayload: Boolean) {
238+
239+
val F: Sync[F] = implicitly
228240

229241
val directlyPoisonedMeter: Meter[F] = monitor.meter("directlyPoisoned")
230242

243+
private val delayingRepublishGauge = monitor.gauge.settableLong("delayingRepublish")
244+
245+
def delayRepublish(time: FiniteDuration)(r: Republish): F[DeliveryResult] = {
246+
delayingRepublishGauge.inc >>
247+
Timer[F].sleep(time) >>
248+
delayingRepublishGauge.dec >>
249+
F.pure(r: DeliveryResult)
250+
}
251+
231252
def redactIfConfigured(delivery: Delivery[_]): Delivery[Any] = {
232253
if (!redactPayload) delivery else delivery.withRedactedBody
233254
}
234255
}
235256

236257
private[rabbitmq] object PoisonedMessageHandlerHelper {
237-
def apply[F[_]: Sync, PMH: ClassTag](monitor: Monitor[F], redactPayload: Boolean): PoisonedMessageHandlerHelper[F] = {
258+
def apply[F[_]: Sync: Timer, PMH: ClassTag](monitor: Monitor[F], redactPayload: Boolean): PoisonedMessageHandlerHelper[F] = {
238259
val logger: ImplicitContextLogger[F] = ImplicitContextLogger.createLogger[F, PMH]
239260
new PoisonedMessageHandlerHelper[F](logger, monitor, redactPayload)
240261
}

core/src/test/scala/com/avast/clients/rabbitmq/PoisonedMessageHandlerTest.scala

+93-29
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,23 @@ package com.avast.clients.rabbitmq
22

33
import com.avast.bytes.Bytes
44
import com.avast.clients.rabbitmq.PoisonedMessageHandler._
5-
import com.avast.clients.rabbitmq.api.DeliveryResult.{DirectlyPoison, Republish}
5+
import com.avast.clients.rabbitmq.api.DeliveryResult.{DirectlyPoison, Reject, Republish}
66
import com.avast.clients.rabbitmq.api._
77
import com.avast.metrics.scalaeffectapi.Monitor
88
import monix.eval.Task
99
import monix.execution.Scheduler.Implicits.global
1010

1111
import java.time.Instant
1212
import java.util.concurrent.atomic.AtomicInteger
13+
import scala.concurrent.Await
1314
import scala.util.Random
1415

1516
class PoisonedMessageHandlerTest extends TestBase {
1617

1718
implicit val dctx: DeliveryContext = TestDeliveryContext.create()
1819

19-
private val pmhHelper = PoisonedMessageHandlerHelper[Task, PoisonedMessageHandlerTest](Monitor.noOp(), redactPayload = false)
20+
private val monitor = new TestMonitor[Task]
21+
private val pmhHelper = PoisonedMessageHandlerHelper[Task, PoisonedMessageHandlerTest](monitor, redactPayload = false)
2022

2123
test("PoisonedMessageHandler.handleResult ignores non-poisoned") {
2224
val movedCount = new AtomicInteger(0)
@@ -77,33 +79,6 @@ class PoisonedMessageHandlerTest extends TestBase {
7779
assertResult(DeliveryResult.Reject)(run(handler, readAction, properties))
7880
}
7981

80-
test("LoggingPoisonedMessageHandler exponential delay") {
81-
import scala.concurrent.duration._
82-
83-
def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = {
84-
Task.now(Republish())
85-
}
86-
87-
val delay = Some(new ExponentialDelay(1.seconds, 1.seconds, 2, 2.seconds))
88-
val handler = new LoggingPoisonedMessageHandler[Task, Bytes](5, delay, pmhHelper)
89-
val timeBeforeExecution = Instant.now()
90-
val properties = (1 to 4).foldLeft(MessageProperties.empty) {
91-
case (p, _) =>
92-
run(handler, readAction, p) match {
93-
case Republish(_, h) => MessageProperties(headers = h)
94-
case _ => MessageProperties.empty
95-
}
96-
}
97-
98-
val now = Instant.now()
99-
assert(now.minusSeconds(7).isAfter(timeBeforeExecution) && now.minusSeconds(8).isBefore(timeBeforeExecution))
100-
// check it increases the header with count
101-
assertResult(MessageProperties(headers = Map(RepublishCountHeaderName -> 4.asInstanceOf[AnyRef])))(properties)
102-
103-
// check it will Reject the message on 5th attempt
104-
assertResult(DeliveryResult.Reject)(run(handler, readAction, properties))
105-
}
106-
10782
test("LoggingPoisonedMessageHandler direct poisoning") {
10883
import scala.concurrent.duration._
10984

@@ -235,6 +210,95 @@ class PoisonedMessageHandlerTest extends TestBase {
235210
assertResult(1)(movedCount.get())
236211
}
237212

213+
test("PoisonedMessageHandlerBase exponential delay of Republish") {
214+
import scala.concurrent.duration._
215+
216+
def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = {
217+
Task.now(Republish())
218+
}
219+
220+
val delay = Some(new ExponentialDelay(1.seconds, 1.seconds, 2, 2.seconds))
221+
val handler = new PoisonedMessageHandlerBase[Task, Bytes](5, delay, pmhHelper) {
222+
override def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[Bytes],
223+
maxAttempts: Int)(implicit dctx: DeliveryContext): Task[Unit] = Task.unit
224+
}
225+
val timeBeforeExecution = Instant.now()
226+
val properties = (1 to 4).foldLeft(MessageProperties.empty) {
227+
case (p, _) =>
228+
run(handler, readAction, p) match {
229+
case Republish(_, h) => MessageProperties(headers = h)
230+
case _ => MessageProperties.empty
231+
}
232+
}
233+
234+
val now = Instant.now()
235+
236+
// check it was delayed
237+
assert(now.minusSeconds(7).isAfter(timeBeforeExecution) && now.minusSeconds(8).isBefore(timeBeforeExecution))
238+
239+
// check it increases the header with count
240+
assertResult(MessageProperties(headers = Map(RepublishCountHeaderName -> 4.asInstanceOf[AnyRef])))(properties)
241+
242+
// check it will Reject the message on 5th attempt
243+
assertResult(DeliveryResult.Reject)(run(handler, readAction, properties))
244+
}
245+
246+
test("PoisonedMessageHandlerBase no delay of throw-away") {
247+
import scala.concurrent.duration._
248+
249+
def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = {
250+
Task.now(Republish())
251+
}
252+
253+
val delay = Some(new ExponentialDelay(1.seconds, 1.seconds, 2, 2.seconds))
254+
val handler = new PoisonedMessageHandlerBase[Task, Bytes](1, delay, pmhHelper) {
255+
override def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[Bytes],
256+
maxAttempts: Int)(implicit dctx: DeliveryContext): Task[Unit] = Task.unit
257+
}
258+
259+
val timeBeforeExecution = Instant.now()
260+
261+
assertResult(Reject)(run(handler, readAction, MessageProperties.empty))
262+
263+
val now = Instant.now()
264+
265+
// check it was not delayed (with some tolerance)
266+
assert(now.minusMillis(500).isBefore(timeBeforeExecution))
267+
}
268+
269+
test("PoisonedMessageHandlerBase puts delayed republish into gauge") {
270+
import scala.concurrent.duration._
271+
272+
def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = {
273+
Task.now(Republish())
274+
}
275+
276+
val delay = Some(new ExponentialDelay(1.seconds, 1.seconds, 2, 2.seconds))
277+
val handler = new PoisonedMessageHandlerBase[Task, Bytes](5, delay, pmhHelper) {
278+
override def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[Bytes],
279+
maxAttempts: Int)(implicit dctx: DeliveryContext): Task[Unit] = Task.unit
280+
}
281+
282+
assertResult(0)(monitor.registry.gaugeLongValue("delayingRepublish"))
283+
284+
val run = Task
285+
.parSequence((1 to 5).map { _ =>
286+
val r = Republish(countAsPoisoned = true, Map("X-Republish-Count" -> 1.asInstanceOf[AnyRef]))
287+
handler.interceptResult(Delivery(Bytes.empty(), MessageProperties.empty, ""), MessageId("msg-id"), Bytes.empty())(r)
288+
})
289+
.map(_.toList)
290+
291+
val future = run.runToFuture
292+
293+
// running!
294+
assertResult(5)(monitor.registry.gaugeLongValue("delayingRepublish"))
295+
296+
// await and check result
297+
assertResult {
298+
List.fill(5)(Republish(countAsPoisoned = true, Map("X-Republish-Count" -> 2.asInstanceOf[AnyRef])))
299+
}(Await.result(future, 5.seconds))
300+
}
301+
238302
test("pretend lower no. of attempts") {
239303
def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = {
240304
Task.now(Republish())

core/src/test/scala/com/avast/clients/rabbitmq/TestMonitor.scala

+1
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,5 @@ class Registry(registry: MetricRegistry) {
3636
def timerCount(path: String): Long = registry.getTimers.asScala(path.replace('.', '/')).getCount
3737
def timerPairCountSuccesses(path: String): Long = registry.getTimers.asScala(path.replace('.', '/') + "Successes").getCount
3838
def timerPairCountFailures(path: String): Long = registry.getTimers.asScala(path.replace('.', '/') + "Failures").getCount
39+
def gaugeLongValue(path: String): Long = registry.getGauges.asScala(path.replace('.', '/')).getValue.asInstanceOf[Long]
3940
}

0 commit comments

Comments
 (0)