Skip to content

Ensure process groups are removed from the pending restart list if they are stuck in terminating or the process is missing #2325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e/test_operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,7 @@ var _ = Describe("Operator", Label("e2e", "pr"), func() {
fdbCluster.GetCluster().Status.ProcessGroups,
processGroupID,
)
}).WithTimeout(5 * time.Minute).WithPolling(5 * time.Second).Should(BeNil())
}).WithTimeout(10 * time.Minute).WithPolling(5 * time.Second).Should(BeNil())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I increased the wait time here. In cases where the global sync mode is enabled, the coordination takes a bit longer, so the actual replacement often takes a bit longer than 5 min (from the last 2 failed tests it's often around 6min)


// Make sure the Pod is actually deleted after some time.
Eventually(func() bool {
Expand Down
39 changes: 37 additions & 2 deletions e2e/test_operator_ha_upgrades/operator_ha_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,20 +388,55 @@ var _ = Describe("Operator HA Upgrades", Label("e2e", "pr"), func() {
log.Println("Ensure cluster(s) are not upgraded")
fdbCluster.VerifyVersion(beforeVersion)
} else {
// If we do a version compatible upgrade, ensure the partition is present for 2 minutes.
time.Sleep(2 * time.Minute)
// If we do a version compatible upgrade, ensure the partition is present for 30 seconds.
time.Sleep(30 * time.Second)
}

log.Println("Restoring connectivity")
factory.DeleteChaosMeshExperimentSafe(partitionExperiment)

// When using protocol compatible versions, the other operator instances are able to move forward. In some
// cases it can happen that new coordinators are selected and all the old coordinators are deleted. In this
// case the remote satellite operator with not be able to connect to the cluster anymore and needs an
// update to the connection string.
if fixtures.VersionsAreProtocolCompatible(beforeVersion, targetVersion) {
Eventually(func(g Gomega) {
currentConnectionString := fdbCluster.GetPrimary().
GetStatus().
Cluster.ConnectionString
remoteSat := fdbCluster.GetRemoteSatellite()
remoteConnectionString := remoteSat.GetCluster().Status.ConnectionString

// If the connection string is different we have to update it on the remote satellite side
// as the operator instances were partitioned.
if currentConnectionString != remoteConnectionString {
if !remoteSat.GetCluster().Spec.Skip {
remoteSat.SetSkipReconciliation(true)
// Wait one minute, that should be enough time for the operator to end the reconciliation loop
// if started.
time.Sleep(1 * time.Minute)
}

remoteSatStatus := remoteSat.GetCluster().Status.DeepCopy()
remoteSatStatus.ConnectionString = currentConnectionString
fdbCluster.GetRemoteSatellite().
UpdateClusterStatusWithStatus(remoteSatStatus)
}

g.Expect(remoteConnectionString).To(Equal(currentConnectionString))
}).WithTimeout(5 * time.Minute).WithPolling(15 * time.Second).Should(Succeed())
}

// Delete the operator Pods to ensure they pick up the work directly otherwise it could take a long time
// until the operator tries to reconcile the cluster again. If the operator is not able to reconcile a
// cluster it will be put into a queue again, at some time the queue will delay the next reconcile attempt
// for a long time and since the network partition is not emitting any events for the operator this won't trigger
// a reconciliation either. So this step is only to speed up the reconcile process.
factory.RecreateOperatorPods(fdbCluster.GetRemoteSatellite().Namespace())

// Ensure that the remote satellite is not set to skip.
fdbCluster.GetRemoteSatellite().SetSkipReconciliation(false)

// Upgrade should make progress now - wait until all processes have upgraded
// to "targetVersion".
fdbCluster.VerifyVersion(targetVersion)
Expand Down
32 changes: 30 additions & 2 deletions e2e/test_operator_upgrades/operator_upgrades_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
)

DescribeTable(
"one process is marked for removal",
"one process is marked for removal and is stuck in removal",
func(beforeVersion string, targetVersion string) {
if fixtures.VersionsAreProtocolCompatible(beforeVersion, targetVersion) {
Skip("this test only affects version incompatible upgrades")
Expand Down Expand Up @@ -555,13 +555,30 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
var processesToUpdate int

cluster := fdbCluster.GetCluster()

for _, processGroup := range cluster.Status.ProcessGroups {
if processGroup.ProcessGroupID == processGroupMarkedForRemoval {
continue
}

if len(processGroup.ProcessGroupConditions) > 0 {
// Ignore process groups that are stuck in terminating.If the global synchronization mode is active
// this will be the case for all the transaction system process groups as one process groups is
// blocked to be removed.
if processGroup.GetConditionTime(fdbv1beta2.ResourcesTerminating) != nil {
log.Println(
"processGroup",
processGroup.ProcessGroupID,
"will be ignored since the process group is in terminating",
)
continue
}

log.Println(
"processGroup",
processGroup.ProcessGroupID,
"processes conditions:",
processGroup.ProcessGroupConditions,
)
processesToUpdate++
}
}
Expand All @@ -571,6 +588,17 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
return processesToUpdate
}).WithTimeout(30 * time.Minute).WithPolling(5 * time.Second).MustPassRepeatedly(5).Should(BeNumerically("==", 0))

// Remove the buggify option and make sure that the terminating processes are removed.
fdbCluster.SetBuggifyBlockRemoval(nil)
Eventually(func(g Gomega) {
processGroups := fdbCluster.GetCluster().Status.ProcessGroups

for _, processGroup := range processGroups {
g.Expect(processGroup.GetConditionTime(fdbv1beta2.ResourcesTerminating)).
To(BeNil())
}
}).WithTimeout(5 * time.Minute).WithPolling(5 * time.Second).Should(Succeed())

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},
Expand Down
21 changes: 18 additions & 3 deletions e2e/test_operator_velocity/operator_velocity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ var _ = Describe("Test Operator Velocity", Label("e2e", "nightly"), func() {
fdbCluster.GetPrimary().ReplacePod(*pod, false)
})

AfterEach(func() {
// Wait until the replaced process group is removed.
Expect(fdbCluster.WaitForReconciliation()).To(Succeed())
})

It("should roll out knob changes within expected time", func() {
Expect(
fdbCluster.SetCustomParameters(
Expand All @@ -351,13 +356,23 @@ var _ = Describe("Test Operator Velocity", Label("e2e", "nightly"), func() {
),
).To(Succeed())

knobRolloutTimeout := normalKnobRolloutTimeoutSeconds
// In our testing pipeline we see failures due to the fact that the replacement takes a long time, e.g.
// when a new node must be created for the replaced pod.
if fdbCluster.GetPrimary().
GetCluster().
GetSynchronizationMode() ==
fdbv1beta2.SynchronizationModeLocal {
knobRolloutTimeout += int(
fdbCluster.GetPrimary().GetCluster().GetLockDuration().Seconds() * 2,
)
}

CheckKnobRollout(
fdbCluster,
newGeneralCustomParameters,
newStorageCustomParameters,
normalKnobRolloutTimeoutSeconds+int(
fdbCluster.GetPrimary().GetCluster().GetLockDuration().Seconds(),
),
knobRolloutTimeout,
totalGeneralProcessCount,
totalStorageProcessCount,
)
Expand Down
55 changes: 53 additions & 2 deletions internal/coordination/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ func UpdateGlobalCoordinationState(
}

processes := GetProcessesFromProcessMap(processGroup.ProcessGroupID, processesMap)

var excluded bool
for _, process := range processes {
excluded = excluded || process.Excluded
Expand All @@ -418,33 +417,85 @@ func UpdateGlobalCoordinationState(
// exclusion timestamp set or because the processes are excluded.
if !(processGroup.IsExcluded() || excluded) {
if _, ok := pendingForExclusion[processGroup.ProcessGroupID]; !ok {
logger.V(1).
Info("Adding to pendingForExclusion", "processGroupID", processGroup.ProcessGroupID, "reason", "process group is marked for removal but not excluded")
updatesPendingForExclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
}

if _, ok := pendingForInclusion[processGroup.ProcessGroupID]; !ok {
logger.V(1).
Info("Adding to pendingForInclusion", "processGroupID", processGroup.ProcessGroupID, "reason", "process group is marked for removal but not excluded")
updatesPendingForInclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
}
} else {
reason := "process group is excluded and marked for removal"
// Check if the process group is present in pendingForExclusion or readyForExclusion.
// If so, add them to the set to remove those entries as the process is already excluded.
if _, ok := pendingForExclusion[processGroup.ProcessGroupID]; ok {
logger.V(1).
Info("Removing from pendingForExclusion", "processGroupID", processGroup.ProcessGroupID, "reason", reason)
updatesPendingForExclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionDelete
}

if _, ok := readyForExclusion[processGroup.ProcessGroupID]; ok {
logger.V(1).
Info("Removing from readyForExclusion", "processGroupID", processGroup.ProcessGroupID, "reason", reason)
updatesReadyForExclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionDelete
}

// Ensure the process is added to the pending for inclusion list.
if _, ok := pendingForInclusion[processGroup.ProcessGroupID]; !ok {
logger.V(1).
Info("Adding to pendingForInclusion", "processGroupID", processGroup.ProcessGroupID, "reason", reason)
updatesPendingForInclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
}

if processGroup.ExclusionSkipped {
if _, ok := readyForInclusion[processGroup.ProcessGroupID]; !ok {
logger.V(1).
Info("Adding to readyForInclusion", "processGroupID", processGroup.ProcessGroupID, "reason", reason)
updatesReadyForInclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
}
}

// if the process group is excluded, we don't need to restart it.
if _, ok := pendingForRestart[processGroup.ProcessGroupID]; ok {
logger.V(1).
Info("Removing from pendingForRestart", "processGroupID", processGroup.ProcessGroupID, "reason", reason)
updatesPendingForRestart[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionDelete
}

if _, ok := readyForRestart[processGroup.ProcessGroupID]; ok {
logger.V(1).
Info("Removing from readyForRestart", "processGroupID", processGroup.ProcessGroupID, "reason", reason)
updatesReadyForRestart[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionDelete
}
}

// If the process group is stuck in terminating, we can add it to the ready for inclusion list.
if processGroup.GetConditionTime(fdbv1beta2.ResourcesTerminating) != nil {
if _, ok := pendingForInclusion[processGroup.ProcessGroupID]; !ok {
logger.V(1).
Info("Adding to pendingForInclusion and readyForInclusion", "processGroupID", processGroup.ProcessGroupID, "reason", "process group is marked for removal and in terminating")
updatesPendingForInclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
updatesReadyForInclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
}

// If the process group is marked for removal and the resources are stuck in terminating or the processes are not running, we should
// remove them from the restart list, because there are no processes to restart.
if processGroup.GetConditionTime(fdbv1beta2.MissingProcesses) != nil {
if _, ok := pendingForRestart[processGroup.ProcessGroupID]; ok {
logger.V(1).
Info("Removing from pendingForRestart", "processGroupID", processGroup.ProcessGroupID, "reason", "process group is marked for removal")
updatesPendingForRestart[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionDelete
}

if _, ok := readyForRestart[processGroup.ProcessGroupID]; ok {
logger.V(1).
Info("Removing from readyForRestart", "processGroupID", processGroup.ProcessGroupID, "reason", "process group is marked for removal")
updatesReadyForRestart[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionDelete
}
}
}

addresses, ok := processAddresses[processGroup.ProcessGroupID]
Expand All @@ -457,7 +508,7 @@ func UpdateGlobalCoordinationState(
continue
}

// If the process groups is missing long enough to be ignored, ensure that it's removed from the pending
// If the process group is missing long enough to be ignored, ensure that it's removed from the pending
// and the ready list.
if processGroup.GetConditionTime(fdbv1beta2.IncorrectCommandLine) != nil &&
!restarts.ShouldBeIgnoredBecauseMissing(logger, cluster, processGroup) {
Expand Down
3 changes: 2 additions & 1 deletion internal/restarts/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,6 @@ func ShouldBeIgnoredBecauseMissing(
return true
}

return false
// If a process group is stuck in terminating we don't want to block further actions because of that.
return processGroup.GetConditionTime(fdbv1beta2.ResourcesTerminating) != nil
}