Skip to content
This repository has been archived by the owner on May 27, 2019. It is now read-only.

Support oauth #7

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
{deps,
[
{jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "0.8.5"}}},
{meck, ".*", {git, "https://github.com/horkhe/meck.git", {branch, "feature/wait"}}}
{meck, ".*", {git, "https://github.com/horkhe/meck.git", {branch, "feature/wait"}}},
{oauth, ".*", {git, "https://github.com/tim/erlang-oauth.git", {tag, "v1.4.0"}}}
]}.
26 changes: 15 additions & 11 deletions src/twerl_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,31 @@

-spec connect(string(), list(), string(), fun()) -> ok | {error, reason}.
connect({post, Url}, Auth, Params, Callback) ->
Headers = twerl_util:headers_for_auth(Auth, {post, Url}, Params),
case catch httpc:request(post, {Url, Headers, ?CONTENT_TYPE, Params}, [], [{sync, false}, {stream, self}]) of
%% TODO look into moving this into headers_for_auth
{Headers, Body} = case twerl_util:headers_for_auth(Auth, {post, Url}, Params) of
L when is_list(L) ->
{L, Params};
{H, L2} ->
{H, L2}
end,
case catch httpc:request(post, {Url, Headers, ?CONTENT_TYPE, Body}, [], [{sync, false}, {stream, self}]) of
{ok, RequestId} ->
?MODULE:handle_connection(Callback, RequestId);

{error, Reason} ->
{error, {http_error, Reason}}
end;

connect({get, BaseUrl}, Auth, Params, Callback) ->
Headers = twerl_util:headers_for_auth(Auth, {get, BaseUrl}, Params),
Url = case Params of
"" ->
BaseUrl;
_ ->
BaseUrl ++ "?" ++ Params
end,
{Headers, UrlParams} = case twerl_util:headers_for_auth(Auth, {get, BaseUrl}, Params) of
L when is_list(L) ->
{L, Params};
{H, L2} ->
{H, L2}
end,
Url = BaseUrl ++ "?" ++ UrlParams,
case catch httpc:request(get, {Url, Headers}, [], [{sync, false}, {stream, self}]) of
{ok, RequestId} ->
?MODULE:handle_connection(Callback, RequestId);

{error, Reason} ->
{error, {http_error, Reason}}
end.
Expand Down
131 changes: 60 additions & 71 deletions src/twerl_stream_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,60 +85,56 @@ handle_call(stop, _From, State) ->
ok = client_shutdown(State),
{stop, normal, stopped, State};

