diff --git a/README.md b/README.md index 28a72e4..76cf018 100644 --- a/README.md +++ b/README.md @@ -124,7 +124,7 @@ First, let's create and activate a bucket type simply called maps that is set up to store Riak maps: $ riak-admin bucket-type create maps '{"props":{"datatype":"map"}}' - $ riak-admin bucket-type activate mapsdmin bucket-type activate maps + $ riak-admin bucket-type activate maps Now, let's create a search index called `sumo_test_index` using the default schema: diff --git a/src/sumo_backend_riak.erl b/src/sumo_backend_riak.erl index 527a9be..a320004 100644 --- a/src/sumo_backend_riak.erl +++ b/src/sumo_backend_riak.erl @@ -29,31 +29,27 @@ %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Public API. --export( - [ get_connection/1, - get_state/1 - ]). +-export([get_connection/1]). %%% Exports for sumo_backend --export( - [ start_link/2 - ]). +-export([start_link/2]). %%% Exports for gen_server --export( - [ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Types. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --record(state, {conn :: pid(), bucket :: binary(), index :: binary()}). +-record(state, {host :: string(), + port :: non_neg_integer(), + opts :: [term()]}). -type state() :: #state{}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -68,10 +64,6 @@ start_link(Name, Options) -> get_connection(Name) -> gen_server:call(Name, get_connection). --spec get_state(atom() | pid()) -> state(). -get_state(Name) -> - gen_server:call(Name, get_state). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% gen_server stuff. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -82,23 +74,16 @@ init(Options) -> Host = proplists:get_value(host, Options, "127.0.0.1"), Port = proplists:get_value(port, Options, 8087), Opts = riak_opts(Options), - %% Get DB parameters - BucketType = iolist_to_binary( - proplists:get_value(bucket_type, Options)), - Bucket = iolist_to_binary( - proplists:get_value(bucket, Options, <<"sumo_test">>)), - Index = iolist_to_binary( - proplists:get_value(index, Options, <<"sumo_test_index">>)), - %% Place Riak connection - {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), - %% Initial state - {ok, #state{conn = Conn, bucket = {BucketType, Bucket}, index = Index}}. + {ok, #state{host = Host, port = Port, opts = Opts}}. +%% @todo: implement connection pool. +%% In other cases is a built-in feature of the client. -spec handle_call(term(), term(), state()) -> {reply, term(), state()}. -handle_call(get_connection, _From, State = #state{conn = Conn}) -> - {reply, Conn, State}; -handle_call(get_state, _From, State) -> - {reply, State, State}. +handle_call(get_connection, + _From, + State = #state{host = Host, port = Port, opts = Opts}) -> + {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), + {reply, Conn, State}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Unused Callbacks diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl index b94df24..501271b 100644 --- a/src/sumo_store_riak.erl +++ b/src/sumo_store_riak.erl @@ -51,7 +51,21 @@ %% Types. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --record(state, {conn :: pid(), bucket :: binary(), index :: binary()}). +%% Riak quorum parameters. +%% @see +-type r_param() :: r | pr | notfound_ok. +-type w_param() :: w | pw | dw | returnbody. + +%% conn: is the Pid of the gen_server that holds the connection with Riak +%% bucket: Riak bucket (per store) +%% index: Riak index to be used by Riak Search +%% read_quorum: Riak read quorum parameters. +%% write_quorum: Riak write quorum parameters. +-record(state, {conn :: pid(), + bucket :: binary(), + index :: binary(), + read_quorum :: [{r_param(), integer() | (true | false)}], + write_quorum :: [{w_param(), integer() | (true | false)}]}). -type state() :: #state{}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -61,19 +75,33 @@ -spec init( term() ) -> {ok, term()}. -init(Options) -> +init(Opts) -> % The storage backend key in the options specifies the name of the process % which creates and initializes the storage backend. - Backend = proplists:get_value(storage_backend, Options), - State = sumo_backend_riak:get_state(Backend), + Backend = proplists:get_value(storage_backend, Opts), + Conn = sumo_backend_riak:get_connection(Backend), + BucketType = iolist_to_binary( + proplists:get_value(bucket_type, Opts, <<"maps">>)), + Bucket = iolist_to_binary( + proplists:get_value(bucket, Opts, <<"sumo">>)), + Index = iolist_to_binary( + proplists:get_value(index, Opts, <<"sumo_index">>)), + Rq = proplists:get_value(read_quorum, Opts, []), + Wq = proplists:get_value(write_quorum, Opts, []), + State = #state{conn = Conn, + bucket = {BucketType, Bucket}, + index = Index, + read_quorum = Rq, + write_quorum = Wq}, {ok, State}. -spec persist( sumo_internal:doc(), state() ) -> sumo_store:result(sumo_internal:doc(), state()). -persist(Doc, #state{conn = Conn, bucket = Bucket} = State) -> +persist(Doc, + #state{conn = Conn, bucket = Bucket, write_quorum = Wq} = State) -> {Id, NewDoc} = new_doc(Doc, State), - case update_map(Conn, Bucket, Id, doc_to_rmap(NewDoc)) of + case update_map(Conn, Bucket, Id, doc_to_rmap(NewDoc), Wq) of {error, Error} -> {error, Error, State}; _ -> @@ -122,9 +150,10 @@ delete_all(_DocName, #state{conn = Conn, bucket = Bucket} = State) -> -spec find_all( sumo:schema_name(), state() ) -> sumo_store:result([sumo_internal:doc()], state()). -find_all(DocName, #state{conn = Conn, bucket = Bucket} = State) -> +find_all(DocName, + #state{conn = Conn, bucket = Bucket, read_quorum = Rq} = State) -> Get = fun({C, B, Kst}, Acc) -> - fetch_map_bulk(DocName, C, B, Kst) ++ Acc + fetch_map_bulk(DocName, C, B, Kst, Rq) ++ Acc end, case stream_keys(Conn, Bucket, Get, []) of {ok, Docs} -> {ok, Docs, State}; @@ -158,11 +187,14 @@ find_by(DocName, Conditions, Limit, Offset, - #state{conn = Conn, bucket = Bucket, index = Index} = State) -> + #state{conn = Conn, + bucket = Bucket, + index = Index, + read_quorum = Rq} = State) -> IdField = sumo_internal:id_field_name(DocName), case lists:keyfind(IdField, 1, Conditions) of {_K, Key} -> - case fetch_map(Conn, Bucket, iolist_to_binary(Key)) of + case fetch_map(Conn, Bucket, iolist_to_binary(Key), Rq) of {ok, RMap} -> Val = rmap_to_doc(DocName, RMap), {ok, [Val], State}; @@ -205,12 +237,12 @@ doc_id(Doc) -> sumo_internal:get_field(IdField, Doc). %% @private -new_doc(Doc, #state{conn = Conn, bucket = Bucket}) -> +new_doc(Doc, #state{conn = Conn, bucket = Bucket, write_quorum = Wq}) -> DocName = sumo_internal:doc_name(Doc), IdField = sumo_internal:id_field_name(DocName), Id = case sumo_internal:get_field(IdField, Doc) of undefined -> - case update_map(Conn, Bucket, undefined, doc_to_rmap(Doc)) of + case update_map(Conn, Bucket, undefined, doc_to_rmap(Doc), Wq) of {ok, RiakMapId} -> RiakMapId; {error, Error} -> throw(Error); _ -> throw(unexpected) @@ -281,13 +313,13 @@ normalize_doc_fields(Src) -> [{return, binary}, global]). %% @private -fetch_map(Conn, Bucket, Key) -> - riakc_pb_socket:fetch_type(Conn, Bucket, Key). +fetch_map(Conn, Bucket, Key, Opts) -> + riakc_pb_socket:fetch_type(Conn, Bucket, Key, Opts). %% @private -fetch_map_bulk(DocName, Conn, Bucket, Keys) -> +fetch_map_bulk(DocName, Conn, Bucket, Keys, Opts) -> Fun = fun(K, Acc) -> - case fetch_map(Conn, Bucket, K) of + case fetch_map(Conn, Bucket, K, Opts) of {ok, M} -> [rmap_to_doc(DocName, M) | Acc]; _ -> Acc end @@ -299,8 +331,8 @@ delete_map(Conn, Bucket, Key) -> riakc_pb_socket:delete(Conn, Bucket, Key). %% @private -update_map(Conn, Bucket, Key, Map) -> - riakc_pb_socket:update_type(Conn, Bucket, Key, riakc_map:to_op(Map)). +update_map(Conn, Bucket, Key, Map, Opts) -> + riakc_pb_socket:update_type(Conn, Bucket, Key, riakc_map:to_op(Map), Opts). %% @private search(Conn, Index, Query, 0, 0) -> diff --git a/test/test.config b/test/test.config index 0449b71..932bdfd 100644 --- a/test/test.config +++ b/test/test.config @@ -43,9 +43,7 @@ sumo_backend_riak, [{host, "127.0.0.1"}, {port, 8087}, - {bucket_type, "maps"}, - {bucket, "sumo_test"}, - {index, "sumo_test_index"}] + {poolsize, 10}] } ] }, @@ -73,7 +71,12 @@ {sumo_test_riak, sumo_store_riak, [{storage_backend, sumo_test_backend_riak}, - {workers, 10}] + {workers, 10}, + {bucket_type, "maps"}, + {bucket, "sumo_test"}, + {index, "sumo_test_index"}, + {w_args, [{w, 2}, {pw, 0}, {dw, 0}, {returnbody, false}]}, + {r_args, [{r, 2}, {pr, 0}]}] } ] },