Skip to content

MINOR: Cleanup Server Module #20180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public ClientRequestQuotaManager(
Optional<Plugin<ClientQuotaCallback>> quotaCallbackPlugin
) {
super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix, quotaCallbackPlugin);
this.maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds);
this.maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds());
this.metrics = metrics;
this.exemptMetricName = metrics.metricName("exempt-request-time", QuotaType.REQUEST.toString(), "Tracking exempt-request-time utilization percentage");
exemptSensor = getOrCreateSensor(EXEMPT_SENSOR_NAME, DEFAULT_INACTIVE_EXEMPT_SENSOR_EXPIRATION_TIME_SECONDS, sensor -> sensor.add(exemptMetricName, new Rate()));
Expand Down
720 changes: 354 additions & 366 deletions server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -175,15 +173,12 @@ public class SocketServerConfigs {
private static final Pattern URI_PARSE_REGEXP = Pattern.compile(
"^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)");

public static final Map<ListenerName, SecurityProtocol> DEFAULT_NAME_TO_SECURITY_PROTO;

static {
HashMap<ListenerName, SecurityProtocol> nameToSecurityProtocol = new HashMap<>();
for (SecurityProtocol securityProtocol : SecurityProtocol.values()) {
nameToSecurityProtocol.put(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
}
DEFAULT_NAME_TO_SECURITY_PROTO = Collections.unmodifiableMap(nameToSecurityProtocol);
}
public static final Map<ListenerName, SecurityProtocol> DEFAULT_NAME_TO_SECURITY_PROTO =
Arrays.stream(SecurityProtocol.values())
.collect(Collectors.toUnmodifiableMap(
ListenerName::forSecurityProtocol,
Function.identity()
));

public static List<Endpoint> listenerListToEndPoints(
String input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.protocol.ApiKeys;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
Expand All @@ -34,7 +34,7 @@ public RequestChannelMetrics(Set<ApiKeys> enabledApis) {
for (ApiKeys apiKey : enabledApis) {
metricsMap.put(apiKey.name, new RequestMetrics(apiKey.name));
}
for (String name : Arrays.asList(
for (String name : List.of(
RequestMetrics.CONSUMER_FETCH_METRIC_NAME,
RequestMetrics.FOLLOW_FETCH_METRIC_NAME,
RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.EnumMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -208,27 +207,6 @@ private synchronized void removeMeter() {
}
}

private static class DeprecatedRequestRateKey {

private final short version;
private final ClientInformation clientInformation;

private DeprecatedRequestRateKey(short version, ClientInformation clientInformation) {
this.version = version;
this.clientInformation = clientInformation;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeprecatedRequestRateKey that = (DeprecatedRequestRateKey) o;
return version == that.version && Objects.equals(clientInformation, that.clientInformation);
}

@Override
public int hashCode() {
return Objects.hash(version, clientInformation);
}
private record DeprecatedRequestRateKey(short version, ClientInformation clientInformation) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,38 +51,25 @@ public class AclEntry {
.collect(Collectors.toSet());

public static Set<AclOperation> supportedOperations(ResourceType resourceType) {
switch (resourceType) {
case TOPIC:
return Set.of(READ, WRITE, CREATE, DESCRIBE, DELETE, ALTER, DESCRIBE_CONFIGS, ALTER_CONFIGS);
case GROUP:
return Set.of(READ, DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS);
case CLUSTER:
return Set.of(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE);
case TRANSACTIONAL_ID:
return Set.of(DESCRIBE, WRITE, TWO_PHASE_COMMIT);
case DELEGATION_TOKEN:
return Set.of(DESCRIBE);
case USER:
return Set.of(CREATE_TOKENS, DESCRIBE_TOKENS);
default:
throw new IllegalArgumentException("Not a concrete resource type");
}
return switch (resourceType) {
case TOPIC -> Set.of(READ, WRITE, CREATE, DESCRIBE, DELETE, ALTER, DESCRIBE_CONFIGS, ALTER_CONFIGS);
case GROUP -> Set.of(READ, DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS);
case CLUSTER -> Set.of(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE);
case TRANSACTIONAL_ID -> Set.of(DESCRIBE, WRITE, TWO_PHASE_COMMIT);
case DELEGATION_TOKEN -> Set.of(DESCRIBE);
case USER -> Set.of(CREATE_TOKENS, DESCRIBE_TOKENS);
default -> throw new IllegalArgumentException("Not a concrete resource type");
};
}

public static Errors authorizationError(ResourceType resourceType) {
switch (resourceType) {
case TOPIC:
return Errors.TOPIC_AUTHORIZATION_FAILED;
case GROUP:
return Errors.GROUP_AUTHORIZATION_FAILED;
case CLUSTER:
return Errors.CLUSTER_AUTHORIZATION_FAILED;
case TRANSACTIONAL_ID:
return Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED;
case DELEGATION_TOKEN:
return Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED;
default:
throw new IllegalArgumentException("Authorization error type not known");
}
return switch (resourceType) {
case TOPIC -> Errors.TOPIC_AUTHORIZATION_FAILED;
case GROUP -> Errors.GROUP_AUTHORIZATION_FAILED;
case CLUSTER -> Errors.CLUSTER_AUTHORIZATION_FAILED;
case TRANSACTIONAL_ID -> Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED;
case DELEGATION_TOKEN -> Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED;
default -> throw new IllegalArgumentException("Authorization error type not known");
};
}
}
78 changes: 17 additions & 61 deletions server/src/main/java/org/apache/kafka/server/Assignment.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,63 +26,21 @@

/**
* The class is not converted to a Java record since record classes are meant for pure data, but this one contains a Runnable
**/
final class Assignment {
/**
* The topic ID and partition index of the replica.
*/
private final TopicIdPartition topicIdPartition;

/**
* The ID of the directory we are placing the replica into.
*/
private final Uuid directoryId;

/**
* The time in monotonic nanosecond when this assignment was created.
*/
private final long submissionTimeNs;

/**
* The callback to invoke on success.
*/
private final Runnable successCallback;

Assignment(
TopicIdPartition topicIdPartition,
Uuid directoryId,
long submissionTimeNs,
Runnable successCallback
) {
this.topicIdPartition = topicIdPartition;
this.directoryId = directoryId;
this.submissionTimeNs = submissionTimeNs;
this.successCallback = successCallback;
}

TopicIdPartition topicIdPartition() {
return topicIdPartition;
}

Uuid directoryId() {
return directoryId;
}

long submissionTimeNs() {
return submissionTimeNs;
}

Runnable successCallback() {
return successCallback;
}
*
* @param topicIdPartition The topic ID and partition index of the replica.
* @param directoryId The ID of the directory we are placing the replica into.
* @param submissionTimeNs The time in monotonic nanosecond when this assignment was created.
* @param successCallback The callback to invoke on success.
*/
record Assignment(TopicIdPartition topicIdPartition, Uuid directoryId, long submissionTimeNs,
Runnable successCallback) {

/**
* Check if this Assignment is still valid to be sent.
*
* @param nodeId The broker ID.
* @param image The metadata image.
*
* @return True only if the Assignment is still valid.
* @param nodeId The broker ID.
* @param image The metadata image.
* @return True only if the Assignment is still valid.
*/
boolean valid(int nodeId, MetadataImage image) {
TopicImage topicImage = image.topics().getTopic(topicIdPartition.topicId());
Expand All @@ -99,13 +57,11 @@ boolean valid(int nodeId, MetadataImage image) {

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("Assignment");
bld.append("(topicIdPartition=").append(topicIdPartition);
bld.append(", directoryId=").append(directoryId);
bld.append(", submissionTimeNs=").append(submissionTimeNs);
bld.append(", successCallback=").append(successCallback);
bld.append(")");
return bld.toString();
return "Assignment" +
"(topicIdPartition=" + topicIdPartition +
", directoryId=" + directoryId +
", submissionTimeNs=" + submissionTimeNs +
", successCallback=" + successCallback +
")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ public String toString() {
public String toString() {
return "Fetching";
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,27 @@
* limitations under the License.
*/
package org.apache.kafka.server.config;
public class ClientQuotaManagerConfig {
public final int numQuotaSamples;
public final int quotaWindowSizeSeconds;

/**
* Configuration settings for quota management
*
* @param numQuotaSamples The number of samples to retain in memory
* @param quotaWindowSizeSeconds The time span of each sample
*/
public record ClientQuotaManagerConfig(
int numQuotaSamples,
int quotaWindowSizeSeconds
) {
/**
* Configuration settings for quota management
*
* @param numQuotaSamples The number of samples to retain in memory
* @param quotaWindowSizeSeconds The time span of each sample
* Default constructor with default values
*/
public ClientQuotaManagerConfig(int numQuotaSamples, int quotaWindowSizeSeconds) {
this.numQuotaSamples = numQuotaSamples;
this.quotaWindowSizeSeconds = quotaWindowSizeSeconds;
}

public ClientQuotaManagerConfig() {
this(QuotaConfig.NUM_QUOTA_SAMPLES_DEFAULT, QuotaConfig.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT);
}

/**
* Constructor with custom numQuotaSamples and default quotaWindowSizeSeconds
*/
public ClientQuotaManagerConfig(int numQuotaSamples) {
this(numQuotaSamples, QuotaConfig.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.List;


public interface LoggingControllerMBean {
List<String> getLoggers();
String getLogLevel(String logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -57,7 +56,7 @@ public DelayedDeleteRecords(long delayMs,
Consumer<Map<TopicPartition, DeleteRecordsPartitionResult>> responseCallback) {
super(delayMs);
this.onAcksPending = onAcksPending;
this.deleteRecordsStatus = Collections.unmodifiableMap(deleteRecordsStatus);
this.deleteRecordsStatus = Map.copyOf(deleteRecordsStatus);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the deep copy necessary for now? or we could revisit this code after rewriting the ReplicaManager by java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the later. Lets keep it as Map.copyOf and revisit the same once the Java conversion is complete.

this.responseCallback = responseCallback;
// first update the acks pending variable according to the error code
deleteRecordsStatus.forEach((topicPartition, status) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.server.purgatory;


import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult;
import org.apache.kafka.common.protocol.Errors;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public double maxValueInQuotaWindow(Session session, String clientId) {
if (!quotasEnabled()) return Double.MAX_VALUE;
var clientSensors = getOrCreateQuotaSensors(session, clientId);
var limit = quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags());
if (limit != null) return limit * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds;
if (limit != null) return limit * (config.numQuotaSamples() - 1) * config.quotaWindowSizeSeconds();
return Double.MAX_VALUE;
}

Expand Down Expand Up @@ -495,8 +495,8 @@ protected MetricConfig getQuotaMetricConfig(Map<String, String> metricTags) {

private MetricConfig getQuotaMetricConfig(double quotaLimit) {
return new MetricConfig()
.timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
.samples(config.numQuotaSamples)
.timeWindow(config.quotaWindowSizeSeconds(), TimeUnit.SECONDS)
.samples(config.numQuotaSamples())
.quota(new Quota(quotaLimit, true));
}

Expand Down Expand Up @@ -575,9 +575,8 @@ private void updateQuotaTypes(KafkaQuotaEntity quotaEntity, boolean shouldAdd) {
return;
}

boolean isActive = (quotaCallback instanceof DefaultQuotaCallback defaultCallback)
? defaultCallback.getActiveQuotasEntities().contains(quotaEntity)
: true;
boolean isActive = !(quotaCallback instanceof DefaultQuotaCallback defaultCallback) ||
defaultCallback.getActiveQuotasEntities().contains(quotaEntity);

int activeQuotaType;
if (quotaEntity.userEntity() != null && quotaEntity.clientIdEntity() != null) {
Expand Down
Loading