Skip to content

Commit 5363ec6

Browse files
committed
wip
Signed-off-by: Attila Mészáros <[email protected]>
1 parent 0856520 commit 5363ec6

File tree

11 files changed

+212
-77
lines changed

11 files changed

+212
-77
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/BaseControl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public void expect(BiPredicate<P, ExpectationContext<P>> expectation, Duration t
4141
this.expectation = new ExpectationAdapter<>(expectation, timeout);
4242
}
4343

44-
public Expectation<P> getExpectation() {
45-
return expectation;
44+
public Optional<Expectation<P>> getExpectation() {
45+
return Optional.ofNullable(expectation);
4646
}
4747
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package io.javaoperatorsdk.operator.api.reconciler;
2+
3+
import java.util.Optional;
4+
import java.util.Set;
5+
import java.util.stream.Collectors;
6+
import java.util.stream.Stream;
7+
8+
import io.fabric8.kubernetes.api.model.HasMetadata;
9+
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
10+
import io.javaoperatorsdk.operator.processing.Controller;
11+
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
12+
import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException;
13+
14+
public class DefaultCacheAware<P extends HasMetadata> implements CacheAware<P> {
15+
16+
protected final Controller<P> controller;
17+
protected final P primaryResource;
18+
19+
public DefaultCacheAware(Controller<P> controller, P primaryResource) {
20+
this.controller = controller;
21+
this.primaryResource = primaryResource;
22+
}
23+
24+
@Override
25+
public <T> Set<T> getSecondaryResources(Class<T> expectedType) {
26+
return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet());
27+
}
28+
29+
@Override
30+
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
31+
return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
32+
.map(es -> es.getSecondaryResources(primaryResource))
33+
.flatMap(Set::stream);
34+
}
35+
36+
@Override
37+
public <T> Optional<T> getSecondaryResource(Class<T> expectedType, String eventSourceName) {
38+
try {
39+
return controller
40+
.getEventSourceManager()
41+
.getEventSourceFor(expectedType, eventSourceName)
42+
.getSecondaryResource(primaryResource);
43+
} catch (NoEventSourceForClassException e) {
44+
/*
45+
* If a workflow has an activation condition there can be event sources which are only
46+
* registered if the activation condition holds, but to provide a consistent API we return an
47+
* Optional instead of throwing an exception.
48+
*
49+
* Note that not only the resource which has an activation condition might not be registered
50+
* but dependents which depend on it.
51+
*/
52+
if (eventSourceName == null && controller.workflowContainsDependentForType(expectedType)) {
53+
return Optional.empty();
54+
} else {
55+
throw e;
56+
}
57+
}
58+
}
59+
60+
@Override
61+
public EventSourceRetriever<P> eventSourceRetriever() {
62+
return controller.getEventSourceManager();
63+
}
64+
65+
@Override
66+
public ControllerConfiguration<P> getControllerConfiguration() {
67+
return controller.getConfiguration();
68+
}
69+
70+
@Override
71+
public P getPrimaryResource() {
72+
return primaryResource;
73+
}
74+
75+
@Override
76+
public IndexedResourceCache<P> getPrimaryCache() {
77+
return controller.getEventSourceManager().getControllerEventSource();
78+
}
79+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java

Lines changed: 13 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,96 +1,48 @@
11
package io.javaoperatorsdk.operator.api.reconciler;
22

33
import java.util.Optional;
4-
import java.util.Set;
54
import java.util.concurrent.ExecutorService;
6-
import java.util.stream.Collectors;
7-
import java.util.stream.Stream;
85

96
import io.fabric8.kubernetes.api.model.HasMetadata;
107
import io.fabric8.kubernetes.client.KubernetesClient;
11-
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
128
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedWorkflowAndDependentResourceContext;
139
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedWorkflowAndDependentResourceContext;
1410
import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationResult;
1511
import io.javaoperatorsdk.operator.processing.Controller;
16-
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
17-
import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException;
1812
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1913

