Skip to content

Commit d5b4fb5

Browse files
committed
xds: Support tracking non-xds resources in XdsDepManager
This will be used for logical dns clusters as part of gRFC A74. Swapping to EnumMap wasn't really necessary, but was easy given the new type system. I can't say I'm particularly happy with the name of the new TrackedWatcher type, but XdsConfigWatcher prevented using "Watcher" because it won't implement the new interface, and ResourceWatcher already exists in XdsClient. So we have TrackedWatcher, WatcherTracer, TypeWatchers, and TrackedWatcherType.
1 parent 8974a30 commit d5b4fb5

File tree

1 file changed

+94
-70
lines changed

1 file changed

+94
-70
lines changed

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

Lines changed: 94 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.grpc.xds.client.XdsClient.ResourceWatcher;
3737
import io.grpc.xds.client.XdsResourceType;
3838
import java.util.Collections;
39+
import java.util.EnumMap;
3940
import java.util.HashMap;
4041
import java.util.HashSet;
4142
import java.util.LinkedHashSet;
@@ -53,8 +54,19 @@
5354
* applies to a single data plane authority.
5455
*/
5556
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
56-
public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
57-
public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
57+
private enum TrackedWatcherTypeEnum {
58+
LDS, RDS, CDS, EDS
59+
}
60+
61+
private static final TrackedWatcherType<XdsListenerResource.LdsUpdate> LDS_TYPE =
62+
new TrackedWatcherType<>(TrackedWatcherTypeEnum.LDS);
63+
private static final TrackedWatcherType<RdsUpdate> RDS_TYPE =
64+
new TrackedWatcherType<>(TrackedWatcherTypeEnum.RDS);
65+
private static final TrackedWatcherType<XdsClusterResource.CdsUpdate> CDS_TYPE =
66+
new TrackedWatcherType<>(TrackedWatcherTypeEnum.CDS);
67+
private static final TrackedWatcherType<XdsEndpointResource.EdsUpdate> EDS_TYPE =
68+
new TrackedWatcherType<>(TrackedWatcherTypeEnum.EDS);
69+
5870
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
5971
private final String listenerName;
6072
private final XdsClient xdsClient;
@@ -63,7 +75,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
6375
private XdsConfigWatcher xdsConfigWatcher;
6476

6577
private StatusOr<XdsConfig> lastUpdate = null;
66-
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
78+
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers =
79+
new EnumMap<>(TrackedWatcherTypeEnum.class);
6780
private final Set<ClusterSubscription> subscriptions = new HashSet<>();
6881

