Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inaki.worker pool #31

Merged
merged 6 commits into from
Nov 8, 2013
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
warn_untyped_record, debug_info
]}.
{deps, [
{'lager', ".*", {
git, "git://github.com/basho/lager.git", "2.0.0"}
}
{'lager', ".*", {git, "git://github.com/basho/lager.git", "2.0.0"}},
{emysql, "0.*", {git, "[email protected]:Eonblast/Emysql.git", "master"}},
{emongo, ".*", {git, "[email protected]:JacobVorreuter/emongo.git", "HEAD"}},
{eredis, ".*", {git, "[email protected]:wooga/eredis.git", "HEAD"}},
{'sqlite3', ".*", {git, "[email protected]:alexeyr/erlang-sqlite3.git", "HEAD"}},
{worker_pool, ".*", {git, "[email protected]:inaka/worker_pool.git", "master"}}
]}.
101 changes: 101 additions & 0 deletions src/sumo_backend_mysql.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
%%% @doc Main interface for repositories.
%%%
%%% Copyright 2012 Marcelo Gornstein <marcelog@@gmail.com>
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%% @end
%%% @copyright Marcelo Gornstein <[email protected]>
%%% @author Marcelo Gornstein <[email protected]>
%%%
-module(sumo_backend_mysql).
-author("Marcelo Gornstein <[email protected]>").
-github("https://github.com/marcelog").
-homepage("http://marcelog.github.com/").
-license("Apache License 2.0").

-behaviour(gen_server).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Exports.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%%% Public API.

