Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support create topic and consumer group for e2e tests #489

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
// TODO: Split controller to a separate port
ControllerServiceImpl controllerService = MetadataStoreBuilder.build(metadataStore);
grpcServer = new GrpcProtocolServer(brokerConfig.proxy(), messagingProcessor, controllerService);
remotingServer = new RemotingProtocolServer(messagingProcessor);
remotingServer = new RemotingProtocolServer(messagingProcessor, metadataStore);

metricsExporter = new MetricsExporter(brokerConfig, messageStore, (ExtendMessagingProcessor) messagingProcessor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.automq.rocketmq.proxy.remoting;

import com.automq.rocketmq.controller.metadata.MetadataStore;
import com.automq.rocketmq.proxy.remoting.activity.AdminActivity;
import com.automq.rocketmq.proxy.remoting.activity.ExtendConsumerManagerActivity;
import com.automq.rocketmq.proxy.remoting.activity.ExtendPullMessageActivity;
import com.automq.rocketmq.proxy.remoting.activity.ExtendSendMessageActivity;
Expand All @@ -30,8 +32,10 @@

public class RemotingProtocolServer extends org.apache.rocketmq.proxy.remoting.RemotingProtocolServer {
private RequestPipeline requestPipeline;
public RemotingProtocolServer(MessagingProcessor messagingProcessor) {
private final MetadataStore metadataStore;
public RemotingProtocolServer(MessagingProcessor messagingProcessor, MetadataStore metadataStore) {
super(messagingProcessor);
this.metadataStore = metadataStore;

Check warning on line 38 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingProtocolServer.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingProtocolServer.java#L38

Added line #L38 was not covered by tests

// Extend some request code to support more features.
extendRequestCode();
Expand Down Expand Up @@ -89,7 +93,11 @@
* Extend the request code to support more features, like CreateTopicRequest.
*/
private void extendRequestCode() {
RemotingServer remotingServer = this.defaultRemotingServer;

Check warning on line 96 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingProtocolServer.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingProtocolServer.java#L96

Added line #L96 was not covered by tests

AdminActivity adminActivity = new AdminActivity(requestPipeline, messagingProcessor, metadataStore);
remotingServer.registerProcessor(RequestCode.UPDATE_AND_CREATE_TOPIC, adminActivity, this.defaultExecutor);
remotingServer.registerProcessor(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, adminActivity, this.defaultExecutor);

Check warning on line 100 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingProtocolServer.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/RemotingProtocolServer.java#L98-L100

Added lines #L98 - L100 were not covered by tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.proxy.remoting.activity;

import apache.rocketmq.controller.v1.AcceptTypes;
import apache.rocketmq.controller.v1.Code;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.MessageType;
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.metadata.MetadataStore;
import com.automq.rocketmq.proxy.remoting.RemotingUtil;
import com.google.common.base.Strings;
import io.netty.channel.ChannelHandlerContext;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.apache.rocketmq.common.attribute.AttributeParser;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.activity.AbstractRemotingActivity;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE;
import static org.apache.rocketmq.remoting.protocol.RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP;
import static org.apache.rocketmq.remoting.protocol.RequestCode.UPDATE_AND_CREATE_TOPIC;

public class AdminActivity extends AbstractRemotingActivity implements CommonRemotingBehavior {
public static final Logger LOGGER = LoggerFactory.getLogger(AdminActivity.class);

Check warning on line 57 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L57

Added line #L57 was not covered by tests
private final MetadataStore metadataStore;

public AdminActivity(RequestPipeline requestPipeline,
MessagingProcessor messagingProcessor, MetadataStore metadataStore) {
super(requestPipeline, messagingProcessor);
this.metadataStore = metadataStore;
}

Check warning on line 64 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L62-L64

Added lines #L62 - L64 were not covered by tests

@Override
protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws Exception {
return switch (request.getCode()) {
case UPDATE_AND_CREATE_TOPIC -> updateAndCreateTopic(ctx, request, context);
case UPDATE_AND_CREATE_SUBSCRIPTIONGROUP -> updateAndCreateSubscriptionGroup(ctx, request, context);
default -> RemotingUtil.codeNotSupportedResponse(request);

Check warning on line 72 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L70-L72

Added lines #L70 - L72 were not covered by tests
};
}

private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) {
RemotingCommand response = RemotingUtil.buildResponseCommand(request, ResponseCode.SUCCESS);

Check warning on line 78 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L78

Added line #L78 was not covered by tests

SubscriptionGroupConfig groupConfig = RemotingSerializable.decode(request.getBody(), SubscriptionGroupConfig.class);
LOGGER.info("Create or update subscription group {} by {} with {}", groupConfig.getGroupName(),
context.getRemoteAddress(), groupConfig);

Check warning on line 82 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L80-L82

Added lines #L80 - L82 were not covered by tests

// Default to standard group type.
GroupType groupType = groupConfig.isConsumeMessageOrderly() ? GroupType.GROUP_TYPE_FIFO : GroupType.GROUP_TYPE_STANDARD;

// Set DLQ to zero here to disable DLQ.
CompletableFuture<Long> groupCf = metadataStore.createGroup(groupConfig.getGroupName(), groupConfig.getRetryMaxTimes(), groupType, 0);

Check warning on line 88 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L88

Added line #L88 was not covered by tests
// TODO: Support update if the group already exists.
groupCf.whenComplete((id, ex) -> {

Check warning on line 90 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L90

Added line #L90 was not covered by tests
if (ex != null) {
LOGGER.error("Failed to create consumer group {}.", groupConfig.getGroupName(), ex);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(ex.getMessage());
return;

Check warning on line 95 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L92-L95

Added lines #L92 - L95 were not covered by tests
}
LOGGER.info("Consumer group {}/{} created or updated by {}", groupConfig.getGroupName(), id, groupConfig);
writeResponse(ctx, context, request, response);
});
return null;

Check warning on line 100 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L97-L100

Added lines #L97 - L100 were not covered by tests
}

private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request,
ProxyContext context) throws RemotingCommandException {
RemotingCommand response = RemotingUtil.buildResponseCommand(request, ResponseCode.SUCCESS);

Check warning on line 105 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L105

Added line #L105 was not covered by tests

final CreateTopicRequestHeader requestHeader =
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);

Check warning on line 108 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L107-L108

Added lines #L107 - L108 were not covered by tests

LOGGER.info("Create or update topic {} by {} with {}", requestHeader.getTopic(), context.getRemoteAddress(),

Check warning on line 110 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L110

Added line #L110 was not covered by tests
requestHeader);

String topicName = requestHeader.getTopic();

Check warning on line 113 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L113

Added line #L113 was not covered by tests

TopicValidator.ValidateTopicResult result = TopicValidator.validateTopic(topicName);

Check warning on line 115 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L115

Added line #L115 was not covered by tests
if (!result.isValid()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(result.getRemark());
return response;

Check warning on line 119 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L117-L119

Added lines #L117 - L119 were not covered by tests
}

if (TopicValidator.isSystemTopic(topicName)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The topic[" + topicName + "] conflict with system topic.");
return response;

Check warning on line 125 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L123-L125

Added lines #L123 - L125 were not covered by tests
}

int queueNums = Math.max(requestHeader.getReadQueueNums(), requestHeader.getWriteQueueNums());
Map<String, String> attributes = AttributeParser.parseToMap(requestHeader.getAttributes());

Check warning on line 129 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L128-L129

Added lines #L128 - L129 were not covered by tests

// Default to normal message type.
TopicMessageType messageType = TopicMessageType.NORMAL;

Check warning on line 132 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L132

Added line #L132 was not covered by tests

String typeVal = attributes.get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName());

Check warning on line 134 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L134

Added line #L134 was not covered by tests
if (!Strings.isNullOrEmpty(typeVal)) {
messageType = TopicMessageType.valueOf(typeVal);

Check warning on line 136 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L136

Added line #L136 was not covered by tests
}

AcceptTypes.Builder builder = AcceptTypes.newBuilder();

Check warning on line 139 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L139

Added line #L139 was not covered by tests
switch (messageType) {
case FIFO -> builder.addTypes(MessageType.FIFO);
case NORMAL -> builder.addTypes(MessageType.NORMAL);
case DELAY -> builder.addTypes(MessageType.DELAY);
case TRANSACTION -> builder.addTypes(MessageType.TRANSACTION);

Check warning on line 144 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L141-L144

Added lines #L141 - L144 were not covered by tests
// For other types, set to unspecified.
default -> builder.addTypes(MessageType.MESSAGE_TYPE_UNSPECIFIED);

Check warning on line 146 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L146

Added line #L146 was not covered by tests
}

CreateTopicRequest topicRequest = CreateTopicRequest
.newBuilder()
.setTopic(topicName)
.setCount(queueNums)
.setAcceptTypes(builder)
.build();

Check warning on line 154 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L150-L154

Added lines #L150 - L154 were not covered by tests

CompletableFuture<Long> topicCf = metadataStore.createTopic(topicRequest);

Check warning on line 156 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L156

Added line #L156 was not covered by tests

// Convert to update topic request if the topic already exists.
topicCf = topicCf.exceptionallyCompose(ex -> {
Throwable t = ExceptionUtils.getRealException(ex);

Check warning on line 160 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L159-L160

Added lines #L159 - L160 were not covered by tests
if (t instanceof ControllerException controllerException) {
if (controllerException.getErrorCode() == Code.DUPLICATED_VALUE) {
return metadataStore.describeTopic(null, topicName).thenCompose(existingTopic -> {

Check warning on line 163 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L163

Added line #L163 was not covered by tests
UpdateTopicRequest updateRequest = UpdateTopicRequest
.newBuilder()
.setTopicId(existingTopic.getTopicId())
.setCount(queueNums)
.setAcceptTypes(builder)
.build();
return metadataStore.updateTopic(updateRequest).thenApply(Topic::getTopicId);

Check warning on line 170 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L165-L170

Added lines #L165 - L170 were not covered by tests
});
}
}
// Rethrow the exception if it's not a duplicated topic error.
throw new CompletionException(t);

Check warning on line 175 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L175

Added line #L175 was not covered by tests
});

topicCf.whenComplete((id, ex) -> {

Check warning on line 178 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L178

Added line #L178 was not covered by tests
if (ex != null) {
LOGGER.error("Failed to create topic {}.", topicName, ex);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(ex.getMessage());
return;

Check warning on line 183 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L180-L183

Added lines #L180 - L183 were not covered by tests
}
LOGGER.info("Topic {}/{} created or updated by {}", topicName, id, requestHeader);
writeResponse(ctx, context, request, response);
});

Check warning on line 187 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L185-L187

Added lines #L185 - L187 were not covered by tests

return null;

Check warning on line 189 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L189

Added line #L189 was not covered by tests
}

@Override
protected RemotingCommand request(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context,
long timeoutMillis) throws Exception {
// The parent class use this method to proxy the request to the broker.
// We disable this behavior here.
return null;

Check warning on line 197 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L197

Added line #L197 was not covered by tests
}

@Override
protected ProxyContext createContext(ChannelHandlerContext ctx, RemotingCommand request) {
return createExtendContext(super.createContext(ctx, request));

Check warning on line 202 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L202

Added line #L202 was not covered by tests
}

@Override
protected void writeResponse(ChannelHandlerContext ctx, ProxyContext context, RemotingCommand request,
RemotingCommand response, Throwable t) {
recordRpcLatency(context, response);
super.writeResponse(ctx, context, request, response, t);
}

Check warning on line 210 in proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/AdminActivity.java#L208-L210

Added lines #L208 - L210 were not covered by tests
}