Skip to content

Commit bd1a667

Browse files
author
Karel Fajkus
committed
PR fixes
1 parent 3b103df commit bd1a667

File tree

6 files changed

+146
-46
lines changed

6 files changed

+146
-46
lines changed

api/src/main/scala/com/avast/clients/rabbitmq/api/exceptions.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ case class ChannelNotRecoveredException(desc: String, cause: Throwable = null) e
77

88
case class TooBigMessage(desc: String, cause: Throwable = null) extends IllegalArgumentException(desc, cause)
99

10-
case class MaxAttempts(desc: String, cause: Throwable = null) extends RuntimeException(desc, cause)
10+
case class MaxAttemptsReached(desc: String, cause: Throwable = null) extends RuntimeException(desc, cause)
1111

12-
case class NotAcknowledgedPublish(desc: String, cause: Throwable = null) extends RuntimeException(desc, cause)
12+
case class NotAcknowledgedPublish(desc: String, cause: Throwable = null, messageId: Long) extends RuntimeException(desc, cause)

core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala

+43-34
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import scala.collection.compat._
1515
import scala.collection.immutable
1616
import scala.jdk.CollectionConverters._
1717
import scala.language.implicitConversions
18+
import scala.reflect.ClassTag
1819

1920
private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Timer: ContextShift](
2021
connection: RabbitMQConnection[F],
@@ -266,52 +267,60 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim
266267
private def prepareProducer[A: ProductConverter](producerConfig: ProducerConfig,
267268
connection: RabbitMQConnection[F],
268269
monitor: Monitor[F]): Resource[F, BaseRabbitMQProducer[F, A]] = {
269-
val logger = ImplicitContextLogger.createLogger[F, BaseRabbitMQProducer[F, A]]
270+
producerConfig.properties.confirms match {
271+
case Some(PublisherConfirmsConfig(true, sendAttempts)) =>
272+
prepareProducer(producerConfig, connection) { (defaultProperties, channel, logger) =>
273+
Ref.of(Map.empty[Long, Deferred[F, Either[NotAcknowledgedPublish, Unit]]])
274+
.map {
275+
new PublishConfirmsRabbitMQProducer[F, A](
276+
producerConfig.name,
277+
producerConfig.exchange,
278+
channel,
279+
defaultProperties,
280+
_,
281+
sendAttempts,
282+
producerConfig.reportUnroutable,
283+
producerConfig.sizeLimitBytes,
284+
blocker,
285+
logger,
286+
monitor)
287+
}
288+
}
289+
case _ =>
290+
prepareProducer(producerConfig, connection) { (defaultProperties, channel, logger) =>
291+
F.pure {
292+
new DefaultRabbitMQProducer[F, A](producerConfig.name,
293+
producerConfig.exchange,
294+
channel,
295+
defaultProperties,
296+
producerConfig.reportUnroutable,
297+
producerConfig.sizeLimitBytes,
298+
blocker,
299+
logger,
300+
monitor)
301+
}
302+
}
303+
}
304+
}
305+
306+
private def prepareProducer[T: ClassTag, A: ProductConverter](producerConfig: ProducerConfig, connection: RabbitMQConnection[F])(
307+
createProducer: (MessageProperties, ServerChannel, ImplicitContextLogger[F]) => F[T]) = {
308+
val logger: ImplicitContextLogger[F] = ImplicitContextLogger.createLogger[F, T]
270309

271310
connection
272311
.newChannel()
273312
.evalTap { channel =>
274313
// auto declare exchange; if configured
275314
producerConfig.declare.map { declareExchange(producerConfig.exchange, channel, _)(logger) }.getOrElse(F.unit)
276315
}
277-
.evalMap[F, BaseRabbitMQProducer[F, A]] { channel =>
316+
.evalMap[F, T] { channel =>
278317
val defaultProperties = MessageProperties(
279318
deliveryMode = DeliveryMode.fromCode(producerConfig.properties.deliveryMode),
280319
contentType = producerConfig.properties.contentType,
281320
contentEncoding = producerConfig.properties.contentEncoding,
282321
priority = producerConfig.properties.priority.map(Integer.valueOf)
283-
)
284-
285-
producerConfig.properties.confirms match {
286-
case Some(PublisherConfirmsConfig(true, sendAttempts)) =>
287-
Ref.of(Map.empty[Long, Deferred[F, Either[Throwable, Unit]]])
288-
.map {
289-
new PublishConfirmsRabbitMQProducer[F, A](
290-
producerConfig.name,
291-
producerConfig.exchange,
292-
channel,
293-
defaultProperties,
294-
_,
295-
sendAttempts,
296-
producerConfig.reportUnroutable,
297-
producerConfig.sizeLimitBytes,
298-
blocker,
299-
logger,
300-
monitor)
301-
}
302-
case _ =>
303-
F.pure {
304-
new DefaultRabbitMQProducer[F, A](producerConfig.name,
305-
producerConfig.exchange,
306-
channel,
307-
defaultProperties,
308-
producerConfig.reportUnroutable,
309-
producerConfig.sizeLimitBytes,
310-
blocker,
311-
logger,
312-
monitor)
313-
}
314-
}
322+
)
323+
createProducer(defaultProperties, channel, logger)
315324
}
316325
}
317326

core/src/main/scala/com/avast/clients/rabbitmq/publisher/BaseRabbitMQProducer.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ abstract class BaseRabbitMQProducer[F[_], A: ProductConverter](name: String,
5757
case Right(convertedBody) =>
5858
for {
5959
_ <- checkSize(convertedBody, routingKey)
60-
_ <- logErrors(sendMessage(routingKey, convertedBody, finalProperties), routingKey)
60+
_ <- processErrors(sendMessage(routingKey, convertedBody, finalProperties), routingKey)
6161
} yield ()
6262
case Left(ce) => Sync[F].raiseError(ce)
6363
}
@@ -76,7 +76,7 @@ abstract class BaseRabbitMQProducer[F[_], A: ProductConverter](name: String,
7676
} yield ()
7777
}
7878

