Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 185 additions & 8 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@

use backon::BackoffBuilder;
use educe::Educe;
use futures::{stream::BoxStream, Stream, StreamExt};
use futures::{poll, stream::BoxStream, Stream, StreamExt};
use kube_client::{
api::{ListParams, Resource, ResourceExt, VersionMatch, WatchEvent, WatchParams},
core::{metadata::PartialObjectMeta, ObjectList, Selector},
error::ErrorResponse,
Api, Error as ClientErr,
};
use serde::de::DeserializeOwned;
use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration};
use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, task::Poll, time::Duration};
use thiserror::Error;
use tracing::{debug, error, warn};

Expand Down Expand Up @@ -107,6 +107,16 @@
objects: VecDeque<K>,
last_bookmark: Option<String>,
},
/// Completed the first page of the LIST operation, now transitioning to the watch phase
/// while continuing to list subsequent pages
WatchedInitPage {
continue_token: Option<String>,
objects: VecDeque<K>,
last_bookmark: Option<String>,
#[educe(Debug(ignore))]
stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
stream_events: VecDeque<WatchEvent<K>>,
},
/// Kubernetes 1.27 Streaming Lists
/// The initial watch is in progress
InitialWatch {
Expand All @@ -115,6 +125,14 @@
},
/// The initial LIST was successful, so we should move on to starting the actual watch.
InitListed { resource_version: String },
/// After completing the list operation, process the backlog of cached events from the watch
/// stream.
WatchedInitListed {
resource_version: String,
#[educe(Debug(ignore))]
stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
stream_events: VecDeque<WatchEvent<K>>,
},
/// The watch is in progress, from this point we just return events from the server.
///
/// If the connection is disrupted then we propagate the error but try to restart the watch stream by
Expand Down Expand Up @@ -183,6 +201,11 @@
/// When using this mode, you can configure the `page_size` on the watcher.
#[default]
ListWatch,
/// List first, Starts watching after retrieving the first page of the list.
///
/// Suitable for clusters with a large number of resources where list operations are slow.
/// Prevents resource version expiration caused by waiting for the entire list operation to complete before starting the watch.
ListWatchParallel,
/// Kubernetes 1.27 Streaming Lists
///
/// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
Expand Down Expand Up @@ -461,11 +484,13 @@
{
match state {
State::Empty => match wc.initial_list_strategy {
InitialListStrategy::ListWatch => (Some(Ok(Event::Init)), State::InitPage {
continue_token: None,
objects: VecDeque::default(),
last_bookmark: None,
}),
InitialListStrategy::ListWatch | InitialListStrategy::ListWatchParallel => {

Check warning on line 487 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L487

Added line #L487 was not covered by tests
(Some(Ok(Event::Init)), State::InitPage {
continue_token: None,
objects: VecDeque::default(),
last_bookmark: None,
})
}
InitialListStrategy::StreamingList => match api.watch(&wc.to_watch_params(), "0").await {
Ok(stream) => (None, State::InitialWatch { stream }),
Err(err) => {
Expand Down Expand Up @@ -499,6 +524,110 @@
}
let mut lp = wc.to_list_params();
lp.continue_token = continue_token;
match api.list(&lp).await {
Ok(list) => {
let last_bookmark = list.metadata.resource_version.filter(|s| !s.is_empty());
let continue_token = list.metadata.continue_.filter(|s| !s.is_empty());
if last_bookmark.is_none() && continue_token.is_none() {
return (Some(Err(Error::NoResourceVersion)), State::Empty);

Check warning on line 532 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L532

Added line #L532 was not covered by tests
}
match wc.initial_list_strategy {
InitialListStrategy::ListWatch => {

Check warning on line 535 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L535

Added line #L535 was not covered by tests
// we have drained the last page - move on to next stage
(None, State::InitPage {
continue_token,
objects: list.items.into_iter().collect(),
last_bookmark,
})
}
InitialListStrategy::ListWatchParallel => {

Check warning on line 543 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L543

Added line #L543 was not covered by tests
// start watch
match api
.watch(&wc.to_watch_params(), &last_bookmark.clone().unwrap())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this unwrap fallible?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a preceding last_bookmark.is_none() check, so this unwrap is guaranteed to be safe here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The preceding check only fails if continue is also none.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right. I will fix it later

.await

Check warning on line 547 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L545-L547

Added lines #L545 - L547 were not covered by tests
{
Ok(stream) => (None, State::WatchedInitPage {
continue_token,
objects: list.items.into_iter().collect(),
Copy link
Contributor

@SOF3 SOF3 Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VecDeque<T> implements From<Vec<T>>

last_bookmark: last_bookmark,

Check failure on line 552 in kube-runtime/src/watcher.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

redundant field names in struct initialization

error: redundant field names in struct initialization --> kube-runtime/src/watcher.rs:552:37 | 552 | ... last_bookmark: last_bookmark, | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `last_bookmark` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_field_names note: the lint level is defined here --> kube-runtime/src/lib.rs:11:9 | 11 | #![deny(clippy::all)] | ^^^^^^^^^^^ = note: `#[deny(clippy::redundant_field_names)]` implied by `#[deny(clippy::all)]`
stream,
stream_events: VecDeque::default(),

Check warning on line 554 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L549-L554

Added lines #L549 - L554 were not covered by tests
}),
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch initlist error with 403: {err:?}");

Check warning on line 558 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L556-L558

Added lines #L556 - L558 were not covered by tests
} else {
debug!("watch initlist error: {err:?}");

Check warning on line 560 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L560

Added line #L560 was not covered by tests
}
(Some(Err(Error::WatchStartFailed(err))), State::Empty)

Check warning on line 562 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L562

Added line #L562 was not covered by tests
}
}
}
_ => unreachable!(),

Check failure on line 566 in kube-runtime/src/watcher.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

wildcard matches only a single variant and will also match any future added variants

error: wildcard matches only a single variant and will also match any future added variants --> kube-runtime/src/watcher.rs:566:25 | 566 | _ => unreachable!(), | ^ help: try: `InitialListStrategy::StreamingList` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#match_wildcard_for_single_variants note: the lint level is defined here --> kube-runtime/src/lib.rs:12:9 | 12 | #![deny(clippy::pedantic)] | ^^^^^^^^^^^^^^^^ = note: `#[deny(clippy::match_wildcard_for_single_variants)]` implied by `#[deny(clippy::pedantic)]`
}
}
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch list error with 403: {err:?}");

Check warning on line 571 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L569-L571

Added lines #L569 - L571 were not covered by tests
} else {
debug!("watch list error: {err:?}");

Check warning on line 573 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L573

Added line #L573 was not covered by tests
}
(Some(Err(Error::InitialListFailed(err))), State::Empty)

Check warning on line 575 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L575

Added line #L575 was not covered by tests
}
}
}
State::WatchedInitPage {
continue_token,
mut objects,
last_bookmark,
mut stream,
mut stream_events,
} => {
if let Some(next) = objects.pop_front() {
return (Some(Ok(Event::InitApply(next))), State::WatchedInitPage {
continue_token,
objects,
last_bookmark,
stream,
stream_events,

Check warning on line 592 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L579-L592

Added lines #L579 - L592 were not covered by tests
});
}
// Attempt to asynchronously fetch events from the Watch Stream and cache them.
// If an error occurs at this stage, restart the list operation.
loop {
Copy link
Member

@clux clux Apr 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unfortunately awkward and complex 😢
The step function (which were intended to do a single step), can now potentially loop for a long time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This step only buffers incremental events from the Watch stream into memory cache. The polling operation terminates immediately when no new events are available. Since the I/O operations are asynchronous and non-blocking, their overhead is negligible compared to the inherent latency of list operations. Therefore, this does not significantly increase the total list+watch processing time.

let event = poll!(stream.next());
match event {
Poll::Ready(Some(Ok(WatchEvent::Error(err)))) => {
return (Some(Err(Error::WatchError(err))), State::default());

Check warning on line 601 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L597-L601

Added lines #L597 - L601 were not covered by tests
}
Poll::Ready(Some(Ok(event))) => {
stream_events.push_back(event);

Check warning on line 604 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L603-L604

Added lines #L603 - L604 were not covered by tests
}
Poll::Ready(Some(Err(err))) => {
return (Some(Err(Error::WatchFailed(err))), State::default());

Check warning on line 607 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L606-L607

Added lines #L606 - L607 were not covered by tests
}
Poll::Ready(None) => {

Check warning on line 609 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L609

Added line #L609 was not covered by tests
// Stream ended, we need to restart the list operation
return (None, State::default());

Check warning on line 611 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L611

Added line #L611 was not covered by tests
}
Poll::Pending => {
break;

Check warning on line 614 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L613-L614

Added lines #L613 - L614 were not covered by tests
}
}
}
// check if we need to perform more pages
if continue_token.is_none() {
if let Some(resource_version) = last_bookmark {

Check warning on line 620 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L619-L620

Added lines #L619 - L620 were not covered by tests
// we have drained the last page - move on to next stage
return (Some(Ok(Event::InitDone)), State::WatchedInitListed {
resource_version,
stream,
stream_events,

Check warning on line 625 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L622-L625

Added lines #L622 - L625 were not covered by tests
});
}
}
let mut lp = wc.to_list_params();
lp.continue_token = continue_token;

Check warning on line 630 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L629-L630

Added lines #L629 - L630 were not covered by tests
match api.list(&lp).await {
Ok(list) => {
let last_bookmark = list.metadata.resource_version.filter(|s| !s.is_empty());
Expand All @@ -508,10 +637,12 @@
}
// Buffer page here, causing us to return to this enum branch (State::InitPage)
// until the objects buffer has drained
(None, State::InitPage {
(None, State::WatchedInitPage {

Check warning on line 640 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L640

Added line #L640 was not covered by tests
continue_token,
objects: list.items.into_iter().collect(),
last_bookmark,
stream,
stream_events,

Check warning on line 645 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L644-L645

Added lines #L644 - L645 were not covered by tests
})
}
Err(err) => {
Expand Down Expand Up @@ -589,6 +720,52 @@
}
}
}
State::WatchedInitListed {
resource_version,
stream,
mut stream_events,
} => {
if let Some(event) = stream_events.pop_front() {
match event {
WatchEvent::Added(obj) | WatchEvent::Modified(obj) => {
let resource_version = obj.resource_version().unwrap_or_default();
return if resource_version.is_empty() {
(Some(Err(Error::NoResourceVersion)), State::default())

Check warning on line 733 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L723-L733

Added lines #L723 - L733 were not covered by tests
} else {
(Some(Ok(Event::Apply(obj))), State::WatchedInitListed {
resource_version,
stream,
stream_events,

Check warning on line 738 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L735-L738

Added lines #L735 - L738 were not covered by tests
})
};
}
WatchEvent::Deleted(obj) => {
let resource_version = obj.resource_version().unwrap_or_default();
return if resource_version.is_empty() {
(Some(Err(Error::NoResourceVersion)), State::default())

Check warning on line 745 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L742-L745

Added lines #L742 - L745 were not covered by tests
} else {
(Some(Ok(Event::Delete(obj))), State::WatchedInitListed {
resource_version,
stream,
stream_events,

Check warning on line 750 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L747-L750

Added lines #L747 - L750 were not covered by tests
})
};
}
WatchEvent::Bookmark(bm) => {
return (None, State::WatchedInitListed {
resource_version: bm.metadata.resource_version,
stream,
stream_events,

Check warning on line 758 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L754-L758

Added lines #L754 - L758 were not covered by tests
})
}
_ => unreachable!(),

Check failure on line 761 in kube-runtime/src/watcher.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

wildcard matches only a single variant and will also match any future added variants

error: wildcard matches only a single variant and will also match any future added variants --> kube-runtime/src/watcher.rs:761:21 | 761 | _ => unreachable!(), | ^ help: try: `WatchEvent::Error(_)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#match_wildcard_for_single_variants
}
}
(None, State::Watching {
resource_version,
stream,

Check warning on line 766 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L764-L766

Added lines #L764 - L766 were not covered by tests
})
}
State::Watching {
resource_version,
mut stream,
Expand Down
Loading