Skip to content

Commit 0ef2a8a

Browse files
authored
Merge pull request #198 from avast/publisher-confirms
introduced publisher confirms
2 parents 5a739b9 + bd1a667 commit 0ef2a8a

File tree

12 files changed

+400
-51
lines changed

12 files changed

+400
-51
lines changed

README.md

+19
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,25 @@ If you don't specify the strategy by yourself, `CorrelationIdStrategy.FromProper
517517
properties (or headers) and generate a new one if it doesn't succeed. In any way, the CID will be part of both logs and resulting (outgoing)
518518
RabbitMQ message.
519519

520+
#### Publisher confirms
521+
522+
By using following configuration
523+
```hocon
524+
producer {
525+
properties {
526+
confirms {
527+
enabled = true
528+
sendAttempts = 2
529+
}
530+
}
531+
}
532+
```
533+
clients can enable [publisher confirms](https://www.rabbitmq.com/confirms.html#publisher-confirms). Each `send` call will wait for ack/nack from broker.
534+
This wait is of course non-blocking. `sendAttempts` is number of all attempts including initial one. If number of `sendAttempts` is greater than 1 it will try to resend messages again
535+
right after it obtains nack from broker.
536+
537+
From implementation point of view, it uses asynchronous acks/nacks combined with [Deferred](https://typelevel.org/cats-effect/docs/std/deferred) from cats library.
538+
520539
#### Consumers
521540

522541
You can also get the CorrelationId from the message properties on the consumer side. The CID is taken from both AMQP properties

api/src/main/scala/com/avast/clients/rabbitmq/api/exceptions.scala

+4
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,7 @@ case class ConversionException(desc: String, cause: Throwable = null) extends Ru
66
case class ChannelNotRecoveredException(desc: String, cause: Throwable = null) extends IOException(desc, cause)
77

88
case class TooBigMessage(desc: String, cause: Throwable = null) extends IllegalArgumentException(desc, cause)
9+
10+
case class MaxAttemptsReached(desc: String, cause: Throwable = null) extends RuntimeException(desc, cause)
11+
12+
case class NotAcknowledgedPublish(desc: String, cause: Throwable = null, messageId: Long) extends RuntimeException(desc, cause)

core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQClientFactory.scala

+47-17
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
package com.avast.clients.rabbitmq
22

33
import cats.effect._
4+
import cats.effect.concurrent.{Deferred, Ref}
45
import cats.implicits.{catsSyntaxFlatMapOps, toFunctorOps, toTraverseOps}
56
import com.avast.bytes.Bytes
67
import com.avast.clients.rabbitmq.DefaultRabbitMQClientFactory.startConsumingQueue
78
import com.avast.clients.rabbitmq.api._
89
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
10+
import com.avast.clients.rabbitmq.publisher.{BaseRabbitMQProducer, DefaultRabbitMQProducer, PublishConfirmsRabbitMQProducer}
911
import com.avast.metrics.scalaeffectapi.Monitor
1012
import com.rabbitmq.client.Consumer
1113

1214
import scala.collection.compat._
1315
import scala.collection.immutable
1416
import scala.jdk.CollectionConverters._
1517
import scala.language.implicitConversions
18+
import scala.reflect.ClassTag
1619

1720
private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Timer: ContextShift](
1821
connection: RabbitMQConnection[F],
@@ -26,7 +29,7 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim
2629

2730
object Producer {
2831

29-
def create[A: ProductConverter](producerConfig: ProducerConfig, monitor: Monitor[F]): Resource[F, DefaultRabbitMQProducer[F, A]] = {
32+
def create[A: ProductConverter](producerConfig: ProducerConfig, monitor: Monitor[F]): Resource[F, BaseRabbitMQProducer[F, A]] = {
3033
prepareProducer[A](producerConfig, connection, monitor)
3134
}
3235
}
@@ -263,34 +266,61 @@ private[rabbitmq] class DefaultRabbitMQClientFactory[F[_]: ConcurrentEffect: Tim
263266

264267
private def prepareProducer[A: ProductConverter](producerConfig: ProducerConfig,
265268
connection: RabbitMQConnection[F],
266-
monitor: Monitor[F]): Resource[F, DefaultRabbitMQProducer[F, A]] = {
267-
val logger = ImplicitContextLogger.createLogger[F, DefaultRabbitMQProducer[F, A]]
269+
monitor: Monitor[F]): Resource[F, BaseRabbitMQProducer[F, A]] = {
270+
producerConfig.properties.confirms match {
271+
case Some(PublisherConfirmsConfig(true, sendAttempts)) =>
272+
prepareProducer(producerConfig, connection) { (defaultProperties, channel, logger) =>
273+
Ref.of(Map.empty[Long, Deferred[F, Either[NotAcknowledgedPublish, Unit]]])
274+
.map {
275+
new PublishConfirmsRabbitMQProducer[F, A](
276+
producerConfig.name,
277+
producerConfig.exchange,
278+
channel,
279+
defaultProperties,
280+
_,
281+
sendAttempts,
282+
producerConfig.reportUnroutable,
283+
producerConfig.sizeLimitBytes,
284+
blocker,
285+
logger,
286+
monitor)
287+
}
288+
}
289+
case _ =>
290+
prepareProducer(producerConfig, connection) { (defaultProperties, channel, logger) =>
291+
F.pure {
292+
new DefaultRabbitMQProducer[F, A](producerConfig.name,
293+
producerConfig.exchange,
294+
channel,
295+
defaultProperties,
296+
producerConfig.reportUnroutable,
297+
producerConfig.sizeLimitBytes,
298+
blocker,
299+
logger,
300+
monitor)
301+
}
302+
}
303+
}
304+
}
305+
306+
private def prepareProducer[T: ClassTag, A: ProductConverter](producerConfig: ProducerConfig, connection: RabbitMQConnection[F])(
307+
createProducer: (MessageProperties, ServerChannel, ImplicitContextLogger[F]) => F[T]) = {
308+
val logger: ImplicitContextLogger[F] = ImplicitContextLogger.createLogger[F, T]
268309

269310
connection
270311
.newChannel()
271312
.evalTap { channel =>
272313
// auto declare exchange; if configured
273314
producerConfig.declare.map { declareExchange(producerConfig.exchange, channel, _)(logger) }.getOrElse(F.unit)
274315
}
275-
.map { channel =>
316+
.evalMap[F, T] { channel =>
276317
val defaultProperties = MessageProperties(
277318
deliveryMode = DeliveryMode.fromCode(producerConfig.properties.deliveryMode),
278319
contentType = producerConfig.properties.contentType,
279320
contentEncoding = producerConfig.properties.contentEncoding,
280321
priority = producerConfig.properties.priority.map(Integer.valueOf)
281-
)
282-
283-
new DefaultRabbitMQProducer[F, A](
284-
producerConfig.name,
285-
producerConfig.exchange,
286-
channel,
287-
defaultProperties,
288-
producerConfig.reportUnroutable,
289-
producerConfig.sizeLimitBytes,
290-
blocker,
291-
logger,
292-
monitor
293-
)
322+
)
323+
createProducer(defaultProperties, channel, logger)
294324
}
295325
}
296326

core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,10 @@ final case class ProducerConfig(name: String,
8484
final case class ProducerPropertiesConfig(deliveryMode: Int = 2,
8585
contentType: Option[String] = None,
8686
contentEncoding: Option[String] = None,
87-
priority: Option[Int] = None)
87+
priority: Option[Int] = None,
88+
confirms: Option[PublisherConfirmsConfig] = None)
89+
90+
final case class PublisherConfirmsConfig(enabled: Boolean = false, sendAttempts: Int = 1)
8891

8992
final case class AutoDeclareExchangeConfig(enabled: Boolean,
9093
`type`: ExchangeType,

core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducer.scala core/src/main/scala/com/avast/clients/rabbitmq/publisher/BaseRabbitMQProducer.scala

+45-33
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,31 @@
1-
package com.avast.clients.rabbitmq
1+
package com.avast.clients.rabbitmq.publisher
22

33
import cats.effect.{Blocker, ContextShift, Effect, Sync}
4-
import cats.implicits.{catsSyntaxApplicativeError, catsSyntaxFlatMapOps, toFlatMapOps}
4+
import cats.syntax.applicativeError._
5+
import cats.syntax.flatMap._
6+
import cats.syntax.functor._
57
import com.avast.bytes.Bytes
68
import com.avast.clients.rabbitmq.JavaConverters._
79
import com.avast.clients.rabbitmq.api.CorrelationIdStrategy.FromPropertiesOrRandomNew
810
import com.avast.clients.rabbitmq.api._
911
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
12+
import com.avast.clients.rabbitmq.{CorrelationId, ProductConverter, ServerChannel, startAndForget}
1013
import com.avast.metrics.scalaeffectapi.Monitor
1114
import com.rabbitmq.client.AMQP.BasicProperties
1215
import com.rabbitmq.client.{AlreadyClosedException, ReturnListener}
1316

1417
import java.util.UUID
1518
import scala.util.control.NonFatal
1619

17-
class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
18-
exchangeName: String,
19-
channel: ServerChannel,
20-
defaultProperties: MessageProperties,
21-
reportUnroutable: Boolean,
22-
sizeLimitBytes: Option[Int],
23-
blocker: Blocker,
24-
logger: ImplicitContextLogger[F],
25-
monitor: Monitor[F])(implicit F: Effect[F], cs: ContextShift[F])
20+
abstract class BaseRabbitMQProducer[F[_], A: ProductConverter](name: String,
21+
exchangeName: String,
22+
channel: ServerChannel,
23+
defaultProperties: MessageProperties,
24+
reportUnroutable: Boolean,
25+
sizeLimitBytes: Option[Int],
26+
blocker: Blocker,
27+
logger: ImplicitContextLogger[F],
28+
monitor: Monitor[F])(implicit F: Effect[F], cs: ContextShift[F])
2629
extends RabbitMQProducer[F, A] {
2730

2831
private val sentMeter = monitor.meter("sent")
@@ -35,6 +38,8 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
3538

3639
channel.addReturnListener(if (reportUnroutable) LoggingReturnListener else NoOpReturnListener)
3740

41+
def sendMessage(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit]
42+
3843
override def send(routingKey: String, body: A, properties: Option[MessageProperties] = None)(
3944
implicit cidStrategy: CorrelationIdStrategy = FromPropertiesOrRandomNew(properties)): F[Unit] = {
4045
implicit val correlationId: CorrelationId = CorrelationId(cidStrategy.toCIDValue)
@@ -49,33 +54,40 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
4954
}
5055

5156
converter.convert(body) match {
52-
case Right(convertedBody) => send(routingKey, convertedBody, finalProperties)
57+
case Right(convertedBody) =>
58+
for {
59+
_ <- checkSize(convertedBody, routingKey)
60+
_ <- processErrors(sendMessage(routingKey, convertedBody, finalProperties), routingKey)
61+
} yield ()
5362
case Left(ce) => Sync[F].raiseError(ce)
5463
}
5564
}
5665

57-
private def send(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit] = {
58-
checkSize(body, routingKey) >>
59-
logger.debug(s"Sending message with ${body.size()} B to exchange $exchangeName with routing key '$routingKey' and $properties") >>
60-
blocker
61-
.delay {
62-
sendLock.synchronized {
63-
// see https://www.rabbitmq.com/api-guide.html#channel-threads
64-
channel.basicPublish(exchangeName, routingKey, properties.asAMQP, body.toByteArray)
65-
}
66-
}
67-
.flatTap(_ => sentMeter.mark)
68-
.recoverWith {
69-
case ce: AlreadyClosedException =>
70-
logger.debug(ce)(s"[$name] Failed to send message with routing key '$routingKey' to exchange '$exchangeName'") >>
71-
sentFailedMeter.mark >>
72-
F.raiseError[Unit](ChannelNotRecoveredException("Channel closed, wait for recovery", ce))
73-
74-
case NonFatal(e) =>
75-
logger.debug(e)(s"[$name] Failed to send message with routing key '$routingKey' to exchange '$exchangeName'") >>
76-
sentFailedMeter.mark >>
77-
F.raiseError[Unit](e)
66+
protected def basicSend(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit] = {
67+
for {
68+
_ <- logger.debug(s"Sending message with ${body.size()} B to exchange $exchangeName with routing key '$routingKey' and $properties")
69+
_ <- blocker.delay {
70+
sendLock.synchronized {
71+
// see https://www.rabbitmq.com/api-guide.html#channel-threads
72+
channel.basicPublish(exchangeName, routingKey, properties.asAMQP, body.toByteArray)
7873
}
74+
}
75+
_ <- sentMeter.mark
76+
} yield ()
77+
}
78+
79+
private def processErrors(from: F[Unit], routingKey: String)(implicit correlationId: CorrelationId): F[Unit] = {
80+
from.recoverWith {
81+
case ce: AlreadyClosedException =>
82+
logger.debug(ce)(s"[$name] Failed to send message with routing key '$routingKey' to exchange '$exchangeName'") >>
83+
sentFailedMeter.mark >>
84+
F.raiseError[Unit](ChannelNotRecoveredException("Channel closed, wait for recovery", ce))
85+
86+
case NonFatal(e) =>
87+
logger.debug(e)(s"[$name] Failed to send message with routing key '$routingKey' to exchange '$exchangeName'") >>
88+
sentFailedMeter.mark >>
89+
F.raiseError[Unit](e)
90+
}
7991
}
8092

8193
private def checkSize(bytes: Bytes, routingKey: String)(implicit correlationId: CorrelationId): F[Unit] = {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.avast.clients.rabbitmq.publisher
2+
3+
import cats.effect.{Blocker, ConcurrentEffect, ContextShift}
4+
import com.avast.bytes.Bytes
5+
import com.avast.clients.rabbitmq.api.MessageProperties
6+
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
7+
import com.avast.clients.rabbitmq.{CorrelationId, ProductConverter, ServerChannel}
8+
import com.avast.metrics.scalaeffectapi.Monitor
9+
10+
class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
11+
exchangeName: String,
12+
channel: ServerChannel,
13+
defaultProperties: MessageProperties,
14+
reportUnroutable: Boolean,
15+
sizeLimitBytes: Option[Int],
16+
blocker: Blocker,
17+
logger: ImplicitContextLogger[F],
18+
monitor: Monitor[F])(implicit F: ConcurrentEffect[F], cs: ContextShift[F])
19+
extends BaseRabbitMQProducer[F, A](name,
20+
exchangeName,
21+
channel,
22+
defaultProperties,
23+
reportUnroutable,
24+
sizeLimitBytes,
25+
blocker,
26+
logger,
27+
monitor) {
28+
override def sendMessage(routingKey: String, body: Bytes, properties: MessageProperties)(implicit correlationId: CorrelationId): F[Unit] =
29+
basicSend(routingKey, body, properties)
30+
}

0 commit comments

Comments
 (0)