79-
private def logErrors(from: F[Unit], routingKey: String)(implicit correlationId: CorrelationId): F[Unit] = {
79+
private def processErrors(from: F[Unit], routingKey: String)(implicit correlationId: CorrelationId): F[Unit] = {
8080
from.recoverWith {
8181
case ce: AlreadyClosedException =>
8282
logger.debug(ce)(s"[$name] Failed to send message with routing key '$routingKey' to exchange '$exchangeName'") >>

core/src/main/scala/com/avast/clients/rabbitmq/publisher/PublishConfirmsRabbitMQProducer.scala

+7-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import cats.effect.{Blocker, ConcurrentEffect, ContextShift}
55
import cats.syntax.flatMap._
66
import cats.syntax.functor._
77
import com.avast.bytes.Bytes
8-
import com.avast.clients.rabbitmq.api.{MaxAttempts, MessageProperties, NotAcknowledgedPublish}
8+
import com.avast.clients.rabbitmq.api.{MaxAttemptsReached, MessageProperties, NotAcknowledgedPublish}
99
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
1010
import com.avast.clients.rabbitmq.publisher.PublishConfirmsRabbitMQProducer.SentMessages
1111
import com.avast.clients.rabbitmq.{CorrelationId, ProductConverter, ServerChannel, startAndForget}
@@ -46,11 +46,11 @@ class PublishConfirmsRabbitMQProducer[F[_], A: ProductConverter](name: String,
4646
implicit correlationId: CorrelationId): F[Unit] = {
4747

4848
if (attemptCount > sendAttempts) {
49-
F.raiseError(MaxAttempts("Exhausted max number of attempts"))
49+
F.raiseError(MaxAttemptsReached("Exhausted max number of attempts"))
5050
} else {
5151
val messageId = channel.getNextPublishSeqNo
5252
for {
53-
defer <- Deferred.apply[F, Either[Throwable, Unit]]
53+
defer <- Deferred.apply[F, Either[NotAcknowledgedPublish, Unit]]
5454
_ <- sentMessages.update(_ + (messageId -> defer))
5555
_ <- basicSend(routingKey, body, properties)
5656
result <- defer.get
@@ -59,7 +59,7 @@ class PublishConfirmsRabbitMQProducer[F[_], A: ProductConverter](name: String,
5959
val sendResult = if (sendAttempts > 1) {
6060
clearProcessedMessage(messageId) >> sendWithAck(routingKey, body, properties, attemptCount + 1)
6161
} else {
62-
F.raiseError(NotAcknowledgedPublish(s"Broker did not acknowledge publish of message $messageId", err))
62+
F.raiseError(err)
6363
}
6464

6565
nacked.mark >> sendResult
@@ -87,17 +87,17 @@ class PublishConfirmsRabbitMQProducer[F[_], A: ProductConverter](name: String,
8787
startAndForget {
8888
logger.plainTrace(s"Not acked $deliveryTag") >> completeDefer(
8989
deliveryTag,
90-
Left(new Exception(s"Message $deliveryTag not acknowledged by broker")))
90+
Left(NotAcknowledgedPublish(s"Message $deliveryTag not acknowledged by broker", messageId = deliveryTag)))
9191
}
9292
}
9393

94-
private def completeDefer(deliveryTag: Long, result: Either[Throwable, Unit]): F[Unit] = {
94+
private def completeDefer(deliveryTag: Long, result: Either[NotAcknowledgedPublish, Unit]): F[Unit] = {
9595
sentMessages.get.flatMap(_.get(deliveryTag).traverse_(_.complete(result)))
9696
}
9797
}
9898

9999
}
100100

101101
object PublishConfirmsRabbitMQProducer {
102-
type SentMessages[F[_]] = Ref[F, Map[Long, Deferred[F, Either[Throwable, Unit]]]]
102+
type SentMessages[F[_]] = Ref[F, Map[Long, Deferred[F, Either[NotAcknowledgedPublish, Unit]]]]
103103
}

core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.avast.clients.rabbitmq
22

3+
import cats.effect.ConcurrentEffect
34
import com.avast.bytes.Bytes
45
import com.avast.clients.rabbitmq.api._
56
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
@@ -192,7 +193,7 @@ class DefaultRabbitMQProducerTest extends TestBase {
192193
reportUnroutable = false,
193194
sizeLimitBytes = Some(limit),
194195
blocker = TestBase.testBlocker,
195-
logger = ImplicitContextLogger.createLogger,
196+
logger = ImplicitContextLogger.createLogger
196197
)
197198

198199
// don't test anything except it doesn't fail
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package com.avast.clients.rabbitmq
2+
3+
import cats.effect.concurrent.{Deferred, Ref}
4+
import cats.syntax.parallel._
5+
import com.avast.bytes.Bytes
6+
import com.avast.clients.rabbitmq.api.{MessageProperties, NotAcknowledgedPublish}
7+
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
8+
import com.avast.clients.rabbitmq.publisher.PublishConfirmsRabbitMQProducer
9+
import com.avast.clients.rabbitmq.publisher.PublishConfirmsRabbitMQProducer.SentMessages
10+
import com.avast.metrics.scalaeffectapi.Monitor
11+
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel
12+
import monix.eval.Task
13+
import monix.execution.Scheduler.Implicits.global
14+
import org.mockito.Matchers
15+
import org.mockito.Matchers.any
16+
import org.mockito.Mockito.{times, verify, when}
17+
18+
import scala.util.Random
19+
20+
class PublisherConfirmsRabbitMQProducerTest extends TestBase {
21+
test("message is acked after one retry") {
22+
val exchangeName = Random.nextString(10)
23+
val routingKey = Random.nextString(10)
24+
val seqNumber = 1L
25+
val seqNumber2 = 2L
26+
27+
val channel = mock[AutorecoveringChannel]
28+
val ref = Ref.of[Task, Map[Long, Deferred[Task, Either[NotAcknowledgedPublish, Unit]]]](Map.empty).await
29+
val updatedState1 = updateMessageState(ref, seqNumber)(Left(NotAcknowledgedPublish("abcd", messageId = seqNumber)))
30+
val updatedState2 = updateMessageState(ref, seqNumber2)(Right())
31+
32+
val producer = new PublishConfirmsRabbitMQProducer[Task, Bytes](
33+
name = "test",
34+
exchangeName = exchangeName,
35+
channel = channel,
36+
monitor = Monitor.noOp(),
37+
defaultProperties = MessageProperties.empty,
38+
reportUnroutable = false,
39+
sizeLimitBytes = None,
40+
blocker = TestBase.testBlocker,
41+
logger = ImplicitContextLogger.createLogger,
42+
sentMessages = ref,
43+
sendAttempts = 2
44+
)
45+
when(channel.getNextPublishSeqNo).thenReturn(seqNumber, seqNumber2)
46+
47+
producer.send(routingKey, Bytes.copyFrom(Array.fill(499)(32.toByte))).parProduct(updatedState1.parProduct(updatedState2)).await
48+
49+
verify(channel, times(2))
50+
.basicPublish(Matchers.eq(exchangeName), Matchers.eq(routingKey), any(), Matchers.eq(Bytes.copyFrom(Array.fill(499)(32.toByte)).toByteArray))
51+
}
52+
53+
test("Message not acked returned if number of attempts exhausted") {
54+
val exchangeName = Random.nextString(10)
55+
val routingKey = Random.nextString(10)
56+
val seqNumber = 1L
57+
58+
val channel = mock[AutorecoveringChannel]
59+
val ref = Ref.of[Task, Map[Long, Deferred[Task, Either[NotAcknowledgedPublish, Unit]]]](Map.empty).await
60+
val updatedState = updateMessageState(ref, seqNumber)(Left(NotAcknowledgedPublish("abcd", messageId = seqNumber)))
61+
62+
val producer = new PublishConfirmsRabbitMQProducer[Task, Bytes](
63+
name = "test",
64+
exchangeName = exchangeName,
65+
channel = channel,
66+
monitor = Monitor.noOp(),
67+
defaultProperties = MessageProperties.empty,
68+
reportUnroutable = false,
69+
sizeLimitBytes = None,
70+
blocker = TestBase.testBlocker,
71+
logger = ImplicitContextLogger.createLogger,
72+
sentMessages = ref,
73+
sendAttempts = 1
74+
)
75+
when(channel.getNextPublishSeqNo).thenReturn(seqNumber)
76+
77+
assertThrows[NotAcknowledgedPublish] {
78+
producer.send(routingKey, Bytes.copyFrom(Array.fill(499)(32.toByte))).parProduct(updatedState).await
79+
}
80+
81+
verify(channel).basicPublish(Matchers.eq(exchangeName), Matchers.eq(routingKey), any(), Matchers.eq(Bytes.copyFrom(Array.fill(499)(32.toByte)).toByteArray))
82+
}
83+
84+
private def updateMessageState(ref: SentMessages[Task], messageId: Long)(result: Either[NotAcknowledgedPublish, Unit]): Task[Unit] = {
85+
ref.get.flatMap(map => map.get(messageId) match {
86+
case Some(value) => value.complete(result)
87+
case None => updateMessageState(ref, messageId)(result)
88+
})
89+
}
90+
}

0 commit comments

Comments
 (0)