Skip to content

Implement pause and resume #300

@gabrieljones

Description

@gabrieljones

I have an algorithm for delayed topic consume that is based on the pause and resume functions. In this API those functions have been shunted as NoOps. Please implement the pause and resume functions so that I may use my delayed message consume algorithm with PubSubLite.

Additional context
https://github.com/googleapis/java-pubsublite-kafka/blob/v0.6.7/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java#L585-L600

Message Delayed Consume Algorithm

  trait Delayer {
    def delay(records: Seq[KafkaRecord]): Seq[KafkaRecord]
  }

  object Delayer {

    /**
     * No op delayer, effectively zero delay.
     */
    object Zero extends Delayer {
      override def delay(records: Seq[KafkaRecord]): Seq[KafkaRecord] = records
    }

    /**
     * Delayer.Some
     * non blocking delayer
     * every batch it:
     * - resumes all assigned partitions
     * - collates previously delayed records with recently polled records
     * - records separated into
     *   - records that are old enough to process
     *   - records still too young to process
     * - pauses partitions for any records that are too young
     * - returns records that are old enough
     * - keeps a buffer of records to reconsider on the next pass
     *
     * If this consumer dies before the held back too young records are processed, their offsets will not have been
     * committed, and they will be reconsidered by the next consumer to be assigned their respective partitions.
     *
     * Not thread safe. It is expected this is only accessed from a single thread.
     */
    class Some(consumer: KafkaConsumer[Array[Byte], Array[Byte]], delay: FiniteDuration, clock: Clock = Clock.systemUTC()) extends Delayer {
      var recordsLater: Seq[KafkaRecord] = Seq.empty

      /** Determine which records are too young to process
       *
       * @param recordsNew records received in the recent poll
       * @param recordsLater records received in previous polls that were too young
       * @return (records ready now, records that are still too young)
       */
      def nowAndDelayed(recordsNew: Seq[KafkaRecord], recordsLater: Seq[KafkaRecord]): (Seq[KafkaRecord], Seq[KafkaRecord]) = {
        if (delay <= Duration.Zero) return (recordsLater ++ recordsNew, Seq.empty)
        val now = clock.millis()
        val bufferLater = new mutable.ArrayBuffer[KafkaRecord]
        val bufferNow = new mutable.ArrayBuffer[KafkaRecord]
        for {
          records <- Seq(recordsLater, recordsNew)
          record <- records
        } yield {
          if (record.timestamp() + delay.toMillis > now) { //record timestamp plus delay is in the future
            bufferLater.append(record) //save it for later
          } else {
            bufferNow.append(record)
          }
        }
        (bufferNow.toSeq, bufferLater.toSeq)
      }

      def partitionsPause(recordsLater: Seq[KafkaRecord]): Unit = {
        if (recordsLater.nonEmpty) {
          val tpsToPause: Set[TopicPartition] = recordsLater.map(r => new TopicPartition(r.topic(), r.partition())).toSet
          for {
            tpToPause <- tpsToPause
          } {
            try {
              consumer.pause(Collections.singletonList(tpToPause))
            } catch {
              case _: IllegalStateException => //skip this topic partition as it is no longer assigned to me
            }
          }
        }
      }

      def partitionsResumeAll(): Unit = consumer.resume(consumer.assignment())

      override def delay(recordsNew: Seq[KafkaRecord]): Seq[KafkaRecord] = {
        partitionsResumeAll() //resume all partitions, we will pause any that should still be paused before exiting
        val (recordsNow, recordsDelayed) = nowAndDelayed(recordsNew, recordsLater)
        recordsLater = recordsDelayed
        partitionsPause(recordsLater) //pause any partitions for records still too young to process
        recordsNow
      }

    }
  }

Metadata

Metadata

Labels

api: pubsubliteIssues related to the googleapis/java-pubsublite-kafka API.type: feature request‘Nice-to-have’ improvement, new feature or different behavior or design.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions