Skip to content

Commit caade66

Browse files
committed
fix: fix several bugs and clean-up code
1 parent 1622e4a commit caade66

File tree

5 files changed

+360
-299
lines changed

5 files changed

+360
-299
lines changed

rustrial-k8s-object-syncer-apis/src/lib.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::HashSet;
22

33
use k8s_openapi::chrono::{SecondsFormat, Utc};
4-
use kube::{api::ObjectMeta, CustomResource};
4+
use kube::{api::ObjectMeta, CustomResource, ResourceExt};
55

66
use schemars::JsonSchema;
77
use serde::{Deserialize, Serialize};
@@ -283,15 +283,11 @@ impl ObjectSync {
283283
)
284284
}
285285

286-
pub fn all_destinations(&self) -> Vec<&DestinationStatus> {
287-
let status: &Option<ObjectSyncStatus> = &self.status;
288-
let mut all: Vec<&DestinationStatus> = Default::default();
289-
if let Some(status) = status {
290-
while let Some(destinations) = &status.destinations {
291-
all.extend(destinations.iter());
292-
}
293-
}
294-
all
286+
pub fn status_destinations(&self) -> Option<&Vec<DestinationStatus>> {
287+
self.status
288+
.as_ref()
289+
.map(|v| v.destinations.as_ref())
290+
.flatten()
295291
}
296292

297293
pub fn update_condition(&mut self, c: Condition) {
@@ -311,6 +307,20 @@ impl ObjectSync {
311307
status.update_destinations(destinations);
312308
self.status = Some(status);
313309
}
310+
311+
pub fn source_name(&self) -> &str {
312+
&self.spec.source.name
313+
}
314+
315+
/// Return the namespace of the source object, which defaults
316+
/// to the namespace of the ObjectSync object.
317+
pub fn source_namespace(&self) -> Option<String> {
318+
self.spec
319+
.source
320+
.namespace
321+
.clone()
322+
.or_else(|| self.namespace())
323+
}
314324
}
315325

316326
impl ObjectSyncStatus {

rustrial-k8s-object-syncer/src/object_sync_controller.rs

Lines changed: 108 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
use crate::{
2+
Configuration, FINALIZER,
23
errors::{ControllerError, ExtKubeApiError},
34
object_sync_modifications::ObjectSyncModifications,
4-
resource_controller::{ObjectSyncHandle, ResourceController},
5-
utils::{delete_destinations, metric_name, remove_finalizer},
6-
Configuration, FINALIZER,
5+
resource_controller::{IN_SYNC, ObjectSyncHandle, ResourceController},
6+
utils::{
7+
ObjectSyncRef, SourceRef, delete_destinations, ensure_gvk, metric_name, remove_finalizer,
8+
},
79
};
810

911
use futures::StreamExt;
1012
use k8s_openapi::api::core::v1::Namespace;
1113
use kube::{
12-
api::{ApiResource, DynamicObject, GroupVersionKind, TypeMeta},
13-
discovery, Api, Client, ResourceExt,
14+
Api, Client, ResourceExt,
15+
api::{ApiResource, DynamicObject, GroupVersionKind},
16+
discovery,
1417
};
1518
use kube_runtime::{
1619
controller::{Action, Controller},
@@ -19,9 +22,8 @@ use kube_runtime::{
1922
};
2023
use log::{debug, info};
2124
use opentelemetry::{
22-
global,
25+
KeyValue, global,
2326
metrics::{Counter, Histogram, Meter},
24-
KeyValue,
2527
};
2628
use rustrial_k8s_object_syncer_apis::{Condition, ObjectSync, SourceObject};
2729
use std::{
@@ -32,7 +34,7 @@ use std::{
3234
};
3335
use tokio::{
3436
sync::RwLock,
35-
time::{sleep, Duration},
37+
time::{Duration, sleep},
3638
};
3739

3840
const READY: &'static str = "Ready";
@@ -42,18 +44,34 @@ pub(crate) const FAILURE: &'static str = "Failure";
4244
const OBJECT_SYNC_CONTROLLER: &'static str = "object_sync_controller";
4345

4446
/// Opaque handle for a [`ObjectSync`] object's registration with a [`ResourceController`].
47+
/// Basically, this is the registration of one specific source object with the
48+
/// [`ResourceController`] of its GVK.
4549
///
50+
#[derive(Clone)]
4651
struct ObjectSyncInstance {
4752
/// Strong reference to [`ResourceController`] to make sure the corresponding
4853
/// controller is alive as long as at least one [`ObjectSync`] is registered
4954
/// with it.
5055
_resource_controller: Arc<ResourceController>,
5156
/// If async_dropped will remove the [ObjectSync] from the corresponding [ResourceController].
52-
_resource_controller_handle: ObjectSyncHandle,
57+
/// Also used to track whether [ObjectSync] spec changed (e.g. whether .spec.source.name or
58+
/// .spec.source.namespace changed).
59+
resource_controller_handle: Arc<ObjectSyncHandle>,
5360
/// The GVK of the [ObjectSync] source object, used to track whether [ObjectSync] spec changed.
5461
gvk: GroupVersionKind,
5562
}
5663

64+
impl ObjectSyncInstance {
65+
/// Check if GVK or name/namespace of source object changed.
66+
fn source_changed(&self, new: &ObjectSync, gvk: &GroupVersionKind) -> bool {
67+
let old_src = &self.resource_controller_handle.src;
68+
let new_source_namespace = new.source_namespace();
69+
self.gvk != *gvk
70+
|| old_src.name != new.spec.source.name
71+
|| Some(old_src.namespace.as_ref()) != new_source_namespace.as_deref()
72+
}
73+
}
74+
5775
/// The main controller which will spawn one [`ResourceController`] instance per
5876
/// GVK (GroupVersionKind) combination.
5977
pub(crate) struct ObjectSyncController {
@@ -63,15 +81,15 @@ pub(crate) struct ObjectSyncController {
6381
/// Weak reference to [`ResourceController`] instances needed to register [ObjectSyncInstance] instances.
6482
/// We use weak references to make sure that unused [`ResourceController`] instances are dropped
6583
/// to make sure there is no unecessary load on the Kubernetes API servers and that there are
66-
/// no stale controllers for remove API resoures (e.g. deleted CustomResourceDefinitions).
84+
/// no stale controllers for removed API resoures (e.g. deleted CustomResourceDefinitions).
6785
resource_controllers: Arc<RwLock<HashMap<GroupVersionKind, Weak<ResourceController>>>>,
6886

6987
/// Mapping of [ObjectSync] (namespace/name) to the corresponding [ObjectSyncInstance] object.
7088
/// Note, we use the peristent ID `{namespasce}/{name}` instead of the object's UID as key to make
7189
/// sure the system is eventual consistent in case the controller should ever miss any events
7290
/// caused by replacement (delete/create) of the underlying object. Practically, it should see all
7391
/// events, but let's be defensive it can't hurt.
74-
instances: Arc<RwLock<HashMap<String, ObjectSyncInstance>>>,
92+
instances: Arc<RwLock<HashMap<ObjectSyncRef, ObjectSyncInstance>>>,
7593

7694
reconcile_object_sync_count: Counter<u64>,
7795
reconcile_object_sync_duration: Histogram<u64>,
@@ -187,7 +205,7 @@ impl ObjectSyncController {
187205
);
188206
Some(gvk)
189207
// The opaque ObjectSyncInstance handle `instance` is dropped here and
190-
// its `Drop` implementation will make sure it is deregisterd from its
208+
// its `Drop` implementation will make sure it is deregistered from its
191209
// `ResourceController`.
192210
} else {
193211
warn!(
@@ -226,7 +244,7 @@ impl ObjectSyncController {
226244
event.id(),
227245
ObjectSyncInstance {
228246
_resource_controller: resource_controller,
229-
_resource_controller_handle: resource_controller_handle,
247+
resource_controller_handle: Arc::new(resource_controller_handle),
230248
gvk,
231249
},
232250
);
@@ -237,50 +255,74 @@ impl ObjectSyncController {
237255
self.configuration.client.clone()
238256
}
239257

240-
async fn delete(&self, event: &mut ObjectSyncModifications) -> Result<(), ControllerError> {
241-
// Remove from ResourceController
242-
self.remove(event).await;
243-
// Delete all remaining destinations
244-
if let Some(destinations) = event
245-
.status
246-
.as_ref()
247-
.map(|v| v.destinations.as_ref())
248-
.flatten()
249-
{
250-
let remaining = delete_destinations(self.client(), destinations).await?;
251-
if !remaining.is_empty() {
252-
let errors = remaining.len();
253-
event.update_destinations(remaining);
254-
event.replace_status(self.client()).await?;
258+
pub async fn delete_destinations(
259+
client: Client,
260+
event: &mut ObjectSyncModifications,
261+
) -> Result<(), ControllerError> {
262+
if let Some(destinations) = event.status_destinations() {
263+
let dest_count = destinations.len();
264+
let remaining = delete_destinations(client.clone(), destinations).await?;
265+
let remaining_count = remaining.len();
266+
let in_sync_condition = if remaining_count > 0 {
267+
Condition::new(
268+
IN_SYNC,
269+
Some(false),
270+
FAILURE,
271+
format!(
272+
"failed to remove {} out of {} destination objects",
273+
remaining_count,
274+
destinations.len()
275+
),
276+
)
277+
} else {
278+
Condition::new(
279+
IN_SYNC,
280+
Some(true),
281+
SUCCESS,
282+
format!(
283+
"successfully removed all {} destination objects",
284+
destinations.len()
285+
),
286+
)
287+
};
288+
event.update_condition(in_sync_condition);
289+
event.update_destinations(remaining);
290+
event.replace_status(client).await?;
291+
if remaining_count > 0 {
255292
return Err(ControllerError::DestinationRemovalError(format!(
256-
"failed to remove {} destinations of {}",
257-
errors,
293+
"failed to deleted {} out of {} destinations of {}",
294+
remaining_count,
295+
dest_count,
258296
event.id()
259297
)));
298+
} else {
299+
info!(
300+
"successfully deleted all {} destination objects of {}",
301+
dest_count,
302+
event.id()
303+
);
260304
}
261305
}
262-
info!(
263-
"successfully removed all destination objects of {}",
264-
event.id()
265-
);
306+
Ok(())
307+
}
308+
309+
async fn delete(&self, event: &mut ObjectSyncModifications) -> Result<(), ControllerError> {
310+
// Remove from ResourceController
311+
self.remove(event).await;
312+
// Delete all remaining destinations
313+
Self::delete_destinations(self.client(), event).await?;
266314
// Remove finalizer from source object.
267315
let gvk = self.get_gvk(&event.spec.source).await?;
268316
let api_resource = ApiResource::from_gvk(&gvk);
269-
let namespace = event
270-
.spec
271-
.source
272-
.namespace
273-
.clone()
274-
.or(event.namespace())
275-
.unwrap_or_else(|| "".to_string());
317+
let namespace = event.source_namespace().unwrap_or_else(|| "".to_string());
276318
let api: Api<DynamicObject> =
277319
Api::namespaced_with(self.client(), namespace.as_str(), &api_resource);
278-
match api.get(event.spec.source.name.as_str()).await {
320+
match api
321+
.get(event.source_name())
322+
.await
323+
.map(|v| ensure_gvk(v, &gvk))
324+
{
279325
Ok(mut source) => {
280-
source.types = source.types.or(Some(TypeMeta {
281-
kind: gvk.kind.clone(),
282-
api_version: gvk.api_version(),
283-
}));
284326
remove_finalizer(api, &mut source, FINALIZER).await?;
285327
}
286328
Err(e) if e.is_not_found() => (),
@@ -301,28 +343,32 @@ impl ObjectSyncController {
301343
let current_gvk = self.get_gvk(&event.spec.source).await?;
302344
// Obtain the configuration checksum and immediately release the read lock
303345
// guard again.
304-
let gvk = {
346+
let active = {
305347
let instances = self.instances.read().await;
306-
instances.get(&event.id()).map(|v| v.gvk.clone())
348+
instances.get(&event.id()).map(|v| v.clone())
307349
};
308-
let condition = if let Some(gvk) = gvk {
309-
if gvk != current_gvk {
310-
// GVK changed so remove from current ResourceController
350+
let condition = if let Some(active) = active {
351+
if active.source_changed(event, &current_gvk) {
352+
// GVK or source object (name or namespace) changed so remove from current ResourceController
311353
self.remove(event).await;
312-
// and add it again with its GVK
354+
// Cleanup destinations as source changed.
355+
Self::delete_destinations(self.client(), event).await?;
356+
// and add it again with new GVK and source object
313357
self.add(&event, current_gvk.clone()).await?;
314358
Some(Condition::new(
315359
READY,
316360
Some(true),
317361
SUCCESS,
318362
format!(
319-
"Successfully moved from ResourceController {}/{}/{} to {}/{}/{}",
320-
gvk.group,
321-
gvk.version,
322-
gvk.kind,
363+
"Successfully moved from ResourceController {}/{}/{} to {}/{}/{} and updated source from {} to {}",
364+
active.gvk.group,
365+
active.gvk.version,
366+
active.gvk.kind,
323367
current_gvk.group,
324368
current_gvk.version,
325-
current_gvk.kind
369+
current_gvk.kind,
370+
active.resource_controller_handle.src,
371+
SourceRef::from(&*event)
326372
),
327373
))
328374
} else {
@@ -336,8 +382,11 @@ impl ObjectSyncController {
336382
Some(true),
337383
SUCCESS,
338384
format!(
339-
"Successfully registered with ResourceController {}/{}/{}",
340-
current_gvk.group, current_gvk.version, current_gvk.kind
385+
"Successfully registered with ResourceController {}/{}/{} and source {}",
386+
current_gvk.group,
387+
current_gvk.version,
388+
current_gvk.kind,
389+
SourceRef::from(&*event)
341390
),
342391
))
343392
};

0 commit comments

Comments
 (0)