Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog.d/24125_fix_topology_reload_deadlock.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Fixed a deadlock during config reload that caused all sources to stop consuming
new messages when a sink in wait_for_sinks was being changed. The source pump
would block in wait_for_replacements due to a Pause control message, creating a
circular dependency with shutdown_diff.

authors: joshcoughlan
149 changes: 121 additions & 28 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ use crate::{

pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;

/// Maximum time `shutdown_diff` will wait for an old sink task to drain before
/// detaching it and proceeding with the reload. Without this bound, a sink whose
/// downstream is permanently failing (e.g. stuck on retriable HTTP 429) would
/// hold the reload indefinitely and backpressure the source pipeline. On timeout
/// the old task is detached (it continues retrying in the background) and any
/// `reuse_buffers` entry for the sink is dropped, so the new sink starts with a
/// fresh buffer rather than waiting on the old one to release ownership.
const RELOAD_DRAIN_TIMEOUT: Duration = Duration::from_secs(60);

#[derive(Debug, Snafu)]
pub enum ReloadError {
#[snafu(display("global options changed: {}", changed_fields.join(", ")))]
Expand Down Expand Up @@ -302,7 +311,7 @@ impl RunningTopology {
} else {
ConfigDiff::new(&self.config, &new_config, HashSet::new())
};
let buffers = self.shutdown_diff(&diff, &new_config).await;
let (buffers, force_removed_sinks) = self.shutdown_diff(&diff, &new_config).await;

// Gives windows some time to make available any port
// released by shutdown components.
Expand All @@ -329,7 +338,8 @@ impl RunningTopology {
.run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
.await
{
self.connect_diff(&diff, &mut new_pieces).await;
self.connect_diff(&diff, &mut new_pieces, &force_removed_sinks)
.await;
self.spawn_diff(&diff, new_pieces);
self.config = new_config;

Expand All @@ -355,7 +365,8 @@ impl RunningTopology {
.run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
.await
{
self.connect_diff(&diff, &mut new_pieces).await;
self.connect_diff(&diff, &mut new_pieces, &force_removed_sinks)
.await;
self.spawn_diff(&diff, new_pieces);

info!("Old configuration restored successfully.");
Expand Down Expand Up @@ -418,7 +429,7 @@ impl RunningTopology {
&mut self,
diff: &ConfigDiff,
new_config: &Config,
) -> HashMap<ComponentKey, BuiltBuffer> {
) -> (HashMap<ComponentKey, BuiltBuffer>, HashSet<ComponentKey>) {
// First, we shutdown any changed/removed sources. This ensures that we can allow downstream
// components to terminate naturally by virtue of the flow of events stopping.
if diff.sources.any_changed_or_removed() {
Expand Down Expand Up @@ -472,7 +483,7 @@ impl RunningTopology {
let previous = self.tasks.remove(key).unwrap();
drop(previous); // detach and forget

self.remove_inputs(key, diff, new_config).await;
self.remove_inputs(key, diff, new_config, false).await;
self.remove_outputs(key);

if let Some(registry) = self.utilization_registry.as_ref() {
Expand All @@ -483,7 +494,7 @@ impl RunningTopology {
for key in &diff.transforms.to_change {
debug!(component_id = %key, "Changing transform.");

self.remove_inputs(key, diff, new_config).await;
self.remove_inputs(key, diff, new_config, false).await;
self.remove_outputs(key);
}

Expand Down Expand Up @@ -619,15 +630,22 @@ impl RunningTopology {
.collect::<Vec<_>>();
for key in &removed_sinks {
debug!(component_id = %key, "Removing sink.");
self.remove_inputs(key, diff, new_config).await;
self.remove_inputs(key, diff, new_config, false).await;

if let Some(registry) = self.utilization_registry.as_ref() {
registry.remove_component(key);
}
}

// After that, for any changed sinks, we temporarily detach their inputs (not remove) so
// they can naturally shutdown and allow us to recover their buffers if possible.
// After that, for any changed sinks, we disconnect their inputs so they can naturally
// shutdown and allow us to recover their buffers if possible.
//
// For sinks in wait_for_sinks, we use a full Remove (not Pause) from the upstream fanout.
// This prevents a circular dependency where the source pump blocks in
// wait_for_replacements (waiting for a Replace that only arrives after connect_diff,
// which runs after shutdown_diff, which is waiting for the old sink to finish).
// Using Remove allows the source pump to continue sending to other sinks while
// the old sink drains.
let mut buffer_tx = HashMap::new();

let sinks_to_change = diff
Expand All @@ -642,8 +660,18 @@ impl RunningTopology {
}))
.collect::<Vec<_>>();

// Track which sinks were force-removed from their upstream fanouts so that
// connect_diff knows to use Add (not Replace) when reconnecting them.
let mut force_removed_sinks = HashSet::new();

for key in &sinks_to_change {
debug!(component_id = %key, "Changing sink.");

let force_remove = wait_for_sinks.contains(key);
if force_remove {
force_removed_sinks.insert((*key).clone());
}

if reuse_buffers.contains(key) || changed_disk_buffer_sinks.contains(key) {
self.detach_triggers
.remove(key)
Expand All @@ -665,7 +693,8 @@ impl RunningTopology {
buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
}
}
self.remove_inputs(key, diff, new_config).await;
self.remove_inputs(key, diff, new_config, force_remove)
.await;
}

// Now that we've disconnected or temporarily detached the inputs to all changed/removed
Expand All @@ -678,7 +707,20 @@ impl RunningTopology {
let previous = self.tasks.remove(key).unwrap();
if wait_for_sinks.contains(key) {
debug!(message = "Waiting for sink to shutdown.", component_id = %key);
previous.await.unwrap().unwrap();
match tokio::time::timeout(RELOAD_DRAIN_TIMEOUT, previous).await {
Ok(res) => {
res.unwrap().unwrap();
}
Err(_) => {
warn!(
component_id = %key,
timeout_secs = RELOAD_DRAIN_TIMEOUT.as_secs(),
"Sink did not finish draining within reload timeout; \
detaching the old task and continuing the reload. \
In-flight events on the detached task may be lost."
);
}
}
} else {
drop(previous); // detach and forget
}
Expand All @@ -689,9 +731,30 @@ impl RunningTopology {
if wait_for_sinks.contains(key) {
let previous = self.tasks.remove(key).unwrap();
debug!(message = "Waiting for sink to shutdown.", component_id = %key);
let buffer = previous.await.unwrap().unwrap();
let buffer = match tokio::time::timeout(RELOAD_DRAIN_TIMEOUT, previous).await {
Ok(res) => Some(res.unwrap().unwrap()),
Err(_) => {
warn!(
component_id = %key,
timeout_secs = RELOAD_DRAIN_TIMEOUT.as_secs(),
reuse_buffers = reuse_buffers.contains(key),
"Sink did not finish draining within reload timeout; \
detaching the old task and continuing the reload. \
In-flight events on the detached task may be lost. \
If the sink was a buffer-reuse candidate, the new \
sink will start with a fresh buffer instead."
);
// Drop the buffer_tx entry so we don't try to reuse it; the
// builder falls through to constructing a fresh buffer when
// the buffers map has no entry for this key.
buffer_tx.remove(key);
None
}
};

if reuse_buffers.contains(key) {
if let Some(buffer) = buffer
&& reuse_buffers.contains(key)
{
// We clone instead of removing here because otherwise the input will be
// missing for the rest of the reload process, which violates the assumption
// that all previous inputs for components not being removed are still
Expand All @@ -710,14 +773,19 @@ impl RunningTopology {
}
}

buffers
(buffers, force_removed_sinks)
}

/// Connects all changed/added components in the given configuration diff.
///
/// `force_removed_sinks` contains the keys of sinks that were fully removed (not paused)
/// from their upstream fanouts during shutdown_diff. These sinks need to be re-added
/// via `ControlMessage::Add` rather than `ControlMessage::Replace`.
pub(crate) async fn connect_diff(
&mut self,
diff: &ConfigDiff,
new_pieces: &mut TopologyPieces,
force_removed_sinks: &HashSet<ComponentKey>,
) {
debug!("Connecting changed/added component(s).");

Expand Down Expand Up @@ -837,13 +905,14 @@ impl RunningTopology {
// with transforms.
for key in diff.transforms.changed_and_added() {
debug!(component_id = %key, "Connecting inputs for transform.");
self.setup_inputs(key, diff, new_pieces).await;
self.setup_inputs(key, diff, new_pieces, false).await;
}

// Now that all sources and transforms are fully configured, we can wire up sinks.
for key in diff.sinks.changed_and_added() {
debug!(component_id = %key, "Connecting inputs for sink.");
self.setup_inputs(key, diff, new_pieces).await;
self.setup_inputs(key, diff, new_pieces, force_removed_sinks.contains(key))
.await;
}
let added_changed_tables: Vec<&ComponentKey> = diff
.enrichment_tables
Expand All @@ -852,7 +921,8 @@ impl RunningTopology {
.collect();
for key in added_changed_tables.iter() {
debug!(component_id = %key, "Connecting inputs for enrichment table sink.");
self.setup_inputs(key, diff, new_pieces).await;
self.setup_inputs(key, diff, new_pieces, force_removed_sinks.contains(key))
.await;
}

// We do a final pass here to reconnect unchanged components.
Expand Down Expand Up @@ -948,6 +1018,7 @@ impl RunningTopology {
key: &ComponentKey,
diff: &ConfigDiff,
new_pieces: &mut builder::TopologyPieces,
force_removed: bool,
) {
let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();

Expand All @@ -965,10 +1036,18 @@ impl RunningTopology {
for input in inputs {
let output = self.outputs.get_mut(&input).expect("unknown output");

if diff.contains(&input.component) || inputs_to_add.contains(&input) {
// If the input we're connecting to is changing, that means its outputs will have been
// recreated, so instead of replacing a paused sink, we have to add it to this new
// output for the first time, since there's nothing to actually replace at this point.
if force_removed || diff.contains(&input.component) || inputs_to_add.contains(&input) {
// Cases where we need to add (not replace) the component input:
//
// Case 1: The component was force-removed from the fanout during shutdown_diff
// (rather than paused), so there is no paused entry to replace.
//
// Case 2: If the input we're connecting to is changing, that means its outputs
// will have been recreated, so instead of replacing a paused sink, we have to add
// it to this new output for the first time, since there's nothing to actually
// replace at this point.
//
// Case 3: This is a newly added connection.
debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout.");

_ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
Expand All @@ -994,7 +1073,13 @@ impl RunningTopology {
self.outputs.retain(|id, _output| &id.component != key);
}

async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
async fn remove_inputs(
&mut self,
key: &ComponentKey,
diff: &ConfigDiff,
new_config: &Config,
force_remove: bool,
) {
self.inputs.remove(key);
self.detach_triggers.remove(key);

Expand All @@ -1007,20 +1092,26 @@ impl RunningTopology {

for input in old_inputs {
if let Some(output) = self.outputs.get_mut(input) {
if diff.contains(&input.component)
if force_remove
|| diff.contains(&input.component)
|| diff.is_removed(key)
|| !new_inputs.contains(input)
{
// 3 cases to remove the input:
// Cases to remove the input:
//
// Case 1: The caller has requested a full removal (force_remove). This is used
// for sinks whose old task must finish before the reload can proceed
// (wait_for_sinks). Using Pause here would block the source pump in
// wait_for_replacements, creating a circular dependency with shutdown_diff.
//
// Case 1: If the input we're removing ourselves from is changing, that means its
// Case 2: If the input we're removing ourselves from is changing, that means its
// outputs will be recreated, so instead of pausing the sink, we just delete it
// outright to ensure things are clean.
//
// Case 2: If this component itself is being removed, then pausing makes no sense
// Case 3: If this component itself is being removed, then pausing makes no sense
// because it isn't coming back.
//
// Case 3: This component is no longer connected to the input from new config.
// Case 4: This component is no longer connected to the input from new config.
debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout.");

_ = output.send(ControlMessage::Remove(key.clone()));
Expand Down Expand Up @@ -1374,7 +1465,9 @@ impl RunningTopology {
{
return None;
}
running_topology.connect_diff(&diff, &mut pieces).await;
running_topology
.connect_diff(&diff, &mut pieces, &HashSet::new())
.await;
running_topology.spawn_diff(&diff, pieces);

let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
Expand Down
Loading
Loading