Skip to content

enhancement(topology): add per-sink authoritative acknowledgement control#25326

Draft
connoryy wants to merge 14 commits intovectordotdev:masterfrom
connoryy:connor/upstream-authoritative-acks
Draft

enhancement(topology): add per-sink authoritative acknowledgement control#25326
connoryy wants to merge 14 commits intovectordotdev:masterfrom
connoryy:connor/upstream-authoritative-acks

Conversation

@connoryy
Copy link
Copy Markdown

Summary

Implements per-sink "authoritative" acknowledgement control, as proposed in RFC 6517 but never built.

When a source has end-to-end acknowledgements enabled, it currently waits for all connected sinks to finalize events before acknowledging — including sinks that don't need delivery guarantees. This PR adds an authoritative field to the sink acknowledgements config. When at least one sink sets authoritative: true, only authoritative sinks participate in the ack chain. Non-authoritative sinks have their event finalizers stripped at the fan-out level, so the source's BatchNotifier resolves as soon as all authoritative sinks have processed the events.

When no sink sets authoritative: true, behavior is identical to current Vector — zero breaking changes.

Motivation

This addresses a long-standing design limitation where a slow, failing, or unreachable non-critical sink blocks acknowledgements for critical sinks:

#10983 and #11346 moved ack config from sources to sinks but explicitly did not change the wait-for-all runtime behavior. This PR completes that work.

Approach

Config

A single Option<bool> field added to AcknowledgementsConfig:

pub struct AcknowledgementsConfig {
    enabled: Option<bool>,
    authoritative: Option<bool>,  // NEW — defaults to false per RFC 6517
}

All 63 sink configs pick this up automatically via the shared AcknowledgementsConfig struct. No individual sink changes needed.

Runtime: fan-out level stripping

Non-authoritative sinks have their EventFinalizer references stripped at the fan-out point — where events are distributed to downstream components — rather than at the sink input stream. This is critical because stateful transforms (like aggregate, reduce) between the fan-out and the sink can hold events for extended periods. Sink-level stripping would be too late — the transform would hold finalizer references, blocking the BatchNotifier.

The stripping calls events.take_finalizers() in Sender::flush() for non-authoritative senders. The dropped finalizers call update_batch(EventStatus::Dropped), which is a no-op on BatchStatus.

Design decision: stripping vs RFC's "immediate status update"

RFC 6517 proposed having authoritative sinks immediately set EventStatus::Recorded on finalization. This PR takes a different approach: strip finalizers from non-authoritative paths entirely. The semantic guarantee is identical (source only waits for authoritative sinks), but stripping is simpler:

  • No changes to the finalization state machine
  • No new status variants
  • Works with any sink type without per-sink code changes
  • Naturally handles stateful transforms (finalizers never enter the non-authoritative path)

Backwards compatibility

The authoritative field defaults to false. The feature activates only when at least one sink explicitly sets authoritative: true. When inactive:

  • compute_authoritative_components() returns None
  • No strip flags are set on any fan-out sender
  • The if self.strip_finalizers branch in Sender::flush() is never taken
  • Behavior is identical to current Vector

This is enforced by the any_sink_explicitly_authoritative guard in the topology builder and verified by two dedicated backwards-compatibility tests.

Performance

When inactive (no authoritative sinks): zero per-event overhead. The strip_finalizers bool check in Sender::flush() is always false and branch-predicted away. The compute_authoritative_components() BFS runs once at topology build time.

When active: the stripping work (take_finalizers() + Arc::make_mut + ref count operations) is the same work that would happen at the sink regardless — it's just done earlier. Non-authoritative transforms and buffers no longer carry unnecessary Arc<EventFinalizer> references, slightly reducing memory pressure.

Vector configuration

sinks:
  s3_archive:
    type: aws_s3
    inputs: ["transform"]
    acknowledgements:
      enabled: true
      authoritative: true
      # Source waits for this sink before acknowledging

  console_debug:
    type: console
    inputs: ["transform"]
    acknowledgements:
      enabled: true
      # No authoritative field — defaults to false
      # Source does NOT wait for this sink

  loki_monitoring:
    type: loki
    inputs: ["transform"]
    # No acknowledgements block at all
    # Source does NOT wait for this sink

How did you test this PR?

28 tests across 4 categories:

Config defaults and merging (2 tests):

  • authoritative defaults to false for all combinations of enabled/authoritative/unset
  • Local authoritative overrides global; global falls through when local is None

Graph walk / BFS (9 tests):

  • Returns None when no sink is explicitly authoritative (backwards compat)
  • Returns None for empty config
  • Linear pipeline: all components on auth path included
  • Fan-out: shared upstream marked authoritative; dedicated non-auth branch excluded
  • Multiple authoritative sinks: both paths included
  • Chain of transforms: entire chain included
  • Diamond topology: all components included
  • Multi-component realistic config with no authoritative: None

