Skip to content
This repository has been archived by the owner on Jan 20, 2023. It is now read-only.

Commit

Permalink
Merge pull request #41 from jtmoulia/fix/19-close-messages
Browse files Browse the repository at this point in the history
Fix for 19: Role closed messages
  • Loading branch information
jtmoulia committed Jan 25, 2016
2 parents 8e1ce65 + fdb318b commit daba475
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 35 deletions.
6 changes: 2 additions & 4 deletions examples/rpc.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,14 @@ defmodule RPC do
end

defp loop(state) do


# `Spell.cast_call` performs an asyncronous call to the remote procedure, the result will
# be intercepted and parsed by the block `Spell.receive_result`
{:ok, call_id} = Spell.cast_call(state.caller, state.procedure, state.params)
Logger.info("<Caller: #{inspect(state.caller)}> send params: #{inspect(state.params)}")

case Spell.receive_result(state.caller, call_id) do
{:ok, result} -> IO.inspect handle_result(state, result)
{:error, reason} -> {:error, reason}
{:ok, result} -> handle_result(state, result)
{:closed, reason} -> :ok
end

:timer.sleep(1000)
Expand Down
24 changes: 24 additions & 0 deletions lib/crossbar.ex
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ defmodule Crossbar do
end
end

@doc """
Generate a WAMP URI using the provided prefix and a random suffix.
"""
def create_uri(prefix, length \\ 10) do
reseed()
"#{prefix}.#{rand_bin(length)}"
end


# GenEvent Callbacks

@doc """
Expand Down Expand Up @@ -306,4 +315,19 @@ defmodule Crossbar do
port when is_binary(port) -> String.to_integer(port)
end
end

defp reseed() do
<<a :: 32, b :: 32, c :: 32>> = :crypto.rand_bytes(12)
:random.seed(a, b, c)
end

defp rand_bin(length, range \\ Enum.into(48..122, []), acc \\ [])
defp rand_bin(0, _, acc) do
:erlang.list_to_binary(acc)
end
defp rand_bin(length, range, acc) do
n = length(range) |> :random.uniform
rand_bin(length - 1, range, [Enum.at(range, n - 1) | acc])
end

end
22 changes: 13 additions & 9 deletions lib/spell/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ defmodule Spell.Message do
clause for their respective WAMP message from `peer` given `type`.
This macro is meant to be a convenience -- feel free to drop down to the
undelrying `receive`.
underlying `receive`.
## Example
Expand All @@ -93,12 +93,9 @@ defmodule Spell.Message do
args: [^subscribe_id, subscription]}} ->
{:ok, subscription}
{Peer, ^peer, %Message{type: :error, args: [33, _, reason | _]}} ->
case Message.normalize_error(error) do
{:ok, reason} ->
{:error, reason}
{:error, _, reason} ->
{:error, reason}
end
{:error, reason}
{Peer, ^peer, {:closed, reason}} ->
{:closed, reason}
after
@timeout -> {:error, :timeout}
end
Expand All @@ -107,6 +104,10 @@ defmodule Spell.Message do
"""
defmacro receive_message(peer, type, body) do
code = get_code_for_type(type)
closed_branch = quote do
{Spell.Peer, ^unquote(peer), {:closed, _} = closed} -> closed
end

branches = (body[:do] || [])
|> Enum.map(fn
{:->, _, [[match], branch_body]} ->
Expand All @@ -130,9 +131,12 @@ defmodule Spell.Message do
when unquote(guards) -> unquote(branch_body)
end
end
end) |> Enum.map(fn [branch] -> branch end)
end)
|> Enum.map(fn [branch] -> branch end)
|> Enum.concat(closed_branch)

quote do
receive do unquote(branches) after 1000 -> {:error, :timeout} end
receive do unquote(branches) after 5000 -> {:error, :timeout} end
end
end

Expand Down
12 changes: 8 additions & 4 deletions lib/spell/peer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ defmodule Spell.Peer do
end

@doc """
Stop the `peer` process.
Stop the `peer` process. Roles are responsible for notifying owners
of open commands that the command is being terminated via a message
like
{Spell.Peer, peer, {:closed, command}}
"""
def stop(peer) do
GenServer.cast(peer, :stop)
Expand Down Expand Up @@ -143,8 +148,6 @@ defmodule Spell.Peer do

