Skip to content

Commit b95a57a

Browse files
committed
improved getOrCreateRlqsEngine
1 parent 8ed3b95 commit b95a57a

File tree

3 files changed

+19
-35
lines changed

3 files changed

+19
-35
lines changed

xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ public class RlqsBucket {
3030

3131
private final RateLimitStrategy noAssignmentStrategy;
3232
private final RateLimitStrategy expiredAssignmentStrategy;
33+
private final DenyResponse denyResponse;
3334

3435
// TODO(sergiitk): [impl] consider AtomicLongFieldUpdater
3536
private final AtomicLong lastSnapshotTimeNanos = new AtomicLong(-1);
3637
private final AtomicLong numRequestsAllowed = new AtomicLong();
3738
private final AtomicLong numRequestsDenied = new AtomicLong();
38-
private final DenyResponse denyResponse;
3939

4040
// TODO(sergiitk): [impl] consider AtomicReferenceFieldUpdater
4141
@Nullable

xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucketCache.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ final class RlqsBucketCache {
3131
private final ConcurrentMap<Long, Set<RlqsBucket>> bucketsPerInterval = new ConcurrentHashMap<>();
3232
private final ConcurrentMap<RlqsBucketId, RlqsBucket> buckets = new ConcurrentHashMap<>();
3333

34-
RlqsBucket getOrCreate(
34+
public RlqsBucket getOrCreate(
3535
RlqsBucketId bucketId, RlqsBucketSettings bucketSettings, Consumer<RlqsBucket> onCreate) {
3636
// read synchronize trick
3737
RlqsBucket bucket = buckets.get(bucketId);
@@ -49,7 +49,7 @@ RlqsBucket getOrCreate(
4949
}
5050
}
5151

52-
void deleteBucket(RlqsBucketId bucketId) {
52+
public void deleteBucket(RlqsBucketId bucketId) {
5353
RlqsBucket bucket = buckets.get(bucketId);
5454
if (bucket == null) {
5555
return;
@@ -63,7 +63,8 @@ void deleteBucket(RlqsBucketId bucketId) {
6363
}
6464
}
6565

66-
void updateBucket(RlqsBucketId bucketId, RateLimitStrategy rateLimitStrategy, long ttlMillis) {
66+
public void updateBucket(
67+
RlqsBucketId bucketId, RateLimitStrategy rateLimitStrategy, long ttlMillis) {
6768
RlqsBucket bucket = buckets.get(bucketId);
6869
bucket.updateAction(rateLimitStrategy, ttlMillis);
6970
}

xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsCache.java

Lines changed: 14 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,14 @@
1919
import static com.google.common.base.Preconditions.checkNotNull;
2020

2121
import com.google.common.collect.Sets;
22-
import com.google.common.util.concurrent.SettableFuture;
22+
import io.grpc.ChannelCredentials;
2323
import io.grpc.InsecureChannelCredentials;
2424
import io.grpc.SynchronizationContext;
2525
import io.grpc.xds.RlqsFilterConfig;
2626
import io.grpc.xds.client.Bootstrapper.RemoteServerInfo;
2727
import java.util.Set;
2828
import java.util.concurrent.ConcurrentHashMap;
29-
import java.util.concurrent.ExecutionException;
3029
import java.util.concurrent.ScheduledExecutorService;
31-
import java.util.concurrent.TimeUnit;
32-
import java.util.concurrent.TimeoutException;
3330
import java.util.logging.Level;
3431
import java.util.logging.Logger;
3532

@@ -80,34 +77,20 @@ public void shutdownRlqsEngine(RlqsFilterConfig oldConfig) {
8077
// TODO(sergiitk): shutdown one
8178
}
8279

83-
public RlqsEngine getOrCreateRlqsEngine(RlqsFilterConfig config) {
84-
final String configHash = hashRlqsFilterConfig(config);
85-
if (enginePool.containsKey(configHash)) {
86-
return enginePool.get(configHash);
87-
}
80+
public RlqsEngine getOrCreateRlqsEngine(final RlqsFilterConfig config) {
81+
String configHash = hashRlqsFilterConfig(config);
82+
return enginePool.computeIfAbsent(configHash, k -> newRlqsEngine(k, config));
83+
}
8884

89-
final SettableFuture<RlqsEngine> future = SettableFuture.create();
90-
syncContext.execute(() -> {
91-
// TODO(sergiitk): [IMPL] get channel creds from the bootstrap.
92-
RemoteServerInfo rlqsServer = RemoteServerInfo.create(config.rlqsService().targetUri(),
93-
InsecureChannelCredentials.create());
94-
RlqsEngine rlqsEngine = new RlqsEngine(
95-
rlqsServer,
96-
config.domain(),
97-
config.bucketMatchers(),
98-
configHash,
99-
scheduler);
100-
101-
enginePool.put(configHash, rlqsEngine);
102-
future.set(enginePool.get(configHash));
103-
});
104-
try {
105-
// TODO(sergiitk): [IMPL] clarify time
106-
return future.get(1, TimeUnit.SECONDS);
107-
} catch (InterruptedException | ExecutionException | TimeoutException e) {
108-
// TODO(sergiitk): [IMPL] handle properly
109-
throw new RuntimeException(e);
110-
}
85+
private RlqsEngine newRlqsEngine(String configHash, RlqsFilterConfig config) {
86+
// TODO(sergiitk): [IMPL] get channel creds from the bootstrap.
87+
ChannelCredentials creds = InsecureChannelCredentials.create();
88+
return new RlqsEngine(
89+
RemoteServerInfo.create(config.rlqsService().targetUri(), creds),
90+
config.domain(),
91+
config.bucketMatchers(),
92+
configHash,
93+
scheduler);
11194
}
11295

11396
private String hashRlqsFilterConfig(RlqsFilterConfig config) {

0 commit comments

Comments
 (0)