Fan-out stripping mechanics (2 new + 8 existing):

  • Non-auth sender stripped, auth sender keeps finalizers, BatchNotifier resolves only from auth path
  • Strip flag survives pause → replace cycle
  • All 8 existing fanout tests pass unchanged

End-to-end topology (2 tests):

  • authoritative_sink_controls_ack_chain: source acks when auth sink drains, even though non-auth sink holds finalizers forever
  • authoritative_sink_blocks_ack_when_not_drained: source does NOT ack when auth sink holds finalizers, even though non-auth sink drains normally

All tests: cargo check clean, cargo test -p vector-core --lib fanout::tests (10 pass), cargo test -p vector-core config::test (7 pass), cargo test -p vector config::authoritative_tests (9 pass), cargo test -p vector topology::test::authoritative_acks (2 pass).

Change Type

  • Bug fix
  • New feature
  • Dependencies
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Add an `authoritative` field to `AcknowledgementsConfig` that controls
whether a sink participates in the source's acknowledgement chain. In
fan-out topologies, sources normally wait for ALL connected sinks before
acknowledging events. Setting `authoritative: false` on a sink strips
event finalizers at the topology layer, causing the source's BatchNotifier
to resolve without waiting for that sink.

The field defaults to the value of `enabled`, preserving full backwards
compatibility. Only explicitly setting `authoritative = false` on an
ack-enabled sink changes behavior.

This implements the "authoritative sink" concept from the original
end-to-end acknowledgement RFC (6517), addressing the long-standing
issue where slow or failing non-critical sinks block acknowledgement
of critical sinks.
Change authoritative default from enabled() to false, matching RFC 6517:
"If no sink indicates it is authoritative, all sinks must finalize the
event before an acknowledgement may be sent."

Stripping only activates when at least one sink explicitly sets
authoritative: true. When no sink has authoritative set, all sinks
participate in the ack chain (full backwards compatibility).

This means:
- No authoritative set anywhere → current behavior, no stripping
- authoritative: true on one sink → only that sink blocks source acks,
  all others have finalizers stripped
- authoritative: true on multiple sinks → source waits for all of them
Previously, finalizer stripping for non-authoritative sinks happened in
build_sinks() after the buffer, right before sink.run() processes events.
This was too late: stateful transforms like aggregate hold event clones
with finalizer references in internal state, preventing the source's
BatchNotifier from resolving even after the authoritative sink finishes.

This commit moves stripping to the Fanout level -- the point where events
are distributed to downstream components. At topology build time, a BFS
walks backward from authoritative sinks through the config graph to
determine which components are on authoritative paths. Each Fanout sender
is tagged with a strip_finalizers flag: true if the downstream component
is NOT on an authoritative path. When a Fanout distributes events, copies
destined for non-authoritative paths have their finalizers dropped in
Sender::flush() before entering the downstream buffer.

Changes:
- src/config/mod.rs: Add compute_authoritative_components() BFS method
- lib/vector-core/src/fanout.rs: Per-sender strip_finalizers flag,
  stripping in Sender::flush(), strip_flags map on Fanout for persistence
  across Replace ops, updated ControlMessage::Add to carry strip flag
- src/topology/builder.rs: Compute authoritative set in Builder::build(),
  store on TopologyPieces, remove sink-level stripping from build_sinks()
- src/topology/running.rs: authoritative_components field on
  RunningTopology, strip flag passed in setup_inputs() and
  reattach_severed_inputs() via ControlMessage::Add
- lib/vector-tap/src/controller.rs: Updated for new ControlMessage::Add
  signature (tap sinks never strip)
B1: Fix doc comment on `authoritative` field to say "Defaults to `false`
per RFC 6517" and explain backwards compatibility via
`compute_authoritative_components()` returning `None`. Remove redundant
`#[serde(default)]` attribute.

B2: Introduce `SenderSlot` struct in fanout.rs that pairs `Option<Sender>`
with `strip_finalizers: bool`, eliminating the parallel `strip_flags`
HashMap. The strip flag now travels with the slot and survives
pause/replace cycles without a separate lookup.

S1: Convert `ControlMessage::Add` from tuple variant to named-field
struct variant for clarity. Update all pattern matches in fanout.rs,
running.rs, and controller.rs.

S2: Add explanatory comment in `compute_authoritative_components()` BFS
about how named outputs (routes) are handled.

S6: Rewrite changelog to accurately state that `authoritative` defaults
to `false`, the feature only activates when at least one sink sets
`authoritative: true`, and there is zero behavioral change otherwise.

N1: Fix import ordering in fanout.rs -- group `vector_common` with
`vector_buffers` above the `crate::` import.

