Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S2S/out rework #4479

Merged
merged 15 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 141 additions & 84 deletions big_tests/tests/s2s_SUITE.erl

Large diffs are not rendered by default.

40 changes: 29 additions & 11 deletions big_tests/tests/s2s_helper.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-module(s2s_helper).

-export([init_s2s/1, end_s2s/1, configure_s2s/2, has_inet_errors/2, has_xmpp_server/2]).
-export([init_s2s/1, end_s2s/1, configure_s2s/2, has_inet_errors/2, has_xmpp_server/3,
reset_s2s_connections/0]).

-import(distributed_helper, [rpc_spec/1, rpc/4]).

Expand All @@ -23,21 +24,21 @@ configure_s2s(Group, Config) ->

has_inet_errors(History, Server) ->
Inet = lists:any(
fun({_, {inet, getaddr, [Server1, inet]}, {error, nxdomain}})
when Server1 == Server -> true;
fun({_, {inet_res, lookup, [Server1, in, a, _, _]}, []})
when Server1 =:= Server -> true;
(_) -> false
end, History),
Inet6 = lists:any(
fun({_, {inet, getaddr, [Server1, inet6]}, {error, nxdomain}})
when Server1 == Server -> true;
fun({_, {inet_res, lookup, [Server1, in, aaaa, _, _]}, []})
when Server1 =:= Server -> true;
(_) -> false
end, History),
Inet andalso Inet6.

