Skip to content

Commit 702ad1c

Browse files
committed
simpify logic
1 parent 2499172 commit 702ad1c

File tree

2 files changed

+36
-13
lines changed

2 files changed

+36
-13
lines changed

core/src/main/scala/kafka/server/KafkaApis.scala

+15-13
Original file line numberDiff line numberDiff line change
@@ -168,16 +168,6 @@ class KafkaApis(val requestChannel: RequestChannel,
168168
maybeForwardToController(request, errorHandler)
169169
}
170170

171-
private def forwardToControllerOrThrow(request: RequestChannel.Request,
172-
createException: RequestChannel.Request => Exception
173-
): Unit = {
174-
def errorHandler(request: RequestChannel.Request): Unit = {
175-
throw createException(request)
176-
}
177-
178-
maybeForwardToController(request, errorHandler)
179-
}
180-
181171
/**
182172
* Top-level method that handles all requests and multiplexes to the right api
183173
*/
@@ -218,8 +208,8 @@ class KafkaApis(val requestChannel: RequestChannel,
218208
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request).exceptionally(handleError)
219209
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
220210
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
221-
case ApiKeys.CREATE_TOPICS => forwardToControllerOrThrow(request, KafkaApis.shouldAlwaysForward)
222-
case ApiKeys.DELETE_TOPICS => forwardToControllerOrThrow(request, KafkaApis.shouldAlwaysForward)
211+
case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
212+
case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
223213
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
224214
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal)
225215
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
@@ -236,7 +226,7 @@ class KafkaApis(val requestChannel: RequestChannel,
236226
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
237227
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
238228
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
239-
case ApiKeys.CREATE_PARTITIONS => forwardToControllerOrThrow(request, KafkaApis.shouldAlwaysForward)
229+
case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)
240230
// Create, renew and expire DelegationTokens must first validate that the connection
241231
// itself is not authenticated with a delegation token before maybeForwardToController.
242232
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
@@ -1778,6 +1768,18 @@ class KafkaApis(val requestChannel: RequestChannel,
17781768
requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
17791769
}
17801770

1771+
def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
1772+
throw KafkaApis.shouldAlwaysForward(request)
1773+
}
1774+
1775+
def handleCreatePartitionsRequest(request: RequestChannel.Request): Unit = {
1776+
throw KafkaApis.shouldAlwaysForward(request)
1777+
}
1778+
1779+
def handleDeleteTopicsRequest(request: RequestChannel.Request): Unit = {
1780+
throw KafkaApis.shouldAlwaysForward(request)
1781+
}
1782+
17811783
def handleDeleteRecordsRequest(request: RequestChannel.Request): Unit = {
17821784
val deleteRecordsRequest = request.body[DeleteRecordsRequest]
17831785

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

+21
Original file line numberDiff line numberDiff line change
@@ -10514,6 +10514,27 @@ class KafkaApisTest extends Logging {
1051410514
verifyShouldNeverHandleErrorMessage(kafkaApis.handleEnvelope(_, RequestLocal.withThreadConfinedCaching))
1051510515
}
1051610516

10517+
@Test
10518+
def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
10519+
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
10520+
kafkaApis = createKafkaApis(raftSupport = true)
10521+
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTopicsRequest)
10522+
}
10523+
10524+
@Test
10525+
def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = {
10526+
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
10527+
kafkaApis = createKafkaApis(raftSupport = true)
10528+
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreatePartitionsRequest)
10529+
}
10530+
10531+
@Test
10532+
def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = {
10533+
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
10534+
kafkaApis = createKafkaApis(raftSupport = true)
10535+
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteTopicsRequest)
10536+
}
10537+
1051710538
@Test
1051810539
def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
1051910540
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)

0 commit comments

Comments
 (0)