Skip to content
Draft
3 changes: 3 additions & 0 deletions changelog.d/kubernetes_logs_acknowledgements.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `kubernetes_logs` source now supports end-to-end acknowledgements. When enabled, file checkpoints only advance after downstream sinks confirm event delivery, preventing data loss on source crashes or restarts.

authors: connoryy
38 changes: 31 additions & 7 deletions src/sources/kubernetes_logs/k8s_paths_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use k8s_openapi::api::core::v1::{Namespace, Pod};
use kube::runtime::reflector::{ObjectRef, store::Store};
use vector_lib::file_source::paths_provider::PathsProvider;

use super::path_helpers::build_pod_logs_directory;
use super::path_helpers::{build_pod_logs_directory, build_pod_logs_directory_with_root};
use crate::kubernetes::pod_manager_logic::extract_static_pod_config_hashsum;

/// A paths provider implementation that uses the state obtained from the
Expand All @@ -19,23 +19,32 @@ pub struct K8sPathsProvider {
include_paths: Vec<glob::Pattern>,
exclude_paths: Vec<glob::Pattern>,
insert_namespace_fields: bool,
/// Optional override for the pod logs root directory.
/// When `None`, defaults to `/var/log/pods`. Used in tests
/// to point at a temporary directory.
logs_dir_override: Option<PathBuf>,
}

impl K8sPathsProvider {
/// Create a new [`K8sPathsProvider`].
///
/// If `logs_dir_override` is `Some`, the given path is used as the pod logs
/// root instead of the default `/var/log/pods`. Pass `None` for production use.
pub const fn new(
pod_state: Store<Pod>,
namespace_state: Store<Namespace>,
include_paths: Vec<glob::Pattern>,
exclude_paths: Vec<glob::Pattern>,
insert_namespace_fields: bool,
logs_dir_override: Option<PathBuf>,
) -> Self {
Self {
pod_state,
namespace_state,
include_paths,
exclude_paths,
insert_namespace_fields,
logs_dir_override,
}
}
}
Expand Down Expand Up @@ -67,7 +76,8 @@ impl PathsProvider for K8sPathsProvider {
})
.flat_map(|pod| {
trace!(message = "Providing log paths for pod.", pod = ?pod.metadata.name);
let paths_iter = list_pod_log_paths(real_glob, pod.as_ref());
let paths_iter =
list_pod_log_paths(real_glob, pod.as_ref(), self.logs_dir_override.as_deref());
filter_paths(
filter_paths(paths_iter, &self.include_paths, true),
&self.exclude_paths,
Expand Down Expand Up @@ -98,7 +108,10 @@ impl PathsProvider for K8sPathsProvider {
/// See <https://github.com/vectordotdev/vector/issues/6001>
/// See <https://github.com/kubernetes/kubernetes/blob/ef3337a443b402756c9f0bfb1f844b1b45ce289d/pkg/kubelet/pod/pod_manager.go#L30-L44>
/// See <https://github.com/kubernetes/kubernetes/blob/cea1d4e20b4a7886d8ff65f34c6d4f95efcb4742/pkg/kubelet/pod/mirror_client.go#L80-L81>
fn extract_pod_logs_directory(pod: &Pod) -> Option<PathBuf> {
fn extract_pod_logs_directory(
pod: &Pod,
logs_dir_override: Option<&std::path::Path>,
) -> Option<PathBuf> {
let metadata = &pod.metadata;
let namespace = metadata.namespace.as_ref()?;
let name = metadata.name.as_ref()?;
Expand All @@ -111,7 +124,17 @@ fn extract_pod_logs_directory(pod: &Pod) -> Option<PathBuf> {
metadata.uid.as_ref()?
};

Some(build_pod_logs_directory(namespace, name, uid))
match logs_dir_override {
Some(root) => {
let root_str = root
.to_str()
.expect("non-utf8 logs dir override is not supported");
Some(build_pod_logs_directory_with_root(
root_str, namespace, name, uid,
))
}
None => Some(build_pod_logs_directory(namespace, name, uid)),
}
}

const CONTAINER_EXCLUSION_ANNOTATION_KEY: &str = "vector.dev/exclude-containers";
Expand Down Expand Up @@ -145,12 +168,13 @@ fn build_container_exclusion_patterns<'a>(
fn list_pod_log_paths<'a, G, GI>(
mut glob_impl: G,
pod: &'a Pod,
logs_dir_override: Option<&'a std::path::Path>,
) -> impl Iterator<Item = PathBuf> + 'a
where
G: FnMut(&str) -> GI + 'a,
GI: Iterator<Item = PathBuf> + 'a,
{
extract_pod_logs_directory(pod)
extract_pod_logs_directory(pod, logs_dir_override)
.into_iter()
.flat_map(move |dir| {
let dir = dir
Expand Down Expand Up @@ -300,7 +324,7 @@ mod tests {

for (pod, expected) in cases {
assert_eq!(
extract_pod_logs_directory(&pod),
extract_pod_logs_directory(&pod, None),
expected.map(PathBuf::from)
);
}
Expand Down Expand Up @@ -457,7 +481,7 @@ mod tests {
paths_to_return.into_iter().map(PathBuf::from)
};

let actual_paths: Vec<_> = list_pod_log_paths(mock_glob, &pod).collect();
let actual_paths: Vec<_> = list_pod_log_paths(mock_glob, &pod, None).collect();
let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
assert_eq!(actual_paths, expected_paths)
}
Expand Down
Loading
Loading