Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/authoritative_sink_acks.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added `authoritative` field to sink acknowledgements configuration. The field defaults to `false`; the feature only activates when at least one sink explicitly sets `authoritative: true`. When active, only **authoritative** sinks block source acknowledgements -- non-authoritative sinks have their event finalizers stripped so the source can acknowledge events as soon as all authoritative sinks have processed them. This prevents non-critical sinks (console, metrics, etc.) from blocking acknowledgement of critical sinks. When no sink sets `authoritative: true`, there is zero behavioral change from previous versions.

authors: connoryy
139 changes: 136 additions & 3 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ impl From<SourceAcknowledgementsConfig> for AcknowledgementsConfig {
fn from(config: SourceAcknowledgementsConfig) -> Self {
Self {
enabled: config.enabled,
authoritative: None,
}
}
}
Expand All @@ -365,25 +366,71 @@ pub struct AcknowledgementsConfig {
///
/// [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements
enabled: Option<bool>,

/// Whether this sink is authoritative for acknowledgements.
///
/// When a source has end-to-end acknowledgements enabled, it waits for all
/// **authoritative** sinks to finalize events before acknowledging them.
/// Non-authoritative sinks have their finalizers stripped, so the source
/// does not wait for them.
///
/// Defaults to `false` per RFC 6517. Backwards compatibility is preserved
/// because `compute_authoritative_components()` returns `None` when no sink
/// has `authoritative: true`, causing all sinks to participate in the ack
/// chain as before.
authoritative: Option<bool>,
}

impl AcknowledgementsConfig {
pub const DEFAULT: Self = Self { enabled: None };
pub const DEFAULT: Self = Self {
enabled: None,
authoritative: None,
};

#[must_use]
pub fn merge_default(&self, other: &Self) -> Self {
let enabled = self.enabled.or(other.enabled);
Self { enabled }
let authoritative = self.authoritative.or(other.authoritative);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Detect conflicting authoritative ack defaults during merge

Merging authoritative with self.authoritative.or(other.authoritative) silently picks one value when multiple config files set different global.acknowledgements.authoritative values. Unlike enabled, there is no corresponding conflict check in GlobalOptions::merge, so contradictory defaults can be accepted and change runtime ack-chain behavior based on merge order instead of failing fast. This is user-visible in multi-file deployments that rely on deterministic global ack semantics.

Useful? React with 👍 / 👎.

Self {
enabled,
authoritative,
}
}

pub fn enabled(&self) -> bool {
self.enabled.unwrap_or(false)
}

/// Returns whether this sink is authoritative for acknowledgements.
///
/// Defaults to `false` per RFC 6517. Stripping only activates when at
/// least one sink in the topology has `authoritative: true` explicitly
/// set, because `compute_authoritative_components()` returns `None`
/// otherwise, preserving backwards compatibility.
pub fn authoritative(&self) -> bool {
self.authoritative.unwrap_or(false)
}

/// Returns whether the `authoritative` field has been explicitly set to `true`.
pub fn is_explicitly_authoritative(&self) -> bool {
self.authoritative == Some(true)
}

/// Create a new config with the given enabled and authoritative values.
pub const fn new(enabled: Option<bool>, authoritative: Option<bool>) -> Self {
Self {
enabled,
authoritative,
}
}
}

impl From<Option<bool>> for AcknowledgementsConfig {
fn from(enabled: Option<bool>) -> Self {
Self { enabled }
Self {
enabled,
authoritative: None,
}
}
}

Expand Down Expand Up @@ -711,4 +758,90 @@ mod test {
let output = SourceOutput::new_maybe_logs(DataType::Log, definition.clone());
assert_eq!(output.schema_definition(true), Some(definition));
}

#[test]
fn ack_config_authoritative_defaults_to_false() {
// Default: both None -> enabled=false, authoritative=false
let config = AcknowledgementsConfig::DEFAULT;
assert!(!config.enabled());
assert!(!config.authoritative());
assert!(!config.is_explicitly_authoritative());

// Enabled true, authoritative unset -> authoritative=false (RFC 6517 default)
let config = AcknowledgementsConfig::from(true);
assert!(config.enabled());
assert!(!config.authoritative());
assert!(!config.is_explicitly_authoritative());

// Enabled true, authoritative explicitly true
let config = AcknowledgementsConfig {
enabled: Some(true),
authoritative: Some(true),
};
assert!(config.enabled());
assert!(config.authoritative());
assert!(config.is_explicitly_authoritative());

// Enabled true, authoritative explicitly false
let config = AcknowledgementsConfig {
enabled: Some(true),
authoritative: Some(false),
};
assert!(config.enabled());
assert!(!config.authoritative());
assert!(!config.is_explicitly_authoritative());

// Enabled false, authoritative explicitly true (edge case)
let config = AcknowledgementsConfig {
enabled: Some(false),
authoritative: Some(true),
};
assert!(!config.enabled());
assert!(config.authoritative());
assert!(config.is_explicitly_authoritative());
}

#[test]
fn ack_config_merge_default_authoritative() {
// Global authoritative falls through when local is None
let local = AcknowledgementsConfig {
enabled: Some(true),
authoritative: None,
};
let global = AcknowledgementsConfig {
enabled: None,
authoritative: Some(true),
};
let merged = local.merge_default(&global);
assert!(merged.enabled());
assert!(merged.authoritative());
assert!(merged.is_explicitly_authoritative());

// Local authoritative takes precedence over global
let local = AcknowledgementsConfig {
enabled: Some(true),
authoritative: Some(false),
};
let global = AcknowledgementsConfig {
enabled: None,
authoritative: Some(true),
};
let merged = local.merge_default(&global);
assert!(merged.enabled());
assert!(!merged.authoritative());

// Both None -> authoritative defaults to false, not explicitly set
let local = AcknowledgementsConfig {
enabled: Some(true),
authoritative: None,
};
let global = AcknowledgementsConfig {
enabled: None,
authoritative: None,
};
let merged = local.merge_default(&global);
assert!(merged.enabled());
assert!(!merged.authoritative());
assert!(!merged.is_explicitly_authoritative());
}
}
Loading
Loading