diff --git a/rebar.config b/rebar.config index eb4d810..2c1cbb8 100644 --- a/rebar.config +++ b/rebar.config @@ -19,12 +19,12 @@ warn_untyped_record, debug_info ]}. {deps, [ - {'lager', ".*", {git, "git://github.com/basho/lager.git", "2.0.3"}}, - {emysql, "0.*", {git, "git@github.com:Eonblast/Emysql.git", "master"}}, - {emongo, ".*", {git, "git@github.com:marcelog/emongo.git", "marcelog_login_for_2_2_and_higher"}}, - {'sqlite3', ".*", {git, "git@github.com:alexeyr/erlang-sqlite3.git", "HEAD"}}, - {'elasticsearch', ".*", {git, "git@github.com:assplecake/elasticsearch-erlang", "master"}}, - {worker_pool, ".*", {git, "git@github.com:inaka/worker_pool.git", "master"}} + {'lager', ".*", {git, "git://github.com/basho/lager.git", "2.0.3"}}, + {emysql, "0.*", {git, "git@github.com:Eonblast/Emysql.git", "master"}}, + {emongo, ".*", {git, "git@github.com:marcelog/emongo.git", "marcelog_login_for_2_2_and_higher"}}, + {'sqlite3', ".*", {git, "git@github.com:alexeyr/erlang-sqlite3.git", "HEAD"}}, + {'tirerl', ".*", {git, "git@github.com:inaka/tirerl", "jfacorro.9.update.shotgun.version"}}, + {worker_pool, ".*", {git, "git@github.com:inaka/worker_pool.git", "1.0"}} ]}. {xref_warnings, true}. {xref_checks, [undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls, deprecated_functions]}. diff --git a/src/sumo_backend_elasticsearch.erl b/src/sumo_backend_elasticsearch.erl index 60df9b0..3f46565 100644 --- a/src/sumo_backend_elasticsearch.erl +++ b/src/sumo_backend_elasticsearch.erl @@ -30,7 +30,8 @@ %%% Public API. -export( - [ get_index/1 + [ get_index/1, + get_pool_name/1 ]). %%% Exports for sumo_backend @@ -51,8 +52,7 @@ %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Types. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --record(state, {index:: string()}). --type state() :: #state{}. +-type state() :: #{}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% External API. @@ -65,31 +65,49 @@ start_link(Name, Options) -> get_index(Name) -> gen_server:call(Name, get_index). +-spec get_pool_name(atom() | pid()) -> atom(). +get_pool_name(Name) -> + gen_server:call(Name, get_pool_name). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% gen_server stuff. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec init([term()]) -> {ok, #state{}}. +-spec init([term()]) -> {ok, state()}. init(Options) -> %% All calls are done through http so there no connection pool. - Index = proplists:get_value(database, Options), - {ok, #state{index = Index}}. + PoolName = list_to_atom(erlang:ref_to_list(make_ref())), + + Index = proplists:get_value(index, Options), + PoolSize = proplists:get_value(poolsize, Options), + Host = proplists:get_value(host, Options), + Port = proplists:get_value(port, Options), + + PoolOpts = [{workers, PoolSize}, + {host, Host}, + {port, Port}], + + {ok, _} = tirerl:start_pool(PoolName, PoolOpts), + + {ok, #{index => Index, pool_name => PoolName}}. --spec handle_call(term(), term(), state()) -> {reply, term(), #state{}}. -handle_call(get_index, _From, State = #state{index = Index}) -> - {reply, Index, State}. +-spec handle_call(term(), term(), state()) -> {reply, term(), state()}. +handle_call(get_index, _From, State = #{index := Index}) -> + {reply, Index, State}; +handle_call(get_pool_name, _From, State = #{pool_name := PoolName}) -> + {reply, PoolName, State}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Unused Callbacks %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec handle_cast(term(), #state{}) -> {noreply, #state{}}. +-spec handle_cast(term(), state()) -> {noreply, state()}. handle_cast(_Msg, State) -> {noreply, State}. --spec handle_info(term(), #state{}) -> {noreply, #state{}}. +-spec handle_info(term(), state()) -> {noreply, state()}. handle_info(_Msg, State) -> {noreply, State}. --spec terminate(term(), #state{}) -> ok. +-spec terminate(term(), state()) -> ok. terminate(_Reason, _State) -> ok. --spec code_change(term(), #state{}, term()) -> {ok, #state{}}. +-spec code_change(term(), state(), term()) -> {ok, state()}. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/sumo_repo_elasticsearch.erl b/src/sumo_repo_elasticsearch.erl index 4dfcf8a..5e3a16b 100644 --- a/src/sumo_repo_elasticsearch.erl +++ b/src/sumo_repo_elasticsearch.erl @@ -28,14 +28,16 @@ %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Public API. -export([ - init/1, create_schema/2, persist/2, find_by/3, find_by/5, + init/1, create_schema/2, persist/2, find_by/3, find_by/5, find_all/2, delete/3, delete_by/3, delete_all/2 ]). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Types. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --record(state, {index:: string()}). +-type state() :: + #{index => binary(), + pool_name => atom()}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% External API. @@ -45,17 +47,35 @@ init(Options) -> %% ElasticSearch client uses poolboy to handle its own pool of workers %% so no pool is required. Backend = proplists:get_value(storage_backend, Options), - Index = sumo_backend_elasticsearch:get_index(Backend), - {ok, #state{index = Index}}. + Index = case sumo_backend_elasticsearch:get_index(Backend) of + Idx when is_list(Idx) -> list_to_binary(Idx); + Idx when is_binary(Idx) -> Idx; + _ -> throw(invalid_index) + end, + PoolName = sumo_backend_elasticsearch:get_pool_name(Backend), -persist(Doc, #state{index = Index} = State) -> + {ok, #{index => Index, pool_name => PoolName}}. + +-spec persist(sumo:doc(), state()) -> sumo:user_doc(). +persist(Doc, #{index := Index, pool_name := PoolName} = State) -> DocName = sumo_internal:doc_name(Doc), + Type = atom_to_binary(DocName, utf8), + IdField = sumo_internal:id_field_name(DocName), Id = sumo_internal:get_field(IdField, Doc), + Fields = sumo_internal:doc_fields(Doc), - {ok, _} = elasticsearch:index(Index, atom_to_list(DocName), Id, Fields), + FieldsMap = maps:from_list(Fields), + + #{status := Status, body := Body} = + tirerl:insert_doc(PoolName, Index, Type, Id, FieldsMap), - {ok, Doc, State}. + io:format("~p~n", [Body]), + true = Status == 200 orelse Status == 201, + GenId = maps:get(<<"_id">>, Body), + Doc1 = sumo_internal:set_field(IdField, GenId, Doc), + + {ok, Doc1, State}. delete(DocName, Id, State) -> delete_by(DocName, [{id, Id}], State). @@ -65,44 +85,46 @@ delete_by(DocName, Conditions, State) -> lager:critical("Unimplemented function: ~p:delete_by(~p, ~p, ~p)", Args), {error, not_implemented, State}. -delete_all(DocName, #state{index = Index} = State) -> +delete_all(DocName, #{index := Index, pool_name := PoolName} = State) -> lager:debug("dropping type: ~p", [DocName]), - {ok, _} = elasticsearch:delete(Index, DocName, <<"">>), + Type = atom_to_binary(DocName, utf8), + MatchAll = #{query => #{match_all => #{}}}, + tirerl:delete_by_query(PoolName, Index, Type, MatchAll, []), {ok, unknown, State}. find_by(DocName, Conditions, Limit, Offset, - #state{index = Index} = State) -> + #{index := Index, pool_name := PoolName} = State) -> CondFun = fun ({Key, Value}) when is_list(Value) -> - [{term , [{Key, list_to_binary(Value)}]}]; + #{match => maps:from_list([{Key, list_to_binary(Value)}])}; (Cond) -> - [{term , [Cond]}] + #{match => maps:from_list([Cond])} end, QueryConditions = lists:map(CondFun, Conditions), - Query = [{query, [{bool, [{should, [QueryConditions]}]}]}], + Query = #{query => #{bool => #{must => QueryConditions}}}, Query1 = case Limit of 0 -> Query; - _ -> [{from, Offset}, {size, Limit} | Query] + _ -> Query#{from => Offset, + size => Limit} end, - {ok, Result} = elasticsearch:search(Index, atom_to_list(DocName), Query1), - Hits = proplists:get_value(<<"hits">>, Result), - Hits1 = proplists:get_value(<<"hits">>, Hits), - Fun = - fun - (Item) -> - Fields = proplists:get_value(<<"_source">>, Item), - sumo_internal:new_doc(DocName, Fields) - end, - Docs = lists:map(Fun ,Hits1), + Type = atom_to_binary(DocName, utf8), + #{body := #{<<"hits">> := #{<<"hits">> := Results}}} = + tirerl:search(PoolName, Index, Type, Query1), + + Fun = fun(Item) -> map_to_doc(DocName, Item) end, + Docs = lists:map(Fun, Results), {ok, Docs, State}. find_by(DocName, Conditions, State) -> find_by(DocName, Conditions, 0, 0, State). -create_schema(Schema, #state{index = Index} = State) -> +find_all(DocName, State) -> + find_by(DocName, [], State). + +create_schema(Schema, #{index := Index, pool_name := PoolName} = State) -> SchemaName = sumo_internal:schema_name(Schema), Fields = sumo_internal:schema_fields(Schema), Fun = @@ -114,5 +136,23 @@ create_schema(Schema, #state{index = Index} = State) -> end, Mappings = lists:foldl(Fun, #{}, Fields), lager:debug("creating type: ~p", [SchemaName]), - {ok, _} = elasticsearch:create_index(Index, [], Mappings), + Response = tirerl:create_index(PoolName, Index, Mappings), + io:format("~p~n", [Response]), {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Internal Functions. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +map_to_doc(DocName, Item) -> + Values = maps:get(<<"_source">>, Item), + IdField = sumo_internal:id_field_name(DocName), + + Fun = fun (Key, Doc) -> + FieldName = binary_to_atom(Key, utf8), + Value = maps:get(Key, Values), + sumo_internal:set_field(FieldName, Value, Doc) + end, + Keys = maps:keys(Values), + Doc = lists:foldl(Fun, sumo_internal:new_doc(DocName, []), Keys), + sumo_internal:set_field(IdField, maps:get(<<"_id">>, Item), Doc).