Skip to content

Commit 0d07ad3

Browse files
committed
Update state checking
1 parent 966fc19 commit 0d07ad3

File tree

11 files changed

+1663
-56
lines changed

11 files changed

+1663
-56
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright 2018, Oracle Corporation and/or its affiliates. All rights reserved.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at http://oss.oracle.com/licenses/upl.
3+
4+
package oracle.kubernetes.operator;
5+
6+
import io.kubernetes.client.ApiException;
7+
import oracle.kubernetes.operator.builders.WatchBuilder;
8+
import oracle.kubernetes.operator.builders.WatchI;
9+
import oracle.kubernetes.operator.watcher.WatchListener;
10+
import io.kubernetes.client.models.V1Event;
11+
12+
import java.util.concurrent.ThreadFactory;
13+
import java.util.concurrent.atomic.AtomicBoolean;
14+
15+
/**
16+
* This class handles Domain watching. It receives domain events and sends
17+
* them into the operator for processing.
18+
*/
19+
public class EventWatcher extends Watcher<V1Event> {
20+
private final String ns;
21+
private final String fieldSelector;
22+
23+
public static EventWatcher create(ThreadFactory factory, String ns, String fieldSelector, String initialResourceVersion, WatchListener<V1Event> listener, AtomicBoolean isStopping) {
24+
EventWatcher watcher = new EventWatcher(ns, fieldSelector, initialResourceVersion, listener, isStopping);
25+
watcher.start(factory);
26+
return watcher;
27+
}
28+
29+
private EventWatcher(String ns, String fieldSelector, String initialResourceVersion, WatchListener<V1Event> listener, AtomicBoolean isStopping) {
30+
super(initialResourceVersion, isStopping, listener);
31+
this.ns = ns;
32+
this.fieldSelector = fieldSelector;
33+
}
34+
35+
@Override
36+
public WatchI<V1Event> initiateWatch(WatchBuilder watchBuilder) throws ApiException {
37+
return watchBuilder
38+
.withFieldSelector(fieldSelector)
39+
.createEventWatch(ns);
40+
}
41+
}