has_xmpp_server(History, Server) ->
lists:all(
fun({_, _, {ok, {hostent, "_xmpp-server._tcp." ++ Server1, _, srv, _, _}}})
when Server1 == Server -> true;
has_xmpp_server(History, Server, DnsRrType) ->
lists:any(
fun({_Pid, {inet_res, lookup, [Server1, in, DnsRrType1, _, _]}, [_|_]})
when Server1 =:= Server, DnsRrType1 =:= DnsRrType -> true;
(_) -> false
end, History).

Expand All @@ -60,6 +61,8 @@ tls_config(required_trusted, #{tls := TlsOpts} = Opts, _) ->
Opts#{tls => TlsOpts#{mode => starttls_required, verify_mode => selfsigned_peer}};
tls_config(required, #{tls := TlsOpts} = Opts, _) ->
Opts#{tls => TlsOpts#{mode => starttls_required, verify_mode => none}};
tls_config(enforced, #{tls := TlsOpts} = Opts, _) ->
Opts#{tls => TlsOpts#{mode => tls, verify_mode => none}};
tls_config(optional, #{tls := TlsOpts} = Opts, _) ->
Opts#{tls => TlsOpts#{mode => starttls, verify_mode => none}};
tls_config(plain, Opts, _) ->
Expand All @@ -71,6 +74,8 @@ tls_preset(both_tls_optional) ->
#{mim => optional, fed => optional};
tls_preset(both_tls_required) ->
#{mim => required, fed => required};
tls_preset(both_tls_enforced) ->
#{mim => enforced, fed => enforced};
tls_preset(node1_tls_optional_node2_tls_required) ->
#{mim => optional, fed => required};
tls_preset(node1_tls_required_node2_tls_optional) ->
Expand All @@ -93,8 +98,8 @@ set_opt(Spec, Opt, Value) ->
rpc(Spec, mongoose_config, set_opt, [Opt, Value]).

restart_s2s(#{} = Spec, S2SListener) ->
Children = rpc(Spec, supervisor, which_children, [ejabberd_s2s_out_sup]),
[rpc(Spec, ejabberd_s2s_out, stop_connection, [Pid]) ||
Children = rpc(Spec, supervisor, which_children, [mongoose_s2s_out_sup]),
[rpc(Spec, mongoose_s2s_out, stop_connection, [Pid, <<"closing connection">>]) ||
{_, Pid, _, _} <- Children],

Children0 = rpc(Spec, supervisor, which_children, [mongoose_listener_sup]),
Expand All @@ -103,3 +108,16 @@ restart_s2s(#{} = Spec, S2SListener) ->
[rpc(Spec, erlang, exit, [Pid, kill]) || {_, Pid, _, _} <- ChildrenIn],

mongoose_helper:restart_listener(Spec, S2SListener).

reset_s2s_connections() ->
[ reset_s2s_connections(rpc_spec(NodeKey)) || NodeKey <- node_keys()].

reset_s2s_connections(Spec) ->
Children = rpc(Spec, supervisor, which_children, [mongoose_s2s_out_sup]),
[rpc(Spec, mongoose_s2s_out, stop_connection, [Pid, <<"closing connection">>]) ||
{_, Pid, _, _} <- Children],

Children0 = rpc(Spec, supervisor, which_children, [mongoose_listener_sup]),
Listeners = [Ref || {Ref, _, _, [mongoose_s2s_listener | _]} <- Children],
ChildrenIn = lists:flatten([ranch:procs(Ref, connections) || Ref <- Listeners]),
[rpc(Spec, erlang, exit, [Pid, kill]) || {_, Pid, _, _} <- ChildrenIn].
3 changes: 3 additions & 0 deletions rel/files/mongooseim.toml
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@
]

[s2s]
{{#max_retry_delay}}
max_retry_delay = {{{max_retry_delay}}}
{{/max_retry_delay}}
{{#s2s_default_policy}}
default_policy = {{{s2s_default_policy}}}
{{/s2s_default_policy}}
Expand Down
1 change: 1 addition & 0 deletions rel/mim1.vars-toml.config
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
{c2s_tls_port, 5223}.
{outgoing_s2s_port, 5299}.
{incoming_s2s_port, 5269}.
{max_retry_delay, 1}.
{http_port, 5280}.
{https_port, 5285}.
{component_port, 8888}.
Expand Down
4 changes: 2 additions & 2 deletions src/c2s/mongoose_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ init({Transport, Ref, LOpts}) ->
-spec handle_event(gen_statem:event_type(), term(), state(), data()) -> fsm_res().
handle_event(internal, {connect, {Transport, Ref, LOpts}}, connect, _) ->
#{shaper := ShaperName, max_stanza_size := MaxStanzaSize} = LOpts,
C2SSocket = mongoose_xmpp_socket:new(Transport, c2s, Ref, LOpts),
C2SSocket = mongoose_xmpp_socket:accept(Transport, c2s, Ref, LOpts),
verify_ip_is_not_blacklisted(C2SSocket),
{ok, Parser} = exml_stream:new_parser([{max_element_size, MaxStanzaSize}]),
Shaper = mongoose_shaper:new(ShaperName),
Expand Down Expand Up @@ -400,7 +400,7 @@ handle_starttls(StateData = #c2s_data{socket = TcpSocket,
parser = Parser,
listener_opts = LOpts = #{tls := _}}, El, SaslAcc, Retries) ->
send_xml(StateData, mongoose_c2s_stanzas:tls_proceed()), %% send last negotiation chunk via tcp
case mongoose_xmpp_socket:tcp_to_tls(TcpSocket, LOpts) of
case mongoose_xmpp_socket:tcp_to_tls(TcpSocket, LOpts, server) of
{ok, TlsSocket} ->
{ok, NewParser} = exml_stream:reset_parser(Parser),
NewStateData = StateData#c2s_data{socket = TlsSocket,
Expand Down
2 changes: 1 addition & 1 deletion src/component/mongoose_component_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ handle_event(internal, {connect, {Transport, Ref, LOpts}}, connect, _) ->
#{shaper := ShaperName, max_stanza_size := MaxStanzaSize} = LOpts,
{ok, Parser} = exml_stream:new_parser([{max_element_size, MaxStanzaSize}]),
Shaper = mongoose_shaper:new(ShaperName),
Socket = mongoose_xmpp_socket:new(Transport, component, Ref, LOpts),
Socket = mongoose_xmpp_socket:accept(Transport, component, Ref, LOpts),
StateData = #component_data{socket = Socket, parser = Parser, shaper = Shaper, listener_opts = LOpts},
{next_state, wait_for_stream, StateData, state_timeout(LOpts)};
handle_event(internal, #xmlstreamstart{attrs = Attrs}, wait_for_stream, StateData) ->
Expand Down
32 changes: 26 additions & 6 deletions src/config/mongoose_config_spec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -821,15 +821,28 @@
format_items = map},
<<"shared">> => #option{type = binary,
validate = non_empty},
<<"shaper">> => #option{type = atom,
validate = non_empty},
<<"state_timeout">> => #option{type = int_or_infinity,
validate = non_negative},
<<"stream_timeout">> => #option{type = int_or_infinity,
validate = non_negative},
<<"address">> => #list{items = s2s_address(),
format_items = map},
<<"max_retry_delay">> => #option{type = integer,
validate = positive},
<<"max_stanza_size">> => #option{type = int_or_infinity,
validate = positive,
process = fun ?MODULE:process_infinity_as_zero/1},
<<"outgoing">> => s2s_outgoing(),
<<"dns">> => s2s_dns(),
<<"tls">> => tls([client, xmpp])},
defaults = #{<<"default_policy">> => allow,
<<"max_retry_delay">> => 300},
<<"shaper">> => none,
<<"max_stanza_size">> => 0,
<<"max_retry_delay">> => 300,
<<"state_timeout">> => timer:seconds(5),
<<"stream_timeout">> => timer:minutes(10)},
wrap = host_config
}.

Expand Down Expand Up @@ -859,7 +872,7 @@
},
include = always,
defaults = #{<<"port">> => 5269,
<<"ip_versions">> => [4, 6],
<<"ip_versions">> => [4, 6], %% NOTE: we still prefer IPv4 first
<<"connection_timeout">> => 10000}
}.

