|
4 | 4 | "context"
|
5 | 5 | "encoding/json"
|
6 | 6 | "fmt"
|
| 7 | + "reflect" |
| 8 | + "slices" |
7 | 9 | "sort"
|
8 | 10 | "strings"
|
9 | 11 | "sync"
|
@@ -562,24 +564,27 @@ func (sc *syncContext) Sync() {
|
562 | 564 |
|
563 | 565 | // remove any tasks not in this wave
|
564 | 566 | phase := tasks.phase()
|
565 |
| - wave := tasks.wave() |
566 |
| - finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave() |
| 567 | + waves, wavesUseBinaryTreeOrdering := tasks.waves() |
| 568 | + lastWaves, lastWavesUseBinaryTreeOrdering := tasks.lastWaves() |
| 569 | + finalWaves := phase == tasks.lastPhase() && reflect.DeepEqual(waves, lastWaves) && wavesUseBinaryTreeOrdering == lastWavesUseBinaryTreeOrdering |
567 | 570 |
|
568 | 571 | // if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful
|
569 | 572 | // EVEN if those objects subsequently degraded
|
570 | 573 | // This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately.
|
571 |
| - remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() }) |
| 574 | + remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || !slices.Contains(waves, t.wave()) || t.isHook() }) |
572 | 575 |
|
573 |
| - sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave") |
574 |
| - tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave }) |
| 576 | + sc.log.WithValues("phase", phase, "wave", waves, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave") |
| 577 | + tasks = tasks.Filter(func(t *syncTask) bool { |
| 578 | + return t.phase == phase && slices.Contains(waves, t.wave()) && t.waveUseBinaryTreeOrdering() == wavesUseBinaryTreeOrdering |
| 579 | + }) |
575 | 580 |
|
576 | 581 | sc.setOperationPhase(common.OperationRunning, "one or more tasks are running")
|
577 | 582 |
|
578 | 583 | sc.log.WithValues("tasks", tasks).V(1).Info("Wet-run")
|
579 | 584 | runState := sc.runTasks(tasks, false)
|
580 | 585 |
|
581 | 586 | if sc.syncWaveHook != nil && runState != failed {
|
582 |
| - err := sc.syncWaveHook(phase, wave, finalWave) |
| 587 | + err := sc.syncWaveHook(phase, waves, finalWaves) |
583 | 588 | if err != nil {
|
584 | 589 | sc.deleteHooks(hooksPendingDeletionFailed)
|
585 | 590 | sc.setOperationPhase(common.OperationFailed, fmt.Sprintf("SyncWaveHook failed: %v", err))
|
@@ -899,52 +904,133 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
|
899 | 904 | }
|
900 | 905 | }
|
901 | 906 |
|
902 |
| - // for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order) |
903 |
| - pruneTasks := make(map[int][]*syncTask) |
| 907 | + // for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order). |
| 908 | + // if all prune tasks use normal wave ordering, use the legacy method. Otherwise, use a binary tree wave ordering |
| 909 | + // on all prune tasks and modify the waves to decreasing power of 2. |
| 910 | + // For prune tasks which already use binary tree wave ordering, set an identical syncWave to tasks which |
| 911 | + // have the same level in a complete binary tree rooted at 1 where each node n has 2*n and 2*n+1 as children. |
| 912 | + |
| 913 | + pruntTasksUsingNormalOrdering := make(map[int][]*syncTask) |
904 | 914 | for _, task := range tasks {
|
905 |
| - if task.isPrune() { |
906 |
| - pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task) |
| 915 | + if task.isPrune() && task.waveUseBinaryTreeOrdering() == "false" { |
| 916 | + pruntTasksUsingNormalOrdering[task.wave()] = append(pruntTasksUsingNormalOrdering[task.wave()], task) |
907 | 917 | }
|
908 | 918 | }
|
| 919 | + var uniquePruneWavesUsingNormalOrdering []int |
| 920 | + for k := range pruntTasksUsingNormalOrdering { |
| 921 | + uniquePruneWavesUsingNormalOrdering = append(uniquePruneWavesUsingNormalOrdering, k) |
| 922 | + } |
909 | 923 |
|
910 |
| - var uniquePruneWaves []int |
911 |
| - for k := range pruneTasks { |
912 |
| - uniquePruneWaves = append(uniquePruneWaves, k) |
| 924 | + sort.Ints(uniquePruneWavesUsingNormalOrdering) |
| 925 | + |
| 926 | + pruneTasksUsingBinaryTreeOrdering := make(map[int][]*syncTask) |
| 927 | + for _, task := range tasks { |
| 928 | + if task.isPrune() && task.waveUseBinaryTreeOrdering() == "true" { |
| 929 | + pruneTasksUsingBinaryTreeOrdering[task.wave()] = append(pruneTasksUsingBinaryTreeOrdering[task.wave()], task) |
| 930 | + } |
913 | 931 | }
|
914 |
| - sort.Ints(uniquePruneWaves) |
915 | 932 |
|
916 |
| - // reorder waves for pruning tasks using symmetric swap on prune waves |
917 |
| - n := len(uniquePruneWaves) |
918 |
| - for i := 0; i < n/2; i++ { |
919 |
| - // waves to swap |
920 |
| - startWave := uniquePruneWaves[i] |
921 |
| - endWave := uniquePruneWaves[n-1-i] |
| 933 | + if len(pruneTasksUsingBinaryTreeOrdering) > 0 { |
| 934 | + var uniquePruneWavesUsingBinaryTreeOrdering []int |
| 935 | + for k := range pruneTasksUsingBinaryTreeOrdering { |
| 936 | + uniquePruneWavesUsingBinaryTreeOrdering = append(uniquePruneWavesUsingBinaryTreeOrdering, k) |
| 937 | + } |
| 938 | + sort.Ints(uniquePruneWavesUsingBinaryTreeOrdering) |
922 | 939 |
|
923 |
| - for _, task := range pruneTasks[startWave] { |
924 |
| - task.waveOverride = &endWave |
| 940 | + pruneTasksWavesValues := []int{0} |
| 941 | + for i := 1; i < len(uniquePruneWavesUsingNormalOrdering); i++ { |
| 942 | + pruneTasksWavesValues = append(pruneTasksWavesValues, i) |
| 943 | + } |
| 944 | + nextPotentialWaveValue := len(uniquePruneWavesUsingNormalOrdering) |
| 945 | + if len(uniquePruneWavesUsingNormalOrdering) != 0 { |
| 946 | + pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue) |
| 947 | + } |
| 948 | + for i := 1; i < len(uniquePruneWavesUsingBinaryTreeOrdering); i++ { |
| 949 | + currentWaveValue := biggestPowerOf2InferiorThan(uniquePruneWavesUsingBinaryTreeOrdering[i]) |
| 950 | + previousWaveValue := biggestPowerOf2InferiorThan(uniquePruneWavesUsingBinaryTreeOrdering[i-1]) |
| 951 | + if currentWaveValue == previousWaveValue { |
| 952 | + pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue) |
| 953 | + } else { |
| 954 | + nextPotentialWaveValue++ |
| 955 | + pruneTasksWavesValues = append(pruneTasksWavesValues, nextPotentialWaveValue) |
| 956 | + } |
925 | 957 | }
|
926 | 958 |
|
927 |
| - for _, task := range pruneTasks[endWave] { |
928 |
| - task.waveOverride = &startWave |
| 959 | + pruneTasksWavesNewValues := PowInt(2, pruneTasksWavesValues[len(pruneTasksWavesValues)-1]) |
| 960 | + newPruneWaves := []int{pruneTasksWavesNewValues} |
| 961 | + for i := 1; i < len(pruneTasksWavesValues); i++ { |
| 962 | + if pruneTasksWavesValues[i] == pruneTasksWavesValues[i-1] { |
| 963 | + newPruneWaves = append(newPruneWaves, pruneTasksWavesNewValues) |
| 964 | + } else { |
| 965 | + pruneTasksWavesNewValues /= 2 |
| 966 | + newPruneWaves = append(newPruneWaves, pruneTasksWavesNewValues) |
| 967 | + } |
| 968 | + } |
| 969 | + |
| 970 | + syncTaskUseBinaryTreeOrdering := "true" |
| 971 | + |
| 972 | + for i := range uniquePruneWavesUsingNormalOrdering { |
| 973 | + // tasks using normal wave ordering to reorder |
| 974 | + iWave := uniquePruneWavesUsingNormalOrdering[i] |
| 975 | + |
| 976 | + for _, task := range pruntTasksUsingNormalOrdering[iWave] { |
| 977 | + task.waveOverride = &newPruneWaves[i] |
| 978 | + task.waveUseBinaryTreeOrderingOverride = &syncTaskUseBinaryTreeOrdering |
| 979 | + } |
| 980 | + } |
| 981 | + |
| 982 | + n := len(uniquePruneWavesUsingNormalOrdering) |
| 983 | + for i := range uniquePruneWavesUsingBinaryTreeOrdering { |
| 984 | + // tasks using binary tree wave ordering to reorder |
| 985 | + iWave := uniquePruneWavesUsingBinaryTreeOrdering[i] |
| 986 | + |
| 987 | + for _, task := range pruneTasksUsingBinaryTreeOrdering[iWave] { |
| 988 | + task.waveOverride = &(newPruneWaves[n+i]) |
| 989 | + task.waveUseBinaryTreeOrderingOverride = &syncTaskUseBinaryTreeOrdering |
| 990 | + } |
| 991 | + } |
| 992 | + } else { |
| 993 | + // reorder waves for pruning tasks using symmetric swap on prune waves |
| 994 | + n := len(uniquePruneWavesUsingNormalOrdering) |
| 995 | + for i := 0; i < n/2; i++ { |
| 996 | + // waves to swap |
| 997 | + startWave := uniquePruneWavesUsingNormalOrdering[i] |
| 998 | + endWave := uniquePruneWavesUsingNormalOrdering[n-1-i] |
| 999 | + |
| 1000 | + for _, task := range pruntTasksUsingNormalOrdering[startWave] { |
| 1001 | + task.waveOverride = &endWave |
| 1002 | + } |
| 1003 | + |
| 1004 | + for _, task := range pruntTasksUsingNormalOrdering[endWave] { |
| 1005 | + task.waveOverride = &startWave |
| 1006 | + } |
929 | 1007 | }
|
930 | 1008 | }
|
931 | 1009 |
|
932 | 1010 | // for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
|
933 | 1011 | // to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
|
934 | 1012 | syncPhaseLastWave := 0
|
| 1013 | + syncPhaseLastWaveUseBinaryTreeOrdering := "false" |
935 | 1014 | for _, task := range tasks {
|
936 | 1015 | if task.phase == common.SyncPhaseSync {
|
937 | 1016 | if task.wave() > syncPhaseLastWave {
|
938 | 1017 | syncPhaseLastWave = task.wave()
|
| 1018 | + syncPhaseLastWaveUseBinaryTreeOrdering = task.waveUseBinaryTreeOrdering() |
939 | 1019 | }
|
940 | 1020 | }
|
941 | 1021 | }
|
942 |
| - syncPhaseLastWave = syncPhaseLastWave + 1 |
| 1022 | + |
| 1023 | + if syncPhaseLastWaveUseBinaryTreeOrdering == "false" { |
| 1024 | + syncPhaseLastWave++ |
| 1025 | + } else { |
| 1026 | + syncPhaseLastWave *= 2 |
| 1027 | + } |
943 | 1028 |
|
944 | 1029 | for _, task := range tasks {
|
945 | 1030 | if task.isPrune() &&
|
946 | 1031 | (sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
|
947 | 1032 | task.waveOverride = &syncPhaseLastWave
|
| 1033 | + task.waveUseBinaryTreeOrderingOverride = &syncPhaseLastWaveUseBinaryTreeOrdering |
948 | 1034 | }
|
949 | 1035 | }
|
950 | 1036 |
|
|
0 commit comments