Skip to content

Commit method respond slow (was: Make poll interval adaptive) #128

Open
@blindspotbounty

Description

@blindspotbounty

Currently, even if there are move than 100 events (which is hardcoded), we won't continue polling but call Task.sleep() which seems pretty strange.
I would like to suggest some adaptive polling interval, which would be depending on events: e.g.:

func eventPoll(events: inout [KafkaEvent], maxEvents: inout Int) -> Bool /* or enum { case shouldSleep, case shouldYield } */{
        events.removeAll(keepingCapacity: true)
        events.reserveCapacity(maxEvents)
        
        var shouldSleep = true

for ...
            switch eventType {
            case .deliveryReport:
                let forwardEvent = self.handleDeliveryReportEvent(event)
                events.append(forwardEvent)
                shouldSleep = false
            case .fetch:
                if let forwardEvent = self.handleFetchEvent(event) {
                    events.append(forwardEvent)
                    shouldSleep = false
                }
            case .log:
                self.handleLogEvent(event)
            case .offsetCommit:
                self.handleOffsetCommitEvent(event)
                shouldSleep = false
            case .statistics:
                events.append(self.handleStatistics(event))
            case .rebalance:
                events.append(self.handleRebalance(event))
                shouldSleep = false
            case .error:
                break
            case .none:
                // Finished reading events, return early
                return shouldSleep
            default:
                break // Ignored Event
            }

The idea is to then adjust polling time the following way:

            case .pollForAndYieldMessage(let client, let source, let eventSource):
                let shouldSleep = client.eventPoll(events: &events, maxEvents: &maxEvents)
                for event in events { ... }
                if shouldSleep {
                    pollInterval = min(self.configuration.pollInterval, pollInterval * 2)
                    try await Task.sleep(for: pollInterval)
                } else {
                    pollInterval = max(pollInterval / 3, .milliseconds(1))
                    await Task.yield()
                }

That might be also useful to adjust events size but not sure if it is feasible.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions