Skip to content

Commit 5c9d45f

Browse files
committed
- Normalize namespace configuration such that it either is empty (None) if all namespaces must be considered or contains as set of unique namespace names. This also fixes a bug where namespaced API instances were created for wildcard entries ("*").
- Remove some redundant code
1 parent a6fa355 commit 5c9d45f

File tree

2 files changed

+62
-52
lines changed

2 files changed

+62
-52
lines changed

rustrial-k8s-object-syncer/src/main.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#[macro_use]
22
extern crate log;
33

4-
use std::{collections::HashSet, iter::FromIterator};
4+
use std::collections::HashSet;
55

66
use futures::TryStreamExt;
77
use k8s_openapi::api::core::v1::Namespace;
@@ -46,18 +46,30 @@ struct Configuration {
4646

4747
impl Configuration {
4848
pub fn new(client: Client) -> Self {
49-
let watch_namespaces: Option<Vec<String>> =
50-
env_var("WATCH_NAMESPACES").map(|v| v.split(",").map(|v| v.to_string()).collect());
51-
let source_namespaces: Option<HashSet<String>> =
52-
env_var("SOURCE_NAMESPACES").map(|v| v.split(",").map(|v| v.to_string()).collect());
53-
let target_namespaces: Option<HashSet<String>> =
54-
env_var("TARGET_NAMESPACES").map(|v| v.split(",").map(|v| v.to_string()).collect());
55-
let object_sync_api = if let Some([ns]) = &watch_namespaces.as_deref() {
49+
fn normalize(hs: HashSet<String>) -> Option<HashSet<String>> {
50+
if hs.is_empty() || hs.contains("*") || hs.contains("") {
51+
None
52+
} else {
53+
Some(hs)
54+
}
55+
}
56+
let watch_namespaces: Option<HashSet<String>> = env_var("WATCH_NAMESPACES")
57+
.map(|v| normalize(v.split(",").map(|v| v.to_string()).collect()))
58+
.flatten();
59+
let source_namespaces: Option<HashSet<String>> = env_var("SOURCE_NAMESPACES")
60+
.map(|v| normalize(v.split(",").map(|v| v.to_string()).collect()))
61+
.flatten();
62+
let target_namespaces: Option<HashSet<String>> = env_var("TARGET_NAMESPACES")
63+
.map(|v| normalize(v.split(",").map(|v| v.to_string()).collect()))
64+
.flatten();
65+
let mut tmp = watch_namespaces.iter().flatten();
66+
let object_sync_api = if let (Some(ns), None) = (tmp.next(), tmp.next()) {
5667
// Optimize for the use-case where exactly one watch-namespace is provided.
5768
info!("Controller is only watching resources in namespace {}", ns);
5869
Api::<ObjectSync>::namespaced(client.clone(), ns.as_str())
5970
} else {
6071
if let Some(namespaces) = &watch_namespaces {
72+
let namespaces: Vec<&str> = namespaces.iter().map(|v| v.as_str()).collect();
6173
info!(
6274
"Controller is watching resources in namespaces: {}",
6375
namespaces.join(",")
@@ -70,7 +82,7 @@ impl Configuration {
7082
Configuration {
7183
client: client,
7284
resource_sync: object_sync_api,
73-
watch_namespaces: watch_namespaces.map(HashSet::from_iter),
85+
watch_namespaces,
7486
source_namespaces,
7587
target_namespaces,
7688
}

rustrial-k8s-object-syncer/src/resource_controller.rs

Lines changed: 41 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -305,10 +305,13 @@ impl ResourceControllerImpl {
305305
// it gets properly removed by the API server and that we can recreate and sync
306306
// it.
307307
match remove_finalizer(api.clone(), &mut current, FINALIZER).await {
308-
Ok(true) => debug!(
309-
"removed finalizer from deleted object {} {}/{}",
310-
self.gvk.kind, d.namespace, d.name
311-
),
308+
Ok(true) => {
309+
debug!(
310+
"removed finalizer from deleted object {} {}/{}",
311+
self.gvk.kind, d.namespace, d.name
312+
);
313+
continue;
314+
}
312315
Err(e) => warn!(
313316
"failed to remove finalizer from deleted object {} {}/{}: {}",
314317
self.gvk.kind, d.namespace, d.name, e
@@ -325,7 +328,6 @@ impl ResourceControllerImpl {
325328
{
326329
template.metadata.uid = current.uid();
327330
template.metadata.resource_version = current.resource_version();
328-
329331
let result = match &d.strategy() {
330332
SyncStrategy::Replace => {
331333
api.replace(d.name.as_str(), &pp, &template).await
@@ -539,6 +541,11 @@ impl ResourceControllerImpl {
539541
let me = ctx.get_ref();
540542
let namespaced_name = NamespacedName::from(&source);
541543

544+
let source_id = format!(
545+
"{}/{}/{} {}",
546+
me.gvk.group, me.gvk.version, me.gvk.kind, namespaced_name
547+
);
548+
542549
let is_source_namespace = me
543550
.configuration
544551
.source_namespaces
@@ -552,7 +559,9 @@ impl ResourceControllerImpl {
552559
guard.get(&namespaced_name).cloned()
553560
} else {
554561
None
555-
};
562+
}
563+
.filter(|c| !c.is_empty());
564+
556565
// Add type information required by server-side apply.
557566
source.types = Some(TypeMeta {
558567
api_version: me.gvk.version.clone(),
@@ -586,10 +595,7 @@ impl ResourceControllerImpl {
586595
)
587596
.await
588597
{
589-
error!(
590-
"failed to remove finalizer from {}/{}/{} {}: {}",
591-
me.gvk.group, me.gvk.version, me.gvk.kind, namespaced_name, e
592-
);
598+
error!("failed to remove finalizer from {}: {}", source_id, e);
593599
}
594600
}
595601
}
@@ -601,26 +607,15 @@ impl ResourceControllerImpl {
601607
)
602608
.await
603609
{
604-
error!(
605-
"failed to add finalizer to {}/{}/{} {}: {}",
606-
me.gvk.group, me.gvk.version, me.gvk.kind, namespaced_name, e
607-
);
610+
error!("failed to add finalizer to {}: {}", source_id, e);
608611
}
609612
for sync_configuration in sync_configurations {
610613
if let Some(mut rs) = me
611614
.get_object_sync(&namespaced_name, &sync_configuration)
612615
.await
613616
{
614617
if let Err(e) = me.reconcile_source(&mut rs, &source).await {
615-
error!(
616-
"failed to reconcile {}/{}/{} {} for {}: {}",
617-
me.gvk.group,
618-
me.gvk.version,
619-
me.gvk.kind,
620-
namespaced_name,
621-
rs.id(),
622-
e
623-
);
618+
error!("failed to reconcile {} for {}: {}", source_id, rs.id(), e);
624619
}
625620
}
626621
}
@@ -636,6 +631,10 @@ impl ResourceControllerImpl {
636631
requeue_after: Some(Duration::from_secs(300)),
637632
})
638633
} else {
634+
debug!(
635+
"ignoring {} as it is not references by any ObjectSync instance",
636+
source_id,
637+
);
639638
// No need to requeue objects not tracked by any ObjectSync configuration.
640639
Ok(ReconcilerAction {
641640
requeue_after: None,
@@ -654,6 +653,17 @@ impl ResourceControllerImpl {
654653
}
655654
}
656655

656+
/// Get an optimized API instance.
657+
fn api(&self, namespaces: &Option<HashSet<String>>) -> Api<DynamicObject> {
658+
match namespaces {
659+
Some(namespaces) if namespaces.len() == 1 => match namespaces.iter().next() {
660+
Some(ns) => Api::namespaced_with(self.client(), ns.as_str(), &self.api_resource),
661+
_ => Api::all_with(self.client(), &self.api_resource),
662+
},
663+
_ => Api::all_with(self.client(), &self.api_resource),
664+
}
665+
}
666+
657667
pub async fn start(
658668
self,
659669
reload: Receiver<()>,
@@ -663,20 +673,8 @@ impl ResourceControllerImpl {
663673
let api_resource = self.api_resource.clone();
664674
let api_resource2 = self.api_resource.clone();
665675
let api_resource3 = self.api_resource.clone();
666-
let src_api = match &self.configuration.source_namespaces {
667-
Some(hs) if hs.len() == 1 => match hs.iter().next() {
668-
Some(ns) => Api::namespaced_with(self.client(), ns.as_str(), &self.api_resource),
669-
None => Api::all_with(self.client(), &self.api_resource),
670-
},
671-
_ => Api::all_with(self.client(), &self.api_resource),
672-
};
673-
let dst_api = match &self.configuration.target_namespaces {
674-
Some(hs) if hs.len() == 1 => match hs.iter().next() {
675-
Some(ns) => Api::namespaced_with(self.client(), ns.as_str(), &self.api_resource),
676-
None => Api::all_with(self.client(), &self.api_resource),
677-
},
678-
_ => Api::all_with(self.client(), &self.api_resource),
679-
};
676+
let src_api = self.api(&self.configuration.source_namespaces);
677+
let dst_api = self.api(&self.configuration.target_namespaces);
680678
let list_params = ListParams::default();
681679
let controller = Controller::new_with(src_api, list_params, self.api_resource.clone());
682680
let sources = self.sources.clone();
@@ -691,9 +689,9 @@ impl ResourceControllerImpl {
691689
Api::<Namespace>::all(self.client()),
692690
ListParams::default(),
693691
move |namespace| {
694-
let is_target_namespace = target_namespaces2.as_ref().map_or(true, |v| {
695-
v.is_empty() || v.contains(namespace.name().as_str()) || v.contains("*")
696-
});
692+
let is_target_namespace = target_namespaces2
693+
.as_ref()
694+
.map_or(true, |v| v.contains(namespace.name().as_str()));
697695
if let Some(ct) = &namespace.metadata.creation_timestamp {
698696
let age = Utc::now() - ct.0;
699697
if is_target_namespace && age.num_seconds() < 300 {
@@ -736,9 +734,9 @@ impl ResourceControllerImpl {
736734
lp_dst,
737735
move |rs: DynamicObject| {
738736
let namespace = rs.namespace().unwrap_or_else(|| "".to_string());
739-
let is_target_namespace = target_namespaces.as_ref().map_or(true, |v| {
740-
v.is_empty() || v.contains(namespace.as_str()) || v.contains("*")
741-
});
737+
let is_target_namespace = target_namespaces
738+
.as_ref()
739+
.map_or(true, |v| v.contains(namespace.as_str()));
742740
if let Some(annotation) =
743741
rs.annotations().get(source_object_annotation_key.as_str())
744742
{

0 commit comments

Comments
 (0)