Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cabol.122.riak backend config enhancement #124

Merged
merged 4 commits into from
Mar 12, 2015
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ First, let's create and activate a bucket type simply called maps that is set up
to store Riak maps:

$ riak-admin bucket-type create maps '{"props":{"datatype":"map"}}'
$ riak-admin bucket-type activate mapsdmin bucket-type activate maps
$ riak-admin bucket-type activate maps

Now, let's create a search index called `sumo_test_index` using the default
schema:
Expand Down
109 changes: 76 additions & 33 deletions src/sumo_backend_riak.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,31 @@
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%%% Public API.
-export(
[ get_connection/1,
get_state/1
]).
-export([get_connection/1, checkin_conn/2, checkout_conn/1]).

%%% Exports for sumo_backend
-export(
[ start_link/2
]).
-export([start_link/2]).

%%% Exports for gen_server
-export(
[ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Types.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-record(state, {conn :: pid(), bucket :: binary(), index :: binary()}).
-type r_host() :: iolist() | string().
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A string() is an iolist()

-type r_port() :: non_neg_integer().
-type r_opts() :: [term()].
-type r_pool() :: [pid()].

-record(state, {conn_args :: {r_host(), r_port(), r_opts()},
conn_pool = [] :: r_pool()}).
-type state() :: #state{}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
Expand All @@ -68,9 +68,13 @@ start_link(Name, Options) ->
get_connection(Name) ->
gen_server:call(Name, get_connection).

-spec get_state(atom() | pid()) -> state().
get_state(Name) ->
gen_server:call(Name, get_state).
-spec checkin_conn(atom() | pid(), pid()) -> atom().
checkin_conn(Name, Conn) ->
gen_server:call(Name, {checkin_conn, Conn}).

-spec checkout_conn(atom() | pid()) -> atom().
checkout_conn(Name) ->
gen_server:call(Name, checkout_conn).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% gen_server stuff.
Expand All @@ -81,24 +85,63 @@ init(Options) ->
%% Get connection parameters
Host = proplists:get_value(host, Options, "127.0.0.1"),
Port = proplists:get_value(port, Options, 8087),
PoolSize = proplists:get_value(poolsize, Options, 10),
Opts = riak_opts(Options),
%% Get DB parameters
BucketType = iolist_to_binary(
proplists:get_value(bucket_type, Options)),
Bucket = iolist_to_binary(
proplists:get_value(bucket, Options, <<"sumo_test">>)),
Index = iolist_to_binary(
proplists:get_value(index, Options, <<"sumo_test_index">>)),
%% Place Riak connection
{ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts),
%% Create Riak connection pool
F = fun(_E, Acc) ->
{ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts),
[Conn | Acc]
end,
ConnPool = lists:foldl(F, [], lists:seq(1, PoolSize)),
%% Initial state
{ok, #state{conn = Conn, bucket = {BucketType, Bucket}, index = Index}}.

{ok, #state{conn_args = {Host, Port, Opts}, conn_pool = ConnPool}}.

%% @todo: These are workarounds, a real connection pool needs to be added.
%% Workaround 1: when the store calls 'get_connection', a new Riak connection
%% is returned - one to one model (between connection and store).
%% Workaround 2: a simple list (LIFO) was added to hold a set of connections
%% that are created in the 'init' function (see code above). When the store
%% needs a connection, must call 'checkout_conn' to get the Pid, and when
%% it finish, must call 'checkin_conn'. These operations have some problems,
%% for instance, the don't consider an overflow, so the amount of new
%% connection are not watched, so in case of high concurrency, exist the
%% risk of have so many connection hitting the DB and causing contention.
-spec handle_call(term(), term(), state()) -> {reply, term(), state()}.
handle_call(get_connection, _From, State = #state{conn = Conn}) ->
handle_call(get_connection,
_From,
State = #state{conn_args = {Host, Port, Opts}}) ->
{ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts),
{reply, Conn, State};
handle_call(get_state, _From, State) ->
{reply, State, State}.
handle_call({checkin_conn, Conn},
_From,
State = #state{conn_args = {Host, Port, Opts},
conn_pool = ConnPool}) ->
NewConn = case is_process_alive(Conn) of
true ->
Conn;
false ->
{ok, Conn0} = riakc_pb_socket:start_link(Host, Port, Opts),
Conn0
end,
{reply, ok, State#state{conn_pool = [NewConn | ConnPool]}};
handle_call(checkout_conn,
_From,
State = #state{conn_args = {Host, Port, Opts}, conn_pool = []}) ->
{ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts),
{reply, Conn, State#state{conn_pool = [Conn]}};
handle_call(checkout_conn,
_From,
State = #state{conn_args = {Host, Port, Opts},
conn_pool = ConnPool}) ->
[Conn | _T] = ConnPool,
NewConn = case is_process_alive(Conn) of
true ->
Conn;
false ->
{ok, Conn0} = riakc_pb_socket:start_link(Host, Port, Opts),
Conn0
end,
{reply, NewConn, State#state{conn_pool = (ConnPool -- [Conn])}}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is extremely inneficient and actually doesn't work really well.

First, the -- should be avoided and you have _T which is the same list you get with ConnPool -- [Conn], second, you trust is_process_alive too much, when that's not how it should be used, it could die immediately after you make the call.


%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Unused Callbacks
Expand Down
Loading