Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Discarded threads

(none yet)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Open questions

(none yet)
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Progress log

## 2026-03-20

### Initial analysis
- Read FlushTracker, ShapeLogCollector, Consumer, ConsumerRegistry, ShapeCleaner source
- Read existing test files for both FlushTracker and ShapeLogCollector
- Identified the exact code path: Consumer.handle_materializer_down → {:stop, :shutdown, state} → terminate → handle_writer_termination clause 3 (:ok, no cleanup)
- Existing tests cover: consumer crash during broadcast (detected by ConsumerRegistry), multi-fragment crash between fragments
- Missing: consumer dying out-of-band AFTER successful broadcast delivery, with no subsequent transactions to that shape

### Implementation
- Wrote 3 FlushTracker unit tests showing stale entries blocking advancement indefinitely
- Wrote 3 SLC integration tests:
- Main bug test: kill consumer with :kill (skips terminate/remove_shape), send txns only to other shapes, verify flush stuck
- Contrast test: graceful termination (runs terminate → remove_shape) allows flush to advance
- Recovery test: when a txn finally touches the dead shape's table, undeliverable detection cleans up
- Key design decisions:
- Used two separate tables (table_a, table_b) so transactions can selectively target one shape
- Used :kill to simulate the end state of handle_materializer_down path (dead process, no cleanup)
- The @describetag is needed (not @tag) to propagate inspector config to setup functions

