Skip to content

Commit

Permalink
Redesigned k8s peer discovery
Browse files Browse the repository at this point in the history
Rather than querying the Kubernetes API, just check the local node name
and try to connect to the pod with ID `0` (`-0` suffix). Only the pod
with ID 0 can form a new cluster - all other pods will wait forever.
This should prevent any race conditions and incorrectly formed clusters.
  • Loading branch information
mkuratczyk committed Jan 20, 2025
1 parent 36d1921 commit d153aaf
Show file tree
Hide file tree
Showing 21 changed files with 148 additions and 853 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/oci-make.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
make package-generic-unix PROJECT_VERSION=4.1.0-alpha.1
- name: Upload package-generic-unix
if: steps.authorized.outputs.authorized == 'true'
uses: actions/upload-artifact@v4.3.1
uses: actions/upload-artifact@v4
with:
name: package-generic-unix
path: PACKAGES/rabbitmq-server-*.tar.xz
Expand Down
52 changes: 38 additions & 14 deletions deps/rabbit/src/rabbit_peer_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ sync_desired_cluster() ->

%% We handle retries at the top level: steps are followed sequentially and
%% if one of them fails, we retry the whole process.
{Retries, RetryDelay} = discovery_retries(),
{Retries, RetryDelay} = discovery_retries(Backend),

sync_desired_cluster(Backend, Retries, RetryDelay).

-spec sync_desired_cluster(Backend, RetriesLeft, RetryDelay) -> ok when
Backend :: backend(),
RetriesLeft :: non_neg_integer(),
RetriesLeft :: non_neg_integer() | infinity,
RetryDelay :: non_neg_integer().
%% @private

Expand Down Expand Up @@ -240,10 +240,18 @@ sync_desired_cluster(Backend, RetriesLeft, RetryDelay) ->

-spec retry_sync_desired_cluster(Backend, RetriesLeft, RetryDelay) -> ok when
Backend :: backend(),
RetriesLeft :: non_neg_integer(),
RetriesLeft :: non_neg_integer() | infinity,
RetryDelay :: non_neg_integer().
%% @private