@doc """
Send an Erlang message to the peer's owner.
TODO: Rename to `notify`
"""
@spec send_to_owner(pid, any) :: :ok
def send_to_owner(peer, term) do
Expand Down Expand Up @@ -281,7 +284,8 @@ defmodule Spell.Peer do
{:stop, {:transport, reason}, state}
end

def terminate(reason, _state) do
def terminate(reason, state) do
{:ok, _state} = Role.map_on_close(state.role.state, self())
Logger.debug(fn -> "Peer terminating due to: #{inspect(reason)}" end)
end

Expand Down
4 changes: 1 addition & 3 deletions lib/spell/role.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ defmodule Spell.Role do
quote do
@behaviour Spell.Role

# Helper Functions

# Default Role Callbacks

def get_features(_options), do: nil
Expand Down Expand Up @@ -218,7 +216,7 @@ defmodule Spell.Role do
{:ok, Enum.reverse(results)}
end
defp map([], _function, results, reasons) do
{:close, Enum.reverse(results), Enum.reverse(reasons)}
{:close, Enum.reverse(reasons), Enum.reverse(results)}
end

defp map([{role_module, _} = role | roles], function, results, reasons) do
Expand Down
18 changes: 18 additions & 0 deletions lib/spell/role/callee.ex
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,24 @@ defmodule Spell.Role.Callee do
end
end

@doc """
The `on_close` callback notifies the owners of all open `REGISTER` and
`UNREGISTER` commands that the connection is closed by sending
a `{Spell.Peer, pid, {:closed, command}}` message.
"""
def on_close(peer,
%{register_requests: register_requests,
unregister_requests: unregister_requests} = state) do
# TODO the command type and subscribe id should be included
for pid <- Dict.values(register_requests) do
:ok = Peer.notify(pid, {:closed, :register})
end
for {registration, pid} <- Dict.values(unregister_requests) do
:ok = Peer.notify(pid, {:closed, {:unregister, registration}})
end
super(peer, state)
end

# Private Functions

@spec new_register_message(Message.wamp_id, Keyword.t) ::
Expand Down
12 changes: 12 additions & 0 deletions lib/spell/role/caller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ defmodule Spell.Role.Caller do
{:ok, :ok, %{state | call_requests: call_requests}}
end

@doc """
The `on_close/2` callback notifies processes which own open `CALL` commands
that the peer is closing by sending a `{Spell.Peer, pid, {:closed, :call}}`
message.
"""
def on_close(peer, %{call_requests: call_requests} = state) do
for pid <- Dict.values(call_requests) do
:ok = Peer.notify(pid, {:closed, :call})
end
super(peer, state)
end

# Private Functions

defp new_call_message(procedure, options) do
Expand Down
9 changes: 1 addition & 8 deletions lib/spell/role/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,6 @@ defmodule Spell.Role.Publisher do
alias Spell.Message
alias Spell.Peer

defstruct [published: HashDict.new()]

# Type Specs

@type t :: %__MODULE__{
published: HashDict.t(String.t, Message.t)}

# Public Interface

