Skip to content

Commit

Permalink
KAZOO-859: use a singleton timer to ensure same broker instead of mes…
Browse files Browse the repository at this point in the history
…sage timeout which is continuously triggered
  • Loading branch information
k-anderson committed May 8, 2013
1 parent 1606b54 commit d64e81a
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions lib/whistle_amqp-1.0.0/src/wh_amqp_connections.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@

-include("amqp_util.hrl").

-record(state, {exchanges=dict:new()}).
-record(state, {exchanges=dict:new()
,ensure_tref}).

-define(TAB, ?MODULE).
-define(ENSURE_TIME, 5000).
Expand Down Expand Up @@ -235,11 +236,11 @@ init([]) ->
%% @end
%%--------------------------------------------------------------------
handle_call(exchanges, _, #state{exchanges=Exchanges}=State) ->
{reply, [V || {_, V} <- dict:to_list(Exchanges)], State, ?ENSURE_TIME};
{reply, [V || {_, V} <- dict:to_list(Exchanges)], State};
handle_call({new, #wh_amqp_connection{uri=URI}=Connection}, _, State) ->
case ets:insert_new(?TAB, Connection) of
true -> {reply, Connection, State, ?ENSURE_TIME};
false -> {reply, find(URI), State, ?ENSURE_TIME}
true -> {reply, Connection, State};
false -> {reply, find(URI), State}
end;
handle_call({connected, #wh_amqp_connection{uri=URI, connection=Pid
,connection_ref=Ref
Expand All @@ -251,7 +252,7 @@ handle_call({connected, #wh_amqp_connection{uri=URI, connection=Pid
,{#wh_amqp_connection.control_channel, CtrlPid}
],
ets:update_element(?TAB, URI, Updates),
{reply, C#wh_amqp_connection{available=true}, State, ?ENSURE_TIME};
{reply, C#wh_amqp_connection{available=true}, ensure_same_connection(State)};
handle_call({disconnected, #wh_amqp_connection{uri=URI}=C}, _, State) ->
Updates = [{#wh_amqp_connection.connection, undefined}
,{#wh_amqp_connection.connection_ref, undefined}
Expand All @@ -260,9 +261,9 @@ handle_call({disconnected, #wh_amqp_connection{uri=URI}=C}, _, State) ->
,{#wh_amqp_connection.control_channel, undefined}
],
ets:update_element(?TAB, URI, Updates),
{reply, C#wh_amqp_connection{available=false}, State, ?ENSURE_TIME};
{reply, C#wh_amqp_connection{available=false}, ensure_same_connection(State)};
handle_call(_Request, _From, State) ->
{reply, {error, not_implemented}, State, ?ENSURE_TIME}.
{reply, {error, not_implemented}, State}.

%%--------------------------------------------------------------------
%% @private
Expand All @@ -276,10 +277,10 @@ handle_call(_Request, _From, State) ->
%%--------------------------------------------------------------------
handle_cast({force_reconnect, Connection}, State) ->
wh_amqp_channels:force_reconnect(Connection),
{noreply, State, ?ENSURE_TIME};
{noreply, State};
handle_cast({add_exchange, #'exchange.declare'{exchange=Name}=Command}
,#state{exchanges=Exchanges}=State) ->
{noreply, State#state{exchanges=dict:store(Name, Command, Exchanges)}, ?ENSURE_TIME};
{noreply, State#state{exchanges=dict:store(Name, Command, Exchanges)}};
handle_cast({add_exchange, URI, #'exchange.declare'{exchange=Name}=Command}, State) ->
Exchanges = [Command
|lists:filter(fun(#'exchange.declare'{exchange=N}) ->
Expand All @@ -288,9 +289,9 @@ handle_cast({add_exchange, URI, #'exchange.declare'{exchange=Name}=Command}, Sta
end, exchanges(URI))
],
ets:update_element(?TAB, URI, {#wh_amqp_connection.exchanges, Exchanges}),
{noreply, State, ?ENSURE_TIME};
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State, ?ENSURE_TIME}.
{noreply, State}.

%%--------------------------------------------------------------------
%% @private
Expand All @@ -302,15 +303,15 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_info('timeout', State) ->
handle_info('$ensure_same_connection', State) ->
_ = case current() of
{'ok', #wh_amqp_connection{}=Connection} ->
wh_amqp_channels:force_reconnect(Connection);
{'error', _} -> 'ok'
end,
{noreply, State};
handle_info(_Info, State) ->
{noreply, State, ?ENSURE_TIME}.
{noreply, State}.

%%--------------------------------------------------------------------
%% @private
Expand Down Expand Up @@ -359,3 +360,10 @@ redeclare_exchanges([], _) -> ok;
redeclare_exchanges([Command|Commands], Connection)->
_ = declare_exchange([Connection], Command),
redeclare_exchanges(Commands, Connection).

-spec ensure_same_connection(#state{}) -> #state{}.
ensure_same_connection(#state{ensure_tref='undefined'}=State) ->
State#state{ensure_tref=erlang:send_after(?ENSURE_TIME, self(), '$ensure_same_connection')};
ensure_same_connection(#state{ensure_tref=TRef}=State) ->
_ = erlang:cancel_timer(TRef),
ensure_same_connection(State#state{ensure_tref='undefined'}).

0 comments on commit d64e81a

Please sign in to comment.