Skip to content

Commit

Permalink
[#74] Use tirerl instead of erlastic_search.
Browse files Browse the repository at this point in the history
  • Loading branch information
jfacorro committed Oct 14, 2014
1 parent dbbf984 commit 48788ba
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 45 deletions.
12 changes: 6 additions & 6 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
warn_untyped_record, debug_info
]}.
{deps, [
{'lager', ".*", {git, "git://github.com/basho/lager.git", "2.0.3"}},
{emysql, "0.*", {git, "[email protected]:Eonblast/Emysql.git", "master"}},
{emongo, ".*", {git, "[email protected]:marcelog/emongo.git", "marcelog_login_for_2_2_and_higher"}},
{'sqlite3', ".*", {git, "[email protected]:alexeyr/erlang-sqlite3.git", "HEAD"}},
{'elasticsearch', ".*", {git, "[email protected]:assplecake/elasticsearch-erlang", "master"}},
{worker_pool, ".*", {git, "[email protected]:inaka/worker_pool.git", "master"}}
{'lager', ".*", {git, "git://github.com/basho/lager.git", "2.0.3"}},
{emysql, "0.*", {git, "[email protected]:Eonblast/Emysql.git", "master"}},
{emongo, ".*", {git, "[email protected]:marcelog/emongo.git", "marcelog_login_for_2_2_and_higher"}},
{'sqlite3', ".*", {git, "[email protected]:alexeyr/erlang-sqlite3.git", "HEAD"}},
{'tirerl', ".*", {git, "[email protected]:inaka/tirerl", "jfacorro.9.update.shotgun.version"}},
{worker_pool, ".*", {git, "[email protected]:inaka/worker_pool.git", "1.0"}}
]}.
{xref_warnings, true}.
{xref_checks, [undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls, deprecated_functions]}.
44 changes: 31 additions & 13 deletions src/sumo_backend_elasticsearch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@