Expand All @@ -883,10 +896,12 @@
<<"ip_address">> => #option{type = string,
validate = ip_address},
<<"port">> => #option{type = integer,
validate = port}
validate = port},
<<"tls">> => #option{type = boolean}
},
required = [<<"host">>, <<"ip_address">>],
process = fun ?MODULE:process_s2s_address/1
process = fun ?MODULE:process_s2s_address/1,
defaults = #{<<"tls">> => false}
}.

%% Callbacks for 'process'
Expand Down Expand Up @@ -1047,8 +1062,13 @@
process_s2s_host_policy(#{host := S2SHost, policy := Policy}) ->
{S2SHost, Policy}.

process_s2s_address(M) ->
maps:take(host, M).
process_s2s_address(#{ip_address := IPAddress} = M0) ->
{ok, IPTuple} = inet:parse_address(IPAddress),
M1 = M0#{ip_tuple => IPTuple, ip_version => ip_version(IPTuple)},
maps:take(host, M1).

ip_version(T) when tuple_size(T) =:= 4 -> inet;
ip_version(T) when tuple_size(T) =:= 8 -> inet6.

Check warning on line 1071 in src/config/mongoose_config_spec.erl

View check run for this annotation

Codecov / codecov/patch

src/config/mongoose_config_spec.erl#L1071

Added line #L1071 was not covered by tests

process_infinity_as_zero(infinity) -> 0;
process_infinity_as_zero(Num) -> Num.
2 changes: 1 addition & 1 deletion src/ejabberd_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ init([]) ->
C2SSupervisor =
template_supervisor_spec(mongoose_c2s_sup, mongoose_c2s),
S2SOutSupervisor =
template_supervisor_spec(ejabberd_s2s_out_sup, ejabberd_s2s_out),
template_supervisor_spec(mongoose_s2s_out_sup, mongoose_s2s_out),
IQSupervisor =
template_supervisor_spec(ejabberd_iq_sup, mongoose_iq_worker),
{ok, {{one_for_one, 10, 1},
Expand Down
8 changes: 4 additions & 4 deletions src/just_tls.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
%% Other options should be supported if the implementing module supports it.
-type options() :: #{module => module(),
verify_mode := peer | selfsigned_peer | none,
mode => tls | starttls | starttls_required, % only ejabberd_s2s_out doesn't use it (yet)
mode => tls | starttls | starttls_required, % only mongoose_s2s_out doesn't use it (yet)
certfile => string(),
cacertfile => string(),
ciphers => string(),
Expand Down Expand Up @@ -45,7 +45,7 @@
%% APIs
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-spec tcp_to_tls(inet:socket(), options(), client | server) ->
-spec tcp_to_tls(inet:socket(), options(), mongoose_xmpp_socket:side()) ->
{ok, ssl:sslsocket()} | {error, any()}.
tcp_to_tls(Socket, Opts, client) ->
TlsOpts = format_opts(Opts, client),
Expand All @@ -57,12 +57,12 @@ tcp_to_tls(Socket, Opts, server) ->
ssl:handshake(Socket, TlsOpts, 5000).

%% @doc Prepare SSL options for direct use of ssl:connect/2 (client side)
-spec make_client_opts(options()) -> [ssl:tls_option()].
-spec make_client_opts(options()) -> [ssl:tls_client_option()].
make_client_opts(Opts) ->
format_opts(Opts, client).

%% @doc Prepare SSL options for direct use of ssl:handshake/2 (server side)
-spec make_server_opts(options()) -> [ssl:tls_option()].
-spec make_server_opts(options()) -> [ssl:tls_server_option()].
make_server_opts(Opts) ->
format_opts(Opts, server).

Expand Down
Loading