6982
XdsDependencyManager(XdsClient xdsClient,
@@ -86,7 +99,7 @@ public void start(XdsConfigWatcher xdsConfigWatcher) {
8699
checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
87100
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
88101
// start the ball rolling
89-
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
102+
syncContext.execute(() -> addWatcher(LDS_TYPE, new LdsWatcher(listenerName)));
90103
}
91104

92105
@Override
@@ -96,7 +109,7 @@ public XdsConfig.Subscription subscribeToCluster(String clusterName) {
96109
ClusterSubscription subscription = new ClusterSubscription(clusterName);
97110

98111
syncContext.execute(() -> {
99-
if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
112+
if (getWatchers(LDS_TYPE).isEmpty()) {
100113
subscription.closed = true;
101114
return; // shutdown() called
102115
}
@@ -107,33 +120,28 @@ public XdsConfig.Subscription subscribeToCluster(String clusterName) {
107120
return subscription;
108121
}
109122

110-
private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
123+
private <T extends ResourceUpdate> void addWatcher(
124+
TrackedWatcherType<T> watcherType, XdsWatcherBase<T> watcher) {
111125
syncContext.throwIfNotInThisSynchronizationContext();
112126
XdsResourceType<T> type = watcher.type;
113127
String resourceName = watcher.resourceName;
114128

115-
getWatchers(type).put(resourceName, watcher);
129+
getWatchers(watcherType).put(resourceName, watcher);
116130
xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
117131
}
118132

119133
public void shutdown() {
120134
syncContext.execute(() -> {
121135
for (TypeWatchers<?> watchers : resourceWatchers.values()) {
122-
shutdownWatchersForType(watchers);
136+
for (TrackedWatcher<?> watcher : watchers.watchers.values()) {
137+
watcher.close();
138+
}
123139
}
124140
resourceWatchers.clear();
125141
subscriptions.clear();
126142
});
127143
}
128144

129-
private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
130-
for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
131-
xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
132-
watcherEntry.getValue());
133-
watcherEntry.getValue().cancelled = true;
134-
}
135-
}
136-
137145
private void releaseSubscription(ClusterSubscription subscription) {
138146
checkNotNull(subscription, "subscription");
139147
syncContext.execute(() -> {
@@ -154,12 +162,12 @@ private void releaseSubscription(ClusterSubscription subscription) {
154162
*/
155163
private void maybePublishConfig() {
156164
syncContext.throwIfNotInThisSynchronizationContext();
157-
if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
165+
if (getWatchers(LDS_TYPE).isEmpty()) {
158166
return; // shutdown() called
159167
}
160168
boolean waitingOnResource = resourceWatchers.values().stream()
161169
.flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
162-
.anyMatch(XdsWatcherBase::missingResult);
170+
.anyMatch(TrackedWatcher::missingResult);
163171
if (waitingOnResource) {
164172
return;
165173
}
@@ -194,8 +202,8 @@ private static StatusOr<XdsConfig> buildUpdate(
194202

195203
// Iterate watchers and build the XdsConfig
196204

197-
XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher
198-
= tracer.getWatcher(XdsListenerResource.getInstance(), listenerName);
205+
TrackedWatcher<XdsListenerResource.LdsUpdate> ldsWatcher
206+
= tracer.getWatcher(LDS_TYPE, listenerName);
199207
if (ldsWatcher == null) {
200208
return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
201209
"Bug: No listener watcher found for " + listenerName));
@@ -241,14 +249,13 @@ private static StatusOr<XdsConfig> buildUpdate(
241249
return StatusOr.fromValue(builder.build());
242250
}
243251

244-
private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
245-
XdsResourceType<T> resourceType) {
246-
TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
252+
private <T> Map<String, TrackedWatcher<T>> getWatchers(TrackedWatcherType<T> watcherType) {
253+
TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
247254
if (typeWatchers == null) {
248-
typeWatchers = new TypeWatchers<T>(resourceType);
249-
resourceWatchers.put(resourceType, typeWatchers);
255+
typeWatchers = new TypeWatchers<T>(watcherType);
256+
resourceWatchers.put(watcherType.typeEnum, typeWatchers);
250257
}
251-
assert typeWatchers.resourceType == resourceType;
258+
assert typeWatchers.watcherType == watcherType;
252259
@SuppressWarnings("unchecked")
253260
TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
254261
return tTypeWatchers.watchers;
@@ -275,7 +282,7 @@ private static void addConfigForCluster(
275282
return;
276283
}
277284

278-
CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName);
285+
CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CDS_TYPE, clusterName);
279286
StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
280287
if (!cdsWatcherDataOr.hasValue()) {
281288
clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
@@ -318,8 +325,8 @@ private static void addConfigForCluster(
318325
child = new AggregateConfig(ImmutableList.copyOf(leafNames));
319326
break;
320327
case EDS:
321-
XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
322-
tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName());
328+
TrackedWatcher<XdsEndpointResource.EdsUpdate> edsWatcher =
329+
tracer.getWatcher(EDS_TYPE, cdsWatcher.getEdsServiceName());
323330
if (edsWatcher != null) {
324331
child = new EndpointConfig(edsWatcher.getData());
325332
} else {
@@ -346,27 +353,27 @@ private static void addConfigForCluster(
346353
}
347354

348355
private void addRdsWatcher(String resourceName) {
349-
if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) {
356+
if (getWatchers(RDS_TYPE).containsKey(resourceName)) {
350357
return;
351358
}
352359

353-
addWatcher(new RdsWatcher(resourceName));
360+
addWatcher(RDS_TYPE, new RdsWatcher(resourceName));
354361
}
355362

356363
private void addEdsWatcher(String edsServiceName) {
357-
if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) {
364+
if (getWatchers(EDS_TYPE).containsKey(edsServiceName)) {
358365
return;
359366
}
360367

361-
addWatcher(new EdsWatcher(edsServiceName));
368+
addWatcher(EDS_TYPE, new EdsWatcher(edsServiceName));
362369
}
363370

364371
private void addClusterWatcher(String clusterName) {
365-
if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) {
372+
if (getWatchers(CDS_TYPE).containsKey(clusterName)) {
366373
return;
367374
}
368375

369-
addWatcher(new CdsWatcher(clusterName));
376+
addWatcher(CDS_TYPE, new CdsWatcher(clusterName));
370377
}
371378

372379
private void updateRoutes(List<VirtualHost> virtualHosts) {
@@ -404,13 +411,13 @@ private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHos
404411
return clusters;
405412
}
406413

407-
private static class TypeWatchers<T extends ResourceUpdate> {
414+
private static class TypeWatchers<T> {
408415
// Key is resource name
409-
final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
410-
final XdsResourceType<T> resourceType;
416+
final Map<String, TrackedWatcher<T>> watchers = new HashMap<>();
417+
final TrackedWatcherType<T> watcherType;
411418

412-
TypeWatchers(XdsResourceType<T> resourceType) {
413-
this.resourceType = resourceType;
419+
TypeWatchers(TrackedWatcherType<T> watcherType) {
420+
this.watcherType = checkNotNull(watcherType, "watcherType");
414421
}
415422
}
416423

@@ -442,48 +449,46 @@ public void close() {
442449

443450
/** State for tracing garbage collector. */
444451
private static final class WatcherTracer {
445-
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers;
446-
private final Map<XdsResourceType<?>, TypeWatchers<?>> usedWatchers;
452+
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers;
453+
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> usedWatchers;
447454

448-
public WatcherTracer(Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers) {
455+
public WatcherTracer(Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers) {
449456
this.resourceWatchers = resourceWatchers;
450457

451-
this.usedWatchers = new HashMap<>();
452-
for (XdsResourceType<?> type : resourceWatchers.keySet()) {
453-
usedWatchers.put(type, newTypeWatchers(type));
458+
this.usedWatchers = new EnumMap<>(TrackedWatcherTypeEnum.class);
459+
for (Map.Entry<TrackedWatcherTypeEnum, TypeWatchers<?>> me : resourceWatchers.entrySet()) {
460+
usedWatchers.put(me.getKey(), newTypeWatchers(me.getValue().watcherType));
454461
}
455462
}
456463

457-
private static <T extends ResourceUpdate> TypeWatchers<T> newTypeWatchers(
458-
XdsResourceType<T> type) {
464+
private static <T> TypeWatchers<T> newTypeWatchers(TrackedWatcherType<T> type) {
459465
return new TypeWatchers<T>(type);
460466
}
461467

462-
public <T extends ResourceUpdate> XdsWatcherBase<T> getWatcher(
463-
XdsResourceType<T> resourceType, String name) {
464-
TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
468+
public <T> TrackedWatcher<T> getWatcher(TrackedWatcherType<T> watcherType, String name) {
469+
TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
465470
if (typeWatchers == null) {
466471
return null;
467472
}
468-
assert typeWatchers.resourceType == resourceType;
473+
assert typeWatchers.watcherType == watcherType;
469474
@SuppressWarnings("unchecked")
470475
TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
471-
XdsWatcherBase<T> watcher = tTypeWatchers.watchers.get(name);
476+
TrackedWatcher<T> watcher = tTypeWatchers.watchers.get(name);
472477
if (watcher == null) {
473478
return null;
474479
}
475480
@SuppressWarnings("unchecked")
476-
TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(resourceType);
481+
TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(watcherType.typeEnum);
477482
usedTypeWatchers.watchers.put(name, watcher);
478483
return watcher;
479484
}
480485

481486
/** Shut down unused watchers. */
482487
public void closeUnusedWatchers() {
483488
boolean changed = false; // Help out the GC by preferring old objects
484-
for (XdsResourceType<?> type : resourceWatchers.keySet()) {
485-
TypeWatchers<?> orig = resourceWatchers.get(type);
486-
TypeWatchers<?> used = usedWatchers.get(type);
489+
for (TrackedWatcherTypeEnum key : resourceWatchers.keySet()) {
490+
TypeWatchers<?> orig = resourceWatchers.get(key);
491+
TypeWatchers<?> used = usedWatchers.get(key);
487492
for (String name : orig.watchers.keySet()) {
488493
if (used.watchers.containsKey(name)) {
489494
continue;
@@ -498,8 +503,33 @@ public void closeUnusedWatchers() {
498503
}
499504
}
500505

506+
@SuppressWarnings("UnusedTypeParameter")
507+
private static final class TrackedWatcherType<T> {
508+
public final TrackedWatcherTypeEnum typeEnum;
509+
510+
public TrackedWatcherType(TrackedWatcherTypeEnum typeEnum) {
511+
this.typeEnum = checkNotNull(typeEnum, "typeEnum");
512+
}
513+
}
514+
515+
private interface TrackedWatcher<T> {
516+
@Nullable
517+
StatusOr<T> getData();
518+
519+
default boolean missingResult() {
520+
return getData() == null;
521+
}
522+
523+
default boolean hasDataValue() {
524+
StatusOr<T> data = getData();
525+
return data != null && data.hasValue();
526+
}
527+
528+
void close();
529+
}
530+
501531
private abstract class XdsWatcherBase<T extends ResourceUpdate>
502-
implements ResourceWatcher<T> {
532+
implements ResourceWatcher<T>, TrackedWatcher<T> {
503533
private final XdsResourceType<T> type;
504534
private final String resourceName;
505535
boolean cancelled;
@@ -554,24 +584,18 @@ public void onChanged(T update) {
554584

555585
protected abstract void subscribeToChildren(T update);
556586

587+
@Override
557588
public void close() {
558589
cancelled = true;
559590
xdsClient.cancelXdsResourceWatch(type, resourceName, this);
560591
}
561592

562-
boolean missingResult() {
563-
return data == null;
564-
}
565-
593+
@Override
566594
@Nullable
567-
StatusOr<T> getData() {
595+
public StatusOr<T> getData() {
568596
return data;
569597
}
570598

571-
boolean hasDataValue() {
572-
return data != null && data.hasValue();
573-
}
574-
575599
public String toContextString() {
576600
return toContextStr(type.typeName(), resourceName);
577601
}
@@ -622,7 +646,7 @@ private RdsWatcher getRdsWatcher(XdsListenerResource.LdsUpdate update, WatcherTr
622646
if (rdsName == null) {
623647
return null;
624648
}
625-
return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName);
649+
return (RdsWatcher) tracer.getWatcher(RDS_TYPE, rdsName);
626650
}
627651

628652
public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
@@ -688,7 +712,7 @@ public StatusOr<RdsUpdate> getRdsUpdate() {
688712

689713
private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
690714
CdsWatcher(String resourceName) {
691-
super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
715+
super(XdsClusterResource.getInstance(), checkNotNull(resourceName, "resourceName"));
692716
}
693717

694718
@Override
@@ -721,7 +745,7 @@ public String getEdsServiceName() {
721745

722746
private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
723747
private EdsWatcher(String resourceName) {
724-
super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
748+
super(XdsEndpointResource.getInstance(), checkNotNull(resourceName, "resourceName"));
725749
}
726750

727751
@Override

0 commit comments

Comments
 (0)