@doc """
Expand Down Expand Up @@ -67,7 +60,7 @@ defmodule Spell.Role.Publisher do
def get_features(_options), do: {:publisher, %{}}

def init(_peer_options, _role_options) do
{:ok, %__MODULE__{}}
{:ok, nil}
end

def handle_message(%{type: :published,
Expand Down
12 changes: 12 additions & 0 deletions lib/spell/role/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ defmodule Spell.Role.Session do
{:ok, {:ok, message}, %{state | pid_goodbye: pid_goodbye}}
end

@doc """
The `on_close/2` callback notifies processes which own open `HELLO` or
`GOODBYE` commands that the peer is closing by sending a
`{Spell.Peer, pid, {:closed, command}}` message.
"""
def on_close(peer,
%{pid_hello: pid_hello, pid_goodbye: pid_goodbye} = state) do
if pid_hello, do: :ok = Peer.notify(pid_hello, {:closed, :hello})
if pid_goodbye, do: :ok = Peer.notify(pid_goodbye, {:closed, :goodbye})
super(peer, state)
end

# Private Functions

@spec new_hello(String.t, map) :: {:ok, Message.t} | {:error, any}
Expand Down
18 changes: 18 additions & 0 deletions lib/spell/role/subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,24 @@ defmodule Spell.Role.Subscriber do
end
end

@doc """
The `on_close` callback notifies the owners of all open `SUBSCRIBE` and
`UNSUBSCRIBE` commands that the connection is closed by sending
a `{Spell.Peer, pid, {:closed, command}}` message.
"""
def on_close(peer,
%{subscribe_requests: subscribe_requests,
unsubscribe_requests: unsubscribe_requests} = state) do
# TODO the command type and subscribe id should be included
for pid <- Dict.values(subscribe_requests) do
:ok = Peer.notify(pid, {:closed, :subscribe})
end
for {subscription, pid} <- Dict.values(unsubscribe_requests) do
:ok = Peer.notify(pid, {:closed, {:unsubscribe, subscription}})
end
super(peer, state)
end

# Private Functions

@spec new_subscribe_message(Message.wamp_uri, Keyword.t) ::
Expand Down
31 changes: 26 additions & 5 deletions test/integration/spell/callee_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ defmodule Spell.CalleeTest do
use ExUnit.Case

alias Spell.Role.Callee

@procedure "com.spell.test.callee.procedure"
alias Spell.Peer

setup do
{:ok, peer} = Crossbar.uri(Crossbar.get_config())
|> Spell.connect(roles: [Callee], realm: Crossbar.get_realm())
on_exit fn -> Spell.close(peer) end
on_exit fn -> if Process.alive?(peer), do: Spell.close(peer) end
{:ok, peer: peer}
end

test "cast_register/{2,3} receive_registered/2", %{peer: peer} do
{:ok, register_id} = Spell.cast_register(peer, @procedure)
{:ok, register_id} = Spell.cast_register(peer, create_uri())
{:ok, registration} = Spell.receive_registered(peer, register_id)
assert is_integer(registration)
end
Expand All @@ -35,8 +34,30 @@ defmodule Spell.CalleeTest do
end

test "call_register", %{peer: peer} do
{:ok, registration} = Spell.call_register(peer, @procedure)
{:ok, registration} = Spell.call_register(peer, create_uri())
assert is_integer(registration)
end

test "stop/1", %{peer: peer} do
:ok = Peer.stop(peer)
refute_receive(_)
end

test "stop/1 with open REGISTER", %{peer: peer} do
{:ok, _registration} = Spell.cast_register(peer, create_uri())
:ok = Peer.stop(peer)
assert_receive({Peer, ^peer, {:closed, :register}})
end

test "stop/1 with open UNREGISTER", %{peer: peer} do
{:ok, registration} = Spell.call_register(peer, create_uri())
{:ok, _unregister} = Spell.cast_unregister(peer, registration)
:ok = Peer.stop(peer)
assert_receive({Peer, ^peer, {:closed, {:unregister, ^registration}}})
end

# Private Functions

defp create_uri, do: Crossbar.create_uri("com.spell.test.callee")

end
22 changes: 22 additions & 0 deletions test/integration/spell/caller_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Spell.CallerTest do
use ExUnit.Case

alias Spell.Role.Caller
alias Spell.Peer

@procedure "com.spell.test.callee.procedure"

setup do
{:ok, peer} = Crossbar.uri(Crossbar.get_config())
|> Spell.connect(roles: [Caller], realm: Crossbar.get_realm())
on_exit fn -> if Process.alive?(peer), do: Spell.close(peer) end
{:ok, peer: peer}
end

test "stop/1 with open CALL", %{peer: peer} do
{:ok, _registration} = Caller.cast_call(peer, @procedure)
:ok = Peer.stop(peer)
assert_receive({Peer, ^peer, {:closed, :call}})
end

end
7 changes: 7 additions & 0 deletions test/integration/spell/session_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Spell.SessionTest do

alias Spell.Role.Session
alias Spell.Message
alias Spell.Peer

setup do
{:ok, peer} = Crossbar.uri(Crossbar.get_config())
Expand All @@ -14,4 +15,10 @@ defmodule Spell.SessionTest do
test "call_goodbye/1", %{peer: peer} do
assert {:ok, %Message{type: :goodbye}} = Session.call_goodbye(peer)
end

test "stop/1 with open GOODBYE", %{peer: peer} do
:ok = Session.cast_goodbye(peer)
:ok = Peer.stop(peer)
assert_receive({Peer, ^peer, {:closed, :goodbye}})
end
end
Loading

0 comments on commit daba475

Please sign in to comment.