Skip to content

Commit 99eacf1

Browse files
KAFKA-16914: Added share group dynamic and broker configs (apache#16268)
KIP-932 introduces a bunch of broker and dynamic configs for share groups. This PR adds those new configs. The changes include: 1. Defined ShareGroupConfigs class which stores various share group configurations. 2. Use the defined share configs in KafkaConfig.scala for making it available to BrokerServer 3. Adds a few tests to validate the conditions on these new configs. Reviewers: Manikumar Reddy <[email protected]>
1 parent af86e56 commit 99eacf1

File tree

3 files changed

+216
-2
lines changed

3 files changed

+216
-2
lines changed

core/src/main/scala/kafka/server/KafkaConfig.scala

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import org.apache.kafka.server.ProcessRole
4747
import org.apache.kafka.server.authorizer.Authorizer
4848
import org.apache.kafka.server.common.{MetadataVersion, MetadataVersionValidator}
4949
import org.apache.kafka.server.common.MetadataVersion._
50-
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, ServerConfigs, QuotaConfigs, ReplicationConfigs, ServerLogConfigs, ZkConfigs}
50+
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, ServerConfigs, QuotaConfigs, ReplicationConfigs, ServerLogConfigs, ZkConfigs, ShareGroupConfigs}
5151
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
5252
import org.apache.kafka.server.metrics.MetricConfigs
5353
import org.apache.kafka.server.record.BrokerCompressionType
@@ -292,6 +292,23 @@ object KafkaConfig {
292292
.define(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_DOC)
293293
.define(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, CaseInsensitiveValidString.in(Utils.enumOptions(classOf[ConsumerGroupMigrationPolicy]): _*), MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_DOC)
294294

295+
/** Share Group Configurations **/
296+
// Internal configuration used by integration and system tests.
297+
.defineInternal(ShareGroupConfigs.SHARE_GROUP_ENABLE_CONFIG, BOOLEAN, ShareGroupConfigs.SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, ShareGroupConfigs.SHARE_GROUP_ENABLE_DOC)
298+
.define(ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
299+
.define(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
300+
.define(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC)
301+
.define(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC)
302+
.define(ShareGroupConfigs.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, ShareGroupConfigs.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC)
303+
.define(ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
304+
.define(ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
305+
.define(ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
306+
.define(ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
307+
.define(ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
308+
.define(ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
309+
.define(ShareGroupConfigs.SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, ShareGroupConfigs.SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MAX_GROUPS_DOC)
310+
.define(ShareGroupConfigs.SHARE_GROUP_MAX_SIZE_CONFIG, SHORT, ShareGroupConfigs.SHARE_GROUP_MAX_SIZE_DEFAULT, between(10, 1000), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MAX_SIZE_DOC)
311+
295312
/** ********* Offset management configuration ***********/
296313
.define(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG, INT, GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_DOC)
297314
.define(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT, GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_DOC)
@@ -960,6 +977,22 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
960977
val consumerGroupAssignors = getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, classOf[ConsumerGroupPartitionAssignor])
961978
val consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG))
962979

980+
/** Share group configuration **/
981+
val isShareGroupEnabled = getBoolean(ShareGroupConfigs.SHARE_GROUP_ENABLE_CONFIG)
982+
val shareGroupPartitionMaxRecordLocks = getInt(ShareGroupConfigs.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG)
983+
val shareGroupDeliveryCountLimit = getInt(ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG)
984+
val shareGroupMaxGroups = getShort(ShareGroupConfigs.SHARE_GROUP_MAX_GROUPS_CONFIG)
985+
val shareGroupMaxSize = getShort(ShareGroupConfigs.SHARE_GROUP_MAX_SIZE_CONFIG)
986+
val shareGroupSessionTimeoutMs = getInt(ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)
987+
val shareGroupMinSessionTimeoutMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)
988+
val shareGroupMaxSessionTimeoutMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)
989+
val shareGroupHeartbeatIntervalMs = getInt(ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)
990+
val shareGroupMinHeartbeatIntervalMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)
991+
val shareGroupMaxHeartbeatIntervalMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)
992+
val shareGroupRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG)
993+
val shareGroupMaxRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG)
994+
val shareGroupMinRecordLockDurationMs = getInt(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG)
995+
963996
/** ********* Offset management configuration ***********/
964997
val offsetMetadataMaxSize = getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG)
965998
val offsetsLoadBufferSize = getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG)
@@ -1457,6 +1490,36 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
14571490
s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " +
14581491
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")
14591492

