Skip to content

Commit d13069d

Browse files
committed
RlqsEngine -> RlqsFilterState
1 parent 1a54283 commit d13069d

File tree

3 files changed

+42
-38
lines changed

3 files changed

+42
-38
lines changed

xds/src/main/java/io/grpc/xds/RlqsFilter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
import io.grpc.xds.internal.matchers.OnMatch;
4545
import io.grpc.xds.internal.rlqs.RlqsBucketSettings;
4646
import io.grpc.xds.internal.rlqs.RlqsCache;
47-
import io.grpc.xds.internal.rlqs.RlqsEngine;
47+
import io.grpc.xds.internal.rlqs.RlqsFilterState;
4848
import io.grpc.xds.internal.rlqs.RlqsRateLimitResult;
4949
import java.util.concurrent.ScheduledExecutorService;
5050
import java.util.concurrent.atomic.AtomicReference;
@@ -168,7 +168,7 @@ private ServerInterceptor generateRlqsInterceptor(RlqsFilterConfig config) {
168168
return null;
169169
}
170170

171-
final RlqsEngine rlqsEngine = rlqsCache.getOrCreateRlqsEngine(config);
171+
final RlqsFilterState rlqsFilterState = rlqsCache.getOrCreateFilterState(config);
172172

173173
return new ServerInterceptor() {
174174
@Override
@@ -182,7 +182,7 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(
182182
return next.startCall(call, headers);
183183
}
184184

185-
RlqsRateLimitResult result = rlqsEngine.rateLimit(httpMatchInput);
185+
RlqsRateLimitResult result = rlqsFilterState.rateLimit(httpMatchInput);
186186
if (result.isAllowed()) {
187187
return next.startCall(call, headers);
188188
}

xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsCache.java

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,42 @@
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
2020

21-
import com.google.common.collect.Sets;
21+
import com.google.common.base.Throwables;
2222
import io.grpc.ChannelCredentials;
2323
import io.grpc.InsecureChannelCredentials;
24+
import io.grpc.InternalLogId;
2425
import io.grpc.SynchronizationContext;
2526
import io.grpc.xds.RlqsFilterConfig;
2627
import io.grpc.xds.client.Bootstrapper.RemoteServerInfo;
28+
import io.grpc.xds.client.XdsLogger;
29+
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
2730
import java.util.Objects;
28-
import java.util.Set;
2931
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.ConcurrentMap;
3033
import java.util.concurrent.ScheduledExecutorService;
31-
import java.util.logging.Level;
32-
import java.util.logging.Logger;
3334

3435
public final class RlqsCache {
35-
private static final Logger logger = Logger.getLogger(RlqsCache.class.getName());
36-
3736
// TODO(sergiitk): [QUESTION] always in sync context?
3837
private volatile boolean shutdown = false;
39-
private final SynchronizationContext syncContext = new SynchronizationContext((thread, error) -> {
40-
String message = "Uncaught exception in RlqsCache SynchronizationContext. Panic!";
41-
logger.log(Level.FINE, message, error);
42-
throw new RlqsPoolSynchronizationException(message, error);
43-
});
44-
45-
private final ConcurrentHashMap<Long, RlqsEngine> enginePool = new ConcurrentHashMap<>();
46-
Set<String> enginesToShutdown = Sets.newConcurrentHashSet();
38+
39+
private final XdsLogger logger;
40+
private final SynchronizationContext syncContext;
41+
42+
private final ConcurrentMap<Long, RlqsFilterState> filterStateCache = new ConcurrentHashMap<>();
4743
private final ScheduledExecutorService scheduler;
4844

4945

5046
private RlqsCache(ScheduledExecutorService scheduler) {
5147
this.scheduler = checkNotNull(scheduler, "scheduler");
48+
// TODO(sergiitk): should be filter name?
49+
logger = XdsLogger.withLogId(InternalLogId.allocate(this.getClass(), null));
50+
51+
syncContext = new SynchronizationContext((thread, error) -> {
52+
String message = "Uncaught exception in RlqsCache SynchronizationContext. Panic!";
53+
logger.log(XdsLogLevel.DEBUG,
54+
message + " {0} \nTrace:\n {1}", error, Throwables.getStackTraceAsString(error));
55+
throw new RlqsCacheSynchronizationException(message, error);
56+
});
5257
}
5358

5459
/** Creates an instance. */
@@ -64,37 +69,38 @@ public void shutdown() {
6469
}
6570
syncContext.execute(() -> {
6671
shutdown = true;
67-
logger.log(Level.FINER, "Shutting down RlqsCache");
68-
enginesToShutdown.clear();
69-
for (long configHash : enginePool.keySet()) {
70-
enginePool.get(configHash).shutdown();
72+
logger.log(XdsLogLevel.DEBUG, "Shutting down RlqsCache");
73+
for (long configHash : filterStateCache.keySet()) {
74+
filterStateCache.get(configHash).shutdown();
7175
}
72-
enginePool.clear();
76+
filterStateCache.clear();
7377
shutdown = false;
7478
});
7579
}
7680

77-
public void shutdownRlqsEngine(RlqsFilterConfig oldConfig) {
81+
public void shutdownFilterState(RlqsFilterConfig oldConfig) {
7882
// TODO(sergiitk): shutdown one
83+
// make it async.
7984
}
8085

81-
public RlqsEngine getOrCreateRlqsEngine(final RlqsFilterConfig config) {
82-
long configHash = hashRlqsFilterConfig(config);
83-
return enginePool.computeIfAbsent(configHash, k -> newRlqsEngine(k, config));
86+
public RlqsFilterState getOrCreateFilterState(final RlqsFilterConfig config) {
87+
// TODO(sergiitk): handle being shut down.
88+
long configHash = hashFilterConfig(config);
89+
return filterStateCache.computeIfAbsent(configHash, k -> newFilterState(k, config));
8490
}
8591

86-
private RlqsEngine newRlqsEngine(long configHash, RlqsFilterConfig config) {
92+
private RlqsFilterState newFilterState(long configHash, RlqsFilterConfig config) {
8793
// TODO(sergiitk): [IMPL] get channel creds from the bootstrap.
8894
ChannelCredentials creds = InsecureChannelCredentials.create();
89-
return new RlqsEngine(
95+
return new RlqsFilterState(
9096
RemoteServerInfo.create(config.rlqsService().targetUri(), creds),
9197
config.domain(),
9298
config.bucketMatchers(),
9399
configHash,
94100
scheduler);
95101
}
96102

97-
private long hashRlqsFilterConfig(RlqsFilterConfig config) {
103+
private long hashFilterConfig(RlqsFilterConfig config) {
98104
// TODO(sergiitk): [QUESTION] better name? - ask Eric.
99105
// TODO(sergiitk): [DESIGN] the key should be hashed (domain + buckets) merged config?
100106
// TODO(sergiitk): [IMPL] Hash buckets
@@ -111,13 +117,11 @@ private long hashRlqsFilterConfig(RlqsFilterConfig config) {
111117
/**
112118
* Throws when fail to bootstrap or initialize the XdsClient.
113119
*/
114-
public static final class RlqsPoolSynchronizationException extends RuntimeException {
120+
public static final class RlqsCacheSynchronizationException extends RuntimeException {
115121
private static final long serialVersionUID = 1L;
116122

117-
public RlqsPoolSynchronizationException(String message, Throwable cause) {
123+
public RlqsCacheSynchronizationException(String message, Throwable cause) {
118124
super(message, cause);
119125
}
120126
}
121-
122-
123127
}

xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsEngine.java renamed to xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsFilterState.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import java.util.concurrent.ScheduledFuture;
3434
import java.util.concurrent.TimeUnit;
3535

36-
public class RlqsEngine {
36+
public class RlqsFilterState {
3737
private final XdsLogger logger;
3838

3939
private final RlqsClient rlqsClient;
@@ -43,7 +43,7 @@ public class RlqsEngine {
4343
private final ScheduledExecutorService scheduler;
4444
private final ConcurrentMap<Long, ScheduledFuture<?>> timers = new ConcurrentHashMap<>();
4545

46-
public RlqsEngine(
46+
public RlqsFilterState(
4747
RemoteServerInfo rlqsServer, String domain,
4848
Matcher<HttpMatchInput, RlqsBucketSettings> bucketMatchers, long configHash,
4949
ScheduledExecutorService scheduler) {
@@ -54,7 +54,7 @@ public RlqsEngine(
5454
String prettyHash = "0x" + Long.toHexString(configHash);
5555
logger = XdsLogger.withLogId(InternalLogId.allocate(this.getClass(), prettyHash));
5656
logger.log(XdsLogLevel.DEBUG,
57-
"Initialized RlqsEngine for hash={0}, domain={1}", prettyHash, domain);
57+
"Initialized RlqsFilterState for hash={0}, domain={1}", prettyHash, domain);
5858

5959
bucketCache = new RlqsBucketCache();
6060
rlqsClient = new RlqsClient(rlqsServer, domain, this::onBucketsUpdate, prettyHash);
@@ -130,8 +130,8 @@ private void reportBucketsWithInterval(long intervalMillis) {
130130

131131
public void shutdown() {
132132
// TODO(sergiitk): [IMPL] Timers shutdown
133-
// TODO(sergiitk): [IMPL] RlqsEngine shutdown
134-
logger.log(XdsLogLevel.DEBUG, "Shutting down RlqsEngine with hash {0}", configHash);
133+
// TODO(sergiitk): [IMPL] RlqsFilterState shutdown
134+
logger.log(XdsLogLevel.DEBUG, "Shutting down RlqsFilterState with hash {0}", configHash);
135135
rlqsClient.shutdown();
136136
}
137137
}

0 commit comments

Comments
 (0)