Skip to content

[WIP] Backpressure strategy #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.github.nomisRev.kafka.receiver

import org.apache.kafka.clients.consumer.KafkaConsumer
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.buffer
import org.apache.kafka.common.TopicPartition

/**
* When a [KafkaReceiver] does not process message fast enough,
* it will start to buffer messages in a [channelFlow] using the default [Channel.BUFFERED].
* This can be overwritten by using [buffer].
*
* When the [Channel] is full,
* it will prevent the [KafkaReceiver] from calling [KafkaConsumer.poll] again.
*
* When the [KafkaConsumer] does not call [KafkaConsumer.poll] every `max.poll.interval.ms`,
* then Kafka will consider our consumer dead and will rebalance the partitions.
*
* [RebalanceStrategy] specifies with which strategy the backpressure should be applied.
*/
public sealed interface RebalanceStrategy {
/**
* When we want to apply backpressure to the Kafka we need to _pause_ the partitions.
* This will prevent Kafka from considering our consumer dead.
*
* Whenever we have space in the [Channel] we will _resume_ the partitions.
*
* NOTE: It can happen that with a slow processor you constantly pause and resume the partitions,
* which can cause considerable overhead to Kafka.
* Consider adding _logging_ or _alerting_ to your [RebalanceListener] to see how often this happens.
*/
public object Backpressure : RebalanceStrategy

/**
* In some cases you **do not** want to apply automatic backpressure to Kafka.
* For example when you need to guarantee _high throughput_,
* and you **never** want a _pause_ or _resume_ causing a rebalance.
*
* In this scenario you can choose
*/
public class FailFast(
public val failFast: (Set<TopicPartition>) -> Nothing = { throw BackpressureException(it) }
) : RebalanceStrategy
}

public class BackpressureException(
public val partitions: Set<TopicPartition>
) : Exception("KafkaConsumer is not processing messages fast enough for partitions: $partitions")
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@ public data class ReceiverSettings<K, V>(
val maxCommitAttempts: Int = DEFAULT_MAX_COMMIT_ATTEMPTS,
val maxDeferredCommits: Int = 0,
val closeTimeout: Duration = Duration.INFINITE,
val rebalanceStrategy: RebalanceStrategy = RebalanceStrategy.Backpressure,
val properties: Properties = Properties(),
) {
init {
@@ -69,6 +70,7 @@ public fun <V> ReceiverSettings(
maxCommitAttempts: Int = DEFAULT_MAX_COMMIT_ATTEMPTS,
maxDeferredCommits: Int = 0,
closeTimeout: Duration = Long.MAX_VALUE.nanoseconds,
rebalanceStrategy: RebalanceStrategy = RebalanceStrategy.Backpressure,
properties: Properties = Properties(),
): ReceiverSettings<Nothing, V> =
ReceiverSettings(
@@ -83,6 +85,7 @@ public fun <V> ReceiverSettings(
maxCommitAttempts,
maxDeferredCommits,
closeTimeout,
rebalanceStrategy,
properties
)

Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package io.github.nomisRev.kafka.receiver.internals

import io.github.nomisRev.kafka.receiver.CommitStrategy
import io.github.nomisRev.kafka.receiver.Offset
import io.github.nomisRev.kafka.receiver.RebalanceStrategy
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import io.github.nomisRev.kafka.receiver.size
import kotlinx.coroutines.CoroutineScope
@@ -220,6 +221,14 @@ internal class EventLoop<K, V>(
}
}

/**
* Returns `true` if you need to _pause_ the consumer, otherwise the consumer was already paused.
*
* If the consumer meets the following conditions, we should wakeup the consumer:
* - paused
* - The downstream can continue processing, requesting.get() == true
* - and we're not in retrying commits
*/
private fun checkAndSetPausedByUs(): Boolean {
logger.debug("checkAndSetPausedByUs")
val pausedNow = !pausedByUs.getAndSet(true)
@@ -246,41 +255,55 @@ internal class EventLoop<K, V>(

val pauseForDeferred =
(settings.maxDeferredCommits > 0 && commitBatch.deferredCount() >= settings.maxDeferredCommits)
val shouldPoll: Boolean = if (pauseForDeferred || retrying.get()) false else requesting.get()

if (shouldPoll) {
if (!awaitingTransaction.get()) {
if (pausedByUs.getAndSet(false)) {
val toResume: MutableSet<TopicPartition> = HashSet(consumer.assignment())
toResume.removeAll(pausedByUser)
pausedByUser.clear()
consumer.resume(toResume)
logger.debug("Resumed")
}
} else {
if (checkAndSetPausedByUs()) {
pausedByUser.addAll(consumer.paused())
consumer.pause(consumer.assignment())
logger.debug("Paused - awaiting transaction")
}
val shouldPoll: Boolean =
if (pauseForDeferred || retrying.get()) false
else requesting.get()

when {
shouldPoll && !awaitingTransaction.get() && pausedByUs.getAndSet(false) -> {
val toResume: MutableSet<TopicPartition> = HashSet(consumer.assignment())
toResume.removeAll(pausedByUser)
pausedByUser.clear()
consumer.resume(toResume)
logger.debug("Resumed")
}
} else if (checkAndSetPausedByUs()) {
pausedByUser.addAll(consumer.paused())
consumer.pause(consumer.assignment())
when {
pauseForDeferred -> logger.debug("Paused - too many deferred commits")
retrying.get() -> logger.debug("Paused - commits are retrying")
else -> logger.debug("Paused - back pressure")

shouldPoll && awaitingTransaction.get() && checkAndSetPausedByUs() -> {
pausedByUser.addAll(consumer.paused())
consumer.pause(consumer.assignment())
logger.debug("Paused - awaiting transaction")
}

// !shouldPoll so we need to apply backpressure to Kafka according to strategy
!shouldPoll && checkAndSetPausedByUs() ->
when (val strategy = settings.rebalanceStrategy) {
RebalanceStrategy.Backpressure -> {
pausedByUser.addAll(consumer.paused())
consumer.pause(consumer.assignment())
when {
pauseForDeferred -> logger.debug("Paused - too many deferred commits")
retrying.get() -> logger.debug("Paused - commits are retrying")
else -> logger.debug("Paused - back pressure")
}
}

is RebalanceStrategy.FailFast -> strategy.failFast(consumer.assignment())
}
}

// Execute poll
val records: ConsumerRecords<K, V> = try {
consumer.poll(pollTimeout)
} catch (e: WakeupException) {
logger.debug("Consumer woken")
ConsumerRecords.empty()
}

// Schedule a new poll task on the single threaded dispatcher
if (isActive.get()) schedulePoll()

// Send the records downstream, and check state of buffer. Signal back-pressure in `requesting`
if (!records.isEmpty) {
if (settings.maxDeferredCommits > 0) {
commitBatch.addUncommitted(records)