Skip to content

enhancement(kubernetes_logs source): add end-to-end acknowledgement support#25325

Draft
connoryy wants to merge 9 commits intovectordotdev:masterfrom
connoryy:connor/upstream-k8s-logs-acks
Draft

enhancement(kubernetes_logs source): add end-to-end acknowledgement support#25325
connoryy wants to merge 9 commits intovectordotdev:masterfrom
connoryy:connor/upstream-k8s-logs-acks

Conversation

@connoryy
Copy link
Copy Markdown

Summary

Adds end-to-end acknowledgement support to the kubernetes_logs source. When acknowledgements are enabled (via a downstream sink), file checkpoints only advance after downstream sinks confirm event delivery. This prevents data loss when the source crashes or restarts — unacknowledged events are re-read from the checkpoint position.

Based on initial work by @ganelo (Orri), cleaned up and rebased onto current master.

Motivation

The kubernetes_logs source currently returns can_acknowledge() -> false, meaning it cannot participate in Vector's end-to-end acknowledgement system. When a downstream sink fails or the source crashes, events between the last checkpoint and the crash point are lost — the checkpoint was advanced before delivery was confirmed.

The file source already supports acknowledgements using the same OrderedFinalizer + BatchNotifier pattern. This PR brings the same capability to kubernetes_logs, which shares the underlying file_source infrastructure.

Approach

The implementation mirrors the file source's acknowledgement pattern:

  1. can_acknowledge() returns true — enables the topology's ack propagation
  2. OrderedFinalizer<FinalizerEntry> — receives ack status from downstream sinks in order
  3. BatchNotifier per batch — attached to events before emitting; downstream sinks update status on delivery
  4. Checkpoint gated on ackcheckpoints.update() only called when BatchStatus::Delivered is received
  5. Graceful shutdown — separate shutdown signal ties the finalizer stream to the checkpoint writer, ensuring all pending acks are processed before stopping

Key design decisions:

  • FinalizerEntry is defined locally (not imported from file source) since sources-kubernetes_logs doesn't depend on sources-file
  • Source::new_test() method added for mock-based testing with a pre-built Kubernetes client
  • The acknowledgements config field uses the standard SourceAcknowledgementsConfig + bool_or_struct pattern

Vector configuration

sources:
  kubernetes_logs:
    type: kubernetes_logs
    # Acknowledgements are controlled at the sink level.
    # When any downstream sink has acknowledgements enabled,
    # the source automatically participates.

No source-level configuration is needed. Acknowledgements activate automatically when a downstream sink has acknowledgements.enabled = true, via propagate_acknowledgements().

How did you test this PR?

3 new tests + 51 existing tests pass:

Test What it proves
file_start_position_server_restart_with_file_rotation_no_acknowledge Without acks: checkpoint advances immediately, no re-read after restart
file_start_position_server_restart_with_file_rotation_acknowledged With acks: checkpoint advances after ack, no duplicates after restart
checkpoint_does_not_advance_without_ack Core safety: rejected events → checkpoint does NOT advance → data re-read after restart

Tests use a mock Kubernetes client (Source::new_test()) with a configurable logs directory, following the same pattern as the file source tests.

All 51 existing kubernetes_logs tests pass unchanged.

Change Type

  • Bug fix
  • New feature
  • Dependencies
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

…upport

Wire acknowledgements through the kubernetes_logs source so file
checkpoints only advance after downstream sinks confirm delivery.
Adds SourceAcknowledgementsConfig, FinalizerEntry, and OrderedFinalizer
integration. Includes mock-based tests for the ack flow.
B1: Add Source::new_test() that accepts a pre-built Client and custom
    logs directory, bypassing kubeconfig/env-var resolution. Add
    logs_dir_override to K8sPathsProvider and path_helpers so tests
    can glob a tempdir instead of /var/log/pods.

S1: Remove dead AckingMode::Unfinalized variant and its handling code.

S3/S5: Remove SourceConfigTest trait entirely. The test now calls
       Source::new_test() directly, which is simpler and avoids
       duplicating SourceConfig's doc comments and method signatures.

S4: Add checkpoint_does_not_advance_without_ack test that verifies
    checkpoints do NOT advance when events are rejected (not acked).

N1: Combine super::super imports into a single use statement.

N2: Remove redundant #[cfg(any(test, feature = "all-integration-tests"))]
    inside the already-#[cfg(test)] module.

N3: Add Clone, Copy derives to FinalizerEntry.
Add required changelog fragment for kubernetes_logs acknowledgement
feature. Fix end-to-end acknowledgements URL in CUE documentation
to match the pattern used by all other source docs.
@connoryy connoryy requested review from a team as code owners April 29, 2026 15:55
@github-actions github-actions Bot added work in progress domain: sources Anything related to the Vector's sources domain: external docs Anything related to Vector's external, public documentation labels Apr 29, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0780694bd4

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +1046 to +1050
let (batch, receiver) = BatchNotifier::new_with_receiver();
event = event.with_batch_notifier(&batch);
let entry = FinalizerEntry {
file_id: line.file_id,
offset: line.end_offset,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Move checkpoint finalizers after partial-line merge

When acknowledgements are enabled, this attaches a batch notifier/finalizer to every raw line before parser/merge_partial_events runs, but PartialEventMergeState::add_event keeps the first partial event and drops later fragments after folding their bytes into it. Those dropped fragments still finalize their own notifiers as Delivered, so on a merged multiline message that is ultimately rejected by a sink, the first fragment reports Rejected while later fragments report Delivered, and OrderedFinalizer will still advance the checkpoint to a later offset. With auto_partial_merge enabled by default, this can skip unacknowledged data on restart.

Useful? React with 👍 / 👎.

When auto_partial_merge is enabled (the default), partial log lines are
merged into a single event. Previously, each raw line received its own
BatchNotifier before the merge step. When the merger dropped fragment
events after extracting their bytes, those fragments' finalizers fired
as Delivered, advancing the checkpoint past unacknowledged data. If the
merged event was later rejected by a sink, earlier fragments had already
been checkpointed.

Fix: transfer finalizers from each fragment to the bucket event during
merge, so the merged event carries all fragments' finalizers. The
checkpoint only advances after the complete merged event is delivered.
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 42650c5a52

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +996 to +999
// When not dealing with end-to-end acknowledgements, just
// clone the global shutdown to stop the checkpoint writer.
(None, global_shutdown.clone().map(|_| ()).boxed())
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Use lifecycle shutdown for checkpointer in non-ack mode

In the non-ack path, shutdown_checkpointer is tied to global_shutdown instead of the lifecycle slot shutdown, so when Lifecycle::run initiates shutdown because the event-processing task ends first (e.g. downstream closes), file_server.run waits forever on checkpoint_task_handle because shutdown_checkpointer never resolves until a global shutdown happens. This can deadlock source teardown/reloads outside full process shutdown; shutdown_checkpointer should be driven by the same per-lifecycle shutdown signal that stops file_server.

Useful? React with 👍 / 👎.

@connoryy connoryy marked this pull request as draft April 30, 2026 14:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: external docs Anything related to Vector's external, public documentation domain: sources Anything related to the Vector's sources work in progress

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant