Skip to content

Commit 3b68c9b

Browse files
committed
- Fix a logic bug in tracking of new namespaces, which caused long delays before objects were synced to new namespaces.
- Fix a bug in condition update logic, which caused the condition message to not be updated on removal of destinations. - Remove finalizer from deleted destination objects to make sure they are properly removed otherwise they could block namespace removal. - Skip destinations in deleted namespaces when building the set of expected destinations to make sure we do not place destinations (with finalizers) into deleted namespaces as this would block namespace removal.
1 parent 2d08f3b commit 3b68c9b

File tree

2 files changed

+40
-10
lines changed

2 files changed

+40
-10
lines changed

rustrial-k8s-object-syncer-apis/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ impl Destination {
129129
self.namespace.as_str() == namespace || self.applies_to_all_namespaces()
130130
}
131131

132+
/// Check whether this [`Destination`] applies to given namespace and if so
133+
/// returns a tuple `(namespace,name)`.
132134
pub fn applies_to(&self, obj: &ObjectSync, namespace: &str) -> Option<(String, String)> {
133135
let spec: &ObjectSyncSpec = &obj.spec;
134136
let src_name = spec.source.name.as_str();

rustrial-k8s-object-syncer/src/resource_controller.rs

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,22 @@ impl ResourceControllerImpl {
219219
let mut tmp: Vec<(String, String)> = Default::default();
220220
if d.applies_to_all_namespaces() {
221221
for ns in cache.iter() {
222-
if let Some(x) = d.applies_to(event, ns.name().as_str()) {
223-
tmp.push(x);
222+
// Make sure we skip deleted namespaces, as otherwise the finalizers on the synced
223+
// destination objects will prevent the namespace from being deleted.
224+
if ns.metadata.deletion_timestamp.is_none() {
225+
if let Some(x) = d.applies_to(event, ns.name().as_str()) {
226+
tmp.push(x);
227+
}
224228
}
225229
}
226230
} else if let Some(x) = d.applies_to(event, d.namespace.as_str()) {
227-
tmp.push(x);
231+
if let Some(ns) = cache.iter().find(|ns| ns.name() == d.namespace) {
232+
// Make sure we skip deleted namespaces, as otherwise the finalizers on the synced
233+
// destination objects will prevent the namespace from being deleted.
234+
if ns.metadata.deletion_timestamp.is_none() {
235+
tmp.push(x);
236+
}
237+
}
228238
}
229239
tmp
230240
})
@@ -253,7 +263,7 @@ impl ResourceControllerImpl {
253263
source.name()
254264
),
255265
);
256-
let mut upserted = 0usize;
266+
let mut changed = false;
257267
let mut pp = PostParams::default();
258268
pp.field_manager = Some(MANAGER.to_string());
259269

@@ -285,7 +295,23 @@ impl ResourceControllerImpl {
285295
while retry_attempts > 0 {
286296
retry_attempts -= 1;
287297
match api.get(d.name.as_str()).await {
288-
Ok(current) => {
298+
Ok(mut current) => {
299+
if current.metadata.deletion_timestamp.is_some() {
300+
// If a destination object has been deleted, remove the finalizer to make sure
301+
// it gets properly removed by the API server and that we can recreate and sync
302+
// it.
303+
match remove_finalizer(api.clone(), &mut current, FINALIZER).await {
304+
Ok(true) => debug!(
305+
"removed finalizer from deleted object {} {}/{}",
306+
self.gvk.kind, d.namespace, d.name
307+
),
308+
Err(e) => warn!(
309+
"failed to remove finalizer from deleted object {} {}/{}: {}",
310+
self.gvk.kind, d.namespace, d.name, e
311+
),
312+
_ => (),
313+
}
314+
}
289315
let dst_version = ObjectRevision {
290316
uid: current.uid(),
291317
resource_version: current.resource_version(),
@@ -302,7 +328,7 @@ impl ResourceControllerImpl {
302328
resource_version: updated.resource_version(),
303329
});
304330
d.source_version = Some(src_version.clone());
305-
upserted += 1;
331+
changed = true;
306332
observed_success += 1;
307333
break;
308334
}
@@ -333,7 +359,7 @@ impl ResourceControllerImpl {
333359
resource_version: created.resource_version(),
334360
});
335361
d.source_version = Some(src_version.clone());
336-
upserted += 1;
362+
changed = true;
337363
observed_success += 1;
338364
break;
339365
}
@@ -362,7 +388,7 @@ impl ResourceControllerImpl {
362388
}
363389
}
364390
}
365-
Ok((upserted > 0, expected_success, observed_success))
391+
Ok((changed, expected_success, observed_success))
366392
}
367393

368394
fn is_same_destination(me: &DestinationStatus, other: &DestinationStatus) -> bool {
@@ -403,7 +429,9 @@ impl ResourceControllerImpl {
403429
expected_destinations.push(expected_dst);
404430
}
405431
// 1. Remove stale destinations.
432+
let stales = stale_destinations.len();
406433
let stale_remnants = delete_destinations(self.client(), &stale_destinations).await?;
434+
let removed_count = stales - stale_remnants.len();
407435
// 2. Update status sub-resources with active destinations.
408436
// Deterministically sort destinations to avoid unnecessary updates.
409437
// Note, it is important that we update the status sub-resource before
@@ -420,7 +448,7 @@ impl ResourceControllerImpl {
420448
let (updated, expected, observed) = self
421449
.upsert_destinations(&source, destinations, &stale_remnants)
422450
.await?;
423-
if updated {
451+
if updated || removed_count > 0 {
424452
let errors = expected - observed;
425453
Some(Condition::new(
426454
IN_SYNC,
@@ -641,7 +669,7 @@ impl ResourceControllerImpl {
641669
});
642670
if let Some(ct) = &namespace.metadata.creation_timestamp {
643671
let age = Utc::now() - ct.0;
644-
if is_target_namespace && age.num_seconds() > 300 {
672+
if is_target_namespace && age.num_seconds() < 300 {
645673
// reconcile all source if namespace has been created in the last 5 minutes.
646674
let guard = futures::executor::block_on(sources.read());
647675
let tmp: Vec<ObjectRef<DynamicObject>> = guard

0 commit comments

Comments
 (0)