@@ -19,6 +19,7 @@ package kafka.server
19
19
20
20
import kafka .cluster .{Broker , Partition }
21
21
import kafka .controller .KafkaController
22
+ import kafka .controller .KafkaController
22
23
import kafka .coordinator .transaction .{InitProducerIdResult , TransactionCoordinator }
23
24
import kafka .log .UnifiedLog
24
25
import kafka .network .RequestChannel
@@ -835,109 +836,6 @@ class KafkaApisTest extends Logging {
835
836
unauthorizedTopic, AuthorizationResult .DENIED , logIfAllowed, logIfDenied)
836
837
}
837
838
838
- private def createCombinedTopicAuthorization (authorizer : Authorizer ,
839
- operation : AclOperation ,
840
- authorizedTopic : String ,
841
- unauthorizedTopic : String ,
842
- logIfAllowed : Boolean = true ,
843
- logIfDenied : Boolean = true ): Unit = {
844
- val expectedAuthorizedActions = Seq (
845
- new Action (operation,
846
- new ResourcePattern (ResourceType .TOPIC , authorizedTopic, PatternType .LITERAL ),
847
- 1 , logIfAllowed, logIfDenied),
848
- new Action (operation,
849
- new ResourcePattern (ResourceType .TOPIC , unauthorizedTopic, PatternType .LITERAL ),
850
- 1 , logIfAllowed, logIfDenied))
851
-
852
- when(authorizer.authorize(
853
- any[RequestContext ], argThat((t : java.util.List [Action ]) => t != null && t.containsAll(expectedAuthorizedActions.asJava))
854
- )).thenAnswer { invocation =>
855
- val actions = invocation.getArgument(1 ).asInstanceOf [util.List [Action ]]
856
- actions.asScala.map { action =>
857
- if (action.resourcePattern().name().equals(authorizedTopic))
858
- AuthorizationResult .ALLOWED
859
- else
860
- AuthorizationResult .DENIED
861
- }.asJava
862
- }
863
- }
864
-
865
- private def verifyCreateTopicsResult (response : CreateTopicsResponse ,
866
- expectedErrorCodes : Map [String , Errors ],
867
- expectedTopicConfigErrorCodes : Map [String , Errors ]): Unit = {
868
- val actualErrorCodes = response.data.topics().asScala.map { topicResponse =>
869
- topicResponse.name() -> Errors .forCode(topicResponse.errorCode)
870
- }.toMap
871
-
872
- assertEquals(expectedErrorCodes, actualErrorCodes)
873
-
874
- val actualTopicConfigErrorCodes = response.data.topics().asScala.map { topicResponse =>
875
- topicResponse.name() -> Errors .forCode(topicResponse.topicConfigErrorCode())
876
- }.toMap
877
-
878
- assertEquals(expectedTopicConfigErrorCodes, actualTopicConfigErrorCodes)
879
- }
880
-
881
- @ Test
882
- def testCreateAclWithForwarding (): Unit = {
883
- val requestBuilder = new CreateAclsRequest .Builder (new CreateAclsRequestData ())
884
- testForwardableApi(ApiKeys .CREATE_ACLS , requestBuilder)
885
- }
886
-
887
- @ Test
888
- def testDeleteAclWithForwarding (): Unit = {
889
- val requestBuilder = new DeleteAclsRequest .Builder (new DeleteAclsRequestData ())
890
- testForwardableApi(ApiKeys .DELETE_ACLS , requestBuilder)
891
- }
892
-
893
- @ Test
894
- def testCreateDelegationTokenWithForwarding (): Unit = {
895
- val requestBuilder = new CreateDelegationTokenRequest .Builder (new CreateDelegationTokenRequestData ())
896
- testForwardableApi(ApiKeys .CREATE_DELEGATION_TOKEN , requestBuilder)
897
- }
898
-
899
- @ Test
900
- def testRenewDelegationTokenWithForwarding (): Unit = {
901
- val requestBuilder = new RenewDelegationTokenRequest .Builder (new RenewDelegationTokenRequestData ())
902
- testForwardableApi(ApiKeys .RENEW_DELEGATION_TOKEN , requestBuilder)
903
- }
904
-
905
- @ Test
906
- def testExpireDelegationTokenWithForwarding (): Unit = {
907
- val requestBuilder = new ExpireDelegationTokenRequest .Builder (new ExpireDelegationTokenRequestData ())
908
- testForwardableApi(ApiKeys .EXPIRE_DELEGATION_TOKEN , requestBuilder)
909
- }
910
-
911
- @ Test
912
- def testAlterPartitionReassignmentsWithForwarding (): Unit = {
913
- val requestBuilder = new AlterPartitionReassignmentsRequest .Builder (new AlterPartitionReassignmentsRequestData ())
914
- testForwardableApi(ApiKeys .ALTER_PARTITION_REASSIGNMENTS , requestBuilder)
915
- }
916
-
917
- @ Test
918
- def testCreatePartitionsWithForwarding (): Unit = {
919
- val requestBuilder = new CreatePartitionsRequest .Builder (new CreatePartitionsRequestData ())
920
- testForwardableApi(ApiKeys .CREATE_PARTITIONS , requestBuilder)
921
- }
922
-
923
- @ Test
924
- def testUpdateFeaturesWithForwarding (): Unit = {
925
- val requestBuilder = new UpdateFeaturesRequest .Builder (new UpdateFeaturesRequestData ())
926
- testForwardableApi(ApiKeys .UPDATE_FEATURES , requestBuilder)
927
- }
928
-
929
- @ Test
930
- def testDeleteTopicsWithForwarding (): Unit = {
931
- val requestBuilder = new DeleteTopicsRequest .Builder (new DeleteTopicsRequestData ())
932
- testForwardableApi(ApiKeys .DELETE_TOPICS , requestBuilder)
933
- }
934
-
935
- @ Test
936
- def testAlterScramWithForwarding (): Unit = {
937
- val requestBuilder = new AlterUserScramCredentialsRequest .Builder (new AlterUserScramCredentialsRequestData ())
938
- testForwardableApi(ApiKeys .ALTER_USER_SCRAM_CREDENTIALS , requestBuilder)
939
- }
940
-
941
839
@ Test
942
840
def testFindCoordinatorAutoTopicCreationForOffsetTopic (): Unit = {
943
841
testFindCoordinatorWithTopicCreation(CoordinatorType .GROUP )
@@ -10054,34 +9952,6 @@ class KafkaApisTest extends Logging {
10054
9952
verifyShouldNeverHandleErrorMessage(kafkaApis.handleAlterPartitionRequest)
10055
9953
}
10056
9954
10057
- @ Test
10058
- def testRaftShouldNeverHandleEnvelope (): Unit = {
10059
- metadataCache = MetadataCache .kRaftMetadataCache(brokerId, () => KRaftVersion .KRAFT_VERSION_0 )
10060
- kafkaApis = createKafkaApis(raftSupport = true )
10061
- verifyShouldNeverHandleErrorMessage(kafkaApis.handleEnvelope(_, RequestLocal .withThreadConfinedCaching))
10062
- }
10063
-
10064
- @ Test
10065
- def testRaftShouldAlwaysForwardCreateTopicsRequest (): Unit = {
10066
- metadataCache = MetadataCache .kRaftMetadataCache(brokerId, () => KRaftVersion .KRAFT_VERSION_0 )
10067
- kafkaApis = createKafkaApis(raftSupport = true )
10068
- verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTopicsRequest)
10069
- }
10070
-
10071
- @ Test
10072
- def testRaftShouldAlwaysForwardCreatePartitionsRequest (): Unit = {
10073
- metadataCache = MetadataCache .kRaftMetadataCache(brokerId, () => KRaftVersion .KRAFT_VERSION_0 )
10074
- kafkaApis = createKafkaApis(raftSupport = true )
10075
- verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreatePartitionsRequest)
10076
- }
10077
-
10078
- @ Test
10079
- def testRaftShouldAlwaysForwardDeleteTopicsRequest (): Unit = {
10080
- metadataCache = MetadataCache .kRaftMetadataCache(brokerId, () => KRaftVersion .KRAFT_VERSION_0 )
10081
- kafkaApis = createKafkaApis(raftSupport = true )
10082
- verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteTopicsRequest)
10083
- }
10084
-
10085
9955
10086
9956
@ Test
10087
9957
def testEmptyLegacyAlterConfigsRequestWithKRaft (): Unit = {
0 commit comments