operator/src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import io.kubernetes.client.models.V1ConfigMap;
2929
import io.kubernetes.client.models.V1DeleteOptions;
3030
import io.kubernetes.client.models.V1EnvVar;
31+
import io.kubernetes.client.models.V1Event;
3132
import io.kubernetes.client.models.V1ObjectMeta;
33+
import io.kubernetes.client.models.V1ObjectReference;
3234
import io.kubernetes.client.models.V1PersistentVolumeClaimList;
3335
import io.kubernetes.client.models.V1Pod;
3436
import io.kubernetes.client.models.V1PodList;
@@ -58,12 +60,14 @@
5860
import oracle.kubernetes.operator.helpers.ResponseStep;
5961
import oracle.kubernetes.operator.helpers.RollingHelper;
6062
import oracle.kubernetes.operator.helpers.ServerKubernetesObjects;
63+
import oracle.kubernetes.operator.helpers.ServerKubernetesObjectsFactory;
6164
import oracle.kubernetes.operator.helpers.ServiceHelper;
6265
import oracle.kubernetes.operator.logging.LoggingFacade;
6366
import oracle.kubernetes.operator.logging.LoggingFactory;
6467
import oracle.kubernetes.operator.logging.MessageKeys;
6568
import oracle.kubernetes.operator.rest.RestConfigImpl;
6669
import oracle.kubernetes.operator.rest.RestServer;
70+
import oracle.kubernetes.operator.utils.ConcurrentWeakHashMap;
6771
import oracle.kubernetes.operator.wlsconfig.NetworkAccessPoint;
6872
import oracle.kubernetes.operator.wlsconfig.WlsClusterConfig;
6973
import oracle.kubernetes.operator.wlsconfig.WlsRetriever;
@@ -98,6 +102,8 @@ public class Main {
98102

99103
private static final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
100104
private static final ConcurrentMap<String, DomainPresenceInfo> domains = new ConcurrentHashMap<String, DomainPresenceInfo>();
105+
private static final ConcurrentMap<String, ServerKubernetesObjects> servers = new ConcurrentWeakHashMap<String, ServerKubernetesObjects>();
106+
private static final ServerKubernetesObjectsFactory skoFactory = new ServerKubernetesObjectsFactory(servers);
101107

102108
private static final TuningParameters tuningAndConfig;
103109
static {
@@ -117,8 +123,10 @@ public class Main {
117123
static {
118124
container.getComponents().put(
119125
ProcessingConstants.MAIN_COMPONENT_NAME,
120-
Component.createFor(ScheduledExecutorService.class, wrappedExecutorService,
121-
TuningParameters.class, tuningAndConfig, callBuilderFactory));
126+
Component.createFor(
127+
ScheduledExecutorService.class, wrappedExecutorService,
128+
TuningParameters.class, tuningAndConfig,
129+
callBuilderFactory, skoFactory));
122130
}
123131

124132
private static final Engine engine = new Engine(wrappedExecutorService);
@@ -133,6 +141,7 @@ public class Main {
133141
private static Map<String, ConfigMapWatcher> configMapWatchers = new HashMap<>();
134142
private static Map<String, DomainWatcher> domainWatchers = new HashMap<>();
135143
private static Map<String, PodWatcher> podWatchers = new HashMap<>();
144+
private static Map<String, EventWatcher> eventWatchers = new HashMap<>();
136145
private static Map<String, ServiceWatcher> serviceWatchers = new HashMap<>();
137146
private static Map<String, IngressWatcher> ingressWatchers = new HashMap<>();
138147
private static KubernetesVersion version = null;
@@ -304,9 +313,7 @@ public NextAction onSuccess(Packet packet, V1ServiceList result, int statusCode,
304313
if (info == null) {
305314
info = created;
306315
}
307-
ServerKubernetesObjects csko = new ServerKubernetesObjects();
308-
ServerKubernetesObjects current = info.getServers().putIfAbsent(serverName, csko);
309-
ServerKubernetesObjects sko = current != null ? current : csko;
316+
ServerKubernetesObjects sko = skoFactory.getOrCreate(info, serverName);
310317
if (channelName != null) {
311318
sko.getChannels().put(channelName, service);
312319
} else {
@@ -341,14 +348,13 @@ public NextAction onSuccess(Packet packet, V1PodList result, int statusCode,
341348
if (info == null) {
342349
info = created;
343350
}
344-
ServerKubernetesObjects csko = new ServerKubernetesObjects();
345-
ServerKubernetesObjects current = info.getServers().putIfAbsent(serverName, csko);
346-
ServerKubernetesObjects sko = current != null ? current : csko;
351+
ServerKubernetesObjects sko = skoFactory.getOrCreate(info, serverName);
347352
sko.getPod().set(pod);
348353
}
349354
}
350355
}
351356
podWatchers.put(ns, createPodWatcher(ns, result != null ? result.getMetadata().getResourceVersion() : ""));
357+
eventWatchers.put(ns, createEventWatcher(ns, ""));
352358
return doNext(packet);
353359
}
354360
})));
@@ -1527,6 +1533,40 @@ private static DomainWatcher createDomainWatcher(String namespace, String initia
15271533
initialResourceVersion, Main::dispatchDomainWatch, stopping);
15281534
}
15291535