### Operational issues
- Initially edited files in ~/code/electric-sql/electric instead of ~/agents/github/erik/repos/electric
- Had to create a patch and git apply it to the correct repo
- Used @tag instead of @describetag, which doesn't propagate to setup functions
- First version of tests used `refute_receive` for the initial flush notification, but FlushTracker does partially advance when one shape catches up (to one below the stuck shape's offset). Fixed to `assert_receive` the partial advance, then `refute_receive` further advances.

### PR
- Created PR #4035: https://github.com/electric-sql/electric/pull/4035
- Added "claude" label for automated review
11 changes: 11 additions & 0 deletions .agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/prompt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Task prompt

Issue #8 from electric-sql/alco-agent-tasks: "FlushTracker stalling when tracked consumer dies out-of-band"

Write unit tests that exercise the edge case where a consumer process is tracked by FlushTracker but then dies independently (via handle_materializer_down with :shutdown reason), leaving a stale shape entry that permanently blocks flush advancement.

Key constraints:
- Use different approaches for each test
- Avoid mocking too many components
- Avoid inventing calls or messages inside the test body
- Try setting conditions so the app hits the edge case naturally
20 changes: 20 additions & 0 deletions .agent-tasks/2026-03-20--8--flush-tracker-stale-consumer/task.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Task: FlushTracker stale consumer edge case tests

## Problem

When a consumer dies via `handle_materializer_down` with `:shutdown` reason:
1. Consumer stops with `{:stop, :shutdown, state}` (skipping `stop_and_clean`)
2. `terminate/2` calls `handle_writer_termination` clause 3 which returns `:ok` without cleanup
3. ConsumerRegistry ETS entry is NOT removed (unregister_name is a no-op)
4. ShapeLogCollector is NOT notified to remove the shape from FlushTracker
5. If no future transactions affect that shape, the stale FlushTracker entry persists indefinitely
6. This blocks `last_global_flushed_offset` advancement → unbounded WAL growth

This only affects `allow_subqueries` stacks since `handle_materializer_down` requires materializers.

## Goal

Write tests demonstrating this edge case at different levels:
1. FlushTracker unit level: a shape tracked but never flushed/removed blocks advancement
2. ShapeLogCollector integration level: a consumer that dies out-of-band after receiving a transaction leaves FlushTracker stuck
3. Full integration: demonstrate the materializer-death path that triggers handle_materializer_down → consumer death → stale FlushTracker
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,83 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTrackerTest do
end
end

describe "out-of-band consumer death leaves stale FlushTracker entry" do
test "shape tracked across many transactions never advances global offset", %{
tracker: tracker
} do
# Simulate: two shapes receive txn at lsn 10. One shape's consumer dies
# out-of-band (no handle_shape_removed ever called). Subsequent txns only
# affect the alive shape. The dead shape's entry pins the global offset.
tracker = handle_txn(tracker, batch(lsn: 10, last_offset: 10), ["alive", "dead"])

# alive flushes lsn 10
tracker = FlushTracker.handle_flush_notification(tracker, "alive", LogOffset.new(10, 10))
# Global offset is pinned one below dead's initial position (tx_offset=9)
assert_receive {:flush_confirmed, 8}

# Many subsequent transactions only affect the alive shape
tracker =
Enum.reduce(11..20, tracker, fn lsn, tracker ->
tracker
|> handle_txn(batch(lsn: lsn, last_offset: 10), ["alive"])
|> FlushTracker.handle_flush_notification("alive", LogOffset.new(lsn, 10))
end)

# Despite 10 more transactions fully flushed by the alive shape,
# global offset has not moved past the dead shape's stuck position
refute_receive {:flush_confirmed, 10}
refute FlushTracker.empty?(tracker)
end

test "removing the stale shape after prolonged stall unblocks to latest seen offset", %{
tracker: tracker
} do
# Same setup: dead shape pins the offset
tracker = handle_txn(tracker, batch(lsn: 10, last_offset: 10), ["alive", "dead"])

tracker = FlushTracker.handle_flush_notification(tracker, "alive", LogOffset.new(10, 10))
assert_receive {:flush_confirmed, 8}

# More txns flow through alive shape only
tracker =
Enum.reduce(11..15, tracker, fn lsn, tracker ->
tracker
|> handle_txn(batch(lsn: lsn, last_offset: 10), ["alive"])
|> FlushTracker.handle_flush_notification("alive", LogOffset.new(lsn, 10))
end)

# Now simulate detection of the dead consumer (e.g. a sweep or monitor)
tracker = FlushTracker.handle_shape_removed(tracker, "dead")

# Global offset jumps all the way to the latest seen offset (lsn 15)
assert_receive {:flush_confirmed, 15}
assert FlushTracker.empty?(tracker)
end

test "stale shape blocks advancement even when its tracked offset is much older than current",
%{tracker: tracker} do
# Dead shape gets tracked at a very early offset
tracker = handle_txn(tracker, batch(lsn: 1, last_offset: 10), ["dead"])

# Alive shape joins later and processes many transactions
tracker =
Enum.reduce(2..50, tracker, fn lsn, tracker ->
tracker
|> handle_txn(batch(lsn: lsn, last_offset: 10), ["alive"])
|> FlushTracker.handle_flush_notification("alive", LogOffset.new(lsn, 10))
end)

# The global offset is still stuck at lsn 1's prev_log_offset position
# because the dead shape was never removed
refute_receive {:flush_confirmed, 2}
refute FlushTracker.empty?(tracker)

# Clean it up
_tracker = FlushTracker.handle_shape_removed(tracker, "dead")
assert_receive {:flush_confirmed, 50}
end
end

# Helper: calls handle_txn_fragment with shapes_with_changes defaulting to
# all affected shapes (the common case for single-fragment transactions).
defp handle_txn(tracker, fragment, affected_shapes) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,235 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
end
end

describe "FlushTracker stalling when tracked consumer dies out-of-band" do
@two_table_inspector Support.StubInspector.new(%{
{1234, {"public", "table_a"}} => [
%{name: "id", type: "int8", pk_position: 0}
],
{5678, {"public", "table_b"}} => [
%{name: "id", type: "int8", pk_position: 0}
]
})

@shape_a Shape.new!("table_a", inspector: @two_table_inspector)
@shape_b Shape.new!("table_b", inspector: @two_table_inspector)

@describetag inspector: @two_table_inspector
setup :setup_log_collector

setup ctx do
parent = self()

consumers = [
{:alive,
start_supervised!(
{Support.TransactionConsumer,
id: :alive,
stack_id: ctx.stack_id,
parent: parent,
shape: @shape_a,
shape_handle: "shape-alive"},
id: {:consumer, :alive}
)},
{:doomed,
start_supervised!(
{Support.TransactionConsumer,
id: :doomed,
stack_id: ctx.stack_id,
parent: parent,
shape: @shape_b,
shape_handle: "shape-doomed"},
id: {:consumer, :doomed}
)}
]

%{consumers: consumers}
end

# This test documents a known bug (electric-sql/electric#3980):
# when a consumer dies without calling remove_shape (e.g. via handle_materializer_down
# with :shutdown reason), and no subsequent transactions touch that shape's table,
# FlushTracker stays stuck indefinitely. When a fix is implemented (e.g. PID monitoring
# or periodic liveness sweep), this test should be updated to assert the fix instead.
test "consumer killed out-of-band after delivery permanently blocks flush advancement", ctx do
register_as_replication_client(ctx.stack_id)

lsn = Lsn.from_integer(10)

# Transaction that inserts into both tables → both shapes get tracked in FlushTracker
txn =
transaction(100, lsn, [
%Changes.NewRecord{
relation: {"public", "table_a"},
record: %{"id" => "1"},
log_offset: LogOffset.new(lsn, 0)
},
%Changes.NewRecord{
relation: {"public", "table_b"},
record: %{"id" => "1"},
log_offset: LogOffset.new(lsn, 1)
}
])

assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id)

