@@ -13,7 +13,10 @@ use futures::{
13
13
} ;
14
14
use k8s_openapi:: { api:: core:: v1:: Namespace , chrono:: Utc } ;
15
15
use kube:: {
16
- api:: { ApiResource , DynamicObject , GroupVersionKind , ListParams , PostParams , TypeMeta } ,
16
+ api:: {
17
+ ApiResource , DynamicObject , GroupVersionKind , ListParams , Patch , PatchParams , PostParams ,
18
+ TypeMeta ,
19
+ } ,
17
20
Api , Client , ResourceExt ,
18
21
} ;
19
22
use kube_runtime:: {
@@ -27,7 +30,8 @@ use opentelemetry::{
27
30
KeyValue ,
28
31
} ;
29
32
use rustrial_k8s_object_syncer_apis:: {
30
- Condition , DestinationStatus , ObjectRevision , ObjectSync , ObjectSyncSpec , API_GROUP ,
33
+ Condition , DestinationStatus , ObjectRevision , ObjectSync , ObjectSyncSpec , SyncStrategy ,
34
+ API_GROUP ,
31
35
} ;
32
36
use std:: {
33
37
borrow:: BorrowMut ,
@@ -212,27 +216,27 @@ impl ResourceControllerImpl {
212
216
fn expected_destinations < ' a > (
213
217
& self ,
214
218
event : & ' a ObjectSyncModifications ,
215
- ) -> impl Iterator < Item = ( String , String ) > + ' a {
219
+ ) -> impl Iterator < Item = ( String , String , Option < SyncStrategy > ) > + ' a {
216
220
let spec: & ObjectSyncSpec = & event. spec ;
217
221
let cache = self . namespace_cache . state ( ) ;
218
222
spec. destinations . iter ( ) . flat_map ( move |d| {
219
- let mut tmp: Vec < ( String , String ) > = Default :: default ( ) ;
223
+ let mut tmp: Vec < ( String , String , Option < SyncStrategy > ) > = Default :: default ( ) ;
220
224
if d. applies_to_all_namespaces ( ) {
221
225
for ns in cache. iter ( ) {
222
226
// Make sure we skip deleted namespaces, as otherwise the finalizers on the synced
223
227
// destination objects will prevent the namespace from being deleted.
224
228
if ns. metadata . deletion_timestamp . is_none ( ) {
225
- if let Some ( x ) = d. applies_to ( event, ns. name ( ) . as_str ( ) ) {
226
- tmp. push ( x ) ;
229
+ if let Some ( ( ns , name ) ) = d. applies_to ( event, ns. name ( ) . as_str ( ) ) {
230
+ tmp. push ( ( ns , name , d . strategy ) ) ;
227
231
}
228
232
}
229
233
}
230
- } else if let Some ( x ) = d. applies_to ( event, d. namespace . as_str ( ) ) {
234
+ } else if let Some ( ( namespace , name ) ) = d. applies_to ( event, d. namespace . as_str ( ) ) {
231
235
if let Some ( ns) = cache. iter ( ) . find ( |ns| ns. name ( ) == d. namespace ) {
232
236
// Make sure we skip deleted namespaces, as otherwise the finalizers on the synced
233
237
// destination objects will prevent the namespace from being deleted.
234
238
if ns. metadata . deletion_timestamp . is_none ( ) {
235
- tmp. push ( x ) ;
239
+ tmp. push ( ( namespace , name , d . strategy ) ) ;
236
240
}
237
241
}
238
242
}
@@ -280,8 +284,6 @@ impl ResourceControllerImpl {
280
284
}
281
285
template. metadata . namespace = Some ( d. namespace . clone ( ) ) ;
282
286
template. metadata . name = Some ( d. name . clone ( ) ) ;
283
- template. metadata . uid = Default :: default ( ) ;
284
- template. metadata . resource_version = Default :: default ( ) ;
285
287
template. metadata . generation = Default :: default ( ) ;
286
288
template. metadata . generate_name = Default :: default ( ) ;
287
289
template. metadata . managed_fields = Default :: default ( ) ;
@@ -293,6 +295,8 @@ impl ResourceControllerImpl {
293
295
let api = self . namespaced_api ( d. namespace . as_str ( ) ) ;
294
296
let mut retry_attempts = 3i32 ;
295
297
while retry_attempts > 0 {
298
+ template. metadata . uid = Default :: default ( ) ;
299
+ template. metadata . resource_version = Default :: default ( ) ;
296
300
retry_attempts -= 1 ;
297
301
match api. get ( d. name . as_str ( ) ) . await {
298
302
Ok ( mut current) => {
@@ -317,11 +321,24 @@ impl ResourceControllerImpl {
317
321
resource_version : current. resource_version ( ) ,
318
322
} ;
319
323
if & Some ( dst_version) != & d. synced_version
320
- || & Some ( src_version. clone ( ) ) != & d. source_version
324
+ || & Some ( & src_version) != & d. source_version . as_ref ( )
321
325
{
322
326
template. metadata . uid = current. uid ( ) ;
323
327
template. metadata . resource_version = current. resource_version ( ) ;
324
- match api. replace ( d. name . as_str ( ) , & pp, & template) . await {
328
+
329
+ let result = match & d. strategy ( ) {
330
+ SyncStrategy :: Replace => {
331
+ api. replace ( d. name . as_str ( ) , & pp, & template) . await
332
+ }
333
+ SyncStrategy :: Apply => {
334
+ let mut pp = PatchParams :: default ( ) ;
335
+ pp. field_manager = Some ( MANAGER . to_string ( ) ) ;
336
+ pp. force = true ;
337
+ api. patch ( d. name . as_str ( ) , & pp, & Patch :: Apply ( & template) )
338
+ . await
339
+ }
340
+ } ;
341
+ match result {
325
342
Ok ( updated) => {
326
343
d. synced_version = Some ( ObjectRevision {
327
344
uid : updated. uid ( ) ,
@@ -410,22 +427,32 @@ impl ResourceControllerImpl {
410
427
. flatten ( )
411
428
. unwrap_or_default ( ) ;
412
429
let mut expected_destinations: Vec < DestinationStatus > = Default :: default ( ) ;
413
- for ( dst_namespace, dst_name) in self . expected_destinations ( event) {
414
- let expected_dst = DestinationStatus {
430
+ for ( dst_namespace, dst_name, strategy ) in self . expected_destinations ( event) {
431
+ let mut expected_dst = DestinationStatus {
415
432
name : dst_name,
416
433
namespace : dst_namespace,
417
434
source_version : None ,
418
435
synced_version : None ,
419
436
group : self . gvk . group . clone ( ) ,
420
437
version : self . gvk . version . clone ( ) ,
421
438
kind : self . gvk . kind . clone ( ) ,
439
+ strategy,
422
440
} ;
423
- let expected_dst = stale_destinations
441
+ if let Some ( status ) = stale_destinations
424
442
. iter ( )
425
443
. find ( |d| Self :: is_same_destination ( d, & expected_dst) )
426
- . map ( |d| ( * d) . clone ( ) )
427
- . unwrap_or ( expected_dst) ;
428
- stale_destinations. retain ( |d| !Self :: is_same_destination ( d, & expected_dst) ) ;
444
+ {
445
+ // If strategy (sync config) changed do not set version to make sure the destination
446
+ // object is update. Note, this is required as we cannot track the ObjectSync's resourceVersion
447
+ // in its own status as this would lead to an infinit reconciliation cycle.
448
+ if status. strategy ( ) == expected_dst. strategy ( ) {
449
+ expected_dst. source_version = status. source_version . clone ( ) ;
450
+ expected_dst. synced_version = status. synced_version . clone ( ) ;
451
+ }
452
+ // As destination is in set of expected destinations, remove it from the set of
453
+ // stale destinations.
454
+ stale_destinations. retain ( |d| !Self :: is_same_destination ( d, & expected_dst) ) ;
455
+ }
429
456
expected_destinations. push ( expected_dst) ;
430
457
}
431
458
// 1. Remove stale destinations.
0 commit comments