Skip to content

Commit 026c201

Browse files
fix: [WIN-NPM] process updatePods in fifo order (#1856)
* process updatePods in fifo order * fix lint * better UT * comments and better naming * stop skipping UTs * fix lint * redesign * dequeue returns nil when cache is empty * Revert "dequeue returns nil when cache is empty" This reverts commit 3e8d187. * requeue if node name has changed * Revert "Revert "dequeue returns nil when cache is empty"" This reverts commit 3f5f99d. * UT for nil result from dequeue
1 parent d7fc7e2 commit 026c201

File tree

5 files changed

+254
-46
lines changed

5 files changed

+254
-46
lines changed

npm/pkg/dataplane/dataplane-test-cases_windows_test.go

-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ const (
1717
netpolCrudTag Tag = "netpol-crud"
1818
reconcileTag Tag = "reconcile"
1919
calicoTag Tag = "calico"
20-
skipTestTag Tag = "skip-test"
2120
)
2221

2322
const (
@@ -1378,7 +1377,6 @@ func updatePodTests() []*SerialTestCase {
13781377
},
13791378
TestCaseMetadata: &TestCaseMetadata{
13801379
Tags: []Tag{
1381-
skipTestTag,
13821380
podCrudTag,
13831381
netpolCrudTag,
13841382
},
@@ -1446,7 +1444,6 @@ func updatePodTests() []*SerialTestCase {
14461444
},
14471445
TestCaseMetadata: &TestCaseMetadata{
14481446
Tags: []Tag{
1449-
skipTestTag,
14501447
podCrudTag,
14511448
netpolCrudTag,
14521449
},

npm/pkg/dataplane/dataplane.go

+16-31
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,6 @@ type Config struct {
2424
*policies.PolicyManagerCfg
2525
}
2626

27-
type updatePodCache struct {
28-
sync.Mutex
29-
cache map[string]*updateNPMPod
30-
}
31-
32-
func newUpdatePodCache() *updatePodCache {
33-
return &updatePodCache{cache: make(map[string]*updateNPMPod)}
34-
}
35-
3627
type endpointCache struct {
3728
sync.Mutex
3829
cache map[string]*npmEndpoint
@@ -70,7 +61,7 @@ func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChann
7061
endpointCache: newEndpointCache(),
7162
nodeName: nodeName,
7263
ioShim: ioShim,
73-
updatePodCache: newUpdatePodCache(),
64+
updatePodCache: newUpdatePodCache(1),
7465
endpointQuery: new(endpointQuery),
7566
stopChannel: stopChannel,
7667
}
@@ -144,13 +135,7 @@ func (dp *DataPlane) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *Po
144135
dp.updatePodCache.Lock()
145136
defer dp.updatePodCache.Unlock()
146137

147-
updatePod, ok := dp.updatePodCache.cache[podMetadata.PodKey]
148-
if !ok {
149-
klog.Infof("[DataPlane] {AddToSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey)
150-
updatePod = newUpdateNPMPod(podMetadata)
151-
dp.updatePodCache.cache[podMetadata.PodKey] = updatePod
152-
}
153-
138+
updatePod := dp.updatePodCache.enqueue(podMetadata)
154139
updatePod.updateIPSetsToAdd(setNames)
155140
}
156141

@@ -172,13 +157,7 @@ func (dp *DataPlane) RemoveFromSets(setNames []*ipsets.IPSetMetadata, podMetadat
172157
dp.updatePodCache.Lock()
173158
defer dp.updatePodCache.Unlock()
174159

175-
updatePod, ok := dp.updatePodCache.cache[podMetadata.PodKey]
176-
if !ok {
177-
klog.Infof("[DataPlane] {RemoveFromSet} pod key %s not found in updatePodCache. creating a new obj", podMetadata.PodKey)
178-
updatePod = newUpdateNPMPod(podMetadata)
179-
dp.updatePodCache.cache[podMetadata.PodKey] = updatePod
180-
}
181-
160+
updatePod := dp.updatePodCache.enqueue(podMetadata)
182161
updatePod.updateIPSetsToRemove(setNames)
183162
}
184163

@@ -220,7 +199,7 @@ func (dp *DataPlane) ApplyDataPlane() error {
220199
if dp.shouldUpdatePod() {
221200
// do not refresh endpoints if the updatePodCache is empty
222201
dp.updatePodCache.Lock()
223-
if len(dp.updatePodCache.cache) == 0 {
202+
if dp.updatePodCache.isEmpty() {
224203
dp.updatePodCache.Unlock()
225204
return nil
226205
}
@@ -238,15 +217,21 @@ func (dp *DataPlane) ApplyDataPlane() error {
238217
dp.updatePodCache.Lock()
239218
defer dp.updatePodCache.Unlock()
240219

241-
for podKey, pod := range dp.updatePodCache.cache {
242-
err := dp.updatePod(pod)
243-
if err != nil {
220+
for !dp.updatePodCache.isEmpty() {
221+
pod := dp.updatePodCache.dequeue()
222+
if pod == nil {
223+
// should never happen because of isEmpty check above and lock on updatePodCache
224+
metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "[DataPlane] failed to dequeue pod while applying the dataplane")
225+
// break to avoid infinite loop (something weird happened since isEmpty returned false above)
226+
break
227+
}
228+
229+
if err := dp.updatePod(pod); err != nil {
244230
// move on to the next and later return as success since this can be retried irrespective of other operations
245-
metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "failed to update pod while applying the dataplane. key: [%s], err: [%s]", podKey, err.Error())
231+
metrics.SendErrorLogAndMetric(util.DaemonDataplaneID, "failed to update pod while applying the dataplane. key: [%s], err: [%s]", pod.PodKey, err.Error())
232+
dp.updatePodCache.requeue(pod)
246233
continue
247234
}
248-
249-
delete(dp.updatePodCache.cache, podKey)
250235
}
251236
}
252237
return nil

npm/pkg/dataplane/dataplane_test.go

+142
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,148 @@ func TestUpdatePolicy(t *testing.T) {
250250
require.NoError(t, err)
251251
}
252252

253+
func TestUpdatePodCache(t *testing.T) {
254+
m1 := NewPodMetadata("x/a", "10.0.0.1", nodeName)
255+
m2 := NewPodMetadata("x/b", "10.0.0.2", nodeName)
256+
m3 := NewPodMetadata("x/c", "10.0.0.3", nodeName)
257+
258+
c := newUpdatePodCache(3)
259+
require.True(t, c.isEmpty())
260+
261+
p1 := c.enqueue(m1)
262+
require.False(t, c.isEmpty())
263+
require.Equal(t, *newUpdateNPMPod(m1), *p1)
264+
require.Equal(t, c.queue, []string{m1.PodKey})
265+
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1})
266+
267+
p2 := c.enqueue(m2)
268+
require.False(t, c.isEmpty())
269+
require.Equal(t, *newUpdateNPMPod(m2), *p2)
270+
require.Equal(t, c.queue, []string{m1.PodKey, m2.PodKey})
271+
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m2.PodKey: p2})
272+
273+
p3 := c.enqueue(m3)
274+
require.False(t, c.isEmpty())
275+
require.Equal(t, *newUpdateNPMPod(m3), *p3)
276+
require.Equal(t, c.queue, []string{m1.PodKey, m2.PodKey, m3.PodKey})
277+
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m2.PodKey: p2, m3.PodKey: p3})
278+
279+
// Test that enqueueing an existing pod does not change the queue or cache.
280+
pairs := []struct {
281+
m *PodMetadata
282+
p *updateNPMPod
283+
}{
284+
{m1, p1},
285+
{m2, p2},
286+
{m3, p3},
287+
}
288+
for _, pair := range pairs {
289+
p := c.enqueue(pair.m)
290+
require.False(t, c.isEmpty())
291+
require.Equal(t, pair.p, p)
292+
require.Equal(t, c.queue, []string{m1.PodKey, m2.PodKey, m3.PodKey})
293+
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m2.PodKey: p2, m3.PodKey: p3})
294+
}
295+
296+
// test dequeue
297+
p := c.dequeue()
298+
require.False(t, c.isEmpty())
299+
require.Equal(t, p1, p)
300+
require.Equal(t, c.queue, []string{m2.PodKey, m3.PodKey})
301+
require.Equal(t, c.cache, map[string]*updateNPMPod{m2.PodKey: p2, m3.PodKey: p3})
302+
303+
p = c.dequeue()
304+
require.False(t, c.isEmpty())
305+
require.Equal(t, p2, p)
306+
require.Equal(t, c.queue, []string{m3.PodKey})
307+
require.Equal(t, c.cache, map[string]*updateNPMPod{m3.PodKey: p3})
308+
309+
// test requeuing
310+
c.requeue(p)
311+
require.False(t, c.isEmpty())
312+
require.Equal(t, c.queue, []string{m3.PodKey, m2.PodKey})
313+
require.Equal(t, c.cache, map[string]*updateNPMPod{m3.PodKey: p3, m2.PodKey: p2})
314+
315+
p = c.dequeue()
316+
require.False(t, c.isEmpty())
317+
require.Equal(t, p3, p)
318+
require.Equal(t, c.queue, []string{m2.PodKey})
319+
require.Equal(t, c.cache, map[string]*updateNPMPod{m2.PodKey: p2})
320+
321+
// test enqueuing again
322+
p = c.enqueue(m1)
323+
require.Equal(t, *p1, *p)
324+
require.False(t, c.isEmpty())
325+
require.Equal(t, c.queue, []string{m2.PodKey, m1.PodKey})
326+
require.Equal(t, c.cache, map[string]*updateNPMPod{m2.PodKey: p2, m1.PodKey: p1})
327+
328+
p = c.dequeue()
329+
require.False(t, c.isEmpty())
330+
require.Equal(t, p2, p)
331+
require.Equal(t, c.queue, []string{m1.PodKey})
332+
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1})
333+
334+
p = c.dequeue()
335+
require.True(t, c.isEmpty())
336+
require.Equal(t, p1, p)
337+
require.Equal(t, c.queue, []string{})
338+
require.Equal(t, c.cache, map[string]*updateNPMPod{})
339+
340+
// test requeue on empty queue
341+
c.requeue(p)
342+
require.False(t, c.isEmpty())
343+
require.Equal(t, c.queue, []string{m1.PodKey})
344+
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1})
345+
346+
p = c.dequeue()
347+
require.True(t, c.isEmpty())
348+
require.Equal(t, p1, p)
349+
require.Equal(t, c.queue, []string{})
350+
require.Equal(t, c.cache, map[string]*updateNPMPod{})
351+
352+
// test nil result on empty queue
353+
p = c.dequeue()
354+
require.True(t, c.isEmpty())
355+
require.Nil(t, p)
356+
357+
// test enqueue on empty queue
358+
p = c.enqueue(m3)
359+
require.False(t, c.isEmpty())
360+
require.Equal(t, *p3, *p)
361+
require.Equal(t, c.queue, []string{m3.PodKey})
362+
require.Equal(t, c.cache, map[string]*updateNPMPod{m3.PodKey: p})
363+
364+
// test enqueue with different node on only item in queue
365+
m3Node2 := *m3
366+
m3Node2.NodeName = "node2"
367+
p3Node2 := *newUpdateNPMPod(&m3Node2)
368+
p = c.enqueue(&m3Node2)
369+
require.False(t, c.isEmpty())
370+
require.Equal(t, p3Node2, *p)
371+
require.Equal(t, c.queue, []string{m3Node2.PodKey})
372+
require.Equal(t, c.cache, map[string]*updateNPMPod{m3Node2.PodKey: p})
373+
374+
// test enqueue with different node on first item in queue
375+
p = c.enqueue(m1)
376+
require.False(t, c.isEmpty())
377+
require.Equal(t, *p1, *p)
378+
require.Equal(t, c.queue, []string{m3Node2.PodKey, m1.PodKey})
379+
require.Equal(t, c.cache, map[string]*updateNPMPod{m3Node2.PodKey: &p3Node2, m1.PodKey: p1})
380+
381+
p = c.enqueue(m3)
382+
require.False(t, c.isEmpty())
383+
require.Equal(t, *p3, *p)
384+
require.Equal(t, c.queue, []string{m1.PodKey, m3.PodKey})
385+
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m3.PodKey: p})
386+
387+
// test enqueue with different node on last item in queue
388+
p = c.enqueue(&m3Node2)
389+
require.False(t, c.isEmpty())
390+
require.Equal(t, p3Node2, *p)
391+
require.Equal(t, c.queue, []string{m1.PodKey, m3Node2.PodKey})
392+
require.Equal(t, c.cache, map[string]*updateNPMPod{m1.PodKey: p1, m3Node2.PodKey: p})
393+
}
394+
253395
func getBootupTestCalls() []testutils.TestCmd {
254396
return append(policies.GetBootupTestCalls(), ipsets.GetResetTestCalls()...)
255397
}

npm/pkg/dataplane/dataplane_windows_test.go

-12
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,6 @@ func testSerialCases(t *testing.T, tests []*SerialTestCase) {
4343
i := i
4444
tt := tt
4545

46-
for _, tag := range tt.Tags {
47-
if tag == skipTestTag {
48-
continue
49-
}
50-
}
51-
5246
t.Run(tt.Description, func(t *testing.T) {
5347
t.Logf("beginning test #%d. Description: [%s]. Tags: %+v", i, tt.Description, tt.Tags)
5448

@@ -85,12 +79,6 @@ func testMultiJobCases(t *testing.T, tests []*MultiJobTestCase) {
8579
i := i
8680
tt := tt
8781

88-
for _, tag := range tt.Tags {
89-
if tag == skipTestTag {
90-
continue
91-
}
92-
}
93-
9482
t.Run(tt.Description, func(t *testing.T) {
9583
t.Logf("beginning test #%d. Description: [%s]. Tags: %+v", i, tt.Description, tt.Tags)
9684

0 commit comments

Comments
 (0)