Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-evaluate stream SAC group after connection down event #13657

Merged

Conversation

acogoluegnes
Copy link
Contributor

@acogoluegnes acogoluegnes commented Mar 31, 2025

The same connection can contain several consumers belonging to a SAC group (group key = vhost + stream + consumer name). The whole new group must be re-evaluated to select a new active consumer after the consumers of the down connection are removed from it.

The previous behavior would not re-evaluate the new group and could select a consumer from the down connection, letting the group with only inactive consumers, as the selected active consumer would never receive the activation message from the stream SAC coordinator.

This commit fixes this problem by removing the consumers of the down down connection from the affected groups and then performing the appropriate operations for the groups to keep on consuming (e.g. notifying an active consumer that it needs to step down).

References #13372

The same connection can contain several consumers belonging to a SAC
group (group key = vhost + stream + consumer name). The whole new group
must be re-evaluated to select a new active consumer after the consumers
of the down connection are removed from it.

The previous behavior would not re-evaluate the new group and could
select a consumer from the down connection, letting the group with only
inactive consumers, as the selected active consumer would never receive
the activation message from the stream SAC coordinator.

This commit fixes this problem by removing the consumers of the down
down connection from the affected groups and then performing the
appropriate operations for the groups to keep on consuming (e.g.
notifying an active consumer that it needs to step down).

References #13372
@acogoluegnes
Copy link
Contributor Author

acogoluegnes commented Mar 31, 2025

Acceptance steps

Get the branch and run the broker with the stream plugin:

cd /tmp
git clone [email protected]:rabbitmq/rabbitmq-server.git
cd rabbitmq-server
git checkout stream-sac-re-evaluate-group-after-connection-down
make run-broker PLUGINS="rabbitmq_stream"

Get Stream PerfTest and run a first instance with 2 consumers on a 3-partition super stream:

cd /tmp
wget https://github.com/rabbitmq/rabbitmq-java-tools-binaries-dev/releases/download/v-stream-perf-test-latest/stream-perf-test-latest.jar
java -jar stream-perf-test-latest.jar --producers 0 --consumers 2 \
    --stream-count 1 --super-streams --super-stream-partitions 3 \
    --single-active-consumer --consumer-names my-app \
    --uris rabbitmq-stream://$(hostname):5552 \
    --consumers-by-connection 100

Start another identical instance:

cd /tmp
java -jar stream-perf-test-latest.jar --producers 0 --consumers 2 \
    --stream-count 1 --super-streams --super-stream-partitions 3 \
    --single-active-consumer --consumer-names my-app \
    --uris rabbitmq-stream://$(hostname):5552 \
    --consumers-by-connection 100

List the registered consumers for the group on the first stream-0 partition:

cd /tmp/rabbitmq-server
sbin/rabbitmqctl list_stream_group_consumers --reference my-app --stream stream-0

There should be 1 active consumer and 3 inactive consumers:

Listing group consumers ...
┌─────────────────┬───────────────────────────────────┬──────────┐
│ subscription_id │ connection_name                   │ state    │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 2               │ 127.0.0.1:57649 -> 127.0.0.1:5552 │ active   │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 5               │ 127.0.0.1:57649 -> 127.0.0.1:5552 │ inactive │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 2               │ 127.0.0.1:57652 -> 127.0.0.1:5552 │ inactive │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 5               │ 127.0.0.1:57652 -> 127.0.0.1:5552 │ inactive │
└─────────────────┴───────────────────────────────────┴──────────┘

Do the same thing for the stream-1 partition:

sbin/rabbitmqctl list_stream_group_consumers --reference my-app --stream stream-1
Listing group consumers ...
┌─────────────────┬───────────────────────────────────┬──────────┐
│ subscription_id │ connection_name                   │ state    │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 1               │ 127.0.0.1:57649 -> 127.0.0.1:5552 │ inactive │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 4               │ 127.0.0.1:57649 -> 127.0.0.1:5552 │ active   │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 1               │ 127.0.0.1:57652 -> 127.0.0.1:5552 │ inactive │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 4               │ 127.0.0.1:57652 -> 127.0.0.1:5552 │ inactive │
└─────────────────┴───────────────────────────────────┴──────────┘

And for the stream-2 partition:

sbin/rabbitmqctl list_stream_group_consumers --reference my-app --stream stream-2
Listing group consumers ...
┌─────────────────┬───────────────────────────────────┬──────────┐
│ subscription_id │ connection_name                   │ state    │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 0               │ 127.0.0.1:57649 -> 127.0.0.1:5552 │ inactive │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 3               │ 127.0.0.1:57649 -> 127.0.0.1:5552 │ inactive │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 0               │ 127.0.0.1:57652 -> 127.0.0.1:5552 │ active   │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 3               │ 127.0.0.1:57652 -> 127.0.0.1:5552 │ inactive │
└─────────────────┴───────────────────────────────────┴──────────┘

List now the Java processes:

jps
11025 Jps
9812 stream-perf-test-latest.jar
9721 stream-perf-test-latest.jar

Pick the PID of one of the stream-perf-test-latest.jar instances and kill it:

kill -9 9721

List the consumers of the group for each partition. There should be 1 active consumer and 1 inactive consumer for each partition (no partition should have 2 inactive consumers).

sbin/rabbitmqctl list_stream_group_consumers --reference my-app --stream stream-0
Listing group consumers ...
┌─────────────────┬───────────────────────────────────┬──────────┐
│ subscription_id │ connection_name                   │ state    │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 2               │ 127.0.0.1:57652 -> 127.0.0.1:5552 │ active   │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 5               │ 127.0.0.1:57652 -> 127.0.0.1:5552 │ inactive │
└─────────────────┴───────────────────────────────────┴──────────┘
sbin/rabbitmqctl list_stream_group_consumers --reference my-app --stream stream-1
Listing group consumers ...
┌─────────────────┬───────────────────────────────────┬──────────┐
│ subscription_id │ connection_name                   │ state    │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 1               │ 127.0.0.1:57652 -> 127.0.0.1:5552 │ inactive │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 4               │ 127.0.0.1:57652 -> 127.0.0.1:5552 │ active   │
└─────────────────┴───────────────────────────────────┴──────────┘
sbin/rabbitmqctl list_stream_group_consumers --reference my-app --stream stream-2
Listing group consumers ...
┌─────────────────┬───────────────────────────────────┬──────────┐
│ subscription_id │ connection_name                   │ state    │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 0               │ 127.0.0.1:57652 -> 127.0.0.1:5552 │ active   │
├─────────────────┼───────────────────────────────────┼──────────┤
│ 3               │ 127.0.0.1:57652 -> 127.0.0.1:5552 │ inactive │
└─────────────────┴───────────────────────────────────┴──────────┘

@acogoluegnes acogoluegnes marked this pull request as ready for review March 31, 2025 14:26
@michaelklishin michaelklishin added this to the 4.1.0 milestone Mar 31, 2025
@michaelklishin michaelklishin merged commit a1ec795 into main Mar 31, 2025
273 checks passed
@michaelklishin michaelklishin deleted the stream-sac-re-evaluate-group-after-connection-down branch March 31, 2025 16:33
@acogoluegnes acogoluegnes removed this from the 4.1.0 milestone Mar 31, 2025
@michaelklishin michaelklishin added this to the 4.1.0 milestone Mar 31, 2025
michaelklishin added a commit that referenced this pull request Mar 31, 2025
Re-evaluate stream SAC group after connection down event (backport #13657)
acogoluegnes added a commit that referenced this pull request Mar 31, 2025
Re-evaluate stream SAC group after connection down event (backport #13657) (backport #13659)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants