Skip to content

Commit

Permalink
Merge pull request #25 from mononym/fix-deadloop
Browse files Browse the repository at this point in the history
fix: fix deadloop when connecting to single server with cluster config
  • Loading branch information
mononym authored Mar 8, 2022
2 parents 7c6d6fc + 504d5dd commit 25da12e
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 71 deletions.
10 changes: 1 addition & 9 deletions src/mongoc/mc_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ next_loop(Pid) ->
%% @private
loop(State = #state{type = Type, host = Host, port = Port, topology = Topology, server = Server,
connect_to = Timeout, heartbeatF = HB_MS, minHeartbeatF = MinHB_MS, counter = Counter, worker_opts = WOpts}) ->
ConnectArgs = form_args(Host, Port, Timeout, WOpts),
ConnectArgs = mc_util:form_connect_args(Host, Port, Timeout, WOpts),
try check(ConnectArgs, Server) of
Res ->
gen_server:cast(Topology, Res),
Expand Down Expand Up @@ -172,11 +172,3 @@ do_timeout(Pid, _TO) ->
%% @private
send_stop(undefined) -> ok;
send_stop(PausePid) -> PausePid ! stop.

%% @private
form_args(Host, Port, Timeout, WorkerArgs) ->
case mc_utils:get_value(ssl, WorkerArgs, false) of
true -> [{host, Host}, {port, Port}, {timeout, Timeout}, {ssl, true},
{ssl_opts, mc_utils:get_value(ssl_opts, WorkerArgs, [])}];
false -> [{host, Host}, {port, Port}, {timeout, Timeout}]
end.
13 changes: 1 addition & 12 deletions src/mongoc/mc_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ start(Topology, HostPort, Topts, Wopts) ->

init([Topology, Addr, TopologyOptions, Wopts]) ->
process_flag(trap_exit, true),
{Host, Port} = parse_seed(Addr),
{Host, Port} = mc_util:parse_seed(Addr),
PoolConf = form_pool_conf(TopologyOptions),
ConnectTimeoutMS = mc_utils:get_value(connectTimeoutMS, TopologyOptions, 20000),
SocketTimeoutMS = mc_utils:get_value(socketTimeoutMS, TopologyOptions, 100),
Expand Down Expand Up @@ -173,17 +173,6 @@ init_pool(#state{host = Host, port = Port, pool_conf = Conf, worker_opts = Wopts
link(Child),
Child.

%% @private
parse_seed(Addr) when is_binary(Addr) ->
parse_seed(binary_to_list(Addr));
parse_seed(Addr) when is_list(Addr) ->
[Host0, Port] = string:split(Addr, ":", trailing),
Host = case inet:parse_address(Host0) of
{ok, H} -> H;
_ -> Host0
end,
{Host, list_to_integer(Port)}.

%% @private
form_pool_conf(TopologyOptions) ->
Size = mc_utils:get_value(pool_size, TopologyOptions, 10),
Expand Down
48 changes: 23 additions & 25 deletions src/mongoc/mc_topology.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,22 @@ init([SeedsList, TopologyOptions, WorkerOptions]) ->
get_pool_timeout = GetPoolTimeout
},
gen_server:cast(self(), init_seeds),
{ok, State}.
{Host, Port} =
case Seeds of
[Seed|_] ->
mc_util:parse_seed(Seed);
Seed ->
mc_util:parse_seed(Seed)
end,
ConnectTimeoutMS = proplists:get_value(connectTimeoutMS, TopologyOptions, 20000),
ConnectArgs = mc_util:form_connect_args(Host, Port, ConnectTimeoutMS, WorkerOptions),
case mc_topology_logics:validate_server_and_config(ConnectArgs, Type, SetName) of
ok ->
{ok, State};

Error ->
{stop, Error}
end.