%%% Public API.
-export(
[ get_index/1
[ get_index/1,
get_pool_name/1
]).

%%% Exports for sumo_backend
Expand All @@ -51,8 +52,7 @@
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Types.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(state, {index:: string()}).
-type state() :: #state{}.
-type state() :: #{}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% External API.
Expand All @@ -65,31 +65,49 @@ start_link(Name, Options) ->
get_index(Name) ->
gen_server:call(Name, get_index).

-spec get_pool_name(atom() | pid()) -> atom().
get_pool_name(Name) ->
gen_server:call(Name, get_pool_name).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% gen_server stuff.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec init([term()]) -> {ok, #state{}}.
-spec init([term()]) -> {ok, state()}.
init(Options) ->
%% All calls are done through http so there no connection pool.
Index = proplists:get_value(database, Options),
{ok, #state{index = Index}}.
PoolName = list_to_atom(erlang:ref_to_list(make_ref())),

Index = proplists:get_value(index, Options),
PoolSize = proplists:get_value(poolsize, Options),
Host = proplists:get_value(host, Options),
Port = proplists:get_value(port, Options),

PoolOpts = [{workers, PoolSize},
{host, Host},
{port, Port}],

{ok, _} = tirerl:start_pool(PoolName, PoolOpts),

{ok, #{index => Index, pool_name => PoolName}}.

-spec handle_call(term(), term(), state()) -> {reply, term(), #state{}}.
handle_call(get_index, _From, State = #state{index = Index}) ->
{reply, Index, State}.
-spec handle_call(term(), term(), state()) -> {reply, term(), state()}.
handle_call(get_index, _From, State = #{index := Index}) ->
{reply, Index, State};
handle_call(get_pool_name, _From, State = #{pool_name := PoolName}) ->
{reply, PoolName, State}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Unused Callbacks
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-spec handle_cast(term(), #state{}) -> {noreply, #state{}}.
-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast(_Msg, State) -> {noreply, State}.

-spec handle_info(term(), #state{}) -> {noreply, #state{}}.
-spec handle_info(term(), state()) -> {noreply, state()}.
handle_info(_Msg, State) -> {noreply, State}.

-spec terminate(term(), #state{}) -> ok.
-spec terminate(term(), state()) -> ok.
terminate(_Reason, _State) -> ok.

-spec code_change(term(), #state{}, term()) -> {ok, #state{}}.
-spec code_change(term(), state(), term()) -> {ok, state()}.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
92 changes: 66 additions & 26 deletions src/sumo_repo_elasticsearch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Public API.
-export([
init/1, create_schema/2, persist/2, find_by/3, find_by/5,
init/1, create_schema/2, persist/2, find_by/3, find_by/5, find_all/2,
delete/3, delete_by/3, delete_all/2
]).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Types.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(state, {index:: string()}).
-type state() ::
#{index => binary(),
pool_name => atom()}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% External API.
Expand All @@ -45,17 +47,35 @@ init(Options) ->
%% ElasticSearch client uses poolboy to handle its own pool of workers
%% so no pool is required.
Backend = proplists:get_value(storage_backend, Options),
Index = sumo_backend_elasticsearch:get_index(Backend),
{ok, #state{index = Index}}.
Index = case sumo_backend_elasticsearch:get_index(Backend) of
Idx when is_list(Idx) -> list_to_binary(Idx);
Idx when is_binary(Idx) -> Idx;
_ -> throw(invalid_index)
end,
PoolName = sumo_backend_elasticsearch:get_pool_name(Backend),

persist(Doc, #state{index = Index} = State) ->
{ok, #{index => Index, pool_name => PoolName}}.

-spec persist(sumo:doc(), state()) -> sumo:user_doc().
persist(Doc, #{index := Index, pool_name := PoolName} = State) ->
DocName = sumo_internal:doc_name(Doc),
Type = atom_to_binary(DocName, utf8),

IdField = sumo_internal:id_field_name(DocName),
Id = sumo_internal:get_field(IdField, Doc),

Fields = sumo_internal:doc_fields(Doc),
{ok, _} = elasticsearch:index(Index, atom_to_list(DocName), Id, Fields),
FieldsMap = maps:from_list(Fields),

#{status := Status, body := Body} =
tirerl:insert_doc(PoolName, Index, Type, Id, FieldsMap),

{ok, Doc, State}.
io:format("~p~n", [Body]),
true = Status == 200 orelse Status == 201,
GenId = maps:get(<<"_id">>, Body),
Doc1 = sumo_internal:set_field(IdField, GenId, Doc),

{ok, Doc1, State}.

delete(DocName, Id, State) ->
delete_by(DocName, [{id, Id}], State).
Expand All @@ -65,44 +85,46 @@ delete_by(DocName, Conditions, State) ->
lager:critical("Unimplemented function: ~p:delete_by(~p, ~p, ~p)", Args),
{error, not_implemented, State}.

delete_all(DocName, #state{index = Index} = State) ->
delete_all(DocName, #{index := Index, pool_name := PoolName} = State) ->
lager:debug("dropping type: ~p", [DocName]),
{ok, _} = elasticsearch:delete(Index, DocName, <<"">>),
Type = atom_to_binary(DocName, utf8),
MatchAll = #{query => #{match_all => #{}}},
tirerl:delete_by_query(PoolName, Index, Type, MatchAll, []),
{ok, unknown, State}.

find_by(DocName, Conditions, Limit, Offset,
#state{index = Index} = State) ->
#{index := Index, pool_name := PoolName} = State) ->
CondFun =
fun
({Key, Value}) when is_list(Value) ->
[{term , [{Key, list_to_binary(Value)}]}];
#{match => maps:from_list([{Key, list_to_binary(Value)}])};
(Cond) ->
[{term , [Cond]}]
#{match => maps:from_list([Cond])}
end,
QueryConditions = lists:map(CondFun, Conditions),
Query = [{query, [{bool, [{should, [QueryConditions]}]}]}],
Query = #{query => #{bool => #{must => QueryConditions}}},
Query1 = case Limit of
0 -> Query;
_ -> [{from, Offset}, {size, Limit} | Query]
_ -> Query#{from => Offset,
size => Limit}
end,

{ok, Result} = elasticsearch:search(Index, atom_to_list(DocName), Query1),
Hits = proplists:get_value(<<"hits">>, Result),
Hits1 = proplists:get_value(<<"hits">>, Hits),
Fun =
fun
(Item) ->
Fields = proplists:get_value(<<"_source">>, Item),
sumo_internal:new_doc(DocName, Fields)
end,
Docs = lists:map(Fun ,Hits1),
Type = atom_to_binary(DocName, utf8),
#{body := #{<<"hits">> := #{<<"hits">> := Results}}} =
tirerl:search(PoolName, Index, Type, Query1),

Fun = fun(Item) -> map_to_doc(DocName, Item) end,
Docs = lists:map(Fun, Results),

{ok, Docs, State}.

find_by(DocName, Conditions, State) ->
find_by(DocName, Conditions, 0, 0, State).

create_schema(Schema, #state{index = Index} = State) ->
find_all(DocName, State) ->
find_by(DocName, [], State).

create_schema(Schema, #{index := Index, pool_name := PoolName} = State) ->
SchemaName = sumo_internal:schema_name(Schema),
Fields = sumo_internal:schema_fields(Schema),
Fun =
Expand All @@ -114,5 +136,23 @@ create_schema(Schema, #state{index = Index} = State) ->
end,
Mappings = lists:foldl(Fun, #{}, Fields),
lager:debug("creating type: ~p", [SchemaName]),
{ok, _} = elasticsearch:create_index(Index, [], Mappings),
Response = tirerl:create_index(PoolName, Index, Mappings),
io:format("~p~n", [Response]),
{ok, State}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Internal Functions.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

map_to_doc(DocName, Item) ->
Values = maps:get(<<"_source">>, Item),
IdField = sumo_internal:id_field_name(DocName),

Fun = fun (Key, Doc) ->
FieldName = binary_to_atom(Key, utf8),
Value = maps:get(Key, Values),
sumo_internal:set_field(FieldName, Value, Doc)
end,
Keys = maps:keys(Values),
Doc = lists:foldl(Fun, sumo_internal:new_doc(DocName, []), Keys),
sumo_internal:set_field(IdField, maps:get(<<"_id">>, Item), Doc).

0 comments on commit 48788ba

Please sign in to comment.