diff --git a/rebar.config b/rebar.config index 6aa3fef..b037471 100644 --- a/rebar.config +++ b/rebar.config @@ -19,7 +19,10 @@ warn_untyped_record, debug_info ]}. {deps, [ - {'lager', ".*", { - git, "git://github.com/basho/lager.git", "2.0.0"} - } + {'lager', ".*", {git, "git://github.com/basho/lager.git", "2.0.0"}}, + {emysql, "0.*", {git, "git@github.com:Eonblast/Emysql.git", "master"}}, + {emongo, ".*", {git, "git@github.com:JacobVorreuter/emongo.git", "HEAD"}}, + {eredis, ".*", {git, "git@github.com:wooga/eredis.git", "HEAD"}}, + {'sqlite3', ".*", {git, "git@github.com:alexeyr/erlang-sqlite3.git", "HEAD"}}, + {worker_pool, ".*", {git, "git@github.com:inaka/worker_pool.git", "master"}} ]}. diff --git a/src/sumo_backend_mysql.erl b/src/sumo_backend_mysql.erl new file mode 100644 index 0000000..01f8c4b --- /dev/null +++ b/src/sumo_backend_mysql.erl @@ -0,0 +1,98 @@ +%%% @doc Main interface for repositories. +%%% +%%% Copyright 2012 Marcelo Gornstein <marcelog@@gmail.com> +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% @end +%%% @copyright Marcelo Gornstein +%%% @author Marcelo Gornstein +%%% +-module(sumo_backend_mysql). +-license("Apache License 2.0"). + +-behaviour(gen_server). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Exports. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%%% Public API. + +%%% Exports for gen_server +-export( + [ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). +-export( + [ get_pool/1 + ]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Types. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-record(state, { + pool = undefined :: atom() +}). +-type state() :: #state{}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% External API. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-spec get_pool(atom() | pid()) -> atom(). +get_pool(Name) -> + gen_server:call(Name, get_pool). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% gen_server stuff. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-spec init([term()]) -> {ok, #state{}}. +init(Options) -> + PoolSize = proplists:get_value(poolsize, Options), + Pool = list_to_atom(erlang:ref_to_list(make_ref())), + emysql:add_pool( + Pool, + PoolSize, + proplists:get_value(username, Options), + proplists:get_value(password, Options), + proplists:get_value(host, Options, "localhost"), + proplists:get_value(port, Options, 3306), + proplists:get_value(database, Options), + proplists:get_value(encoding, Options, utf8) + ), + {ok, #state{pool=Pool}}. + +-spec handle_call(term(), term(), state()) -> {reply, term(), #state{}}. +handle_call(get_pool, _From, State = #state{pool=Pool}) -> + {reply, Pool, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Unused Callbacks +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-spec handle_cast(term(), #state{}) -> {noreply, #state{}}. +handle_cast(_Msg, State) -> {noreply, State}. + +-spec handle_info(term(), #state{}) -> {noreply, #state{}}. +handle_info(_Msg, State) -> {noreply, State}. + +-spec terminate(term(), #state{}) -> ok. +terminate(_Reason, _State) -> ok. + +-spec code_change(term(), #state{}, term()) -> {ok, #state{}}. +code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/sumo_backend_sup.erl b/src/sumo_backend_sup.erl new file mode 100644 index 0000000..33283cd --- /dev/null +++ b/src/sumo_backend_sup.erl @@ -0,0 +1,49 @@ +%%% @doc Repository supervisor. +%%% +%%% Copyright 2012 Marcelo Gornstein <marcelog@@gmail.com> +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% @end +%%% @copyright Marcelo Gornstein +%%% @author Marcelo Gornstein +%%% +-module(sumo_backend_sup). +-license("Apache License 2.0"). + +-define(CLD(Name, Module, Options), { + Module, + {gen_server, start_link, [{local, Name}, Module, Options, []]}, + permanent, 5000, worker, [Module] +}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Exports. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-export([start_link/0]). +-export([init/1]). + +-behaviour(supervisor). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Code starts here. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, Backends} = application:get_env(sumo_db, storage_backends), + Children = lists:map( + fun({Name, Module, Options}) -> ?CLD(Name, Module, Options) end, + Backends + ), + {ok, { {one_for_one, 5, 10}, Children} }. diff --git a/src/sumo_db.app.src b/src/sumo_db.app.src index 087fa24..61ddd04 100644 --- a/src/sumo_db.app.src +++ b/src/sumo_db.app.src @@ -8,7 +8,8 @@ stdlib, sasl, lager, - crypto + crypto, + worker_pool ]}, {mod, { sumo_app, []}}, {env, []} diff --git a/src/sumo_repo.erl b/src/sumo_repo.erl index 0f94916..8af670f 100644 --- a/src/sumo_repo.erl +++ b/src/sumo_repo.erl @@ -33,13 +33,14 @@ %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Exports. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %%% Public API. +-export([start_link/3]). -export([create_schema/2]). -export([persist/2]). --export([find_all/2, find_all/5, find_by/3, find_by/5]). -export([delete/3, delete_by/3, delete_all/2]). +-export([find_all/2, find_all/5, find_by/3, find_by/5]). -export([call/4]). --export([start_link/3]). %%% Exports for gen_server -export([ @@ -50,13 +51,16 @@ %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Types. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + -record(state, { handler = undefined:: module(), handler_state = undefined:: any() }). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Code starts here. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% @doc Returns all behavior callbacks. -spec behaviour_info(callbacks) -> proplists:proplist()|undefined. behaviour_info(callbacks) -> @@ -71,34 +75,51 @@ behaviour_info(_Other) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% External API. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% @doc Starts and links a new process for the given repo implementation. +-spec start_link(atom(), module(), [term()]) -> {ok, pid()}. +start_link(Name, Module, Options) -> + Poolsize = proplists:get_value(workers, Options, 100), + WPoolOptions = [ {overrun_warning, infinity} + , {overrun_handler, {error_logger, warning_report}} + , {workers, Poolsize} + , {worker, {?MODULE, [Module, Options]}} + ], + wpool:start_pool(Name, WPoolOptions). + +%% @doc Creates the schema of the given docs in the given repository name. +-spec create_schema(atom(), #sumo_schema{}) -> ok. +create_schema(Name, #sumo_schema{}=Schema) -> + wpool:call(Name, {create_schema, Schema}). + %% @doc Persist the given doc with the given repository name. -spec persist(atom(), #sumo_doc{}) -> #sumo_doc{}. persist(Name, #sumo_doc{}=Doc) -> - gen_server:call(Name, {persist, Doc}). + wpool:call(Name, {persist, Doc}). %% @doc Deletes the doc identified by id in the given repository name. -spec delete(atom(), sumo_schema_name(), term()) -> ok. delete(Name, DocName, Id) -> - gen_server:call(Name, {delete, DocName, Id}). + wpool:call(Name, {delete, DocName, Id}). %% @doc Deletes the docs identified by the given conditions. -spec delete_by(atom(), sumo_schema_name(), proplists:proplist()) -> ok. delete_by(Name, DocName, Conditions) -> - gen_server:call(Name, {delete_by, DocName, Conditions}). + wpool:call(Name, {delete_by, DocName, Conditions}). %% @doc Deletes all docs in the given repository name. -spec delete_all(atom(), sumo_schema_name()) -> ok. delete_all(Name, DocName) -> - gen_server:call(Name, {delete_all, DocName}). + wpool:call(Name, {delete_all, DocName}). %% @doc Returns all docs from the given repositoru name. find_all(Name, DocName) -> - gen_server:call(Name, {find_all, DocName}). + wpool:call(Name, {find_all, DocName}). -%% @doc Returns Limit docs starting at Offset from the given repository name, ordered by OrderField. -%% OrderField may be 'undefined'. +%% @doc Returns Limit docs starting at Offset from the given repository name, +%% ordered by OrderField. OrderField may be 'undefined'. find_all(Name, DocName, OrderField, Limit, Offset) -> - gen_server:call(Name, {find_all, DocName, OrderField, Limit, Offset}). + wpool:call(Name, {find_all, DocName, OrderField, Limit, Offset}). %% @doc Finds documents that match the given conditions in the given %% repository name. @@ -107,7 +128,7 @@ find_all(Name, DocName, OrderField, Limit, Offset) -> pos_integer(), pos_integer() ) -> [#sumo_doc{}]. find_by(Name, DocName, Conditions, Limit, Offset) -> - gen_server:call(Name, {find_by, DocName, Conditions, Limit, Offset}). + wpool:call(Name, {find_by, DocName, Conditions, Limit, Offset}). %% @doc Finds documents that match the given conditions in the given %% repository name. @@ -115,26 +136,17 @@ find_by(Name, DocName, Conditions, Limit, Offset) -> atom(), sumo_schema_name(), proplists:proplist() ) -> [#sumo_doc{}]. find_by(Name, DocName, Conditions) -> - gen_server:call(Name, {find_by, DocName, Conditions}). + wpool:call(Name, {find_by, DocName, Conditions}). %% @doc Calls a custom function in the given repository name. -spec call(atom(), sumo_schema_name(), atom(), [term()]) -> term(). call(Name, DocName, Function, Args) -> - gen_server:call(Name, {call, DocName, Function, Args}). - -%% @doc Creates the schema of the given docs in the given repository name. --spec create_schema(atom(), #sumo_schema{}) -> ok. -create_schema(Name, #sumo_schema{}=Schema) -> - gen_server:call(Name, {create_schema, Schema}). - -%% @doc Starts and links a new process for the given repo implementation. --spec start_link(atom(), module(), [term()]) -> {ok, pid()}. -start_link(Name, Module, Options) -> - gen_server:start_link({local, Name}, ?MODULE, [Module,Options], []). + wpool:call(Name, {call, DocName, Function, Args}). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% gen_server stuff. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% @doc Called by start_link. -spec init([term()]) -> {ok, #state{}}. init([Module, Options]) -> @@ -215,7 +227,7 @@ handle_call( {create_schema, #sumo_schema{name=Name}=Schema}, _From, #state{handler=Handler,handler_state=HState}=State ) -> - lager:info("Creating schema for: ~p", [Name]), + lager:info("creating schema for: ~p", [Name]), {Result, NewState} = case Handler:create_schema(Schema, HState) of {ok, NewState_} -> {ok, NewState_}; {error, Error, NewState_} -> {{error, Error}, NewState_} diff --git a/src/sumo_repo_mongo.erl b/src/sumo_repo_mongo.erl index c84c5bb..f5844df 100644 --- a/src/sumo_repo_mongo.erl +++ b/src/sumo_repo_mongo.erl @@ -72,7 +72,7 @@ delete_by(DocName, Conditions, #state{pool=Pool}=State) -> {ok, 0, State}. delete_all(DocName, #state{pool=Pool}=State) -> - lager:debug("Dropping collection: ~p", [DocName]), + lager:debug("dropping collection: ~p", [DocName]), ok = emongo:delete(Pool, atom_to_list(DocName)), {ok, unknown, State}. @@ -129,7 +129,7 @@ create_schema( [], Attrs ), - lager:debug("Creating index: ~p for ~p", [Name, SchemaName]), + lager:debug("creating index: ~p for ~p", [Name, SchemaName]), ok = emongo:ensure_index( Pool, atom_to_list(SchemaName), [{atom_to_list(Name), 1}] ) diff --git a/src/sumo_repo_mysql.erl b/src/sumo_repo_mysql.erl index 8fac45b..aab25c4 100644 --- a/src/sumo_repo_mysql.erl +++ b/src/sumo_repo_mysql.erl @@ -43,12 +43,21 @@ %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Types. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --record(state, {pool:: pid()}). + +-record(state, {pool :: atom() | pid()}). -type state() :: #state{}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% External API. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +init(Options) -> + % The storage backend key in the options specifies the name of the process + % which creates and initializes the storage backend. + Backend = proplists:get_value(storage_backend, Options), + Pool = sumo_backend_mysql:get_pool(Backend), + {ok, #state{pool=Pool}}. + persist(#sumo_doc{name=DocName}=Doc, State) -> % Set the real id, replacing undefined by 0 so it is autogenerated IdField = sumo:field_name(sumo:get_id_field(DocName)), @@ -322,6 +331,18 @@ create_index(Name, index) -> create_index(_, _) -> none. +prepare(DocName, PreName, Fun) when is_atom(PreName), is_function(Fun) -> + Name = statement_name(DocName, PreName), + case emysql_statements:fetch(Name) of + undefined -> + Query = iolist_to_binary(Fun()), + log("Preparing query: ~p: ~p", [Name, Query]), + ok = emysql:prepare(Name, Query); + Q -> + log("Using already prepared query: ~p: ~p", [Name, Q]) + end, + Name. + %% @doc Call prepare/3 first, to get a well formed statement name. execute(Name, Args, #state{pool=Pool}) when is_atom(Name), is_list(Args) -> {Time, Value} = timer:tc( emysql, execute, [Pool, Name, Args] ), @@ -337,40 +358,15 @@ execute(PreQuery, #state{pool=Pool}) when is_list(PreQuery)-> log("Executed Query: ~s (~pms)", [Query, Time/1000]), Value. -prepare(DocName, PreName, Fun) when is_atom(PreName), is_function(Fun) -> - Name = statement_name(DocName, PreName), - case emysql_statements:fetch(Name) of - undefined -> - Query = iolist_to_binary(Fun()), - log("Preparing query: ~p: ~p", [Name, Query]), - ok = emysql:prepare(Name, Query); - Q -> - log("Using already prepared query: ~p: ~p", [Name, Q]) - end, - Name. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Private API. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% @doc We can extend this to wrap around emysql records, so they don't end up %% leaking details in all the repo. evaluate_execute_result(#error_packet{status = Status, msg = Msg}, State) -> {error, <>, State}. -init(Options) -> - Pool = list_to_atom(erlang:ref_to_list(make_ref())), - emysql:add_pool( - Pool, - proplists:get_value(poolsize, Options, 1), - proplists:get_value(username, Options), - proplists:get_value(password, Options), - proplists:get_value(host, Options, "localhost"), - proplists:get_value(port, Options, 3306), - proplists:get_value(database, Options), - proplists:get_value(encoding, Options, utf8) - ), - {ok, #state{pool=Pool}}. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% Private API. -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% escape(String) -> ["`", String, "`"]. @@ -381,6 +377,6 @@ statement_name(DocName, StatementName) -> log(Msg, Args) -> case application:get_env(sumo_db, log_queries) of - true -> lager:debug(Msg, Args); - _ -> ok + {ok, true} -> lager:debug(Msg, Args); + _ -> ok end. diff --git a/src/sumo_repo_sup.erl b/src/sumo_repo_sup.erl index be23804..5d912b9 100644 --- a/src/sumo_repo_sup.erl +++ b/src/sumo_repo_sup.erl @@ -24,7 +24,8 @@ -license("Apache License 2.0"). -define(CLD(Name, Module, Options), { - Module, {sumo_repo, start_link, [Name, Module, Options]}, + Module, + {sumo_repo, start_link, [Name, Module, Options]}, permanent, 5000, worker, [Module] }). @@ -45,10 +46,7 @@ start_link() -> init([]) -> {ok, Repositories} = application:get_env(sumo_db, repositories), Children = lists:map( - fun({Name, Module, Options}) -> - lager:debug("Starting repository: ~s (~s)", [Name, Module]), - ?CLD(Name, Module, Options) - end, + fun({Name, Module, Options}) -> ?CLD(Name, Module, Options) end, Repositories ), - {ok, { {one_for_one, 5, 10}, Children} }. \ No newline at end of file + {ok, { {one_for_one, 5, 10}, Children} }. diff --git a/src/sumo_sup.erl b/src/sumo_sup.erl index 6f77781..536c737 100644 --- a/src/sumo_sup.erl +++ b/src/sumo_sup.erl @@ -46,6 +46,9 @@ start_link() -> init([]) -> {ok, { - {one_for_one, 5, 10}, [?SUP(sumo_repo_sup)] + {one_for_one, 5, 10}, + [ ?SUP(sumo_backend_sup) + , ?SUP(sumo_repo_sup) + ] }}.