# Both consumers receive the transaction
assert_receive {Support.TransactionConsumer, {:alive, _}, [_]}
assert_receive {Support.TransactionConsumer, {:doomed, _}, [_]}

# Kill the doomed consumer with :kill to prevent terminate from running.
# This simulates a consumer dying via handle_materializer_down → {:stop, :shutdown}
# where handle_writer_termination clause 3 returns :ok without cleanup.
# Using :kill here achieves the same end state: dead process, no remove_shape call.
{_, doomed_pid} = List.keyfind!(ctx.consumers, :doomed, 0)
kill_consumer(doomed_pid, :kill)

# The alive consumer flushes its data
ShapeLogCollector.notify_flushed(ctx.stack_id, "shape-alive", LogOffset.new(lsn, 1))

# Flush boundary partially advances: the alive shape caught up, but the dead
# shape pins the global offset at its prev_log_offset (tx_offset=9) minus 1 = 8.
assert_receive {:flush_boundary_updated, 8}, 100

# Send more transactions that ONLY affect table_a (the alive shape).
# Since no transaction touches table_b, SLC never discovers the dead consumer.
for i <- 11..15 do
lsn_i = Lsn.from_integer(i)

txn_i =
transaction(100 + i, lsn_i, [
%Changes.NewRecord{
relation: {"public", "table_a"},
record: %{"id" => "#{i}"},
log_offset: LogOffset.new(lsn_i, 0)
}
])

assert :ok = ShapeLogCollector.handle_event(txn_i, ctx.stack_id)
assert_receive {Support.TransactionConsumer, {:alive, _}, [_]}
ShapeLogCollector.notify_flushed(ctx.stack_id, "shape-alive", LogOffset.new(lsn_i, 0))
end

# Despite 5 more fully-flushed transactions through lsn 15, the flush boundary
# is permanently stuck at 8. It never reaches the current lsn (15).
# This is the bug: the dead consumer's stale FlushTracker entry blocks WAL flush.
refute_receive {:flush_boundary_updated, _}, 100
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Don't lock in the stale-flush stall as expected behavior

The commit description calls this exact scenario the bug, but this assertion makes that bug part of the passing contract. If we later fix the out-of-band death path by monitoring/sweeping dead consumers, the correct {:flush_boundary_updated, 15} would fail here, so the new test suite would block the fix instead of guarding it. This should be pending/skipped or inverted into a regression for the desired recovery behavior.

Useful? React with 👍 / 👎.

end

# NOTE: This test uses Support.TransactionConsumer whose terminate/2 always
# calls ShapeLogCollector.remove_shape. The real Consumer does NOT do this
# when dying with :shutdown (handle_writer_termination clause 3 returns :ok).
# This test demonstrates that the cleanup mechanism works when invoked,
# serving as a contrast to the "killed out-of-band" test above where it isn't.
test "consumer that calls remove_shape on termination allows flush to advance", ctx do
register_as_replication_client(ctx.stack_id)

lsn = Lsn.from_integer(10)

# Transaction affecting both tables
txn =
transaction(100, lsn, [
%Changes.NewRecord{
relation: {"public", "table_a"},
record: %{"id" => "1"},
log_offset: LogOffset.new(lsn, 0)
},
%Changes.NewRecord{
relation: {"public", "table_b"},
record: %{"id" => "1"},
log_offset: LogOffset.new(lsn, 1)
}
])

assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id)
assert_receive {Support.TransactionConsumer, {:alive, _}, [_]}
assert_receive {Support.TransactionConsumer, {:doomed, _}, [_]}

# Alive consumer flushes — partial advance to 8 (doomed still pending)
ShapeLogCollector.notify_flushed(ctx.stack_id, "shape-alive", LogOffset.new(lsn, 1))
assert_receive {:flush_boundary_updated, 8}, 100

# Stop the doomed consumer gracefully — terminate runs, calls remove_shape,
# which triggers FlushTracker.handle_shape_removed. This is the CORRECT path.
{_, doomed_pid} = List.keyfind!(ctx.consumers, :doomed, 0)
Process.unlink(doomed_pid)
stop_supervised!({:consumer, :doomed})
Comment on lines +1431 to +1435
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Exercise the real shutdown path in the graceful-termination test

This contrast case currently passes only because Support.TransactionConsumer.terminate/2 always calls ShapeLogCollector.remove_shape/2, regardless of exit reason. The production consumer does not do that for :shutdown/{:shutdown, _}ShapeCleaner.handle_writer_termination/3 explicitly returns :ok in that case—so the test is asserting a cleanup path that the real code never takes. As written, it can give false confidence that a graceful shutdown clears the stale FlushTracker entry when the production path still leaves it behind.

Useful? React with 👍 / 👎.


# Now the flush boundary advances all the way to lsn because the stale shape
# was cleaned up by the graceful termination path
expected_lsn = Lsn.to_integer(lsn)
assert_receive {:flush_boundary_updated, ^expected_lsn}, 200
end

test "transaction to dead shape's table eventually unblocks flush via undeliverable detection",
ctx do
register_as_replication_client(ctx.stack_id)

lsn = Lsn.from_integer(10)

txn =
transaction(100, lsn, [
%Changes.NewRecord{
relation: {"public", "table_a"},
record: %{"id" => "1"},
log_offset: LogOffset.new(lsn, 0)
},
%Changes.NewRecord{
relation: {"public", "table_b"},
record: %{"id" => "1"},
log_offset: LogOffset.new(lsn, 1)
}
])

assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id)
assert_receive {Support.TransactionConsumer, {:alive, _}, [_]}
assert_receive {Support.TransactionConsumer, {:doomed, _}, [_]}

# Kill the doomed consumer out-of-band
{_, doomed_pid} = List.keyfind!(ctx.consumers, :doomed, 0)
kill_consumer(doomed_pid, :kill)

# Alive flushes — partial advance to 8
ShapeLogCollector.notify_flushed(ctx.stack_id, "shape-alive", LogOffset.new(lsn, 1))
assert_receive {:flush_boundary_updated, 8}, 100

# Now a NEW transaction comes that affects BOTH tables.
# ConsumerRegistry.broadcast detects the dead doomed consumer → undeliverable.
# SLC calls FlushTracker.handle_shape_removed for undeliverable shapes.
lsn2 = Lsn.from_integer(20)

txn2 =
transaction(200, lsn2, [
%Changes.NewRecord{
relation: {"public", "table_a"},
record: %{"id" => "2"},
log_offset: LogOffset.new(lsn2, 0)
},
%Changes.NewRecord{
relation: {"public", "table_b"},
record: %{"id" => "2"},
log_offset: LogOffset.new(lsn2, 1)
}
])

log =
ExUnit.CaptureLog.capture_log(fn ->
assert :ok = ShapeLogCollector.handle_event(txn2, ctx.stack_id)
end)

assert log =~ "Consumer processes crashed or missing during broadcast"

assert_receive {Support.TransactionConsumer, {:alive, _}, [_]}

# The stale entry was removed by the undeliverable detection in publish/1.
# The alive consumer flushes the second transaction.
ShapeLogCollector.notify_flushed(ctx.stack_id, "shape-alive", LogOffset.new(lsn2, 1))

expected_lsn = Lsn.to_integer(lsn2)
assert_receive {:flush_boundary_updated, ^expected_lsn}, 100
end
end

defp transaction(xid, lsn, changes) do
last_log_offset =
case Enum.reverse(changes) do
Expand Down
Loading