N6: Add `BatchStatus::Delivered` assertion to
`fanout_strip_flag_preserved_across_replace` test.
@connoryy connoryy requested a review from a team as a code owner April 29, 2026 15:55
@github-actions github-actions Bot added domain: topology Anything related to Vector's topology code domain: core Anything related to core crates i.e. vector-core, core-common, etc labels Apr 29, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c11c9dd62e

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

// sequence of control operations has been applied.
assert!(
sender.replace(Sender::new(sink)).is_none(),
slot.sender.replace(Sender::new(sink, strip)).is_none(),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Recompute strip flag when replacing fanout senders

replace always reuses the previously stored slot.strip_finalizers, so a reload that changes acknowledgement authority does not update existing fanout links that go through pause/replace (the normal path for unchanged wiring in RunningTopology::setup_inputs). In practice, toggling authoritative (or introducing the first authoritative sink) via hot reload can leave old strip behavior in place until a full restart, causing stale ack semantics where non-authoritative sinks may still block or newly authoritative paths may still be stripped.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 24e9db5991

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/config/mod.rs Outdated
Comment on lines +297 to +301
.filter(|(_, sink)| {
sink.inner
.acknowledgements()
.merge_default(&self.global.acknowledgements)
.authoritative()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Require acknowledgements.enabled for authoritative sinks

compute_authoritative_components marks sinks authoritative using only authoritative(), but source ack propagation is still gated by enabled() in propagate_acknowledgements (src/config/mod.rs:214-223). This lets a sink with acknowledgements.enabled = false and authoritative = true become authoritative, which can strip finalizers from sinks that are actually ack-enabled and make sources wait on (or prioritize) a sink that was explicitly opted out of end-to-end acknowledgements.

Useful? React with 👍 / 👎.

Comment thread src/topology/running.rs Outdated
Comment on lines +975 to +979
let strip_finalizers = self
.authoritative_components
.as_ref()
.map(|auth| !auth.contains(key))
.unwrap_or(false);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve legacy acking for pipelines without auth sinks

This strip decision is global once authoritative_components is Some: every component not in that set has finalizers removed. If one authoritative sink exists elsewhere in the topology, a source whose downstream graph contains only non-authoritative sinks will still have all of its branches stripped and can acknowledge events before any of its connected sinks finalize them, instead of retaining the previous wait-for-all behavior for that source path.

Useful? React with 👍 / 👎.

A sink with authoritative: true but enabled: false should not activate
the stripping feature. Without this check, such a sink would cause
other ack-enabled sinks to have their finalizers stripped while the
'authoritative' sink doesn't actually participate in acks (because
propagate_acks_rec only traces from enabled sinks).
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f609fead1d

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +382 to +386
.send(fanout::ControlMessage::Add {
id: ComponentKey::from(sink_id.as_str()),
sender: tap_buffer_tx,
strip_finalizers: false,
})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Strip finalizers for tap fanout attachments

When authoritative acknowledgements are enabled, tap sinks are still attached with strip_finalizers: false, so tapped events keep finalizers even though taps are not authoritative sinks. In a topology where sources wait on acknowledgements, a slow or stalled vector tap consumer can therefore hold batch finalizers and delay/override the intended "only authoritative sinks block ack" behavior. This affects runs with active tap subscriptions and can make acknowledgement latency depend on observability tooling rather than configured authoritative sinks.

Useful? React with 👍 / 👎.

…horitative sinks

Use per-edge strip decisions instead of per-component: only strip when
the upstream component IS in the authoritative set and the downstream
is NOT. This preserves legacy wait-for-all behavior for sources whose
downstream graph has no authoritative sinks, even when other pipelines
in the same topology use authoritative mode.

Also fixes tap fanout attachments: tap sinks now use per-edge stripping
so they cannot block acknowledgements in authoritative mode.
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 6257497c8d

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

pub fn merge_default(&self, other: &Self) -> Self {
let enabled = self.enabled.or(other.enabled);
Self { enabled }
let authoritative = self.authoritative.or(other.authoritative);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Detect conflicting authoritative ack defaults during merge

Merging authoritative with self.authoritative.or(other.authoritative) silently picks one value when multiple config files set different global.acknowledgements.authoritative values. Unlike enabled, there is no corresponding conflict check in GlobalOptions::merge, so contradictory defaults can be accepted and change runtime ack-chain behavior based on merge order instead of failing fast. This is user-visible in multi-file deployments that rely on deterministic global ack semantics.

Useful? React with 👍 / 👎.

@connoryy connoryy marked this pull request as draft April 30, 2026 14:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: core Anything related to core crates i.e. vector-core, core-common, etc domain: topology Anything related to Vector's topology code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Sources should only wait to ack for sinks with acks enabled

1 participant