20-
public class DefaultContext<P extends HasMetadata> implements Context<P> {
14+
public class DefaultContext<P extends HasMetadata> extends DefaultCacheAware<P>
15+
implements Context<P> {
2116

2217
private RetryInfo retryInfo;
23-
private final Controller<P> controller;
24-
private final P primaryResource;
25-
private final ControllerConfiguration<P> controllerConfiguration;
18+
2619
private final DefaultManagedWorkflowAndDependentResourceContext<P>
2720
defaultManagedDependentResourceContext;
2821

29-
public DefaultContext(RetryInfo retryInfo, Controller<P> controller, P primaryResource) {
22+
private final ExpectationResult<P> expectationResult;
23+
24+
public DefaultContext(
25+
RetryInfo retryInfo,
26+
Controller<P> controller,
27+
P primaryResource,
28+
ExpectationResult<P> expectationResult) {
29+
super(controller, primaryResource);
3030
this.retryInfo = retryInfo;
31-
this.controller = controller;
32-
this.primaryResource = primaryResource;
33-
this.controllerConfiguration = controller.getConfiguration();
3431
this.defaultManagedDependentResourceContext =
3532
new DefaultManagedWorkflowAndDependentResourceContext<>(controller, primaryResource, this);
33+
this.expectationResult = expectationResult;
3634
}
3735

3836
@Override
3937
public Optional<RetryInfo> getRetryInfo() {
4038
return Optional.ofNullable(retryInfo);
4139
}
4240

43-
@Override
44-
public <T> Set<T> getSecondaryResources(Class<T> expectedType) {
45-
return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet());
46-
}
47-
48-
@Override
49-
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
50-
return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream()
51-
.map(es -> es.getSecondaryResources(primaryResource))
52-
.flatMap(Set::stream);
53-
}
54-
55-
@Override
56-
public <T> Optional<T> getSecondaryResource(Class<T> expectedType, String eventSourceName) {
57-
try {
58-
return controller
59-
.getEventSourceManager()
60-
.getEventSourceFor(expectedType, eventSourceName)
61-
.getSecondaryResource(primaryResource);
62-
} catch (NoEventSourceForClassException e) {
63-
/*
64-
* If a workflow has an activation condition there can be event sources which are only
65-
* registered if the activation condition holds, but to provide a consistent API we return an
66-
* Optional instead of throwing an exception.
67-
*
68-
* Note that not only the resource which has an activation condition might not be registered
69-
* but dependents which depend on it.
70-
*/
71-
if (eventSourceName == null && controller.workflowContainsDependentForType(expectedType)) {
72-
return Optional.empty();
73-
} else {
74-
throw e;
75-
}
76-
}
77-
}
78-
79-
@Override
80-
public ControllerConfiguration<P> getControllerConfiguration() {
81-
return controllerConfiguration;
82-
}
83-
8441
@Override
8542
public ManagedWorkflowAndDependentResourceContext managedWorkflowAndDependentResourceContext() {
8643
return defaultManagedDependentResourceContext;
8744
}
8845

89-
@Override
90-
public EventSourceRetriever<P> eventSourceRetriever() {
91-
return controller.getEventSourceManager();
92-
}
93-
9446
@Override
9547
public KubernetesClient getClient() {
9648
return controller.getClient();
@@ -103,16 +55,6 @@ public ExecutorService getWorkflowExecutorService() {
10355
return controller.getExecutorServiceManager().workflowExecutorService();
10456
}
10557

106-
@Override
107-
public P getPrimaryResource() {
108-
return primaryResource;
109-
}
110-
111-
@Override
112-
public IndexedResourceCache<P> getPrimaryCache() {
113-
return controller.getEventSourceManager().getControllerEventSource();
114-
}
115-
11658
@Override
11759
public boolean isNextReconciliationImminent() {
11860
return controller
@@ -122,7 +64,7 @@ public boolean isNextReconciliationImminent() {
12264

12365
@Override
12466
public Optional<ExpectationResult<P>> expectationResult() {
125-
return Optional.empty();
67+
return Optional.ofNullable(expectationResult);
12668
}
12769

12870
public DefaultContext<P> setRetryInfo(RetryInfo retryInfo) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.javaoperatorsdk.operator.api.reconciler.expectation;
2+
3+
import io.fabric8.kubernetes.api.model.HasMetadata;
4+
import io.javaoperatorsdk.operator.api.reconciler.DefaultCacheAware;
5+
import io.javaoperatorsdk.operator.processing.Controller;
6+
7+
public class DefaultExpectationContext<P extends HasMetadata> extends DefaultCacheAware<P>
8+
implements ExpectationContext<P> {
9+
public DefaultExpectationContext(Controller<P> controller, P primaryResource) {
10+
super(controller, primaryResource);
11+
}
12+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
1818
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
1919
import io.javaoperatorsdk.operator.api.reconciler.Constants;
20+
import io.javaoperatorsdk.operator.api.reconciler.expectation.DefaultExpectationContext;
21+
import io.javaoperatorsdk.operator.api.reconciler.expectation.ExpectationContext;
2022
import io.javaoperatorsdk.operator.processing.LifecycleAware;
2123
import io.javaoperatorsdk.operator.processing.MDCUtils;
2224
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
@@ -136,6 +138,10 @@ private void submitReconciliationExecution(ResourceState state) {
136138
Optional<P> maybeLatest = cache.get(resourceID);
137139
maybeLatest.ifPresent(MDCUtils::addResourceInfo);
138140
if (!controllerUnderExecution && maybeLatest.isPresent()) {
141+
if (!shouldProceedWithExpectation(state, maybeLatest.orElseThrow())) {
142+
return;
143+
}
144+
139145
var rateLimit = state.getRateLimit();
140146
if (rateLimit == null) {
141147
rateLimit = rateLimiter.initState();
@@ -174,6 +180,21 @@ private void submitReconciliationExecution(ResourceState state) {
174180
}
175181
}
176182

183+
boolean shouldProceedWithExpectation(ResourceState state, P primary) {
184+
var optionalHolder = state.getExpectationHolder();
185+
if (optionalHolder.isEmpty()) {
186+
return true;
187+
}
188+
var holder = optionalHolder.orElseThrow();
189+
if (holder.isTimedOut()) {
190+
return true;
191+
}
192+
// todo cleanup state etc
193+
ExpectationContext<P> expectationContext =
194+
new DefaultExpectationContext<>(this.eventSourceManager.getController(), primary);
195+
return holder.getExpectation().isFulfilled(primary, expectationContext);
196+
}
197+
177198
private void handleEventMarking(Event event, ResourceState state) {
178199
final var relatedCustomResourceID = event.getRelatedCustomResourceID();
179200
if (event instanceof ResourceEvent resourceEvent) {
@@ -257,6 +278,9 @@ synchronized void eventProcessingFinished(
257278
state.markProcessedMarkForDeletion();
258279
metrics.cleanupDoneFor(resourceID, metricsMetadata);
259280
} else {
281+
// TODO what should be the relation between re-schedule and expectation
282+
// should we add a flag if trigger if expectation fails
283+
setExpectation(state, postExecutionControl);
260284
if (state.eventPresent()) {
261285
submitReconciliationExecution(state);
262286
} else {
@@ -265,6 +289,10 @@ synchronized void eventProcessingFinished(
265289
}
266290
}
267291

292+
private void setExpectation(ResourceState state, PostExecutionControl<P> postExecutionControl) {
293+
postExecutionControl.getExpectation().ifPresent(state::setExpectation);
294+
}
295+
268296
/**
269297
* In case retry is configured more complex error logging takes place, see handleRetryOnException
270298
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.javaoperatorsdk.operator.processing.event;
2+
3+
import java.time.LocalDateTime;
4+
5+
import io.fabric8.kubernetes.api.model.HasMetadata;
6+
import io.javaoperatorsdk.operator.api.reconciler.expectation.Expectation;
7+
8+
public class ExpectationHolder<P extends HasMetadata> {
9+
10+
private LocalDateTime expectationCreationTime;
11+
private Expectation<P> expectation;
12+
13+
public ExpectationHolder(LocalDateTime expectationCreationTime, Expectation<P> expectation) {
14+
this.expectationCreationTime = expectationCreationTime;
15+
this.expectation = expectation;
16+
}
17+
18+
public LocalDateTime getExpectationCreationTime() {
19+
return expectationCreationTime;
20+
}
21+
22+
public void setExpectationCreationTime(LocalDateTime expectationCreationTime) {
23+
this.expectationCreationTime = expectationCreationTime;
24+
}
25+
26+
public Expectation<?> getExpectation() {
27+
return expectation;
28+
}
29+
30+
public void setExpectation(Expectation<P> expectation) {
31+
this.expectation = expectation;
32+
}
33+
34+
public boolean isTimedOut() {
35+
return expectationCreationTime.plus(expectation.timeout()).isBefore(LocalDateTime.now());
36+
}
37+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/PostExecutionControl.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
import java.util.Optional;
44

55
import io.fabric8.kubernetes.api.model.HasMetadata;
6+
import io.javaoperatorsdk.operator.api.reconciler.expectation.Expectation;
67

78
final class PostExecutionControl<R extends HasMetadata> {
89

910
private final boolean finalizerRemoved;
1011
private final R updatedCustomResource;
1112
private final boolean updateIsStatusPatch;
1213
private final Exception runtimeException;
14+
private Expectation<R> expectation;
1315

1416
private Long reScheduleDelay = null;
1517

@@ -66,6 +68,11 @@ public PostExecutionControl<R> withReSchedule(long delay) {
6668
return this;
6769
}
6870

71+
public PostExecutionControl<R> withExpectation(Expectation<R> expectation) {
72+
this.expectation = expectation;
73+
return this;
74+
}
75+
6976
public Optional<Exception> getRuntimeException() {
7077
return Optional.ofNullable(runtimeException);
7178
}
@@ -93,4 +100,8 @@ public String toString() {
93100
public boolean isFinalizerRemoved() {
94101
return finalizerRemoved;
95102
}
103+
104+
public Optional<Expectation<R>> getExpectation() {
105+
return Optional.ofNullable(expectation);
106+
}
96107
}

0 commit comments

Comments
 (0)