1536+
private static EventWatcher createEventWatcher(String namespace, String initialResourceVersion) {
1537+
return EventWatcher.create(factory, namespace,
1538+
"reason=Unhealthy,type=Warning,involvedObject.fieldPath=spec.containers{weblogic-server}",
1539+
initialResourceVersion, Main::dispatchEventWatch, stopping);
1540+
}
1541+
1542+
private static void dispatchEventWatch(Watch.Response<V1Event> item) {
1543+
V1Event e = item.object;
1544+
if (e != null) {
1545+
switch (item.type) {
1546+
case "ADDED":
1547+
case "MODIFIED":
1548+
V1ObjectReference ref = e.getInvolvedObject();
1549+
if (ref != null) {
1550+
String name = ref.getName();
1551+
String message = e.getMessage();
1552+
if (message != null) {
1553+
int idx = message.indexOf("Not ready: Server state=");
1554+
if (idx > 0) {
1555+
ServerKubernetesObjects sko = servers.get(name);
1556+
if (sko != null) {
1557+
sko.getLastKnownStatus().set(message.substring(idx + 24));
1558+
}
1559+
}
1560+
}
1561+
}
1562+
break;
1563+
case "DELETED":
1564+
case "ERROR":
1565+
default:
1566+
}
1567+
}
1568+
}
1569+
15301570
private static PodWatcher createPodWatcher(String namespace, String initialResourceVersion) {
15311571
return PodWatcher.create(factory, namespace,
15321572
initialResourceVersion, Main::dispatchPodWatch, stopping);
@@ -1541,9 +1581,7 @@ private static void dispatchPodWatch(Watch.Response<V1Pod> item) {
15411581
if (domainUID != null) {
15421582
DomainPresenceInfo info = domains.get(domainUID);
15431583
if (info != null && serverName != null) {
1544-
ServerKubernetesObjects created = new ServerKubernetesObjects();
1545-
ServerKubernetesObjects current = info.getServers().putIfAbsent(serverName, created);
1546-
ServerKubernetesObjects sko = current != null ? current : created;
1584+
ServerKubernetesObjects sko = skoFactory.getOrCreate(info, serverName);
15471585
if (sko != null) {
15481586
switch (item.type) {
15491587
case "ADDED":
@@ -1594,9 +1632,7 @@ private static void dispatchServiceWatch(Watch.Response<V1Service> item) {
15941632
ServerKubernetesObjects sko = null;
15951633
if (info != null) {
15961634
if (serverName != null) {
1597-
ServerKubernetesObjects created = new ServerKubernetesObjects();
1598-
ServerKubernetesObjects current = info.getServers().putIfAbsent(serverName, created);
1599-
sko = current != null ? current : created;
1635+
sko = skoFactory.getOrCreate(info, serverName);
16001636
}
16011637
switch (item.type) {
16021638
case "ADDED":

operator/src/main/java/oracle/kubernetes/operator/PodWatcher.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@
1414
import oracle.kubernetes.operator.helpers.CallBuilder;
1515
import oracle.kubernetes.operator.helpers.CallBuilderFactory;
1616
import oracle.kubernetes.operator.helpers.ResponseStep;
17+
import oracle.kubernetes.operator.helpers.ServerKubernetesObjects;
18+
import oracle.kubernetes.operator.helpers.ServerKubernetesObjectsFactory;
1719
import oracle.kubernetes.operator.logging.LoggingFacade;
1820
import oracle.kubernetes.operator.logging.LoggingFactory;
1921
import oracle.kubernetes.operator.logging.MessageKeys;
2022
import oracle.kubernetes.operator.watcher.WatchListener;
23+
import oracle.kubernetes.operator.work.Container;
2124
import oracle.kubernetes.operator.work.ContainerResolver;
2225
import oracle.kubernetes.operator.work.NextAction;
2326
import oracle.kubernetes.operator.work.Packet;
@@ -82,6 +85,13 @@ public void receivedResponse(Watch.Response<V1Pod> item) {
8285
Boolean isReady = isReady(pod);
8386
String podName = pod.getMetadata().getName();
8487
if (isReady) {
88+
Container c = ContainerResolver.getInstance().getContainer();
89+
ServerKubernetesObjectsFactory skoFactory = c.getSPI(ServerKubernetesObjectsFactory.class);
90+
ServerKubernetesObjects sko = skoFactory.lookup(podName);
91+
if (sko != null) {
92+
sko.getLastKnownStatus().set("RUNNING");
93+
}
94+
8595
OnReady ready = readyCallbackRegistrations.remove(podName);
8696
if (ready != null) {
8797
ready.onReady();

operator/src/main/java/oracle/kubernetes/operator/ServerStatusReader.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public NextAction apply(Packet packet) {
7676
if (pod != null) {
7777
Packet p = packet.clone();
7878
startDetails.add(new StepAndPacket(
79-
createServerStatusReaderStep(pod, serverName, timeoutSeconds, null), p));
79+
createServerStatusReaderStep(sko, pod, serverName, timeoutSeconds, null), p));
8080
}
8181
}
8282
}
@@ -90,25 +90,28 @@ public NextAction apply(Packet packet) {
9090

9191
/**
9292
* Creates asynchronous step to read WebLogic server state from a particular pod
93+
* @param sko Server objects
9394
* @param pod The pod
9495
* @param serverName Server name
9596
* @param timeoutSeconds Timeout in seconds
9697
* @param next Next step
9798
* @return Created step
9899
*/
99-
public static Step createServerStatusReaderStep(V1Pod pod, String serverName, long timeoutSeconds, Step next) {
100-
return new ServerStatusReaderStep(pod, serverName, timeoutSeconds,
100+
public static Step createServerStatusReaderStep(ServerKubernetesObjects sko, V1Pod pod, String serverName, long timeoutSeconds, Step next) {
101+
return new ServerStatusReaderStep(sko, pod, serverName, timeoutSeconds,
101102
new ServerHealthStep(serverName, next));
102103
}
103104

104105
private static class ServerStatusReaderStep extends Step {
106+
private final ServerKubernetesObjects sko;
105107
private final V1Pod pod;
106108
private final String serverName;
107109
private final long timeoutSeconds;
108110

109-
public ServerStatusReaderStep(V1Pod pod, String serverName,
111+
public ServerStatusReaderStep(ServerKubernetesObjects sko, V1Pod pod, String serverName,
110112
long timeoutSeconds, Step next) {
111113
super(next);
114+
this.sko = sko;
112115
this.pod = pod;
113116
this.serverName = serverName;
114117
this.timeoutSeconds = timeoutSeconds;
@@ -120,7 +123,11 @@ public NextAction apply(Packet packet) {
120123
ConcurrentMap<String, String> serverStateMap = (ConcurrentMap<String, String>) packet
121124
.get(ProcessingConstants.SERVER_STATE_MAP);
122125

123-
if (PodWatcher.isReady(pod, true)) {
126+
String lastKnownState = sko.getLastKnownStatus().get();
127+
if (lastKnownState != null) {
128+
serverStateMap.put(serverName, lastKnownState);
129+
return doNext(packet);
130+
} else if (PodWatcher.isReady(pod, true)) {
124131
serverStateMap.put(serverName, "RUNNING");
125132
return doNext(packet);
126133
}
@@ -155,25 +162,12 @@ public NextAction apply(Packet packet) {
155162
}
156163
}
157164

158-
serverStateMap.put(serverName, parseState(state));
165+
serverStateMap.put(serverName, state != null ? state : "UNKNOWN");
159166
fiber.resume(packet);
160167
});
161168
}
162169
}
163170

164-
private static String parseState(String state) {
165-
// Format of state is "<serverState>:<Y or N, if server started>:<Y or N, if server failed>
166-
String s = "UNKNOWN";
167-
if (state != null) {
168-
int ind = state.indexOf(':');
169-
if (ind > 0) {
170-
s = state.substring(0, ind);
171-
}
172-
}
173-
174-
return s;
175-
}
176-
177171
private static final Set<String> statesSupportingREST = new HashSet<>();
178172
static {
179173
statesSupportingREST.add("STANDBY");

operator/src/main/java/oracle/kubernetes/operator/builders/WatchBuilder.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.kubernetes.client.apis.CoreV1Api;
1111
import io.kubernetes.client.apis.ExtensionsV1beta1Api;
1212
import io.kubernetes.client.models.V1ConfigMap;
13+
import io.kubernetes.client.models.V1Event;
1314
import io.kubernetes.client.models.V1Pod;
1415
import io.kubernetes.client.models.V1Service;
1516
import io.kubernetes.client.models.V1beta1Ingress;
@@ -128,6 +129,36 @@ public Call apply(ApiClient client, CallParams callParams) {
128129
}
129130
}
130131

132+
/**
133+
* Creates a web hook object to track events
134+
* @param namespace the namespace
135+
* @return the active web hook
136+
* @throws ApiException if there is an error on the call that sets up the web hook.
137+
*/
138+
public WatchI<V1Event> createEventWatch(String namespace) throws ApiException {
139+
return FACTORY.createWatch(ClientPool.getInstance(), callParams, V1Event.class, new ListEventCall(namespace));
140+
}
141+
142+
private class ListEventCall implements BiFunction<ApiClient, CallParams, Call> {
143+
private String namespace;
144+
145+
ListEventCall(String namespace) {
146+
this.namespace = namespace;
147+
}
148+
149+
@Override
150+
public Call apply(ApiClient client, CallParams callParams) {
151+
try {
152+
return new CoreV1Api(client).listNamespacedEventCall(namespace, callParams.getPretty(),
153+
START_LIST, callParams.getFieldSelector(), callParams.getIncludeUninitialized(),
154+
callParams.getLabelSelector(), callParams.getLimit(), callParams.getResourceVersion(),
155+
callParams.getTimeoutSeconds(), WATCH, null, null);
156+
} catch (ApiException e) {
157+
throw new UncheckedApiException(e);
158+
}
159+
}
160+
}
161+
131162
/**
132163
* Creates a web hook object to track changes to the cluster ingress
133164
* @param namespace the namespace

operator/src/main/java/oracle/kubernetes/operator/helpers/ConfigMapHelper.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public NextAction apply(Packet packet) {
7676
" exit 1\n" +
7777
"fi\n" +
7878
"if [ -f ${STATEFILE} ] && [ `grep -c \"FAILED_NOT_RESTARTABLE\" ${STATEFILE}` -eq 1 ]; then\n" +
79-
" echo \"Error: WebLogic Server FAILED_NOT_RESTARTABLE.\"\n" +
79+
" echo \"Error: WebLogic Server state is FAILED_NOT_RESTARTABLE.\"\n" +
8080
" exit 1\n" +
8181
"fi\n" +
8282
"exit 0");
@@ -97,10 +97,16 @@ public NextAction apply(Packet packet) {
9797
" exit 1\n" +
9898
"fi\n" +
9999
"\n" +
100-
"if [ ! -f ${STATEFILE} ] || [ `grep -c \"RUNNING\" ${STATEFILE}` -ne 1 ]; then\n" +
101-
" exit 1\n" +
100+
"if [ ! -f ${STATEFILE} ]; then\n" +
101+
" echo \"Error: WebLogic Server state file not found.\"\n" +
102+
" exit 2\n" +
102103
"fi\n" +
103104
"\n" +
105+
"state=$(cat ${STATEFILE} | cut -f 1 -d ':')\n" +
106+
"if [ \"$state\" != \"RUNNING\" ]; then\n" +
107+
" echo \"Not ready: WebLogic Server state: ${state}\"\n" +
108+
" exit 3\n" +
109+
"fi\n" +
104110
"exit 0");
105111

106112
data.put("readState.sh",
@@ -119,11 +125,11 @@ public NextAction apply(Packet packet) {
119125
"fi\n" +
120126
"\n" +
121127
"if [ ! -f ${STATEFILE} ]; then\n" +
122-
" echo \"Error: Server state file not found.\"\n" +
123-
" exit 1\n" +
128+
" echo \"Error: WebLogic Server state file not found.\"\n" +
129+
" exit 2\n" +
124130
"fi\n" +
125131
"\n" +
126-
"cat ${STATEFILE}\n" +
132+
"cat ${STATEFILE} | cut -f 1 -d ':'\n" +
127133
"exit 0");
128134

129135
cm.setData(data);

0 commit comments

Comments
 (0)