%%% Exports for gen_server
-export(
[ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export(
[ get_pool/1
]).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Types.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(state, {
pool = undefined :: atom()
}).
-type state() :: #state{}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% External API.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-spec get_pool(atom() | pid()) -> atom().
get_pool(Name) ->
gen_server:call(Name, get_pool).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% gen_server stuff.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-spec init([term()]) -> {ok, #state{}}.
init(Options) ->
PoolSize = proplists:get_value(poolsize, Options),
Pool = list_to_atom(erlang:ref_to_list(make_ref())),
emysql:add_pool(
Pool,
PoolSize,
proplists:get_value(username, Options),
proplists:get_value(password, Options),
proplists:get_value(host, Options, "localhost"),
proplists:get_value(port, Options, 3306),
proplists:get_value(database, Options),
proplists:get_value(encoding, Options, utf8)
),
{ok, #state{pool=Pool}}.

-spec handle_call(term(), term(), state()) -> {reply, term(), #state{}}.
handle_call(get_pool, _From, State = #state{pool=Pool}) ->
{reply, Pool, State}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Unused Callbacks
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-spec handle_cast(term(), #state{}) -> {noreply, #state{}}.
handle_cast(_Msg, State) -> {noreply, State}.

-spec handle_info(term(), #state{}) -> {noreply, #state{}}.
handle_info(_Msg, State) -> {noreply, State}.

-spec terminate(term(), #state{}) -> ok.
terminate(_Reason, _State) -> ok.

-spec code_change(term(), #state{}, term()) -> {ok, #state{}}.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
55 changes: 55 additions & 0 deletions src/sumo_backend_sup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
%%% @doc Repository supervisor.
%%%
%%% Copyright 2012 Marcelo Gornstein &lt;marcelog@@gmail.com&gt;
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%% @end
%%% @copyright Marcelo Gornstein <[email protected]>
%%% @author Marcelo Gornstein <[email protected]>
%%%
-module(sumo_backend_sup).
-author("Marcelo Gornstein <[email protected]>").
-github("https://github.com/marcelog").
-homepage("http://marcelog.github.com/").
-license("Apache License 2.0").

-define(CLD(Name, Module, Options), {
Module,
{gen_server, start_link, [{local, Name}, Module, Options, []]},
permanent, 5000, worker, [Module]
}).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Exports.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-export([start_link/0]).
-export([init/1]).

-behaviour(supervisor).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Code starts here.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
{ok, Backends} = application:get_env(sumo_db, storage_backends),
Children = lists:map(
fun({Name, Module, Options}) ->
lager:debug("starting backend: ~s (~s)", [Name, Module]),
?CLD(Name, Module, Options)
end,
Backends
),
{ok, { {one_for_one, 5, 10}, Children} }.
3 changes: 2 additions & 1 deletion src/sumo_db.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
stdlib,
sasl,
lager,
crypto
crypto,
worker_pool
]},
{mod, { sumo_app, []}},
{env, []}
Expand Down
60 changes: 36 additions & 24 deletions src/sumo_repo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Exports.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%%% Public API.
-export([start_link/3]).
-export([create_schema/2]).
-export([persist/2]).
-export([find_all/2, find_all/5, find_by/3, find_by/5]).
-export([delete/3, delete_by/3, delete_all/2]).
-export([find_all/2, find_all/5, find_by/3, find_by/5]).
-export([call/4]).
-export([start_link/3]).

%%% Exports for gen_server
-export([
Expand All @@ -50,13 +51,16 @@
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Types.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-record(state, {
handler = undefined:: module(),
handler_state = undefined:: any()
}).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Code starts here.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @doc Returns all behavior callbacks.
-spec behaviour_info(callbacks) -> proplists:proplist()|undefined.
behaviour_info(callbacks) ->
Expand All @@ -71,34 +75,51 @@ behaviour_info(_Other) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% External API.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @doc Starts and links a new process for the given repo implementation.
-spec start_link(atom(), module(), [term()]) -> {ok, pid()}.
start_link(Name, Module, Options) ->
Poolsize = proplists:get_value(workers, Options, 100),
WPoolOptions = [ {overrun_warning, infinity}
, {overrun_handler, {error_logger, warning_report}}
, {workers, Poolsize}
, {worker, {?MODULE, [Module, Options]}}
],
wpool:start_pool(Name, WPoolOptions).

%% @doc Creates the schema of the given docs in the given repository name.
-spec create_schema(atom(), #sumo_schema{}) -> ok.
create_schema(Name, #sumo_schema{}=Schema) ->
wpool:call(Name, {create_schema, Schema}).

%% @doc Persist the given doc with the given repository name.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved start_link and create_schema from below so that it matches the export order.

-spec persist(atom(), #sumo_doc{}) -> #sumo_doc{}.
persist(Name, #sumo_doc{}=Doc) ->
gen_server:call(Name, {persist, Doc}).
wpool:call(Name, {persist, Doc}).

%% @doc Deletes the doc identified by id in the given repository name.
-spec delete(atom(), sumo_schema_name(), term()) -> ok.
delete(Name, DocName, Id) ->
gen_server:call(Name, {delete, DocName, Id}).
wpool:call(Name, {delete, DocName, Id}).

%% @doc Deletes the docs identified by the given conditions.
-spec delete_by(atom(), sumo_schema_name(), proplists:proplist()) -> ok.
delete_by(Name, DocName, Conditions) ->
gen_server:call(Name, {delete_by, DocName, Conditions}).
wpool:call(Name, {delete_by, DocName, Conditions}).

%% @doc Deletes all docs in the given repository name.
-spec delete_all(atom(), sumo_schema_name()) -> ok.
delete_all(Name, DocName) ->
gen_server:call(Name, {delete_all, DocName}).
wpool:call(Name, {delete_all, DocName}).

%% @doc Returns all docs from the given repositoru name.
find_all(Name, DocName) ->
gen_server:call(Name, {find_all, DocName}).
wpool:call(Name, {find_all, DocName}).

%% @doc Returns Limit docs starting at Offset from the given repository name, ordered by OrderField.
%% OrderField may be 'undefined'.
%% @doc Returns Limit docs starting at Offset from the given repository name,
%% ordered by OrderField. OrderField may be 'undefined'.
find_all(Name, DocName, OrderField, Limit, Offset) ->
gen_server:call(Name, {find_all, DocName, OrderField, Limit, Offset}).
wpool:call(Name, {find_all, DocName, OrderField, Limit, Offset}).

%% @doc Finds documents that match the given conditions in the given
%% repository name.
Expand All @@ -107,34 +128,25 @@ find_all(Name, DocName, OrderField, Limit, Offset) ->
pos_integer(), pos_integer()
) -> [#sumo_doc{}].
find_by(Name, DocName, Conditions, Limit, Offset) ->
gen_server:call(Name, {find_by, DocName, Conditions, Limit, Offset}).
wpool:call(Name, {find_by, DocName, Conditions, Limit, Offset}).

%% @doc Finds documents that match the given conditions in the given
%% repository name.
-spec find_by(
atom(), sumo_schema_name(), proplists:proplist()
) -> [#sumo_doc{}].
find_by(Name, DocName, Conditions) ->
gen_server:call(Name, {find_by, DocName, Conditions}).
wpool:call(Name, {find_by, DocName, Conditions}).

%% @doc Calls a custom function in the given repository name.
-spec call(atom(), sumo_schema_name(), atom(), [term()]) -> term().
call(Name, DocName, Function, Args) ->
gen_server:call(Name, {call, DocName, Function, Args}).

%% @doc Creates the schema of the given docs in the given repository name.
-spec create_schema(atom(), #sumo_schema{}) -> ok.
create_schema(Name, #sumo_schema{}=Schema) ->
gen_server:call(Name, {create_schema, Schema}).

%% @doc Starts and links a new process for the given repo implementation.
-spec start_link(atom(), module(), [term()]) -> {ok, pid()}.
start_link(Name, Module, Options) ->
gen_server:start_link({local, Name}, ?MODULE, [Module,Options], []).
wpool:call(Name, {call, DocName, Function, Args}).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% gen_server stuff.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @doc Called by start_link.
-spec init([term()]) -> {ok, #state{}}.
init([Module, Options]) ->
Expand Down Expand Up @@ -215,7 +227,7 @@ handle_call(
{create_schema, #sumo_schema{name=Name}=Schema}, _From,
#state{handler=Handler,handler_state=HState}=State
) ->
lager:info("Creating schema for: ~p", [Name]),
lager:info("creating schema for: ~p", [Name]),
{Result, NewState} = case Handler:create_schema(Schema, HState) of
{ok, NewState_} -> {ok, NewState_};
{error, Error, NewState_} -> {{error, Error}, NewState_}
Expand Down
2 changes: 1 addition & 1 deletion src/sumo_repo_mongo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ delete_by(DocName, Conditions, #state{pool=Pool}=State) ->
{ok, 0, State}.

delete_all(DocName, #state{pool=Pool}=State) ->
lager:debug("Dropping collection: ~p", [DocName]),
lager:debug("dropping collection: ~p", [DocName]),
ok = emongo:delete(Pool, atom_to_list(DocName)),
{ok, unknown, State}.

Expand Down
Loading