Skip to content

Commit 56cda35

Browse files
committed
fix: primary cache utils mechanism
Signed-off-by: Attila Mészáros <[email protected]>
1 parent 4755219 commit 56cda35

File tree

8 files changed

+119
-40
lines changed

8 files changed

+119
-40
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.javaoperatorsdk.operator.api.reconciler;
22

3+
import java.time.LocalTime;
4+
import java.time.temporal.ChronoUnit;
35
import java.util.function.UnaryOperator;
46

57
import org.slf4j.Logger;
@@ -25,6 +27,8 @@
2527
public class PrimaryUpdateAndCacheUtils {
2628

2729
public static final int DEFAULT_MAX_RETRY = 10;
30+
public static final int RESOURCE_CACHE_POLL_TIMEOUT = 10000;
31+
public static final int DEFAULT_SLEEP_FOR_CACHE_POLL_MILLIS = 30;
2832

2933
private PrimaryUpdateAndCacheUtils() {}
3034

@@ -90,8 +94,8 @@ public static <P extends HasMetadata> P ssaPatchStatusAndCacheResource(
9094
}
9195

9296
/**
93-
* Same as {@link #updateAndCacheResource(HasMetadata, Context, UnaryOperator, UnaryOperator,
94-
* int)} using the default maximum retry number as defined by {@link #DEFAULT_MAX_RETRY}.
97+
* Same as {@link #updateAndCacheResource(HasMetadata, Context, UnaryOperator, UnaryOperator, int,
98+
* long,long)} using the default maximum retry number as defined by {@link #DEFAULT_MAX_RETRY}.
9599
*
96100
* @param resourceToUpdate original resource to update
97101
* @param context of reconciliation
@@ -106,7 +110,13 @@ public static <P extends HasMetadata> P updateAndCacheResource(
106110
UnaryOperator<P> modificationFunction,
107111
UnaryOperator<P> updateMethod) {
108112
return updateAndCacheResource(
109-
resourceToUpdate, context, modificationFunction, updateMethod, DEFAULT_MAX_RETRY);
113+
resourceToUpdate,
114+
context,
115+
modificationFunction,
116+
updateMethod,
117+
DEFAULT_MAX_RETRY,
118+
RESOURCE_CACHE_POLL_TIMEOUT,
119+
DEFAULT_SLEEP_FOR_CACHE_POLL_MILLIS);
110120
}
111121

112122
/**
@@ -133,7 +143,9 @@ public static <P extends HasMetadata> P updateAndCacheResource(
133143
Context<P> context,
134144
UnaryOperator<P> modificationFunction,
135145
UnaryOperator<P> updateMethod,
136-
int maxRetry) {
146+
int maxRetry,
147+
long cachePollTimeoutMillis,
148+
long pollDelayMillis) {
137149

138150
if (log.isDebugEnabled()) {
139151
log.debug("Conflict retrying update for: {}", ResourceID.fromResource(resourceToUpdate));
@@ -180,14 +192,35 @@ public static <P extends HasMetadata> P updateAndCacheResource(
180192
resourceToUpdate.getMetadata().getNamespace(),
181193
e.getCode());
182194
resourceToUpdate =
183-
(P)
184-
context
185-
.getClient()
186-
.resources(resourceToUpdate.getClass())
187-
.inNamespace(resourceToUpdate.getMetadata().getNamespace())
188-
.withName(resourceToUpdate.getMetadata().getName())
189-
.get();
195+
pollLocalCache(context, resourceToUpdate, cachePollTimeoutMillis, pollDelayMillis);
196+
}
197+
}
198+
}
199+
200+
private static <P extends HasMetadata> P pollLocalCache(
201+
Context<P> context, P staleResource, long timeoutMillis, long pollDelayMillis) {
202+
try {
203+
var resourceId = ResourceID.fromResource(staleResource);
204+
var startTime = LocalTime.now();
205+
while (startTime.plus(timeoutMillis, ChronoUnit.MILLIS).isAfter(LocalTime.now())) {
206+
log.debug("Polling cache for resource: {}", resourceId);
207+
var cachedResource = context.getPrimaryCache().get(resourceId).orElseThrow();
208+
if (!cachedResource
209+
.getMetadata()
210+
.getResourceVersion()
211+
.equals(staleResource.getMetadata().getResourceVersion())) {
212+
return context
213+
.getControllerConfiguration()
214+
.getConfigurationService()
215+
.getResourceCloner()
216+
.clone(cachedResource);
217+
}
218+
Thread.sleep(pollDelayMillis);
190219
}
220+
throw new OperatorException("Timeout of resource polling from cache for resource");
221+
} catch (InterruptedException e) {
222+
Thread.currentThread().interrupt();
223+
throw new OperatorException(e);
191224
}
192225
}
193226
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,7 @@ public synchronized void putResource(T newResource, String previousResourceVersi
126126
knownResourceVersions.add(newResource.getMetadata().getResourceVersion());
127127
}
128128
var resourceId = ResourceID.fromResource(newResource);
129-
var cachedResource =
130-
getResourceFromCache(resourceId)
131-
.orElse(managedInformerEventSource.get(resourceId).orElse(null));
129+
var cachedResource = managedInformerEventSource.get(resourceId).orElse(null);
132130

133131
boolean moveAhead = false;
134132
if (previousResourceVersion == null && cachedResource == null) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsTest.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
11
package io.javaoperatorsdk.operator.api.reconciler;
22

3+
import java.util.Optional;
34
import java.util.function.UnaryOperator;
45

56
import org.junit.jupiter.api.BeforeEach;
67
import org.junit.jupiter.api.Test;
78

9+
import io.fabric8.kubernetes.api.model.HasMetadata;
810
import io.fabric8.kubernetes.client.KubernetesClient;
911
import io.fabric8.kubernetes.client.KubernetesClientException;
1012
import io.fabric8.kubernetes.client.dsl.MixedOperation;
1113
import io.fabric8.kubernetes.client.dsl.Resource;
14+
import io.fabric8.kubernetes.client.utils.KubernetesSerialization;
1215
import io.javaoperatorsdk.operator.OperatorException;
1316
import io.javaoperatorsdk.operator.TestUtils;
17+
import io.javaoperatorsdk.operator.api.config.Cloner;
18+
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
19+
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
1420
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
1521
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource;
1622
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
@@ -29,6 +35,7 @@ class PrimaryUpdateAndCacheUtilsTest {
2935
Context<TestCustomResource> context = mock(Context.class);
3036
KubernetesClient client = mock(KubernetesClient.class);
3137
Resource resource = mock(Resource.class);
38+
IndexedResourceCache<TestCustomResource> primaryCache = mock(IndexedResourceCache.class);
3239

3340
@BeforeEach
3441
void setup() {
@@ -41,6 +48,20 @@ void setup() {
4148
when(mixedOp.inNamespace(any())).thenReturn(mixedOp);
4249
when(mixedOp.withName(any())).thenReturn(resource);
4350
when(resource.get()).thenReturn(TestUtils.testCustomResource1());
51+
when(context.getPrimaryCache()).thenReturn(primaryCache);
52+
53+
var controllerConfiguration = mock(ControllerConfiguration.class);
54+
when(context.getControllerConfiguration()).thenReturn(controllerConfiguration);
55+
var configService = mock(ConfigurationService.class);
56+
when(controllerConfiguration.getConfigurationService()).thenReturn(configService);
57+
when(configService.getResourceCloner())
58+
.thenReturn(
59+
new Cloner() {
60+
@Override
61+
public <R extends HasMetadata> R clone(R object) {
62+
return new KubernetesSerialization().clone(object);
63+
}
64+
});
4465
}
4566

4667
@Test
@@ -76,6 +97,10 @@ void retriesConflicts() {
7697
when(updateOperation.apply(any()))
7798
.thenThrow(new KubernetesClientException("", 409, null))
7899
.thenReturn(TestUtils.testCustomResource1());
100+
var freshResource = TestUtils.testCustomResource1();
101+
102+
freshResource.getMetadata().setResourceVersion("2");
103+
when(primaryCache.get(any())).thenReturn(Optional.of(freshResource));
79104

80105
var updated =
81106
PrimaryUpdateAndCacheUtils.updateAndCacheResource(
@@ -89,15 +114,21 @@ void retriesConflicts() {
89114
updateOperation);
90115

91116
assertThat(updated).isNotNull();
92-
verify(resource, times(1)).get();
117+
verify(primaryCache, times(1)).get(any());
93118
}
94119

95120
@Test
96121
void throwsIfRetryExhausted() {
97122
var updateOperation = mock(UnaryOperator.class);
98123

99124
when(updateOperation.apply(any())).thenThrow(new KubernetesClientException("", 409, null));
125+
var stubbing = when(primaryCache.get(any()));
100126

127+
for (int i = 0; i < DEFAULT_MAX_RETRY; i++) {
128+
var resource = TestUtils.testCustomResource1();
129+
resource.getMetadata().setResourceVersion("" + i);
130+
stubbing = stubbing.thenReturn(Optional.of(resource));
131+
}
101132
assertThrows(
102133
OperatorException.class,
103134
() ->
@@ -106,6 +137,28 @@ void throwsIfRetryExhausted() {
106137
context,
107138
UnaryOperator.identity(),
108139
updateOperation));
109-
verify(resource, times(DEFAULT_MAX_RETRY)).get();
140+
verify(primaryCache, times(DEFAULT_MAX_RETRY)).get(any());
141+
}
142+
143+
@Test
144+
void cachePollTimeouts() {
145+
var updateOperation = mock(UnaryOperator.class);
146+
147+
when(updateOperation.apply(any())).thenThrow(new KubernetesClientException("", 409, null));
148+
when(primaryCache.get(any())).thenReturn(Optional.of(TestUtils.testCustomResource1()));
149+
150+
var ex =
151+
assertThrows(
152+
OperatorException.class,
153+
() ->
154+
PrimaryUpdateAndCacheUtils.updateAndCacheResource(
155+
TestUtils.testCustomResource1(),
156+
context,
157+
UnaryOperator.identity(),
158+
updateOperation,
159+
2,
160+
50L,
161+
10L));
162+
assertThat(ex.getMessage()).contains("Timeout");
110163
}
111164
}
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,5 @@
99
@Group("sample.javaoperatorsdk")
1010
@Version("v1")
1111
@ShortNames("spwl")
12-
public class StatusPatchCacheWithLockCustomResource
13-
extends CustomResource<StatusPatchCacheWithLockSpec, StatusPatchCacheWithLockStatus>
14-
implements Namespaced {}
12+
public class StatusPatchCacheCustomResource
13+
extends CustomResource<StatusPatchCacheSpec, StatusPatchCacheStatus> implements Namespaced {}
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,19 @@
1111
import static org.assertj.core.api.Assertions.assertThat;
1212
import static org.awaitility.Awaitility.await;
1313

14-
public class StatusPatchCacheWithLockIT {
14+
public class StatusPatchCacheIT {
1515

1616
public static final String TEST_1 = "test1";
1717

1818
@RegisterExtension
1919
LocallyRunOperatorExtension extension =
2020
LocallyRunOperatorExtension.builder()
21-
.withReconciler(StatusPatchCacheWithLockReconciler.class)
21+
.withReconciler(StatusPatchCacheReconciler.class)
2222
.build();
2323

2424
@Test
2525
void testStatusAlwaysUpToDate() {
26-
var reconciler = extension.getReconcilerOfType(StatusPatchCacheWithLockReconciler.class);
26+
var reconciler = extension.getReconcilerOfType(StatusPatchCacheReconciler.class);
2727

2828
extension.create(testResource());
2929

@@ -39,10 +39,10 @@ void testStatusAlwaysUpToDate() {
3939
});
4040
}
4141

42-
StatusPatchCacheWithLockCustomResource testResource() {
43-
var res = new StatusPatchCacheWithLockCustomResource();
42+
StatusPatchCacheCustomResource testResource() {
43+
var res = new StatusPatchCacheCustomResource();
4444
res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build());
45-
res.setSpec(new StatusPatchCacheWithLockSpec());
45+
res.setSpec(new StatusPatchCacheSpec());
4646
return res;
4747
}
4848
}
Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,14 @@
1212
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
1313

1414
@ControllerConfiguration
15-
public class StatusPatchCacheWithLockReconciler
16-
implements Reconciler<StatusPatchCacheWithLockCustomResource> {
15+
public class StatusPatchCacheReconciler implements Reconciler<StatusPatchCacheCustomResource> {
1716

1817
public volatile int latestValue = 0;
1918
public volatile boolean errorPresent = false;
2019

2120
@Override
22-
public UpdateControl<StatusPatchCacheWithLockCustomResource> reconcile(
23-
StatusPatchCacheWithLockCustomResource resource,
24-
Context<StatusPatchCacheWithLockCustomResource> context) {
21+
public UpdateControl<StatusPatchCacheCustomResource> reconcile(
22+
StatusPatchCacheCustomResource resource, Context<StatusPatchCacheCustomResource> context) {
2523

2624
if (resource.getStatus() != null && resource.getStatus().getValue() != latestValue) {
2725
errorPresent = true;
@@ -50,22 +48,20 @@ public UpdateControl<StatusPatchCacheWithLockCustomResource> reconcile(
5048
}
5149

5250
@Override
53-
public List<EventSource<?, StatusPatchCacheWithLockCustomResource>> prepareEventSources(
54-
EventSourceContext<StatusPatchCacheWithLockCustomResource> context) {
51+
public List<EventSource<?, StatusPatchCacheCustomResource>> prepareEventSources(
52+
EventSourceContext<StatusPatchCacheCustomResource> context) {
5553
// periodic event triggering for testing purposes
5654
return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache()));
5755
}
5856

59-
private StatusPatchCacheWithLockCustomResource createFreshCopy(
60-
StatusPatchCacheWithLockCustomResource resource) {
61-
var res = new StatusPatchCacheWithLockCustomResource();
57+
private StatusPatchCacheCustomResource createFreshCopy(StatusPatchCacheCustomResource resource) {
58+
var res = new StatusPatchCacheCustomResource();
6259
res.setMetadata(
6360
new ObjectMetaBuilder()
6461
.withName(resource.getMetadata().getName())
6562
.withNamespace(resource.getMetadata().getNamespace())
6663
.build());
67-
res.setStatus(new StatusPatchCacheWithLockStatus());
68-
64+
res.setStatus(new StatusPatchCacheStatus());
6965
return res;
7066
}
7167
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.javaoperatorsdk.operator.baseapi.statuscache;
22

3-
public class StatusPatchCacheWithLockSpec {
3+
public class StatusPatchCacheSpec {
44

55
private int counter = 0;
66

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package io.javaoperatorsdk.operator.baseapi.statuscache;
22

3-
public class StatusPatchCacheWithLockStatus {
3+
public class StatusPatchCacheStatus {
44

55
private Integer value = 0;
66

77
public Integer getValue() {
88
return value;
99
}
1010

11-
public StatusPatchCacheWithLockStatus setValue(Integer value) {
11+
public StatusPatchCacheStatus setValue(Integer value) {
1212
this.value = value;
1313
return this;
1414
}

0 commit comments

Comments
 (0)