retry_sync_desired_cluster(Backend, infinity, RetryDelay) ->
?LOG_DEBUG(
"Peer discovery: retrying to create/sync cluster in ~b ms "
"(will retry forever)",
[RetryDelay],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
timer:sleep(RetryDelay),
sync_desired_cluster(Backend, infinity, RetryDelay);
retry_sync_desired_cluster(Backend, RetriesLeft, RetryDelay)
when RetriesLeft > 0 ->
RetriesLeft1 = RetriesLeft - 1,
Expand Down Expand Up @@ -1007,19 +1015,35 @@ maybe_unregister() ->
ok
end.

-spec discovery_retries() -> {Retries, RetryDelay} when
Retries :: non_neg_integer(),
-spec discovery_retries(Backend) -> {Retries, RetryDelay} when
Backend :: backend(),
Retries :: non_neg_integer() | infinity,
RetryDelay :: non_neg_integer().

discovery_retries() ->
case application:get_env(rabbit, cluster_formation) of
{ok, Proplist} ->
Retries = proplists:get_value(discovery_retry_limit, Proplist, ?DEFAULT_DISCOVERY_RETRY_COUNT),
Interval = proplists:get_value(discovery_retry_interval, Proplist, ?DEFAULT_DISCOVERY_RETRY_INTERVAL_MS),
{Retries, Interval};
undefined ->
{?DEFAULT_DISCOVERY_RETRY_COUNT, ?DEFAULT_DISCOVERY_RETRY_INTERVAL_MS}
end.
discovery_retries(Backend) ->
_ = code:ensure_loaded(Backend),
{Retries0, Interval} = case application:get_env(rabbit, cluster_formation) of
{ok, Proplist} ->
Retries1 = proplists:get_value(
discovery_retry_limit,
Proplist,
?DEFAULT_DISCOVERY_RETRY_COUNT),
Interval1 = proplists:get_value(
discovery_retry_interval,
Proplist,
?DEFAULT_DISCOVERY_RETRY_INTERVAL_MS),
{Retries1, Interval1};
undefined ->
{?DEFAULT_DISCOVERY_RETRY_COUNT, ?DEFAULT_DISCOVERY_RETRY_INTERVAL_MS}
end,
Retries = case erlang:function_exported(Backend, retry_forever, 0)
andalso Backend:retry_forever() of
true ->
infinity;
false ->
Retries0
end,
{Retries, Interval}.

-spec register(Backend) -> ok when
Backend :: backend().
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit_common/src/rabbit_peer_discovery_backend.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@

-callback unlock(Data :: term()) -> ok.

-optional_callbacks([init/0]).
-callback retry_forever() -> boolean().

-optional_callbacks([init/0, retry_forever/0]).

-export([api_version/0]).

Expand Down
3 changes: 1 addition & 2 deletions deps/rabbitmq_peer_discovery_k8s/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ PROJECT_DESCRIPTION = Kubernetes-based RabbitMQ peer discovery backend
PROJECT_MOD = rabbitmq_peer_discovery_k8s_app

DEPS = rabbit_common rabbitmq_peer_discovery_common rabbit
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers ct_helper meck
dep_ct_helper = git https://github.com/extend/ct_helper.git master
TEST_DEPS = meck

DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk
Expand Down
49 changes: 21 additions & 28 deletions deps/rabbitmq_peer_discovery_k8s/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,53 @@

## Overview

This is an implementation of RabbitMQ [peer discovery interface](https://www.rabbitmq.com/blog/2018/02/12/peer-discovery-subsystem-in-rabbitmq-3-7/)
for Kubernetes.
This is an implementation of RabbitMQ peer discovery interface for for Kubernetes. This is a completely new implementation (version 2) that has little to do with the original design but is backwards compatible
(all the configuration options of version 1 are accepted but ignored).

This plugin only performs peer discovery using Kubernetes API as the source of data on running cluster pods.
Please get familiar with [RabbitMQ clustering fundamentals](https://rabbitmq.com/clustering.html) before attempting
to use it.
### Version 1 vs Version 2

Cluster provisioning and most of Day 2 operations such as [proper monitoring](https://rabbitmq.com/monitoring.html)
are not in scope for this plugin.
The original implementation of this plugin performed peer discovery using Kubernetes API as the source of data on running cluster pods. It queried the Kubernetes API for the list of endpoints serving as the backends of a Kubernetes Service.

For a more comprehensive open source RabbitMQ on Kubernetes deployment solution,
see the [RabbitMQ Cluster Operator for Kubernetes](https://www.rabbitmq.com/kubernetes/operator/operator-overview.html).
The Operator is developed [on GitHub](https://github.com/rabbitmq/cluster-operator/) and contains its
own [set of examples](https://github.com/rabbitmq/cluster-operator/tree/main/docs/examples).
However, RabbitMQ should be deployed using a StatefulSet and pods of a StatefulSet have consistent names - Kubernetes always creates the pods with the StatefulSet name and an ID suffix, starting with 0. For example, a StatefulSet with 3 replicas named `foobar` will have pods named `foobar-0`, `foobar-1`, and `foobar-2`. It is therefore not necessary to query the Kubernetes API to discover peers. Version 2 doesn't perform any API queries and insteard checks the suffix of the local node and:
* if the suffix is `-0`, it forms a new cluster
* if the suffix is different, it never forms a new cluster and will always join the node with the `-0` suffix

This avoids any race conditions that could lead to the cluster being formed incorrectly (Version 1 was prone to this problem in some environments).

If the node with `-0` suffix is not running for any reason, the other nodes will keep waiting for it to start. A cluster will not be formed until the node with `-0` suffix is running. This is different from the original implementation where the cluster could be formed without the node with `-0` suffix.

Peer discovery is only performed when a node starts for the first time. Once the cluster is formed, peer discovery is not performed and therefore the node with index 0 is not necessary for the cluster to work or for other nodes to be able to start (with the exception of the cluster being scaled out; for example if you want to add nodes to turn a 3-node cluster into a 5-node cluster, the new nodes will try to join node 0 and therefore it has to be running).

## Supported RabbitMQ Versions

This plugin ships with RabbitMQ 3.7.0 or later.
Version 2 was first included in RabbitMQ 4.1.

Version 1 of this plugin was included from RabbitMQ 3.7.0 until 4.0.

## Installation

This plugin ships with [supported RabbitMQ versions](https://www.rabbitmq.com/versions.html).
There is no need to install it separately.
If you are using RabbitMQ on Kubernetes, the recommended way is to use the [Cluster Operator](https://www.rabbitmq.com/kubernetes/operator/operator-overview). If you do so, the plugin is configured automatically and most of the time,
you don't need to do anything about peer discovery.

As with any [plugin](https://rabbitmq.com/plugins.html), it must be enabled before it
can be used. For peer discovery plugins it means they must be [enabled](https://rabbitmq.com//plugins.html#basics) or [preconfigured](https://rabbitmq.com//plugins.html#enabled-plugins-file)
before first node boot:

```
rabbitmq-plugins --offline enable rabbitmq_peer_discovery_k8s
```
Similarly, if you use the [Helm chart](https://github.com/bitnami/charts/tree/main/bitnami/rabbitmq),
this plugin is configured automatically.

## Documentation

Version 2 of this plugins deprecates and ignores all the configuration options of Version 1.

See [RabbitMQ Cluster Formation guide](https://www.rabbitmq.com/cluster-formation.html) for an overview
of the peer discovery subsystem, general and Kubernetes-specific configurable values and troubleshooting tips.

Example deployments that use this plugin can be found in an [RabbitMQ on Kubernetes examples repository](https://github.com/rabbitmq/diy-kubernetes-examples).
Note that they are just that, examples, and won't be optimal for every use case or cover a lot of important production
system concerns such as monitoring, persistent volume settings, access control, sizing, and so on.


## Contributing

See [CONTRIBUTING.md](./CONTRIBUTING.md) and our [development process overview](https://www.rabbitmq.com/github.html).


## License

[Licensed under the MPL](LICENSE-MPL-RabbitMQ), same as RabbitMQ server.


## Copyright

(c) 2007-2024 Broadcom. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
(c) 2007-2025 Broadcom. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
9 changes: 0 additions & 9 deletions deps/rabbitmq_peer_discovery_k8s/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ def all_beam_files(name = "all_beam_files"):
srcs = [
"src/rabbit_peer_discovery_k8s.erl",
"src/rabbitmq_peer_discovery_k8s.erl",
"src/rabbitmq_peer_discovery_k8s_app.erl",
"src/rabbitmq_peer_discovery_k8s_node_monitor.erl",
"src/rabbitmq_peer_discovery_k8s_sup.erl",
],
hdrs = [":public_and_private_hdrs"],
app_name = "rabbitmq_peer_discovery_k8s",
Expand All @@ -37,9 +34,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
srcs = [
"src/rabbit_peer_discovery_k8s.erl",
"src/rabbitmq_peer_discovery_k8s.erl",
"src/rabbitmq_peer_discovery_k8s_app.erl",
"src/rabbitmq_peer_discovery_k8s_node_monitor.erl",
"src/rabbitmq_peer_discovery_k8s_sup.erl",
],
hdrs = [":public_and_private_hdrs"],
app_name = "rabbitmq_peer_discovery_k8s",
Expand Down Expand Up @@ -73,9 +67,6 @@ def all_srcs(name = "all_srcs"):
srcs = [
"src/rabbit_peer_discovery_k8s.erl",
"src/rabbitmq_peer_discovery_k8s.erl",
"src/rabbitmq_peer_discovery_k8s_app.erl",
"src/rabbitmq_peer_discovery_k8s_node_monitor.erl",
"src/rabbitmq_peer_discovery_k8s_sup.erl",
],
)
filegroup(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%






%%
%%
%% All configuration options below are deprecated as of version 2 of this plugin's implementation (first shipped in RabbitMQ 4.1)
%%
%%







%% Kubernetes host

{mapping, "cluster_formation.k8s.host", "rabbit.cluster_formation.peer_discovery_k8s.k8s_host", [
Expand Down
Loading

0 comments on commit d153aaf

Please sign in to comment.