Skip to content

Commit 24b2946

Browse files
TaiJuWumjsax
authored andcommitted
KAFKA-18399 Remove ZooKeeper from KafkaApis (7/N): CREATE_TOPICS, DELETE_TOPICS, CREATE_PARTITIONS (apache#18433)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 336376d commit 24b2946

File tree

2 files changed

+1
-529
lines changed

2 files changed

+1
-529
lines changed

Diff for: core/src/main/scala/kafka/server/KafkaApis.scala

-243
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,7 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE
3636
import org.apache.kafka.common.internals.{FatalExitError, Topic}
3737
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection}
3838
import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
39-
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
40-
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
41-
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
4239
import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult}
43-
import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
4440
import org.apache.kafka.common.message.ElectLeadersResponseData.{PartitionResult, ReplicaElectionResult}
4541
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
4642
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
@@ -1466,245 +1462,6 @@ class KafkaApis(val requestChannel: RequestChannel,
14661462
requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
14671463
}
14681464

1469-
def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
1470-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
1471-
val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
1472-
1473-
def sendResponseCallback(results: CreatableTopicResultCollection): Unit = {
1474-
val responseData = new CreateTopicsResponseData()
1475-
.setTopics(results)
1476-
val response = new CreateTopicsResponse(responseData)
1477-
trace(s"Sending create topics response $responseData for correlation id " +
1478-
s"${request.header.correlationId} to client ${request.header.clientId}.")
1479-
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, response)
1480-
}
1481-
1482-
val createTopicsRequest = request.body[CreateTopicsRequest]
1483-
val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
1484-
if (!zkSupport.controller.isActive) {
1485-
createTopicsRequest.data.topics.forEach { topic =>
1486-
results.add(new CreatableTopicResult().setName(topic.name)
1487-
.setErrorCode(Errors.NOT_CONTROLLER.code))
1488-
}
1489-
sendResponseCallback(results)
1490-
} else {
1491-
createTopicsRequest.data.topics.forEach { topic =>
1492-
results.add(new CreatableTopicResult().setName(topic.name))
1493-
}
1494-
val hasClusterAuthorization = authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
1495-
logIfDenied = false)
1496-
1497-
val allowedTopicNames = {
1498-
val topicNames = createTopicsRequest
1499-
.data
1500-
.topics
1501-
.asScala
1502-
.map(_.name)
1503-
.toSet
1504-
1505-
topicNames.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))
1506-
}
1507-
1508-
val authorizedTopics = if (hasClusterAuthorization) {
1509-
allowedTopicNames
1510-
} else {
1511-
authHelper.filterByAuthorized(request.context, CREATE, TOPIC, allowedTopicNames)(identity)
1512-
}
1513-
val authorizedForDescribeConfigs = authHelper.filterByAuthorized(
1514-
request.context,
1515-
DESCRIBE_CONFIGS,
1516-
TOPIC,
1517-
allowedTopicNames,
1518-
logIfDenied = false
1519-
)(identity).map(name => name -> results.find(name)).toMap
1520-
1521-
results.forEach { topic =>
1522-
if (topic.name() == Topic.CLUSTER_METADATA_TOPIC_NAME) {
1523-
topic.setErrorCode(Errors.INVALID_REQUEST.code)
1524-
topic.setErrorMessage(s"Creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME} is prohibited.")
1525-
} else if (results.findAll(topic.name).size > 1) {
1526-
topic.setErrorCode(Errors.INVALID_REQUEST.code)
1527-
topic.setErrorMessage("Found multiple entries for this topic.")
1528-
} else if (!authorizedTopics.contains(topic.name)) {
1529-
topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
1530-
topic.setErrorMessage("Authorization failed.")
1531-
}
1532-
if (!authorizedForDescribeConfigs.contains(topic.name) && topic.name() != Topic.CLUSTER_METADATA_TOPIC_NAME) {
1533-
topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
1534-
}
1535-
}
1536-
val toCreate = mutable.Map[String, CreatableTopic]()
1537-
createTopicsRequest.data.topics.forEach { topic =>
1538-
if (results.find(topic.name).errorCode == Errors.NONE.code) {
1539-
toCreate += topic.name -> topic
1540-
}
1541-
}
1542-
def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = {
1543-
errors.foreach { case (topicName, error) =>
1544-
val result = results.find(topicName)
1545-
result.setErrorCode(error.error.code)
1546-
.setErrorMessage(error.message)
1547-
// Reset any configs in the response if Create failed
1548-
if (error != ApiError.NONE) {
1549-
result.setConfigs(List.empty.asJava)
1550-
.setNumPartitions(-1)
1551-
.setReplicationFactor(-1)
1552-
.setTopicConfigErrorCode(Errors.NONE.code)
1553-
}
1554-
}
1555-
sendResponseCallback(results)
1556-
}
1557-
zkSupport.adminManager.createTopics(
1558-
createTopicsRequest.data.timeoutMs,
1559-
createTopicsRequest.data.validateOnly,
1560-
toCreate,
1561-
authorizedForDescribeConfigs,
1562-
controllerMutationQuota,
1563-
handleCreateTopicsResults)
1564-
}
1565-
}
1566-
1567-
def handleCreatePartitionsRequest(request: RequestChannel.Request): Unit = {
1568-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
1569-
val createPartitionsRequest = request.body[CreatePartitionsRequest]
1570-
val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 3)
1571-
1572-
def sendResponseCallback(results: Map[String, ApiError]): Unit = {
1573-
val createPartitionsResults = results.map {
1574-
case (topic, error) => new CreatePartitionsTopicResult()
1575-
.setName(topic)
1576-
.setErrorCode(error.error.code)
1577-
.setErrorMessage(error.message)
1578-
}.toSeq
1579-
val response = new CreatePartitionsResponse(new CreatePartitionsResponseData()
1580-
.setResults(createPartitionsResults.asJava))
1581-
trace(s"Sending create partitions response $response for correlation id ${request.header.correlationId} to " +
1582-
s"client ${request.header.clientId}.")
1583-
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, response)
1584-
}
1585-
1586-
if (!zkSupport.controller.isActive) {
1587-
val result = createPartitionsRequest.data.topics.asScala.map { topic =>
1588-
(topic.name, new ApiError(Errors.NOT_CONTROLLER, null))
1589-
}.toMap
1590-
sendResponseCallback(result)
1591-
} else {
1592-
// Special handling to add duplicate topics to the response
1593-
val topics = createPartitionsRequest.data.topics.asScala.toSeq
1594-
val dupes = topics.groupBy(_.name)
1595-
.filter { _._2.size > 1 }
1596-
.keySet
1597-
val notDuped = topics.filterNot(topic => dupes.contains(topic.name))
1598-
val (authorized, unauthorized) = authHelper.partitionSeqByAuthorized(request.context, ALTER, TOPIC,
1599-
notDuped)(_.name)
1600-
1601-
val (queuedForDeletion, valid) = authorized.partition { topic =>
1602-
zkSupport.controller.isTopicQueuedForDeletion(topic.name)
1603-
}
1604-
1605-
val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request.")) ++
1606-
unauthorized.map(_.name -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "The topic authorization is failed.")) ++
1607-
queuedForDeletion.map(_.name -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion."))
1608-
1609-
zkSupport.adminManager.createPartitions(
1610-
createPartitionsRequest.data.timeoutMs,
1611-
valid,
1612-
createPartitionsRequest.data.validateOnly,
1613-
controllerMutationQuota,
1614-
result => sendResponseCallback(result ++ errors))
1615-
}
1616-
}
1617-
1618-
def handleDeleteTopicsRequest(request: RequestChannel.Request): Unit = {
1619-
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
1620-
val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 5)
1621-
1622-
def sendResponseCallback(results: DeletableTopicResultCollection): Unit = {
1623-
val responseData = new DeleteTopicsResponseData()
1624-
.setResponses(results)
1625-
val response = new DeleteTopicsResponse(responseData)
1626-
trace(s"Sending delete topics response $response for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
1627-
requestHelper.sendResponseMaybeThrottleWithControllerQuota(controllerMutationQuota, request, response)
1628-
}
1629-
1630-
val deleteTopicRequest = request.body[DeleteTopicsRequest]
1631-
val results = new DeletableTopicResultCollection(deleteTopicRequest.numberOfTopics())
1632-
val toDelete = mutable.Set[String]()
1633-
if (!zkSupport.controller.isActive) {
1634-
deleteTopicRequest.topics().forEach { topic =>
1635-
results.add(new DeletableTopicResult()
1636-
.setName(topic.name())
1637-
.setTopicId(topic.topicId())
1638-
.setErrorCode(Errors.NOT_CONTROLLER.code))
1639-
}
1640-
sendResponseCallback(results)
1641-
} else if (!config.deleteTopicEnable) {
1642-
val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED
1643-
deleteTopicRequest.topics().forEach { topic =>
1644-
results.add(new DeletableTopicResult()
1645-
.setName(topic.name())
1646-
.setTopicId(topic.topicId())
1647-
.setErrorCode(error.code))
1648-
}
1649-
sendResponseCallback(results)
1650-
} else {
1651-
val topicIdsFromRequest = deleteTopicRequest.topicIds().asScala.filter(topicId => topicId != Uuid.ZERO_UUID).toSet
1652-
deleteTopicRequest.topics().forEach { topic =>
1653-
if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID)
1654-
throw new InvalidRequestException("Topic name and topic ID can not both be specified.")
1655-
val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name()
1656-
else zkSupport.controller.controllerContext.topicName(topic.topicId).orNull
1657-
results.add(new DeletableTopicResult()
1658-
.setName(name)
1659-
.setTopicId(topic.topicId()))
1660-
}
1661-
val authorizedDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
1662-
results.asScala.filter(result => result.name() != null))(_.name)
1663-
val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
1664-
results.asScala.filter(result => result.name() != null))(_.name)
1665-
results.forEach { topic =>
1666-
val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
1667-
if (unresolvedTopicId) {
1668-
topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
1669-
} else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics.contains(topic.name)) {
1670-
1671-
// Because the client does not have Describe permission, the name should
1672-
// not be returned in the response. Note, however, that we do not consider
1673-
// the topicId itself to be sensitive, so there is no reason to obscure
1674-
// this case with `UNKNOWN_TOPIC_ID`.
1675-
topic.setName(null)
1676-
topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
1677-
} else if (!authorizedDeleteTopics.contains(topic.name)) {
1678-
topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
1679-
} else if (!metadataCache.contains(topic.name)) {
1680-
topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
1681-
} else {
1682-
toDelete += topic.name
1683-
}
1684-
}
1685-
// If no authorized topics return immediately
1686-
if (toDelete.isEmpty)
1687-
sendResponseCallback(results)
1688-
else {
1689-
def handleDeleteTopicsResults(errors: Map[String, Errors]): Unit = {
1690-
errors.foreach {
1691-
case (topicName, error) =>
1692-
results.find(topicName)
1693-
.setErrorCode(error.code)
1694-
}
1695-
sendResponseCallback(results)
1696-
}
1697-
1698-
zkSupport.adminManager.deleteTopics(
1699-
deleteTopicRequest.data.timeoutMs,
1700-
toDelete,
1701-
controllerMutationQuota,
1702-
handleDeleteTopicsResults
1703-
)
1704-
}
1705-
}
1706-
}
1707-
17081465
def handleDeleteRecordsRequest(request: RequestChannel.Request): Unit = {
17091466
val deleteRecordsRequest = request.body[DeleteRecordsRequest]
17101467

0 commit comments

Comments
 (0)