Skip to content

Commit

Permalink
handler behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
oberernst committed Mar 14, 2022
1 parent c450455 commit 78335b9
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 185 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ UNDER CONSTRUCTION! It's still a rough draft that I'm ripping to shreds constant
* CoAP binary request encoding and decoding (RFC 7253.3)
* Endpoint
* can receive and decode messages
* de-dups messages based on incoming IP/Port and `message_id`
* de-dups messages based on incoming IP/Port and `id`
* echoes requests
* can be started with arbitrary `handler` function for extending functionality

Expand Down
12 changes: 6 additions & 6 deletions lib/macrina/client.ex
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
defmodule Macrina.Client do
alias Macrina.{Endpoint, Connection, Message}
alias Macrina.{Endpoint, Connection.Server, Message}

defstruct [:conn]

def build(ip, port, endpoint \\ Endpoint) do
{:ok, socket} = Endpoint.socket(endpoint)

case Connection.start_link(ip: ip, port: port, socket: socket, type: :client) do
case Server.start_link(ip: ip, port: port, socket: socket, type: :client) do
{:ok, conn} -> %__MODULE__{conn: conn}
{:error, {:already_started, conn}} -> %__MODULE__{conn: conn}
end
end

def get(%__MODULE__{conn: pid}, url) do
message = Message.build(:get, options: parse_url(url), type: :con)
Connection.call(pid, message)
Server.call(pid, message)
end

def post(%__MODULE__{conn: pid}, url, payload \\ <<>>) do
message = Message.build(:post, options: parse_url(url), payload: payload)
Connection.call(pid, message)
Server.call(pid, message)
end

def put(%__MODULE__{conn: pid}, url, payload \\ <<>>) do
message = Message.build(:put, options: parse_url(url), payload: payload)
Connection.call(pid, message)
Server.call(pid, message)
end

def delete(%__MODULE__{conn: pid}, url) do
message = Message.build(:delete, options: parse_url(url))
Connection.call(pid, message)
Server.call(pid, message)
end

defp parse_url(url) do
Expand Down
140 changes: 0 additions & 140 deletions lib/macrina/connection.ex

This file was deleted.

44 changes: 44 additions & 0 deletions lib/macrina/connection/connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
defmodule Macrina.Connection do
alias Macrina.Message
require Logger

defstruct [:callers, :handler, :ids, :ip, :name, :port, :seen_ids, :socket, :tokens]

@type t :: %__MODULE__{
callers: [{binary(), tuple()}],
handler: module(),
ip: tuple(),
name: String.t(),
port: integer(),
seen_ids: [integer()],
socket: port()
}

def pop_caller(%__MODULE__{callers: callers} = state, caller) do
%__MODULE__{state | callers: List.delete(callers, caller)}
end

def pop_id(%__MODULE__{ids: ids} = state, %Message{id: id}) do
%__MODULE__{state | ids: List.delete(ids, id)}
end

def pop_token(%__MODULE__{tokens: tokens} = state, %Message{token: token}) do
%__MODULE__{state | ids: List.delete(tokens, token)}
end

def push_caller(%__MODULE__{callers: callers} = state, caller) do
%__MODULE__{state | callers: [caller | callers]}
end

def push_id(%__MODULE__{ids: ids} = state, %Message{id: id}) do
%__MODULE__{state | ids: [id | ids]}
end

def push_seen_id(%__MODULE__{seen_ids: seen_ids} = state, %Message{id: id}) do
%__MODULE__{state | seen_ids: [id | seen_ids]}
end

def push_token(%__MODULE__{tokens: tokens} = state, %Message{token: token}) do
%__MODULE__{state | tokens: [token | tokens]}
end
end
96 changes: 96 additions & 0 deletions lib/macrina/connection/server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
defmodule Macrina.Connection.Server do
use GenServer
alias Macrina.{Connection, ConnectionRegistry, Handler.Echo, Message}
import Connection, only: :functions
require Logger

# ------------------------------------------- Client ------------------------------------------- #

def start_link(args) do
handler = Keyword.get(args, :handler, Echo)
ip = Keyword.fetch!(args, :ip)
port = Keyword.fetch!(args, :port)
socket = Keyword.fetch!(args, :socket)
name = {:via, Registry, {ConnectionRegistry, Macrina.conn_name(ip, port)}}

state = %Connection{
callers: [],
handler: handler,
ids: [],
ip: ip,
port: port,
tokens: [],
seen_ids: [],
socket: socket
}

GenServer.start_link(__MODULE__, state, name: name)
end

def call(pid, message, timeout \\ 2000) do
GenServer.call(pid, {:request, message}, timeout)
end

# ------------------------------------------- Server ------------------------------------------- #

def init(state) do
{:ok, state}
end

def handle_call({:request, %Message{} = message}, from, %Connection{port: port} = state) do
bin = Message.encode(message)
Logger.info("#{port} sending #{message.type} #{:binary.encode_hex(bin)}")
:gen_udp.send(state.socket, {state.ip, state.port}, bin)

{:noreply,
state
|> push_caller({message.token, from})
|> push_id(message)
|> push_token(message)}
end

def handle_info({:coap, packet}, %Connection{port: port} = state) do
case Message.decode(packet) do
{:ok, %Message{type: type} = message} when type in [:ack, :res] ->
Logger.info("#{port} received #{type} #{:binary.encode_hex(packet)}")

{:noreply,
state
|> handle(message)
|> reply_to_client(message)
|> pop_id(message)
|> pop_token(message)
|> push_seen_id(message)}

{:ok, %Message{type: type} = message} ->
Logger.info("#{port} received #{type} #{:binary.encode_hex(packet)}")

{:noreply,
state
|> handle(message)
|> reply_to_client(message)
|> push_seen_id(message)}
end
end

defp reply_to_client(%Connection{callers: callers} = state, message) do
caller = Enum.find(callers, fn {t, _from} -> t == message.token end)

unless is_nil(caller) do
{_, from} = caller
GenServer.reply(from, message)
end

pop_caller(state, message)
end

defp handle(%Connection{handler: handler, ip: ip, port: port, socket: socket} = state, message) do
if message.id in state.seen_ids do
state
else
{:ok, responses} = handler.call(state, message)
Enum.each(responses, fn bin -> :gen_udp.send(socket, {ip, port}, bin) end)
state
end
end
end
4 changes: 2 additions & 2 deletions lib/macrina/endpoint.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Macrina.Endpoint do
use GenServer
alias Macrina.{Connection, ConnectionRegistry, ConnectionSupervisor}
alias Macrina.{Connection.Server, ConnectionRegistry, ConnectionSupervisor}

defstruct [:socket]

Expand Down Expand Up @@ -31,7 +31,7 @@ defmodule Macrina.Endpoint do
send(pid, {:coap, packet})

_ ->
init_args = {Connection, ip: ip, name: conn_name, port: port, socket: socket}
init_args = {Server, ip: ip, name: conn_name, port: port, socket: socket}
{:ok, pid} = DynamicSupervisor.start_child(ConnectionSupervisor, init_args)
send(pid, {:coap, packet})
end
Expand Down
25 changes: 0 additions & 25 deletions lib/macrina/handler.ex

This file was deleted.

Loading

0 comments on commit 78335b9

Please sign in to comment.