Skip to content

Commit

Permalink
Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Jan 22, 2025
1 parent d99e3ae commit b8d3186
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 61 deletions.
86 changes: 38 additions & 48 deletions deps/rabbit/src/rabbit_peer_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,29 +108,22 @@ maybe_init() ->
%% node, even if the configuration changed in between.
persistent_term:put(?PT_PEER_DISC_BACKEND, Backend),

_ = code:ensure_loaded(Backend),
case erlang:function_exported(Backend, init, 0) of
true ->
?LOG_DEBUG(
"Peer discovery: backend supports initialisation",
case catch Backend:init() of
ok ->
?LOG_INFO(
"Peer discovery: backend initialisation succeeded",
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
case Backend:init() of
ok ->
?LOG_DEBUG(
"Peer discovery: backend initialisation succeeded",
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok;
{error, _Reason} = Error ->
?LOG_WARNING(
"Peer discovery: backend initialisation failed: ~tp.",
[Error],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok
end;
false ->
ok;
{error, _Reason} = Error ->
?LOG_ERROR(
"Peer discovery: backend initialisation failed: ~tp",
[Error],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok;
{'EXIT', {undef, _}} ->
?LOG_DEBUG(
"Peer discovery: backend does not support initialisation",
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok
end.

Expand Down Expand Up @@ -159,7 +152,7 @@ sync_desired_cluster() ->

-spec sync_desired_cluster(Backend, RetriesLeft, RetryDelay) -> ok when
Backend :: backend(),
RetriesLeft :: non_neg_integer() | infinity,
RetriesLeft :: non_neg_integer() | unlimited,
RetryDelay :: non_neg_integer().
%% @private

Expand Down Expand Up @@ -240,18 +233,18 @@ sync_desired_cluster(Backend, RetriesLeft, RetryDelay) ->

-spec retry_sync_desired_cluster(Backend, RetriesLeft, RetryDelay) -> ok when
Backend :: backend(),
RetriesLeft :: non_neg_integer() | infinity,
RetriesLeft :: non_neg_integer() | unlimited,
RetryDelay :: non_neg_integer().
%% @private

retry_sync_desired_cluster(Backend, infinity, RetryDelay) ->
retry_sync_desired_cluster(Backend, unlimited, RetryDelay) ->
?LOG_DEBUG(
"Peer discovery: retrying to create/sync cluster in ~b ms "
"(will retry forever)",
[RetryDelay],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
timer:sleep(RetryDelay),
sync_desired_cluster(Backend, infinity, RetryDelay);
sync_desired_cluster(Backend, unlimited, RetryDelay);
retry_sync_desired_cluster(Backend, RetriesLeft, RetryDelay)
when RetriesLeft > 0 ->
RetriesLeft1 = RetriesLeft - 1,
Expand Down Expand Up @@ -1017,33 +1010,30 @@ maybe_unregister() ->

-spec discovery_retries(Backend) -> {Retries, RetryDelay} when
Backend :: backend(),
Retries :: non_neg_integer() | infinity,
Retries :: non_neg_integer() | unlimited,
RetryDelay :: non_neg_integer().

discovery_retries(Backend) ->
_ = code:ensure_loaded(Backend),
{Retries0, Interval} = case application:get_env(rabbit, cluster_formation) of
{ok, Proplist} ->
Retries1 = proplists:get_value(
discovery_retry_limit,
Proplist,
?DEFAULT_DISCOVERY_RETRY_COUNT),
Interval1 = proplists:get_value(
discovery_retry_interval,
Proplist,
?DEFAULT_DISCOVERY_RETRY_INTERVAL_MS),
{Retries1, Interval1};
undefined ->
{?DEFAULT_DISCOVERY_RETRY_COUNT, ?DEFAULT_DISCOVERY_RETRY_INTERVAL_MS}
end,
Retries = case erlang:function_exported(Backend, retry_forever, 0)
andalso Backend:retry_forever() of
true ->
infinity;
false ->
Retries0
end,
{Retries, Interval}.
{_Retries, RetryDelay} = RetryConfig = discovery_retries_from_config(),
case catch Backend:retry_strategy() of
unlimited ->
{unlimited, RetryDelay};
_ ->
RetryConfig
end.

-spec discovery_retries_from_config() -> {Retries, RetryDelay} when
Retries :: non_neg_integer(),
RetryDelay :: non_neg_integer().
discovery_retries_from_config() ->
case application:get_env(rabbit, cluster_formation) of
{ok, Proplist} ->
Retries = proplists:get_value(discovery_retry_limit, Proplist, ?DEFAULT_DISCOVERY_RETRY_COUNT),
Interval = proplists:get_value(discovery_retry_interval, Proplist, ?DEFAULT_DISCOVERY_RETRY_INTERVAL_MS),
{Retries, Interval};
undefined ->
{?DEFAULT_DISCOVERY_RETRY_COUNT, ?DEFAULT_DISCOVERY_RETRY_INTERVAL_MS}
end.

-spec register(Backend) -> ok when
Backend :: backend().
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit_common/src/rabbit_peer_discovery_backend.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@

-callback unlock(Data :: term()) -> ok.

-callback retry_forever() -> boolean().
-callback retry_strategy() -> limited | unlimited.

-optional_callbacks([init/0, retry_forever/0]).
-optional_callbacks([init/0, retry_strategy/0]).

-export([api_version/0]).

Expand Down
6 changes: 3 additions & 3 deletions deps/rabbitmq_peer_discovery_k8s/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

## Overview

This is an implementation of RabbitMQ peer discovery interface for for Kubernetes. This is a completely new implementation (version 2) that has little to do with the original design but is backwards compatible
This is an implementation of RabbitMQ peer discovery interface for Kubernetes. This is a completely new implementation (version 2) that has little to do with the original design but is backwards compatible
(all the configuration options of version 1 are accepted but ignored).

### Version 1 vs Version 2

The original implementation of this plugin performed peer discovery using Kubernetes API as the source of data on running cluster pods. It queried the Kubernetes API for the list of endpoints serving as the backends of a Kubernetes Service.

However, RabbitMQ should be deployed using a StatefulSet and pods of a StatefulSet have consistent names - Kubernetes always creates the pods with the StatefulSet name and an ID suffix, starting with 0. For example, a StatefulSet with 3 replicas named `foobar` will have pods named `foobar-0`, `foobar-1`, and `foobar-2`. It is therefore not necessary to query the Kubernetes API to discover peers. Version 2 doesn't perform any API queries and insteard checks the suffix of the local node and:
* if the suffix is `-0`, it forms a new cluster
However, RabbitMQ should be deployed using a StatefulSet and pods of a StatefulSet have consistent names - Kubernetes always creates the pods with the StatefulSet name and an ID suffix, starting with 0. For example, a StatefulSet with 3 replicas named `foobar` will have pods named `foobar-0`, `foobar-1`, and `foobar-2`. It is therefore not necessary to query the Kubernetes API to discover peers. Version 2 doesn't perform any API queries and instead checks the suffix of the local node and:
* if the suffix is `-0`, it starts normally (effectively forming a new single-node cluster)
* if the suffix is different, it never forms a new cluster and will always join the node with the `-0` suffix

This avoids any race conditions that could lead to the cluster being formed incorrectly (Version 1 was prone to this problem in some environments).
Expand Down
24 changes: 20 additions & 4 deletions deps/rabbitmq_peer_discovery_k8s/src/rabbit_peer_discovery_k8s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

-export([init/0, list_nodes/0, supports_registration/0, register/0,
unregister/0, post_registration/0, lock/1, unlock/1, node/0,
retry_forever/0]).
retry_strategy/0]).

-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/logging.hrl").
Expand All @@ -19,14 +19,31 @@
-compile([node/0]).
-endif.

init() ->
Formation = application:get_env(rabbit, cluster_formation, []),
case proplists:get_value(peer_discovery_k8s, Formation, undefined) of
undefined -> ok;
_ -> ?LOG_WARNING("Peer discovery: ignoring deprecated cluster_formation.k8s.* configuration options",
[], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok
end,
case proplists:get_value(discovery_retry_limit, Formation, undefined) of
undefined -> ok;
_ -> ?LOG_WARNING("Peer discovery: ignoring cluster_formation.discovery_retry_limit option "
"(will retry forever)",
[], #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok
end,
ok.

-spec list_nodes() -> {ok, {Nodes :: [node()] | node(), NodeType :: rabbit_types:node_type()}} | {error, Reason :: string()}.

list_nodes() ->
Nodename = atom_to_list(?MODULE:node()),
try
[[], Prefix, StatefulSetName, MyPodId, Domain] = re:split(
Nodename,
"([^@]+@)([^.]*-)([0-9]+)",
"^([^@]+@)([^.]*-)([0-9]+)",
[{return, list}]),
_ = list_to_integer(MyPodId),
SeedNode = list_to_atom(lists:flatten(Prefix ++ StatefulSetName ++ "0" ++ Domain)),
Expand All @@ -44,10 +61,9 @@ node() ->
erlang:node().

supports_registration() -> false.
init() -> ok.
register() -> ok.
unregister() -> ok.
post_registration() -> ok.
lock(_) -> not_supported.
unlock(_) -> ok.
retry_forever() -> true.
retry_strategy() -> unlimited.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
-behaviour(rabbit_peer_discovery_backend).

-export([init/0, list_nodes/0, supports_registration/0, register/0, unregister/0,
post_registration/0, lock/1, unlock/1, retry_forever/0]).
post_registration/0, lock/1, unlock/1, retry_strategy/0]).

-define(DELEGATE, rabbit_peer_discovery_k8s).

Expand Down Expand Up @@ -53,7 +53,7 @@ lock(Node) ->
unlock(Data) ->
?DELEGATE:unlock(Data).

-spec retry_forever() -> boolean().
retry_forever() ->
?DELEGATE:retry_forever().
-spec retry_strategy() -> limited | unlimited.
retry_strategy() ->
?DELEGATE:retry_strategy().

0 comments on commit b8d3186

Please sign in to comment.