Skip to content

Commit

Permalink
Fix deleting and cleanup of shovels with old ids
Browse files Browse the repository at this point in the history
  • Loading branch information
gomoripeti committed Nov 17, 2023
1 parent 806c9f5 commit 76f4ef1
Showing 1 changed file with 29 additions and 15 deletions.
44 changes: 29 additions & 15 deletions deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,18 @@ stop_child({VHost, ShovelName} = Name) ->
case get({shovel_worker_autodelete, Name}) of
true -> ok; %% [1]
_ ->
ok = mirrored_supervisor:terminate_child(?SUPERVISOR, id(Name)),
ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(Name)),
case mirrored_supervisor:terminate_child(?SUPERVISOR, id(Name)) of
ok ->
ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(Name));
{error, not_found} ->
%% try older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894.
case mirrored_supervisor:terminate_child(?SUPERVISOR, old_id(Name)) of
ok ->
ok = mirrored_supervisor:delete_child(?SUPERVISOR, old_id(Name));
{error, not_found} ->
ok
end
end,
rabbit_shovel_status:remove(Name)
end,
rabbit_shovel_locks:unlock(LockId),
Expand All @@ -90,23 +100,26 @@ stop_child({VHost, ShovelName} = Name) ->
cleanup_specs() ->
Children = mirrored_supervisor:which_children(?SUPERVISOR),

%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894
OldStyleSpecsSet = sets:from_list([element(1, S) || S <- Children]),
NewStyleSpecsSet = sets:from_list([element(2, element(1, S)) || S <- Children]),
ParamsSet = sets:from_list([ {proplists:get_value(vhost, S), proplists:get_value(name, S)}
|| S <- rabbit_runtime_parameters:list_component(<<"shovel">>) ]),
F = fun(Name, ok) ->
SupIdSet = sets:from_list([element(1, S) || S <- Children]),
ParamsSet = sets:from_list(
lists:flatmap(
fun(S) ->
Name = {proplists:get_value(vhost, S), proplists:get_value(name, S)},
%% Supervisor Id format was different pre 3.13.0 and 3.12.8.
%% Try both formats to cover the transitionary mixed version cluster period.
[id(Name), old_id(Name)]
end,
rabbit_runtime_parameters:list_component(<<"shovel">>))),
F = fun(SupId, ok) ->
try
_ = mirrored_supervisor:delete_child(?SUPERVISOR, id(Name))
_ = mirrored_supervisor:delete_child(?SUPERVISOR, SupId)
catch _:_:_Stacktrace ->
ok
end,
ok
end,
%% Try both formats to cover the transitionary mixed version cluster period.
AllSpecs = sets:union(NewStyleSpecsSet, OldStyleSpecsSet),
%% Delete any supervisor children that do not have their respective runtime parameters in the database.
SetToCleanUp = sets:subtract(AllSpecs, ParamsSet),
SetToCleanUp = sets:subtract(SupIdSet, ParamsSet),
ok = sets:fold(F, ok, SetToCleanUp).

%%----------------------------------------------------------------------------
Expand All @@ -115,7 +128,8 @@ init([]) ->
{ok, {{one_for_one, 3, 10}, []}}.

id({V, S} = Name) ->
{[V, S], Name};
{[V, S], Name}.

%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894
id(Other) ->
Other.
old_id({_V, _S} = Name) ->
Name.

0 comments on commit 76f4ef1

Please sign in to comment.