Skip to content

Commit e3b2da3

Browse files
authored
OWLS-104759: Pause the watches during periodic LIST and restart afterward with the latest resourceVersion (#3733)
* Pause the watches during periodic LIST and restart afterward with the latest resourceVersion
1 parent 12c5bf6 commit e3b2da3

26 files changed

+372
-250
lines changed

integration-tests/src/test/java/oracle/weblogic/kubernetes/ItDiagnosticsFailedCondition.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,13 @@ void testReplicasTooHigh() {
237237

238238
//check the desired completed, available and failed statuses
239239
checkStatus(domainName, "False", "False", "True");
240-
240+
241+
testUntil(
242+
domainStatusReasonMatches(domainName, domainNamespace, "ReplicasTooHigh"),
243+
getLogger(),
244+
"waiting for domain status condition reason ReplicasTooHigh exists"
245+
);
246+
241247
// remove after debug
242248
String patchStr
243249
= "["
@@ -261,11 +267,6 @@ void testReplicasTooHigh() {
261267
getLogger(),
262268
"waiting for domain status condition reason DomainInvalid exists"
263269
);
264-
testUntil(
265-
domainStatusReasonMatches(domainName, domainNamespace, "ReplicasTooHigh"),
266-
getLogger(),
267-
"waiting for domain status condition reason ReplicasTooHigh exists"
268-
);
269270

270271
patchStr
271272
= "["
@@ -280,11 +281,6 @@ void testReplicasTooHigh() {
280281
getLogger(),
281282
"waiting for domain status condition reason DomainInvalid exists"
282283
);
283-
testUntil(
284-
domainStatusReasonMatches(domainName, domainNamespace, "ReplicasTooHigh"),
285-
getLogger(),
286-
"waiting for domain status condition reason ReplicasTooHigh exists"
287-
);
288284
testPassed = true;
289285
} finally {
290286
if (!testPassed) {

integration-tests/src/test/java/oracle/weblogic/kubernetes/ItMiiClusterResource.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ void testAddReplaceClusterResource() {
233233

234234
//verify the introspector pod is created and runs
235235
String introspectPodNameBase = getIntrospectJobName(domainUid);
236-
checkPodExists(introspectPodNameBase, domainUid, domainNamespace);
236+
ConditionFactory customConditionFactory = createCustomConditionFactory(0, 1, 5);
237+
checkPodExists(customConditionFactory, introspectPodNameBase, domainUid, domainNamespace);
237238
checkPodDoesNotExist(introspectPodNameBase, domainUid, domainNamespace);
238239

239240
// verify managed server services and pods are created
@@ -256,7 +257,6 @@ void testAddReplaceClusterResource() {
256257

257258
//verify the introspector pod is created and runs
258259
String introspectPodNameBase2 = getIntrospectJobName(domainUid);
259-
ConditionFactory customConditionFactory = createCustomConditionFactory(0, 1, 5);
260260
checkPodExists(customConditionFactory, introspectPodNameBase2, domainUid, domainNamespace);
261261
checkPodDoesNotExist(introspectPodNameBase2, domainUid, domainNamespace);
262262

@@ -548,7 +548,18 @@ void testSharedClusterResource() {
548548
domainUid, domainNamespace,
549549
MII_BASIC_IMAGE_NAME + ":" + MII_BASIC_IMAGE_TAG);
550550
createDomainAndVerify(domain, domainNamespace);
551-
551+
552+
logger.info("Wait for admin server pod {0} to be ready in namespace {1}",
553+
adminServerPodName, domainNamespace);
554+
checkPodReadyAndServiceExists(adminServerPodName,domainUid,domainNamespace);
555+
556+
// verify managed server services and pods are created
557+
for (int i = 1; i <= replicaCount; i++) {
558+
logger.info("Wait for managed pod {0} to be ready in namespace {1}",
559+
managedServerPrefix + i, domainNamespace);
560+
checkPodReadyAndServiceExists(managedServerPrefix + i, domainUid, domainNamespace);
561+
}
562+
552563
// create and deploy domain resource with cluster reference
553564
DomainResource domain2 = createDomainResource(domain2Uid,
554565
domainNamespace, adminSecretName,
@@ -562,17 +573,6 @@ void testSharedClusterResource() {
562573
MII_BASIC_IMAGE_NAME + ":" + MII_BASIC_IMAGE_TAG);
563574
createDomainAndVerify(domain2, domainNamespace);
564575

565-
logger.info("Wait for admin server pod {0} to be ready in namespace {1}",
566-
adminServerPodName, domainNamespace);
567-
checkPodReadyAndServiceExists(adminServerPodName,domainUid,domainNamespace);
568-
569-
// verify managed server services and pods are created
570-
for (int i = 1; i <= replicaCount; i++) {
571-
logger.info("Wait for managed pod {0} to be ready in namespace {1}",
572-
managedServerPrefix + i, domainNamespace);
573-
checkPodReadyAndServiceExists(managedServerPrefix + i, domainUid, domainNamespace);
574-
}
575-
576576
testUntil(withLongRetryPolicy,
577577
checkDomainFailedEventWithReason(opNamespace, domainNamespace,
578578
domain2Uid, "Domain validation error", "Warning", timestamp),
@@ -982,7 +982,8 @@ void testManageClusterResource() {
982982

983983
//verify the introspector pod is created and runs
984984
String introspectPodNameBase = getIntrospectJobName(domainUid);
985-
checkPodExists(introspectPodNameBase, domainUid, domainNamespace);
985+
ConditionFactory customConditionFactory = createCustomConditionFactory(0, 1, 5);
986+
checkPodExists(customConditionFactory, introspectPodNameBase, domainUid, domainNamespace);
986987
checkPodDoesNotExist(introspectPodNameBase, domainUid, domainNamespace);
987988

988989
verifyPodsNotRolled(domainNamespace,c1Time);

operator/src/main/java/oracle/kubernetes/operator/DomainNamespaces.java

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ void stopAllWatchers() {
113113
void stopNamespace(String ns) {
114114
namespaceStoppingMap.remove(ns).set(true);
115115
namespaceStatuses.remove(ns);
116-
117116
clusterWatchers.removeWatcher(ns);
118117
domainWatchers.removeWatcher(ns);
119118
eventWatchers.removeWatcher(ns);
@@ -188,7 +187,7 @@ static ThreadFactory getThreadFactory() {
188187
* @param processor processing to be done to bring up any found domains
189188
*/
190189
Step readExistingResources(String ns, DomainProcessor processor) {
191-
NamespacedResources resources = new NamespacedResources(ns, null);
190+
NamespacedResources resources = new NamespacedResources(ns, null, this);
192191
resources.addProcessing(new DomainResourcesValidation(ns, processor).getProcessors());
193192
resources.addProcessing(createWatcherStartupProcessing(ns, processor));
194193
return Step.chain(ConfigMapHelper.createScriptConfigMapStep(ns, productVersion), resources.createListSteps());
@@ -226,6 +225,7 @@ private WatcherControl(WatcherFactory<T, W> factory, ListenerSelector<T> selecto
226225

227226
void startWatcher(String namespace, String resourceVersion, DomainProcessor domainProcessor) {
228227
watchers.computeIfAbsent(namespace, n -> createWatcher(n, resourceVersion, selector.apply(domainProcessor)));
228+
getWatcher(namespace).withResourceVersion(resourceVersion).resume();
229229
}
230230

231231
W createWatcher(String ns, String resourceVersion, WatchListener<T> listener) {
@@ -299,4 +299,70 @@ public Consumer<ClusterList> getClusterListProcessing() {
299299
return l -> clusterWatchers.startWatcher(ns, getResourceVersion(l), domainProcessor);
300300
}
301301
}
302+
303+
Processors createWatcherResumeProcessing(String ns) {
304+
return new WatcherResumeProcessing(ns);
305+
}
306+
307+
class WatcherResumeProcessing implements Processors {
308+
private final String ns;
309+
310+
WatcherResumeProcessing(String ns) {
311+
this.ns = ns;
312+
}
313+
314+
@Override
315+
public Consumer<V1ConfigMapList> getConfigMapListProcessing() {
316+
return l -> Optional.ofNullable(configMapWatchers.getWatcher(ns))
317+
.ifPresent(w -> w.withResourceVersion(getResourceVersion(l)).resume());
318+
}
319+
320+
@Override
321+
public Consumer<CoreV1EventList> getEventListProcessing() {
322+
return l -> Optional.ofNullable(eventWatchers.getWatcher(ns))
323+
.ifPresent(w -> w.withResourceVersion(getResourceVersion(l)).resume());
324+
}
325+
326+
@Override
327+
public Consumer<CoreV1EventList> getOperatorEventListProcessing() {
328+
return l -> Optional.ofNullable(operatorEventWatchers.getWatcher(ns))
329+
.ifPresent(w -> w.withResourceVersion(getResourceVersion(l)).resume());
330+
}
331+
332+
@Override
333+
public Consumer<V1JobList> getJobListProcessing() {
334+
return l -> Optional.ofNullable(jobWatchers.getWatcher(ns))
335+
.ifPresent(w -> w.withResourceVersion(getResourceVersion(l)).resume());
336+
}
337+
338+
@Override
339+
public Consumer<V1PodList> getPodListProcessing() {
340+
return l -> Optional.ofNullable(podWatchers.getWatcher(ns))
341+
.ifPresent(w -> w.withResourceVersion(getResourceVersion(l)).resume());
342+
}
343+
344+
@Override
345+
public Consumer<V1ServiceList> getServiceListProcessing() {
346+
return l -> Optional.ofNullable(serviceWatchers.getWatcher(ns))
347+
.ifPresent(w -> w.withResourceVersion(getResourceVersion(l)).resume());
348+
}
349+
350+
@Override
351+
public Consumer<V1PodDisruptionBudgetList> getPodDisruptionBudgetListProcessing() {
352+
return l -> Optional.ofNullable(podDisruptionBudgetWatchers.getWatcher(ns))
353+
.ifPresent(w -> w.withResourceVersion(getResourceVersion(l)).resume());
354+
}
355+
356+
@Override
357+
public Consumer<DomainList> getDomainListProcessing() {
358+
return l -> Optional.ofNullable(domainWatchers.getWatcher(ns))
359+
.ifPresent(w -> w.withResourceVersion(getResourceVersion(l)).resume());
360+
}
361+
362+
@Override
363+
public Consumer<ClusterList> getClusterListProcessing() {
364+
return l -> Optional.ofNullable(clusterWatchers.getWatcher(ns))
365+
.ifPresent(w -> w.withResourceVersion(getResourceVersion(l)).resume());
366+
}
367+
}
302368
}

operator/src/main/java/oracle/kubernetes/operator/DomainProcessor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33

44
package oracle.kubernetes.operator;
55

6+
import java.util.Map;
67
import java.util.Set;
8+
import java.util.concurrent.ConcurrentHashMap;
79
import java.util.stream.Stream;
810

911
import io.kubernetes.client.openapi.models.CoreV1Event;
@@ -100,4 +102,12 @@ default void reportSuspendedFibers() {
100102
default Stream<DomainPresenceInfo> findStrandedDomainPresenceInfos(String namespace, Set<String> domainUids) {
101103
return Stream.empty();
102104
}
105+
106+
/**
107+
* Get the map of domain presence infos.
108+
* @return Map of cached domain presence infos.
109+
*/
110+
default Map<String, Map<String,DomainPresenceInfo>> getDomainPresenceInfoMap() {
111+
return new ConcurrentHashMap<>();
112+
}
103113
}

operator/src/main/java/oracle/kubernetes/operator/DomainProcessorDelegate.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,6 @@ default MakeRightDomainOperation createMakeRightOperation(MakeRightExecutor exec
5454
default MakeRightClusterOperation createMakeRightOperation(MakeRightExecutor executor, ClusterPresenceInfo info) {
5555
return new MakeRightClusterOperationImpl(executor, this, info);
5656
}
57+
58+
DomainNamespaces getDomainNamespaces();
5759
}

operator/src/main/java/oracle/kubernetes/operator/DomainProcessorImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,11 @@ private static DomainPresenceInfo getExistingDomainPresenceInfo(DomainPresenceIn
136136
return getExistingDomainPresenceInfo(newPresence.getNamespace(), newPresence.getDomainUid());
137137
}
138138

139+
@Override
140+
public Map<String, Map<String,DomainPresenceInfo>> getDomainPresenceInfoMap() {
141+
return domains;
142+
}
143+
139144
private static List<DomainPresenceInfo> getExistingDomainPresenceInfoForCluster(String ns, String cluster) {
140145
List<DomainPresenceInfo> referencingDomains = new ArrayList<>();
141146
Optional.ofNullable(domains.get(ns)).ifPresent(d -> d.values().stream()

operator/src/main/java/oracle/kubernetes/operator/DomainResourcesValidation.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package oracle.kubernetes.operator;
55

6+
import java.util.Collection;
67
import java.util.List;
78
import java.util.Map;
89
import java.util.Optional;
@@ -40,7 +41,6 @@
4041
class DomainResourcesValidation {
4142
private final String namespace;
4243
private final DomainProcessor processor;
43-
private final Map<String, DomainPresenceInfo> domainPresenceInfoMap = new ConcurrentHashMap<>();
4444
private ClusterList activeClusterResources;
4545

4646
DomainResourcesValidation(String namespace, DomainProcessor processor) {
@@ -102,9 +102,18 @@ private boolean isForDomain(ClusterResource clusterResource, DomainPresenceInfo
102102
}
103103

104104
private void addPodList(V1PodList list) {
105+
getDomainPresenceInfoMap().values().stream().forEach(dpi -> removeDeletedPodsFromDPI(list, dpi));
105106
list.getItems().forEach(this::addPod);
106107
}
107108

109+
private void removeDeletedPodsFromDPI(V1PodList list, DomainPresenceInfo dpi) {
110+
Collection<String> serverNamesFromPodList = list.getItems().stream()
111+
.map(PodHelper::getPodServerName).collect(Collectors.toList());
112+
113+
dpi.getServerNames().stream().filter(s -> !serverNamesFromPodList.contains(s)).collect(Collectors.toList())
114+
.forEach(name -> dpi.deleteServerPodFromEvent(name, null));
115+
}
116+
108117
private void addEvent(CoreV1Event event) {
109118
DomainProcessorImpl.updateEventK8SObjects(event);
110119
}
@@ -117,12 +126,16 @@ private void addPod(V1Pod pod) {
117126
String domainUid = PodHelper.getPodDomainUid(pod);
118127
String serverName = PodHelper.getPodServerName(pod);
119128
if (domainUid != null && serverName != null) {
120-
getDomainPresenceInfo(domainUid).setServerPod(serverName, pod);
129+
getDomainPresenceInfo(domainUid).setServerPodFromEvent(serverName, pod);
121130
}
122131
}
123132

124133
private DomainPresenceInfo getDomainPresenceInfo(String domainUid) {
125-
return domainPresenceInfoMap.computeIfAbsent(domainUid, k -> new DomainPresenceInfo(namespace, domainUid));
134+
return getDomainPresenceInfoMap().computeIfAbsent(domainUid, k -> new DomainPresenceInfo(namespace, domainUid));
135+
}
136+
137+
private Map<String,DomainPresenceInfo> getDomainPresenceInfoMap() {
138+
return processor.getDomainPresenceInfoMap().computeIfAbsent(namespace, k -> new ConcurrentHashMap<>());
126139
}
127140

128141
private void addServiceList(V1ServiceList list) {
@@ -148,9 +161,19 @@ private void addPodDisruptionBudget(V1PodDisruptionBudget pdb) {
148161
}
149162

150163
private void addDomainList(DomainList list) {
164+
getDomainPresenceInfoMap().values().stream().forEach(dpi -> updateDeletedDomainsinDPI(list));
151165
list.getItems().forEach(this::addDomain);
152166
}
153167

168+
private void updateDeletedDomainsinDPI(DomainList list) {
169+
Collection<String> domainNamesFromList = list.getItems().stream()
170+
.map(DomainResource::getDomainUid).collect(Collectors.toList());
171+
172+
getDomainPresenceInfoMap().values().stream()
173+
.filter(dpi -> !domainNamesFromList.contains(dpi.getDomainUid())).collect(Collectors.toList())
174+
.forEach(i -> getDomainPresenceInfo(i.getDomainUid()).setDomain(null));
175+
}
176+
154177
private void addDomain(DomainResource domain) {
155178
getDomainPresenceInfo(domain.getDomainUid()).setDomain(domain);
156179
}
@@ -161,8 +184,8 @@ private void addClusterList(ClusterList list) {
161184

162185
private Stream<DomainPresenceInfo> getStrandedDomainPresenceInfos(DomainProcessor dp) {
163186
return Stream.concat(
164-
domainPresenceInfoMap.values().stream().filter(this::isStranded),
165-
dp.findStrandedDomainPresenceInfos(namespace, domainPresenceInfoMap.keySet()));
187+
getDomainPresenceInfoMap().values().stream().filter(this::isStranded),
188+
dp.findStrandedDomainPresenceInfos(namespace, getDomainPresenceInfoMap().keySet()));
166189
}
167190

168191
private boolean isStranded(DomainPresenceInfo dpi) {
@@ -176,7 +199,7 @@ private static void removeStrandedDomainPresenceInfo(DomainProcessor dp, DomainP
176199
}
177200

178201
private Stream<DomainPresenceInfo> getActiveDomainPresenceInfos() {
179-
return domainPresenceInfoMap.values().stream().filter(this::isActive);
202+
return getDomainPresenceInfoMap().values().stream().filter(this::isActive);
180203
}
181204

182205
private boolean isActive(DomainPresenceInfo dpi) {

operator/src/main/java/oracle/kubernetes/operator/DomainStatusUpdater.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import oracle.kubernetes.operator.helpers.DomainPresenceInfo;
3838
import oracle.kubernetes.operator.helpers.EventHelper;
3939
import oracle.kubernetes.operator.helpers.EventHelper.EventData;
40+
import oracle.kubernetes.operator.helpers.LastKnownStatus;
4041
import oracle.kubernetes.operator.helpers.PodHelper;
4142
import oracle.kubernetes.operator.helpers.ResponseStep;
4243
import oracle.kubernetes.operator.logging.LoggingFacade;
@@ -1280,10 +1281,15 @@ private String getRunningState(String serverName) {
12801281
} else if (isDeleting(serverName)) {
12811282
return SHUTTING_DOWN_STATE;
12821283
} else {
1283-
return Optional.ofNullable(serverState).map(m -> m.get(serverName)).orElse(null);
1284+
return Optional.ofNullable(getInfo().getLastKnownServerStatus(serverName))
1285+
.map(LastKnownStatus::getStatus).orElse(getStateFromPacket(serverName));
12841286
}
12851287
}
12861288

1289+
private String getStateFromPacket(String serverName) {
1290+
return Optional.ofNullable(serverState).map(m -> m.get(serverName)).orElse(null);
1291+
}
1292+
12871293
private boolean isDeleting(String serverName) {
12881294
return Optional.ofNullable(getInfo().getServerPod(serverName)).map(PodHelper::isDeleting).orElse(false);
12891295
}

operator/src/main/java/oracle/kubernetes/operator/MakeRightExecutor.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,13 @@ public interface MakeRightExecutor {
3030
* @param processors the processing to be done
3131
* @param info the presence info which encapsulates the domain
3232
*/
33-
default Step createNamespacedResourceSteps(Processors processors, DomainPresenceInfo info) {
34-
NamespacedResources resources = new NamespacedResources(info.getNamespace(), info.getDomainUid());
33+
default Step createNamespacedResourceSteps(Processors processors, DomainPresenceInfo info,
34+
DomainNamespaces domainNamespaces) {
35+
NamespacedResources resources = new NamespacedResources(info.getNamespace(), info.getDomainUid(), domainNamespaces);
3536
resources.addProcessing(processors);
37+
if (domainNamespaces != null) {
38+
resources.addProcessing(domainNamespaces.createWatcherResumeProcessing(info.getNamespace()));
39+
}
3640
return resources.createListSteps();
3741
}
3842

0 commit comments

Comments
 (0)