handle_call(start_stream, _From, State = #state{client_pid = Pid}) ->
case Pid of
undefined ->
% not started, start client
NewPid = client_connect(State);
_ ->
% alrady started, ignore
NewPid = Pid
end,
{reply, ok, State#state{ client_pid = NewPid, status = connected }};
handle_call(start_stream, _From, State=#state{client_pid=Pid}) ->
NewPid = case Pid of
undefined ->
%% not started, start client
client_connect(State);
_ ->
%% alrady started, ignore
Pid
end,
{reply, ok, State#state{client_pid=NewPid, status=connected }};

handle_call(stop_stream, _From, State) ->
ok = client_shutdown(State),
% we leave the state as is, we will get a message when the client ends
% and set the pid / status there
{reply, ok, State};

handle_call({set_params, Params}, _From, State = #state{client_pid = Pid, params = OldParams}) ->
case Params of
OldParams ->
% same, don't do anything
{reply, ok, State};
_ ->
% different, change and see if we need to restart the client
case Pid of
undefined ->
% not started, nothing to do
NewPid = Pid;
_ ->
% already started, restart
ok = client_shutdown(State),
NewPid = client_connect(State#state{ params = Params })
end,
{reply, ok, State#state{ params = Params, client_pid = NewPid }}
end;

handle_call({set_auth, Auth}, _From, State = #state{client_pid = Pid, auth = OldAuth}) ->
case Auth of
OldAuth ->
% same, don't do anything
{reply, ok, State};
_ ->
% different, change and see if we need to restart the client
case Pid of
undefined ->
handle_call({set_params, OldParams}, _From, State=#state{params=OldParams}) ->
%% same, don't do anything
{reply, ok, State};

handle_call({set_params, Params}, _From, State = #state{client_pid=Pid}) ->
%% change and see if we need to restart the client
NewPid = case Pid of
undefined ->
%% not started, nothing to do
undefined;
_ ->
%% already started, restart
ok = client_shutdown(State),
client_connect(State#state{ params = Params })
end,
{reply, ok, State#state{ params = Params, client_pid = NewPid }};

handle_call({set_auth, OldAuth}, _From, State=#state{auth=OldAuth}) ->
%% same, don't do anything
{reply, ok, State};

handle_call({set_auth, Auth}, _From, State = #state{client_pid = Pid}) ->
%% different, change and see if we need to restart the client
NewPid = case Pid of
undefined ->
% not started, nothing to do
NewPid = Pid;
undefined;
_ ->
% already started, restart
ok = client_shutdown(State),
NewPid = client_connect(State#state{ auth = Auth })
client_connect(State#state{ auth = Auth })
end,
{reply, ok, State#state{ auth = Auth, client_pid = NewPid }}
end;
{reply, ok, State#state{ auth = Auth, client_pid = NewPid }};

handle_call({set_callback, Callback}, _From, State) ->
{reply, ok, State#state{ callback = Callback }};
Expand Down Expand Up @@ -178,25 +174,21 @@ handle_cast(_Msg, State) ->

% we only care about messages from the current client, not old shutdown message
handle_info({Pid, client_exit, Message}, State) when Pid == State#state.client_pid ->
case Message of
% Handle messages from client process terminating
unauthorised ->
NewPid = undefined,
Status = {error, unauthorised};
stream_end ->
% TODO reconnect
NewPid = undefined,
Status = disconnected;
terminate ->
% We closed the connection
NewPid = undefined,
Status = disconnected;
Error ->
% TODO maybe try reconnecting?
NewPid = undefined,
Status = {error, Error}
end,
{noreply, State#state{status = Status, client_pid = NewPid}};
{NewPid, Status} = case Message of
%% Handle messages from client process terminating
unauthorised ->
{undefined, {error, unauthorised}};
stream_end ->
%% TODO reconnect
{undefined, disconnected};
terminate ->
%% We closed the connection
{undefined, disconnected};
Error ->
%% TODO maybe try reconnecting?
{undefined, {error, Error}}
end,
{noreply, State#state{status=Status, client_pid=NewPid}};

handle_info(_Info, State) ->
{noreply, State}.
Expand Down Expand Up @@ -251,13 +243,10 @@ client_connect(#state{auth = Auth, params = Params}) ->
end).

-spec client_shutdown(record()) -> ok.
client_shutdown(#state{client_pid = Pid}) ->
case Pid of
undefined ->
% not started, nothing to do
ok;
_ ->
% terminate the client
Pid ! terminate,
ok
end.
client_shutdown(#state{client_pid=undefined}) ->
%% not started, nothing to do
ok;
client_shutdown(#state{client_pid=Pid}) ->
%% terminate the client
Pid ! terminate,
ok.
18 changes: 14 additions & 4 deletions src/twerl_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
decode/1
]).

% TODO extend for oauth
-spec headers_for_auth(term(), term(), list()) -> list().
-spec headers_for_auth(term(), term(), list()) -> list() | {list(), string()}.
headers_for_auth({basic, [User, Pass]}, _Endpoint, _Params) ->
generate_auth_headers(User, Pass).
generate_auth_headers(User, Pass);

headers_for_auth({oauth, [ConsumerKey, ConsumerSecret, TokenKey, TokenSecret]}, _Endpoint, _Params) ->
{[], oauth_params(ConsumerKey, ConsumerSecret, TokenKey, TokenSecret, _Endpoint, _Params)}.

-spec generate_headers() -> list().
generate_headers() ->
Expand Down Expand Up @@ -58,7 +60,7 @@ args_to_params(Method, [Current | Remaining], Acc) ->

-spec filter_url() -> string().
filter_url() ->
"https://stream.twitter.com/1/statuses/filter.json".
"https://stream.twitter.com/1.1/statuses/filter.json".

-spec decode(binary()) -> list().
decode(Data) ->
Expand All @@ -69,3 +71,11 @@ decode(Data) ->
{Decoded} = jiffy:decode(Data),
Decoded
end.

oauth_params(ConsumerKey, ConsumerSecret, TokenKey, TokenSecret, {Method, URL}, Params) ->
MethodStr = case Method of
get -> "GET";
post -> "POST" end,
Consumer = {ConsumerKey, ConsumerSecret, hmac_sha1},
SignedParams = oauth:sign(MethodStr, URL, Params, Consumer, TokenKey, TokenSecret),
oauth:uri_params_encode(SignedParams).