Skip to content

Commit 2e61939

Browse files
committed
delta xds
1 parent d71b9a2 commit 2e61939

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2017
-340
lines changed

cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ public class CacheStatusInfo<T> implements StatusInfo<T> {
1717
private final T nodeGroup;
1818

1919
private final ConcurrentMap<Long, Watch> watches = new ConcurrentHashMap<>();
20+
private final ConcurrentMap<Long, DeltaWatch> deltaWatches = new ConcurrentHashMap<>();
2021
private volatile long lastWatchRequestTime;
22+
private volatile long lastDeltaWatchRequestTime;
2123

2224
public CacheStatusInfo(T nodeGroup) {
2325
this.nodeGroup = nodeGroup;
@@ -31,6 +33,11 @@ public long lastWatchRequestTime() {
3133
return lastWatchRequestTime;
3234
}
3335

36+
@Override
37+
public long lastDeltaWatchRequestTime() {
38+
return lastDeltaWatchRequestTime;
39+
}
40+
3441
/**
3542
* {@inheritDoc}
3643
*/
@@ -47,6 +54,11 @@ public int numWatches() {
4754
return watches.size();
4855
}
4956

57+
@Override
58+
public int numDeltaWatches() {
59+
return deltaWatches.size();
60+
}
61+
5062
/**
5163
* Removes the given watch from the tracked collection of watches.
5264
*
@@ -56,6 +68,15 @@ public void removeWatch(long watchId) {
5668
watches.remove(watchId);
5769
}
5870

71+
/**
72+
* Removes the given delta watch from the tracked collection of watches.
73+
*
74+
* @param watchId the ID for the delta watch that should be removed
75+
*/
76+
public void removeDeltaWatch(long watchId) {
77+
deltaWatches.remove(watchId);
78+
}
79+
5980
/**
6081
* Sets the timestamp of the last discovery watch request.
6182
*
@@ -65,6 +86,15 @@ public void setLastWatchRequestTime(long lastWatchRequestTime) {
6586
this.lastWatchRequestTime = lastWatchRequestTime;
6687
}
6788

89+
/**
90+
* Sets the timestamp of the last discovery delta watch request.
91+
*
92+
* @param lastDeltaWatchRequestTime the latest delta watch request timestamp
93+
*/
94+
public void setLastDeltaWatchRequestTime(long lastDeltaWatchRequestTime) {
95+
this.lastDeltaWatchRequestTime = lastDeltaWatchRequestTime;
96+
}
97+
6898
/**
6999
* Adds the given watch to the tracked collection of watches.
70100
*
@@ -75,13 +105,30 @@ public void setWatch(long watchId, Watch watch) {
75105
watches.put(watchId, watch);
76106
}
77107

108+
/**
109+
* Adds the given watch to the tracked collection of watches.
110+
*
111+
* @param watchId the ID for the watch that should be added
112+
* @param watch the watch that should be added
113+
*/
114+
public void setDeltaWatch(long watchId, DeltaWatch watch) {
115+
deltaWatches.put(watchId, watch);
116+
}
117+
78118
/**
79119
* Returns the set of IDs for all watched currently being tracked.
80120
*/
81121
public Set<Long> watchIds() {
82122
return ImmutableSet.copyOf(watches.keySet());
83123
}
84124

125+
/**
126+
* Returns the set of IDs for all watched currently being tracked.
127+
*/
128+
public Set<Long> deltaWatchIds() {
129+
return ImmutableSet.copyOf(deltaWatches.keySet());
130+
}
131+
85132
/**
86133
* Iterate over all tracked watches and execute the given function. If it returns {@code true}, then the watch is
87134
* removed from the tracked collection. If it returns {@code false}, then the watch is not removed.
@@ -91,4 +138,15 @@ public Set<Long> watchIds() {
91138
public void watchesRemoveIf(BiFunction<Long, Watch, Boolean> filter) {
92139
watches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue()));
93140
}
141+
142+
/**
143+
* Iterate over all tracked delta watches and execute the given function. If it returns {@code true},
144+
* then the watch is removed from the tracked collection. If it returns {@code false}, then
145+
* the watch is not removed.
146+
*
147+
* @param filter the function to execute on each delta watch
148+
*/
149+
public void deltaWatchesRemoveIf(BiFunction<Long, DeltaWatch, Boolean> filter) {
150+
deltaWatches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue()));
151+
}
94152
}

cache/src/main/java/io/envoyproxy/controlplane/cache/ConfigWatcher.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.envoyproxy.controlplane.cache;
22

