Skip to content

Commit d9d1435

Browse files
authored
fix: core - publisher confirmations - add support for confirmation of multiple messages (#201)
fix: core - publisher confirmations - add support for confirmation of multiple messages
1 parent 3ab1cdf commit d9d1435

File tree

2 files changed

+110
-55
lines changed

2 files changed

+110
-55
lines changed

core/src/main/scala/com/avast/clients/rabbitmq/publisher/PublishConfirmsRabbitMQProducer.scala

+26-11
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package com.avast.clients.rabbitmq.publisher
22

3+
import cats.effect.Concurrent.ops.toAllConcurrentOps
34
import cats.effect.concurrent.Deferred
45
import cats.effect.{Blocker, ConcurrentEffect, ContextShift}
5-
import cats.syntax.flatMap._
6-
import cats.syntax.functor._
6+
import cats.implicits._
77
import com.avast.bytes.Bytes
88
import com.avast.clients.rabbitmq.api.{MaxAttemptsReached, MessageProperties, NotAcknowledgedPublish}
99
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
@@ -62,7 +62,8 @@ class PublishConfirmsRabbitMQProducer[F[_], A: ProductConverter](name: String,
6262
_ <- result match {
6363
case Left(err) =>
6464
val sendResult = if (sendAttempts > 1) {
65-
sendWithAck(routingKey, body, properties, attemptCount + 1)
65+
logger.plainTrace(s"Republishing nacked message with sequenceNumber $sequenceNumber") >>
66+
sendWithAck(routingKey, body, properties, attemptCount + 1)
6667
} else {
6768
F.raiseError(err)
6869
}
@@ -74,26 +75,40 @@ class PublishConfirmsRabbitMQProducer[F[_], A: ProductConverter](name: String,
7475
}
7576
}
7677

77-
private object DefaultConfirmListener extends ConfirmListener {
78+
private[rabbitmq] object DefaultConfirmListener extends ConfirmListener {
7879

7980
override def handleAck(deliveryTag: Long, multiple: Boolean): Unit = {
8081
startAndForget {
81-
logger.plainTrace(s"Acked $deliveryTag") >> completeDefer(deliveryTag, Right(()))
82+
logger.plainTrace(s"Acked $deliveryTag, multiple: $multiple") >> completeDefer(deliveryTag, multiple, Right(()))
8283
}
8384
}
8485

8586
override def handleNack(deliveryTag: Long, multiple: Boolean): Unit = {
8687
startAndForget {
87-
logger.plainTrace(s"Not acked $deliveryTag") >> completeDefer(
88+
logger.plainTrace(s"Nacked $deliveryTag, multiple: $multiple") >> completeDefer(
8889
deliveryTag,
89-
Left(NotAcknowledgedPublish(s"Message $deliveryTag not acknowledged by broker", messageId = deliveryTag)))
90+
multiple,
91+
Left(NotAcknowledgedPublish(s"Broker was unable to process the message", messageId = deliveryTag)))
9092
}
9193
}
9294

93-
private def completeDefer(deliveryTag: Long, result: Either[NotAcknowledgedPublish, Unit]): F[Unit] = {
94-
confirmationCallbacks.get(deliveryTag) match {
95-
case Some(callback) => callback.complete(result)
96-
case None => logger.plainWarn("Received confirmation for unknown delivery tag. That is unexpected state.")
95+
private def completeDefer(deliveryTag: Long, multiple: Boolean, result: Either[NotAcknowledgedPublish, Unit]): F[Unit] = {
96+
if (multiple) {
97+
confirmationCallbacks
98+
.filter {
99+
case (sequenceNumber, _) => sequenceNumber <= deliveryTag
100+
}
101+
.values
102+
.toList
103+
.traverse { callback =>
104+
callback.complete(result).start
105+
}
106+
.void
107+
} else {
108+
confirmationCallbacks.get(deliveryTag) match {
109+
case Some(callback) => callback.complete(result)
110+
case None => logger.plainError(s"Received confirmation for unknown delivery tag $deliveryTag. That is unexpected state.")
111+
}
97112
}
98113
}
99114
}
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.avast.clients.rabbitmq
22

3-
import cats.implicits.catsSyntaxParallelAp
43
import com.avast.bytes.Bytes
54
import com.avast.clients.rabbitmq.api.{MessageProperties, NotAcknowledgedPublish}
65
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
@@ -9,7 +8,7 @@ import com.avast.metrics.scalaeffectapi.Monitor
98
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel
109
import monix.eval.Task
1110
import monix.execution.Scheduler.Implicits.global
12-
import org.junit.runner.manipulation.InvalidOrderingException
11+
import monix.execution.atomic.AtomicInt
1312
import org.mockito.Matchers
1413
import org.mockito.Matchers.any
1514
import org.mockito.Mockito.{times, verify, when}
@@ -23,11 +22,13 @@ class PublisherConfirmsRabbitMQProducerTest extends TestBase {
2322
test("Message is acked after one retry") {
2423
val exchangeName = Random.nextString(10)
2524
val routingKey = Random.nextString(10)
26-
val seqNumber = 1L
27-
val seqNumber2 = 2L
2825

26+
val nextSeqNumber = AtomicInt(0)
2927
val channel = mock[AutorecoveringChannel]
30-
when(channel.getNextPublishSeqNo).thenReturn(seqNumber, seqNumber2)
28+
when(channel.getNextPublishSeqNo).thenAnswer(_ => nextSeqNumber.get())
29+
when(channel.basicPublish(any(), any(), any(), any())).thenAnswer(_ => {
30+
nextSeqNumber.increment()
31+
})
3132

3233
val producer = new PublishConfirmsRabbitMQProducer[Task, Bytes](
3334
name = "test",
@@ -44,13 +45,15 @@ class PublisherConfirmsRabbitMQProducerTest extends TestBase {
4445

4546
val body = Bytes.copyFrom(Array.fill(499)(32.toByte))
4647

47-
val publishTask = producer.send(routingKey, body).runToFuture
48+
val publishFuture = producer.send(routingKey, body).runToFuture
49+
50+
while (nextSeqNumber.get() < 1) { Thread.sleep(5) }
51+
producer.DefaultConfirmListener.handleNack(0, multiple = false)
4852

49-
updateMessageState(producer, seqNumber)(Left(NotAcknowledgedPublish("abcd", messageId = seqNumber))).parProduct {
50-
updateMessageState(producer, seqNumber2)(Right())
51-
}.await
53+
while (nextSeqNumber.get() < 2) { Thread.sleep(5) }
54+
producer.DefaultConfirmListener.handleAck(1, multiple = false)
5255

53-
Await.result(publishTask, 10.seconds)
56+
Await.result(publishFuture, 10.seconds)
5457

5558
verify(channel, times(2))
5659
.basicPublish(Matchers.eq(exchangeName), Matchers.eq(routingKey), any(), Matchers.eq(body.toByteArray))
@@ -59,10 +62,13 @@ class PublisherConfirmsRabbitMQProducerTest extends TestBase {
5962
test("Message not acked returned if number of attempts exhausted") {
6063
val exchangeName = Random.nextString(10)
6164
val routingKey = Random.nextString(10)
62-
val seqNumber = 1L
6365

66+
val nextSeqNumber = AtomicInt(0)
6467
val channel = mock[AutorecoveringChannel]
65-
when(channel.getNextPublishSeqNo).thenReturn(seqNumber)
68+
when(channel.getNextPublishSeqNo).thenAnswer(_ => nextSeqNumber.get())
69+
when(channel.basicPublish(any(), any(), any(), any())).thenAnswer(_ => {
70+
nextSeqNumber.increment()
71+
})
6672

6773
val producer = new PublishConfirmsRabbitMQProducer[Task, Bytes](
6874
name = "test",
@@ -77,27 +83,35 @@ class PublisherConfirmsRabbitMQProducerTest extends TestBase {
7783
sendAttempts = 1
7884
)
7985

80-
val body = Bytes.copyFrom(Array.fill(499)(32.toByte))
86+
val body = Bytes.copyFrom(Array.fill(499)(64.toByte))
8187

8288
val publishTask = producer.send(routingKey, body).runToFuture
8389

90+
while (nextSeqNumber.get() < 1) {
91+
Thread.sleep(5)
92+
}
93+
94+
producer.DefaultConfirmListener.handleNack(0, multiple = false)
95+
8496
assertThrows[NotAcknowledgedPublish] {
85-
updateMessageState(producer, seqNumber)(Left(NotAcknowledgedPublish("abcd", messageId = seqNumber))).await
86-
Await.result(publishTask, 10.seconds)
97+
Await.result(publishTask, 1.seconds)
8798
}
8899

89100
verify(channel).basicPublish(Matchers.eq(exchangeName), Matchers.eq(routingKey), any(), Matchers.eq(body.toByteArray))
90101
}
91102

92-
test("Multiple messages are fully acked") {
103+
test("Multiple messages are fully acked one by one") {
93104
val exchangeName = Random.nextString(10)
94105
val routingKey = Random.nextString(10)
95106

96-
val channel = mock[AutorecoveringChannel]
107+
val seqNumbers = (0 to 499).toList
108+
val nextSeqNumber = AtomicInt(0)
97109

98-
val seqNumbers = 1 to 500
99-
val iterator = seqNumbers.iterator
100-
when(channel.getNextPublishSeqNo).thenAnswer(_ => { iterator.next() })
110+
val channel = mock[AutorecoveringChannel]
111+
when(channel.getNextPublishSeqNo).thenAnswer(_ => nextSeqNumber.get())
112+
when(channel.basicPublish(any(), any(), any(), any())).thenAnswer(_ => {
113+
nextSeqNumber.increment()
114+
})
101115

102116
val producer = new PublishConfirmsRabbitMQProducer[Task, Bytes](
103117
name = "test",
@@ -112,38 +126,64 @@ class PublisherConfirmsRabbitMQProducerTest extends TestBase {
112126
sendAttempts = 2
113127
)
114128

115-
val body = Bytes.copyFrom(Array.fill(499)(32.toByte))
129+
val body = Bytes.copyFrom(Array.fill(499)(Random.nextInt(255).toByte))
116130

117-
val publishTasks = Task.parSequenceUnordered {
118-
seqNumbers.map { _ =>
119-
producer.send(routingKey, body)
120-
}
131+
val publishFuture = Task.parSequence {
132+
seqNumbers.map(_ => producer.send(routingKey, body))
121133
}.runToFuture
122134

123-
Task
124-
.parSequenceUnordered(seqNumbers.map { seqNumber =>
125-
updateMessageState(producer, seqNumber)(Right())
126-
})
127-
.await(15.seconds)
135+
seqNumbers.foreach { seqNumber =>
136+
while (nextSeqNumber.get() <= seqNumber) { Thread.sleep(5) }
137+
producer.DefaultConfirmListener.handleAck(seqNumber, multiple = false)
138+
}
128139

129-
Await.result(publishTasks, 15.seconds)
140+
Await.result(publishFuture, 15.seconds)
130141

142+
assertResult(seqNumbers.length)(nextSeqNumber.get())
131143
verify(channel, times(seqNumbers.length))
132144
.basicPublish(Matchers.eq(exchangeName), Matchers.eq(routingKey), any(), Matchers.eq(body.toByteArray))
133145
}
134146

135-
private def updateMessageState(producer: PublishConfirmsRabbitMQProducer[Task, Bytes], messageId: Long, attempt: Int = 1)(
136-
result: Either[NotAcknowledgedPublish, Unit]): Task[Unit] = {
137-
Task
138-
.delay(producer.confirmationCallbacks.get(messageId))
139-
.flatMap {
140-
case Some(value) => value.complete(result)
141-
case None =>
142-
if (attempt < 90) {
143-
Task.sleep(100.millis) >> updateMessageState(producer, messageId, attempt + 1)(result)
144-
} else {
145-
throw new InvalidOrderingException(s"The message ID $messageId is not present in the list of callbacks")
146-
}
147-
}
147+
test("Multiple messages are fully acked at once") {
148+
val exchangeName = Random.nextString(10)
149+
val routingKey = Random.nextString(10)
150+
151+
val messageCount = 500
152+
val seqNumbers = (0 until messageCount).toList
153+
val nextSeqNumber = AtomicInt(0)
154+
155+
val channel = mock[AutorecoveringChannel]
156+
when(channel.getNextPublishSeqNo).thenAnswer(_ => nextSeqNumber.get())
157+
when(channel.basicPublish(any(), any(), any(), any())).thenAnswer(_ => {
158+
nextSeqNumber.increment()
159+
})
160+
161+
val producer = new PublishConfirmsRabbitMQProducer[Task, Bytes](
162+
name = "test",
163+
exchangeName = exchangeName,
164+
channel = channel,
165+
monitor = Monitor.noOp(),
166+
defaultProperties = MessageProperties.empty,
167+
reportUnroutable = false,
168+
sizeLimitBytes = None,
169+
blocker = TestBase.testBlocker,
170+
logger = ImplicitContextLogger.createLogger,
171+
sendAttempts = 2
172+
)
173+
174+
val body = Bytes.copyFrom(Array.fill(499)(Random.nextInt(255).toByte))
175+
176+
val publishFuture = Task.parSequence {
177+
seqNumbers.map(_ => producer.send(routingKey, body))
178+
}.runToFuture
179+
180+
while (nextSeqNumber.get() < messageCount) { Thread.sleep(5) }
181+
producer.DefaultConfirmListener.handleAck(messageCount, multiple = true)
182+
183+
Await.result(publishFuture, 15.seconds)
184+
185+
assertResult(seqNumbers.length)(nextSeqNumber.get())
186+
verify(channel, times(seqNumbers.length))
187+
.basicPublish(Matchers.eq(exchangeName), Matchers.eq(routingKey), any(), Matchers.eq(body.toByteArray))
148188
}
149189
}

0 commit comments

Comments
 (0)