1493+
require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
1494+
s"${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
1495+
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
1496+
require(shareGroupHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
1497+
s"${ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
1498+
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
1499+
require(shareGroupHeartbeatIntervalMs <= shareGroupMaxHeartbeatIntervalMs,
1500+
s"${ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be less than or equals " +
1501+
s"to ${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}")
1502+
1503+
require(shareGroupMaxSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
1504+
s"${ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
1505+
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
1506+
require(shareGroupSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
1507+
s"${ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
1508+
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
1509+
require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs,
1510+
s"${ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " +
1511+
s"to ${ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")
1512+
1513+
require(shareGroupMaxRecordLockDurationMs >= shareGroupMinRecordLockDurationMs,
1514+
s"${ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG} must be greater than or equals " +
1515+
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG}")
1516+
require(shareGroupRecordLockDurationMs >= shareGroupMinRecordLockDurationMs,
1517+
s"${ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG} must be greater than or equals " +
1518+
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG}")
1519+
require(shareGroupMaxRecordLockDurationMs >= shareGroupRecordLockDurationMs,
1520+
s"${ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG} must be greater than or equals " +
1521+
s"to ${ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG}")
1522+
14601523
if (originals.containsKey(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG)) {
14611524
warn(s"${GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG} is deprecated and it will be removed in Apache Kafka 4.0.")
14621525
}

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.kafka.raft.QuorumConfig
4141
import org.apache.kafka.security.PasswordEncoderConfigs
4242
import org.apache.kafka.server.common.MetadataVersion
4343
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1}
44-
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZkConfigs}
44+
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ShareGroupConfigs, ZkConfigs}
4545
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
4646
import org.apache.kafka.server.metrics.MetricConfigs
4747
import org.apache.kafka.storage.internals.log.CleanerConfig
@@ -1074,6 +1074,22 @@ class KafkaConfigTest {
10741074
case GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
10751075
case GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG => // ignore string
10761076

1077+
/** Share groups configs */
1078+
case ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1079+
case ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1080+
case ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1081+
case ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1082+
case ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1083+
case ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1084+
case ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1085+
case ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1086+
case ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1087+
case ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1088+
case ShareGroupConfigs.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1089+
case ShareGroupConfigs.SHARE_GROUP_MAX_GROUPS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1090+
case ShareGroupConfigs.SHARE_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
1091+
1092+
10771093
case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1")
10781094
}
10791095
}
@@ -1928,4 +1944,61 @@ class KafkaConfigTest {
19281944
props.put(ServerLogConfigs.LOG_DIRS_CONFIG, "/tmp/a,/tmp/b")
19291945
assertDoesNotThrow(() => KafkaConfig.fromProps(props))
19301946
}
1947+
1948+
@Test
1949+
def testShareGroupSessionTimeoutValidation(): Unit = {
1950+
val props = new Properties()
1951+
props.putAll(kraftProps())
1952+
1953+
// Max should be greater than or equals to min.
1954+
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "20")
1955+
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "10")
1956+
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
1957+
1958+
// The timeout should be within the min-max range.
1959+
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "10")
1960+
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "20")
1961+
props.put(ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "5")
1962+
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
1963+
props.put(ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "25")
1964+
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
1965+
}
1966+
1967+
@Test
1968+
def testShareGroupHeartbeatIntervalValidation(): Unit = {
1969+
val props = new Properties()
1970+
props.putAll(kraftProps())
1971+
1972+
// Max should be greater than or equals to min.
1973+
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "20")
1974+
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "10")
1975+
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
1976+
1977+
// The timeout should be within the min-max range.
1978+
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "10")
1979+
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "20")
1980+
props.put(ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "5")
1981+
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
1982+
props.put(ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "25")
1983+
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
1984+
}
1985+
1986+
@Test
1987+
def testShareGroupRecordLockDurationValidation(): Unit = {
1988+
val props = new Properties()
1989+
props.putAll(kraftProps())
1990+
1991+
// Max should be greater than or equals to min.
1992+
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "20")
1993+
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "10")
1994+
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
1995+
1996+
// The duration should be within the min-max range.
1997+
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "10")
1998+
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "20")
1999+
props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "5")
2000+
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
2001+
props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "25")
2002+
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
2003+
}
19312004
}

0 commit comments

Comments
 (0)