3+
import java.util.Map;
34
import java.util.Set;
45
import java.util.function.Consumer;
56
import javax.annotation.concurrent.ThreadSafe;
@@ -28,4 +29,25 @@ Watch createWatch(
2829
Set<String> knownResourceNames,
2930
Consumer<Response> responseConsumer,
3031
boolean hasClusterChanged);
32+
33+
/**
34+
* Returns a new configuration resource {@link Watch} for the given discovery request.
35+
*
36+
* @param request the discovery request (node, names, etc.) to use to generate the watch
37+
* @param requesterVersion the last version applied by the requester
38+
* @param resourceVersions resources that are already known to the requester
39+
* @param pendingResources resources that the caller is waiting for
40+
* @param isWildcard indicates if the stream is in wildcard mode
41+
* @param responseConsumer the response handler, used to process outgoing response messages
42+
* @param hasClusterChanged indicates if EDS should be sent immediately, even if version has not been changed.
43+
* Supported in ADS mode.
44+
*/
45+
DeltaWatch createDeltaWatch(
46+
DeltaXdsRequest request,
47+
String requesterVersion,
48+
Map<String, String> resourceVersions,
49+
Set<String> pendingResources,
50+
boolean isWildcard,
51+
Consumer<DeltaResponse> responseConsumer,
52+
boolean hasClusterChanged);
3153
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.envoyproxy.controlplane.cache;
2+
3+
import com.google.auto.value.AutoValue;
4+
import com.google.protobuf.Message;
5+
import java.util.List;
6+
import java.util.Map;
7+
8+
/**
9+
* {@code Response} is a data class that contains the response for an assumed configuration type.
10+
*/
11+
@AutoValue
12+
public abstract class DeltaResponse {
13+
14+
public static DeltaResponse create(DeltaXdsRequest request,
15+
Map<String, SnapshotResource<?>> resources,
16+
List<String> removedResources,
17+
String version) {
18+
return new AutoValue_DeltaResponse(request, resources, removedResources, version);
19+
}
20+
21+
/**
22+
* Returns the original request associated with the response.
23+
*/
24+
public abstract DeltaXdsRequest request();
25+
26+
/**
27+
* Returns the resources to include in the response.
28+
*/
29+
public abstract Map<String, SnapshotResource<? extends Message>> resources();
30+
31+
/**
32+
* Returns the removed resources to include in the response.
33+
*/
34+
public abstract List<String> removedResources();
35+
36+
/**
37+
* Returns the version of the resources as tracked by the cache for the given type. Envoy responds with this version
38+
* as an acknowledgement.
39+
*/
40+
public abstract String version();
41+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package io.envoyproxy.controlplane.cache;
2+
3+
import java.util.Map;
4+
import java.util.Set;
5+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
6+
import java.util.function.Consumer;
7+
8+
/**
9+
* {@code Watch} is a dedicated stream of configuration resources produced by the configuration cache and consumed by
10+
* the xDS server.
11+
*/
12+
public class DeltaWatch {
13+
private static final AtomicIntegerFieldUpdater<DeltaWatch> isCancelledUpdater =
14+
AtomicIntegerFieldUpdater.newUpdater(DeltaWatch.class, "isCancelled");
15+
private final DeltaXdsRequest request;
16+
private final Consumer<DeltaResponse> responseConsumer;
17+
private final Map<String, String> resourceVersions;
18+
private final Set<String> pendingResources;
19+
private final boolean isWildcard;
20+
private final String version;
21+
private volatile int isCancelled = 0;
22+
private Runnable stop;
23+
24+
/**
25+
* Construct a watch.
26+
*
27+
* @param request the original request for the watch
28+
* @param version indicates the stream current version
29+
* @param isWildcard indicates if the stream is in wildcard mode
30+
* @param responseConsumer handler for outgoing response messages
31+
*/
32+
public DeltaWatch(DeltaXdsRequest request,
33+
Map<String, String> resourceVersions,
34+
Set<String> pendingResources,
35+
String version,
36+
boolean isWildcard,
37+
Consumer<DeltaResponse> responseConsumer) {
38+
this.request = request;
39+
this.resourceVersions = resourceVersions;
40+
this.pendingResources = pendingResources;
41+
this.version = version;
42+
this.isWildcard = isWildcard;
43+
this.responseConsumer = responseConsumer;
44+
}
45+
46+
/**
47+
* Cancel the watch. A watch must be cancelled in order to complete its resource stream and free resources. Cancel
48+
* may be called multiple times, with each subsequent call being a no-op.
49+
*/
50+
public void cancel() {
51+
if (isCancelledUpdater.compareAndSet(this, 0, 1)) {
52+
if (stop != null) {
53+
stop.run();
54+
}
55+
}
56+
}
57+
58+
/**
59+
* Returns boolean indicating whether or not the watch has been cancelled.
60+
*/
61+
public boolean isCancelled() {
62+
return isCancelledUpdater.get(this) == 1;
63+
}
64+
65+
/**
66+
* Returns the original request for the watch.
67+
*/
68+
public DeltaXdsRequest request() {
69+
return request;
70+
}
71+
72+
/**
73+
* Returns the tracked resources for the watch.
74+
*/
75+
public Map<String, String> trackedResources() {
76+
return resourceVersions;
77+
}
78+
79+
/**
80+
* Returns the pending resources for the watch.
81+
*/
82+
public Set<String> pendingResources() {
83+
return pendingResources;
84+
}
85+
86+
/**
87+
* Returns the stream current version.
88+
*/
89+
public String version() {
90+
return version;
91+
}
92+
93+
/**
94+
* Indicates if the stream is in wildcard mode.
95+
*/
96+
public boolean isWildcard() {
97+
return isWildcard;
98+
}
99+
100+
/**
101+
* Sends the given response to the watch's response handler.
102+
*
103+
* @param response the response to be handled
104+
* @throws WatchCancelledException if the watch has already been cancelled
105+
*/
106+
public void respond(DeltaResponse response) throws WatchCancelledException {
107+
if (isCancelled()) {
108+
throw new WatchCancelledException();
109+
}
110+
111+
responseConsumer.accept(response);
112+
}
113+
114+
/**
115+
* Sets the callback method to be executed when the watch is cancelled. Even if cancel is executed multiple times, it
116+
* ensures that this stop callback is only executed once.
117+
*/
118+
public void setStop(Runnable stop) {
119+
this.stop = stop;
120+
}
121+
}

0 commit comments

Comments
 (0)