-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #topology_state{}) -> term()).
Expand Down Expand Up @@ -159,11 +174,14 @@ handle_cast(update_topology, State = #topology_state{servers = Tab}) ->
handle_cast(_Request, State) ->
{noreply, State}.

handle_info({'DOWN', MRef, _, _, _}, State = #topology_state{topology_opts = Topts, worker_opts = Wopts, servers = Tab}) ->
handle_info(init_seeds, State) ->
mc_topology_logics:init_seeds(State),
{noreply, State};
handle_info({'DOWN', MRef, _, _, _}, State = #topology_state{servers = Tab}) ->
case ets:match(Tab, #mc_server{pid = '$1', host = '$2', mref = MRef, _ = '_'}) of
[[Pid, Host]] ->
[[Pid, _Host]] ->
true = ets:delete(Tab, Pid),
mc_topology_logics:init_seeds([Host], Tab, Topts, Wopts),
erlang:send_after(1000, self(), init_seeds),
{noreply, State};
[] ->
{noreply, State}
Expand Down Expand Up @@ -199,7 +217,7 @@ handle_server_to_unknown(Server, #topology_state{servers = Tab} = State) ->

%% @private
parse_ismaster(Server, IsMaster, RTT, State = #topology_state{servers = Tab}) ->
SType = server_type(IsMaster),
SType = mc_topology_logics:server_type(IsMaster),
[Saved] = ets:select(Tab, [{#mc_server{pid = Server, _ = '_'}, [], ['$_']}]),
{OldRTT, NRTT} = parse_rtt(Saved#mc_server.old_rtt, Saved#mc_server.rtt, RTT),
ToUpdate = Saved#mc_server{
Expand Down Expand Up @@ -230,26 +248,6 @@ parse_rtt(OldRTT, CurRTT, RTT) ->
A = 0.2,
{CurRTT, A * RTT + (1 - A) * OldRTT / 1000}.

%% @private
server_type(#{<<"ismaster">> := true, <<"secondary">> := false, <<"setName">> := _}) ->
rsPrimary;
server_type(#{<<"ismaster">> := false, <<"secondary">> := true, <<"setName">> := _}) ->
rsSecondary;
server_type(#{<<"arbiterOnly">> := true, <<"setName">> := _}) ->
rsArbiter;
server_type(#{<<"hidden">> := true, <<"setName">> := _}) ->
rsOther;
server_type(#{<<"setName">> := _}) ->
rsOther;
server_type(#{<<"msg">> := <<"isdbgrid">>}) ->
mongos;
server_type(#{<<"isreplicaset">> := true}) ->
rsGhost;
server_type(#{<<"ok">> := _}) ->
unknown;
server_type(_) ->
standalone.

%% @private
drop_server(Topology, Server) ->
gen_server:cast(Topology, {drop_server, Server}).
Expand Down
97 changes: 80 additions & 17 deletions src/mongoc/mc_topology_logics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@
-define(UNKN_GHST(ST), ST =:= unknown; ST =:= rsGhost).
-define(NOT_MAX(E, M), E =/= undefined, M =/= undefined, E < M).

-define(LOG_TOPOLOGY_ERROR(Configured, Actual),
logger:error("Configured mongo client topology does not match actual mongo install topology. Configured: ~p; Actual: ~p", [Configured, Actual])).

-define(LOG_SET_NAME_ERROR(Configured, Actual),
logger:error("Configured mongo set name does not match actual mongo install set name. Configured: ~p; Actual: ~p", [Configured, Actual])).

%% API
-export([update_topology_state/2, init_seeds/4, init_seeds/1]).
-export([update_topology_state/2, init_seeds/4, init_seeds/1, validate_server_and_config/3, server_type/1]).

update_topology_state(#mc_server{type = SType, pid = Pid}, State = #topology_state{type = sharded}) when ?NON_SHARDED(SType) -> %% SHARDED
?LOG_TOPOLOGY_ERROR(sharded, SType),
exit(Pid, kill),
State;
update_topology_state(_, State = #topology_state{type = sharded}) ->
Expand All @@ -33,6 +40,7 @@ update_topology_state(#mc_server{type = rsGhost}, State = #topology_state{type =
update_topology_state(#mc_server{type = standalone}, State = #topology_state{type = unknown, seeds = Seeds}) when length(Seeds) =< 1 ->
State#topology_state{type = standalone};
update_topology_state(#mc_server{type = standalone, pid = Pid}, State = #topology_state{type = unknown}) ->
?LOG_TOPOLOGY_ERROR(unknown, standalone),
exit(Pid, kill),
State;
update_topology_state(#mc_server{type = mongos}, State = #topology_state{type = unknown}) ->
Expand All @@ -48,10 +56,12 @@ update_topology_state(
set_possible_primary(Tab, Primary),
State#topology_state{type = checkIfHasPrimary(Tab), setName = SetName};
update_topology_state(#mc_server{type = SType, pid = Pid}, State = #topology_state{type = unknown}) when ?SEC_ARB_OTH(SType) ->
?LOG_TOPOLOGY_ERROR(unknown, SType),
exit(Pid, kill),
State;
update_topology_state(#mc_server{type = SType, pid = Pid},
State = #topology_state{type = replicaSetNoPrimary}) when ?STAL_MONGS(SType) -> %% REPLICASETNOPRIMARY
?LOG_TOPOLOGY_ERROR(replicaSetNoPrimary, SType),
exit(Pid, kill),
State;
update_topology_state(Server = #mc_server{type = SType, setName = SetName},
Expand All @@ -64,14 +74,16 @@ update_topology_state(
init_seeds(lists:flatten([Hosts, Arbiters, Passives]), Tab, Topts, Wopts),
set_possible_primary(Tab, Primary),
State#topology_state{setName = SetName};
update_topology_state(#mc_server{type = SType, pid = Pid}, State = #topology_state{type = replicaSetNoPrimary}) when ?SEC_ARB_OTH(SType) ->
update_topology_state(#mc_server{type = SType, pid = Pid, setName = SSetName}, State = #topology_state{type = replicaSetNoPrimary, setName = TSetName}) when ?SEC_ARB_OTH(SType) ->
?LOG_SET_NAME_ERROR(TSetName, SSetName),
exit(Pid, kill),
State;
update_topology_state(#mc_server{type = SType},
State = #topology_state{type = replicaSetWithPrimary, servers = Tab}) when ?UNKN_GHST(SType) -> %% REPLICASETWITHPRIMARY
State#topology_state{type = checkIfHasPrimary(Tab)};
update_topology_state(#mc_server{type = SType, pid = Pid},
State = #topology_state{type = replicaSetWithPrimary, servers = Tab}) when ?STAL_MONGS(SType) ->
?LOG_TOPOLOGY_ERROR(replicaSetWithPrimary, SType),
exit(Pid, kill),
State#topology_state{type = checkIfHasPrimary(Tab)};
update_topology_state(
Expand All @@ -80,8 +92,9 @@ update_topology_state(
set_possible_primary(Tab, Primary),
State#topology_state{type = checkIfHasPrimary(Tab)};
update_topology_state(
#mc_server{type = SType, pid = Pid},
State = #topology_state{type = replicaSetWithPrimary, servers = Tab}) when ?SEC_ARB_OTH(SType) ->
#mc_server{type = SType, pid = Pid, setName = SSetName},
State = #topology_state{type = replicaSetWithPrimary, servers = Tab, setName = TSetName}) when ?SEC_ARB_OTH(SType) ->
?LOG_SET_NAME_ERROR(TSetName, SSetName),
exit(Pid, kill),
State#topology_state{type = checkIfHasPrimary(Tab)};
update_topology_state(Server = #mc_server{type = rsPrimary, setName = SetName}, State = #topology_state{setName = SetName}) -> %% REPLICASETWITHPRIMARY
Expand All @@ -100,6 +113,7 @@ update_topology_state(
update_topology_state(#mc_server{type = rsPrimary, pid = Pid, host = Host, setName = SSetName},
State = #topology_state{setName = CSetName, servers = Tab}) when SSetName =/= CSetName ->
ets:insert(Tab, #mc_server{pid = Pid, host = Host, type = deleted}),
?LOG_SET_NAME_ERROR(CSetName, SSetName),
exit(Pid, kill),
State#topology_state{type = checkIfHasPrimary(Tab)};
update_topology_state(_, State) ->
Expand All @@ -115,6 +129,68 @@ init_seeds([Addr | Seeds], Tab, Topts, Wopts) ->
start_seed(Saved, Host, Tab, Topts, Wopts),
init_seeds(Seeds, Tab, Topts, Wopts).

validate_server_and_config(ConnectArgs, TopologyType, TopologySetName) ->
{ok, Conn} = mc_worker_api:connect(ConnectArgs),
{true, IsMaster} = mc_worker_api:command(Conn, {isMaster, 1}),
mc_worker_api:disconnect(Conn),
ServerType = server_type(IsMaster),
ServerSetName = maps:get(<<"setName">>, IsMaster, undefined),

case TopologyType of
unknown when ?SEC_ARB_OTH(ServerType) ->
?LOG_TOPOLOGY_ERROR(unknown, ServerType),
{configured_mongo_type_mismatch, TopologyType, ServerType};

unknown when ServerType == standalone ->
?LOG_TOPOLOGY_ERROR(unknown, ServerType),
{configured_mongo_type_mismatch, TopologyType, ServerType};

sharded when ?NON_SHARDED(ServerType) ->
?LOG_TOPOLOGY_ERROR(sharded, ServerType),
{configured_mongo_type_mismatch, TopologyType, ServerType};

replicaSetNoPrimary when ?STAL_MONGS(ServerType) ->
?LOG_TOPOLOGY_ERROR(replicaSetNoPrimary, ServerType),
{configured_mongo_type_mismatch, TopologyType, ServerType};

replicaSetNoPrimary when ?SEC_ARB_OTH(ServerType), ServerSetName /= TopologySetName, TopologySetName /= undefined ->
?LOG_SET_NAME_ERROR(TopologySetName, ServerSetName),
{configured_mongo_set_name_mismatch, TopologySetName, ServerSetName};

replicaSetWithPrimary when ?STAL_MONGS(ServerType) ->
?LOG_TOPOLOGY_ERROR(replicaSetWithPrimary, ServerType),
{configured_mongo_type_mismatch, TopologyType, ServerType};

replicaSetWithPrimary when ?SEC_ARB_OTH(ServerType), ServerSetName /= TopologySetName ->
?LOG_SET_NAME_ERROR(TopologySetName, ServerSetName),
{configured_mongo_set_name_mismatch, TopologySetName, ServerSetName};

_ when ServerType == rsPrimary, ServerSetName /= TopologySetName ->
?LOG_SET_NAME_ERROR(TopologySetName, ServerSetName),
{configured_mongo_set_name_mismatch, TopologySetName, ServerSetName};

_ ->
ok
end.

server_type(#{<<"ismaster">> := true, <<"secondary">> := false, <<"setName">> := _}) ->
rsPrimary;
server_type(#{<<"ismaster">> := false, <<"secondary">> := true, <<"setName">> := _}) ->
rsSecondary;
server_type(#{<<"arbiterOnly">> := true, <<"setName">> := _}) ->
rsArbiter;
server_type(#{<<"hidden">> := true, <<"setName">> := _}) ->
rsOther;
server_type(#{<<"setName">> := _}) ->
rsOther;
server_type(#{<<"msg">> := <<"isdbgrid">>}) ->
mongos;
server_type(#{<<"isreplicaset">> := true}) ->
rsGhost;
server_type(#{<<"ok">> := _}) ->
unknown;
server_type(_) ->
standalone.

%% @private
start_seed([], Host, Tab, Topts, Wopts) ->
Expand Down Expand Up @@ -145,16 +221,3 @@ checkIfHasPrimary_Res([]) ->
replicaSetNoPrimary;
checkIfHasPrimary_Res(_) ->
replicaSetWithPrimary.

%stop_servers_not_in_list(HostsList, Tab) ->
% ets:foldl(
% fun(E, Acc) ->
% case lists:member(E#mc_server.host, HostsList) of
% false ->
% ets:insert(Tab, E#mc_server{type = deleted}),
% unlink(E#mc_server.pid),
% exit(E#mc_server.pid, kill),
% [E#mc_server.host | Acc];
% true -> Acc
% end
% end, [], Tab).
35 changes: 35 additions & 0 deletions src/mongoc/mc_util.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
%%%-------------------------------------------------------------------
%%% @author mononym
%%% @copyright (C) 2022, <COMPANY>
%%% @doc
%%% helper functions accessed from various modules
%%% @end
%%%-------------------------------------------------------------------
-module(mc_util).

-include("mongoc.hrl").

%% API
-export([form_connect_args/4, parse_seed/1]).


%%%===================================================================
%%% API
%%%===================================================================

form_connect_args(Host, Port, Timeout, WorkerArgs) ->
case mc_utils:get_value(ssl, WorkerArgs, false) of
true -> [{host, Host}, {port, Port}, {timeout, Timeout}, {ssl, true},
{ssl_opts, mc_utils:get_value(ssl_opts, WorkerArgs, [])}];
false -> [{host, Host}, {port, Port}, {timeout, Timeout}]
end.

parse_seed(Addr) when is_binary(Addr) ->
parse_seed(binary_to_list(Addr));
parse_seed(Addr) when is_list(Addr) ->
[Host0, Port] = string:split(Addr, ":", trailing),
Host = case inet:parse_address(Host0) of
{ok, H} -> H;
_ -> Host0
end,
{Host, list_to_integer(Port)}.
2 changes: 1 addition & 1 deletion src/mongodb.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% ex: ts=4 sw=4 noexpandtab syntax=erlang
{application, mongodb, [
{description, "Client interface to MongoDB, also known as the driver. See www.mongodb.org"},
{vsn, "3.0.11"},
{vsn, "3.0.12"},
{registered, []},
{applications, [kernel, stdlib, bson, crypto, poolboy, pbkdf2]},
{mod, {mongo_app, []}}
Expand Down
Loading

0 comments on commit 25da12e

Please sign in to comment.