Skip to content

Commit 4b4bc0c

Browse files
Add support for fetch.wait.max.ms Kafka consumer config (#197)
Motivation: Currently the API doesn't provide a way to set the `fetch.wait.max.ms` Kafka consumer config. This PR enables the user to set it. Modifications: Added support for `fetch.wait.max.ms` inside of `KafkaConsumerConfiguration`. Result: We can set `fetch.wait.max.msa`.
1 parent 62da8d9 commit 4b4bc0c

File tree

1 file changed

+15
-0
lines changed

1 file changed

+15
-0
lines changed

Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,20 @@ public struct KafkaConsumerConfiguration {
188188
}
189189
}
190190

191+
/// The maximum amount of time the server will block before answering the fetch request
192+
/// there isn’t sufficient data to immediately satisfy the requirement given by fetch.min.bytes.
193+
/// Default: `.milliseconds(500)`
194+
public var maximumFetchWaitTime: Duration = .milliseconds(500) {
195+
didSet {
196+
if maximumFetchWaitTime != .zero {
197+
precondition(
198+
maximumFetchWaitTime.canBeRepresentedAsMilliseconds,
199+
"Lowest granularity is milliseconds"
200+
)
201+
}
202+
}
203+
}
204+
191205
/// Topic metadata options.
192206
public var topicMetadata: KafkaConfiguration.TopicMetadataOptions = .init()
193207

@@ -260,6 +274,7 @@ extension KafkaConsumerConfiguration {
260274
resultDict["receive.message.max.bytes"] = String(maximumReceiveMessageBytes)
261275
resultDict["max.in.flight.requests.per.connection"] = String(maximumInFlightRequestsPerConnection)
262276
resultDict["metadata.max.age.ms"] = String(maximumMetadataAge.inMilliseconds)
277+
resultDict["fetch.wait.max.ms"] = String(maximumFetchWaitTime.inMilliseconds)
263278
resultDict["topic.metadata.refresh.interval.ms"] = String(topicMetadata.refreshInterval.rawValue)
264279
resultDict["topic.metadata.refresh.fast.interval.ms"] = String(topicMetadata.refreshFastInterval.inMilliseconds)
265280
resultDict["topic.metadata.refresh.sparse"] = String(topicMetadata.isSparseRefreshingEnabled)

0 commit comments

Comments
 (0)