From f345d809a5c13f7c8a0e00c100fcc0ef6861f5aa Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 14 Jan 2025 15:19:37 +0000 Subject: [PATCH 1/4] Optimisation for riak stats Optimisations: - Take the timestamp to the cache after get_stats/0 has returned, so that if get_stats/0 takes > 1s any requests in the queue for riak_kv_http_cache will still use the cache. - refactor riak_kv_status:aliases/0 to use simple lists rather than orddict. - remove altogether the sys_monitor_count, it is simply too expensive. Available as a riak_kv_util module function instead for the experienced operator. --- src/riak_kv_http_cache.erl | 22 +++++++---- src/riak_kv_stat.erl | 1 - src/riak_kv_stat_bc.erl | 12 ------ src/riak_kv_status.erl | 79 +++++++++++++++++++++++++------------- src/riak_kv_util.erl | 22 ++++++++++- 5 files changed, 88 insertions(+), 48 deletions(-) diff --git a/src/riak_kv_http_cache.erl b/src/riak_kv_http_cache.erl index df3c65e100..975c54fb74 100644 --- a/src/riak_kv_http_cache.erl +++ b/src/riak_kv_http_cache.erl @@ -12,7 +12,12 @@ -define(SERVER, ?MODULE). --record(st, {ts, stats = []}). +-record(st, + { + ts :: undefined|erlang:timestamp(), + stats = [] + } +). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -40,14 +45,17 @@ code_change(_, S, _) -> {ok, S}. check_cache(#st{ts = undefined} = S) -> - S#st{ts = os:timestamp(), stats = do_get_stats()}; + Stats = do_get_stats(), + S#st{ts = os:timestamp(), stats = Stats}; check_cache(#st{ts = Then} = S) -> + CacheTime = application:get_env(riak_kv, http_stats_cache_seconds, 1), Now = os:timestamp(), - case timer:now_diff(Now, Then) < 1000000 of - true -> - S; - false -> - S#st{ts = Now, stats = do_get_stats()} + case timer:now_diff(Now, Then) < (CacheTime * 1000000) of + true -> + S; + false -> + Stats = do_get_stats(), + S#st{ts = os:timestamp(), stats = Stats} end. do_get_stats() -> diff --git a/src/riak_kv_stat.erl b/src/riak_kv_stat.erl index 0819ba2713..fc357f01a2 100644 --- a/src/riak_kv_stat.erl +++ b/src/riak_kv_stat.erl @@ -969,7 +969,6 @@ bc_stats(Pfx) -> {sys_global_heaps_size, ?MODULE, value, [deprecated]}, {sys_heap_type, erlang, system_info, [heap_type]}, {sys_logical_processors, erlang, system_info, [logical_processors]}, - {sys_monitor_count, riak_kv_stat_bc, sys_monitor_count, []}, {sys_otp_release, riak_kv_stat_bc, otp_release, []}, {sys_port_count, erlang, system_info, [port_count]}, {sys_process_count, erlang, system_info, [process_count]}, diff --git a/src/riak_kv_stat_bc.erl b/src/riak_kv_stat_bc.erl index 1353fc054b..5867295650 100644 --- a/src/riak_kv_stat_bc.erl +++ b/src/riak_kv_stat_bc.erl @@ -195,18 +195,6 @@ system_version() -> system_architecture() -> list_to_binary(erlang:system_info(system_architecture)). -%% Count up all monitors, unfortunately has to obtain process_info -%% from all processes to work it out. -sys_monitor_count() -> - lists:foldl(fun(Pid, Count) -> - case erlang:process_info(Pid, monitors) of - {monitors, Mons} -> - Count + length(Mons); - _ -> - Count - end - end, 0, processes()). - app_stats() -> [{list_to_atom(atom_to_list(A) ++ "_version"), list_to_binary(V)} || {A,_,V} <- application:which_applications()]. diff --git a/src/riak_kv_status.erl b/src/riak_kv_status.erl index 9f444761a8..e0c782eee1 100644 --- a/src/riak_kv_status.erl +++ b/src/riak_kv_status.erl @@ -91,33 +91,60 @@ get_stats(console) -> ++ riak_kv_stat_bc:disk_stats() ++ riak_kv_stat_bc:app_stats(). -aliases() -> - Grouped = exometer_alias:prefix_foldl( - <<>>, - fun(Alias, Entry, DP, Acc) -> - orddict:append(Entry, {DP, Alias}, Acc) - end, orddict:new()), - lists:keysort( - 1, - lists:foldl( - fun({K, DPs}, Acc) -> - case exometer:get_value(K, [D || {D,_} <- DPs]) of - {ok, Vs} when is_list(Vs) -> - lists:foldr(fun({D,V}, Acc1) -> - {_,N} = lists:keyfind(D,1,DPs), - [{N,V}|Acc1] - end, Acc, Vs); - Other -> - Val = case Other of - {ok, disabled} -> undefined; - _ -> 0 - end, - lists:foldr(fun({_,N}, Acc1) -> - [{N,Val}|Acc1] - end, Acc, DPs) - end - end, [], orddict:to_list(Grouped))). +aliases() -> + AllStats = + exometer_alias:prefix_foldl( + <<>>, + fun(Alias, Entry, DP, Acc) -> [{Entry, {DP, Alias}}|Acc] end, + [] + ), + case AllStats of + [] -> + []; + AllStats when is_list(AllStats) -> + {{FinalEntry, FinalDPMap}, AliasVals} = + lists:foldl( + fun({Entry, {DP, Alias}}, {{PrevEntry, DPmap}, Acc}) -> + case Entry of + Entry when Entry == PrevEntry -> + {{PrevEntry, maps:put(DP, Alias, DPmap)}, Acc}; + Entry when PrevEntry == none -> + {{Entry, maps:put(DP, Alias, DPmap)}, Acc}; + Entry -> + UpdAcc = get_exometer_values(PrevEntry, DPmap), + {{Entry, #{DP => Alias}}, UpdAcc ++ Acc} + end + end, + {{none, #{}}, []}, + AllStats + ), + lists:keysort( + 1, + get_exometer_values(FinalEntry, FinalDPMap) ++ AliasVals + ) + end. + +get_exometer_values(Entry, DPmap) -> + case exometer:get_value(Entry, maps:keys(DPmap)) of + {ok, Vs} when is_list(Vs) -> + lists:map( + fun({D, V}) -> + {maps:get(D, DPmap), V} + end, + Vs + ); + Other -> + DefaultValue = + case Other of + {ok, disabled} -> disabled; + _ -> 0 + end, + lists:map( + fun(A) -> {A, DefaultValue} end, + maps:values(DPmap) + ) + end. expand_disk_stats([{disk, Stats}]) -> [{disk, [{struct, [{id, list_to_binary(Id)}, {size, Size}, {used, Used}]} diff --git a/src/riak_kv_util.erl b/src/riak_kv_util.erl index a20f569ef8..7a8aebe346 100644 --- a/src/riak_kv_util.erl +++ b/src/riak_kv_util.erl @@ -52,7 +52,8 @@ is_modfun_allowed/2, shuffle_list/1, kv_ready/0, - ngr_initial_timeout/0 + ngr_initial_timeout/0, + sys_monitor_count/0 ]). -export([report_hashtree_tokens/0, reset_hashtree_tokens/2]). @@ -714,7 +715,24 @@ get_initial_call(P) -> _ -> undefined end. - + +%% @doc sys_monitor_count/0 +%% Count up all monitors, unfortunately has to obtain process_info +%% from all processes to work it out. +sys_monitor_count() -> + lists:foldl( + fun(Pid, Count) -> + case erlang:process_info(Pid, monitors) of + {monitors, Mons} -> + Count + length(Mons); + _ -> + Count + end + end, + 0, processes() + ). + + %% =================================================================== %% EUnit tests %% =================================================================== From b982e4a4aaa9a9f72a694d476bc8891d7edfad48 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 15 Jan 2025 16:35:00 +0000 Subject: [PATCH 2/4] Wait forever for call to respond It takes more than 5s on some systems at the moment - and this then dumps an unhelpful crashdump to the user. Make a longer default timeout, allow the timeout to be passed by the operator, and also return more operator-friendly error on timeout occurring. --- src/riak_kv_http_cache.erl | 18 ++++------ src/riak_kv_stat.erl | 14 ++------ src/riak_kv_wm_stats.erl | 71 +++++++++++++++++++++++++++++++------- 3 files changed, 67 insertions(+), 36 deletions(-) diff --git a/src/riak_kv_http_cache.erl b/src/riak_kv_http_cache.erl index 975c54fb74..98e0279b25 100644 --- a/src/riak_kv_http_cache.erl +++ b/src/riak_kv_http_cache.erl @@ -1,7 +1,7 @@ -module(riak_kv_http_cache). -export([start_link/0, - get_stats/0]). + get_stats/1]). -export([init/1, handle_call/3, @@ -10,8 +10,6 @@ terminate/2, code_change/3]). --define(SERVER, ?MODULE). - -record(st, { ts :: undefined|erlang:timestamp(), @@ -19,11 +17,12 @@ } ). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -get_stats() -> - gen_server:call(?MODULE, get_stats). +get_stats(Timeout) -> + gen_server:call(?MODULE, get_stats, Timeout). init(_) -> {ok, #st{}}. @@ -45,7 +44,7 @@ code_change(_, S, _) -> {ok, S}. check_cache(#st{ts = undefined} = S) -> - Stats = do_get_stats(), + Stats = riak_kv_status:get_stats(web), S#st{ts = os:timestamp(), stats = Stats}; check_cache(#st{ts = Then} = S) -> CacheTime = application:get_env(riak_kv, http_stats_cache_seconds, 1), @@ -54,9 +53,6 @@ check_cache(#st{ts = Then} = S) -> true -> S; false -> - Stats = do_get_stats(), + Stats = riak_kv_status:get_stats(web), S#st{ts = os:timestamp(), stats = Stats} - end. - -do_get_stats() -> - riak_kv_wm_stats:get_stats(). + end. \ No newline at end of file diff --git a/src/riak_kv_stat.erl b/src/riak_kv_stat.erl index fc357f01a2..8d04a08b5b 100644 --- a/src/riak_kv_stat.erl +++ b/src/riak_kv_stat.erl @@ -26,11 +26,6 @@ %% Update each stat with the exported function update/1. Add %% a new stat to the internal stats/0 func to register a new stat with %% folsom. -%% -%% Get the latest aggregation of stats with the exported function -%% get_stats/0. Or use folsom_metrics:get_metric_value/1, -%% or riak_core_stat_q:get_stats/1. -%% -module(riak_kv_stat). @@ -41,7 +36,7 @@ -endif. %% API --export([start_link/0, get_stats/0, +-export([start_link/0, update/1, perform_update/1, register_stats/0, unregister_vnode_stats/1, produce_stats/0, leveldb_read_block_errors/0, stat_update_error/3, stop/0]). -export([track_bucket/1, untrack_bucket/1]). @@ -71,11 +66,6 @@ unregister_vnode_stats(Index) -> unregister_per_index(heads, Index), unregister_per_index(puts, Index). -%% @spec get_stats() -> proplist() -%% @doc Get the current aggregation of stats. -get_stats() -> - riak_kv_wm_stats:get_stats(). - %% Creation of a dynamic stat _must_ be serialized. register_stat(Name, Type) -> @@ -1126,7 +1116,7 @@ create_or_update_histogram_test() -> Metric = [riak_kv,put_fsm,counter,time], ok = repeat_create_or_update(Metric, 1, histogram, 100), ?assertNotEqual(exometer:get_value(Metric), 0), - Stats = get_stats(), + Stats = riak_kv_status:get_stats(web), ?LOG_INFO("stats prop list ~s", [Stats]), ?assertNotEqual(proplists:get_value({node_put_fsm_counter_time_mean}, Stats), 0) after diff --git a/src/riak_kv_wm_stats.erl b/src/riak_kv_wm_stats.erl index 574c3ba679..468bcbdf60 100644 --- a/src/riak_kv_wm_stats.erl +++ b/src/riak_kv_wm_stats.erl @@ -29,14 +29,17 @@ content_types_provided/2, service_available/2, forbidden/2, + malformed_request/2, produce_body/2, pretty_print/2 ]). --export([get_stats/0]). + +-define(TIMEOUT, 30000). -include_lib("webmachine/include/webmachine.hrl"). +-include("riak_kv_wm_raw.hrl"). --record(ctx, {}). +-record(ctx, {timeout = ?TIMEOUT :: non_neg_integer()}). init(_) -> {ok, #ctx{}}. @@ -66,25 +69,67 @@ content_types_provided(ReqData, Context) -> {"text/plain", pretty_print}], ReqData, Context}. - service_available(ReqData, Ctx) -> {true, ReqData, Ctx}. +malformed_request(RD, Ctx) -> + case wrq:get_qs_value("timeout", RD) of + undefined -> + {false, RD, Ctx}; + TimeoutStr -> + try + case list_to_integer(TimeoutStr) of + Timeout when Timeout > 0 -> + {false, RD, Ctx#ctx{timeout=Timeout}} + end + catch + _:_ -> + {true, + wrq:append_to_resp_body( + io_lib:format( + "Bad timeout value ~0p", + [TimeoutStr] + ), + wrq:set_resp_header(?HEAD_CTYPE, "text/plain", RD)), + Ctx} + end + end. + forbidden(RD, Ctx) -> {riak_kv_wm_utils:is_forbidden(RD), RD, Ctx}. -produce_body(ReqData, Ctx) -> - Stats= riak_kv_http_cache:get_stats(), - Body = mochijson2:encode({struct, Stats}), - {Body, ReqData, Ctx}. +produce_body(RD, Ctx) -> + try + Stats = riak_kv_http_cache:get_stats(Ctx#ctx.timeout), + Body = mochijson2:encode({struct, Stats}), + {Body, RD, Ctx} + catch + exit:{timeout, _} -> + { + {halt, 503}, + wrq:set_resp_header( + ?HEAD_CTYPE, + "text/plain", + wrq:append_to_response_body( + io_lib:format( + "Request timed out after ~w ms", + [Ctx#ctx.timeout] + ), + RD + ) + ), + Ctx + } + end. %% @spec pretty_print(webmachine:wrq(), context()) -> %% {string(), webmachine:wrq(), context()} %% @doc Format the respons JSON object is a "pretty-printed" style. -pretty_print(RD1, C1=#ctx{}) -> - {Json, RD2, C2} = produce_body(RD1, C1), - {json_pp:print(binary_to_list(list_to_binary(Json))), RD2, C2}. - +pretty_print(RD, Ctx) -> + case produce_body(RD, Ctx) of + {{halt, RepsonseCode}, UpdRD, UpdCtx} -> + {{halt, RepsonseCode}, UpdRD, UpdCtx}; + {Json, UpdRD, UpdCtx} -> + {json_pp:print(Json), UpdRD, UpdCtx} + end. -get_stats() -> - riak_kv_status:get_stats(web). From efacd053b77bf3f45af13f535b93c45127b6cf95 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 15 Jan 2025 17:24:10 +0000 Subject: [PATCH 3/4] Fix meck dep --- rebar.config | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rebar.config b/rebar.config index 018feb2e23..1bb9d45830 100644 --- a/rebar.config +++ b/rebar.config @@ -30,9 +30,8 @@ {provider_hooks, [ {pre, [{compile, {protobuf, compile}}]} ]}. - {profiles, [ - {test, [{deps, [meck]}]}, + {test, [{deps, [{meck, {git, "https://github.com/OpenRiak/meck.git", {branch, "openriak-3.2"}}}]}]}, {gha, [{erl_opts, [{d, 'GITHUBEXCLUDE'}]}]} ]}. From ea794ea6e4bea966de16ad6b9bc087bfb44ec7c4 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 20 Jan 2025 21:29:05 +0000 Subject: [PATCH 4/4] Update following review --- src/riak_kv_http_cache.erl | 6 ++++-- src/riak_kv_wm_stats.erl | 11 ++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/riak_kv_http_cache.erl b/src/riak_kv_http_cache.erl index 98e0279b25..4e003dab6f 100644 --- a/src/riak_kv_http_cache.erl +++ b/src/riak_kv_http_cache.erl @@ -21,6 +21,7 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec get_stats(Milliseconds :: pos_integer()) -> list({atom(), term()}). get_stats(Timeout) -> gen_server:call(?MODULE, get_stats, Timeout). @@ -47,9 +48,10 @@ check_cache(#st{ts = undefined} = S) -> Stats = riak_kv_status:get_stats(web), S#st{ts = os:timestamp(), stats = Stats}; check_cache(#st{ts = Then} = S) -> - CacheTime = application:get_env(riak_kv, http_stats_cache_seconds, 1), + CacheTime = + application:get_env(riak_kv, http_stats_cache_milliseconds, 1000), Now = os:timestamp(), - case timer:now_diff(Now, Then) < (CacheTime * 1000000) of + case timer:now_diff(Now, Then) < (CacheTime * 1000) of true -> S; false -> diff --git a/src/riak_kv_wm_stats.erl b/src/riak_kv_wm_stats.erl index 468bcbdf60..dbff12a10d 100644 --- a/src/riak_kv_wm_stats.erl +++ b/src/riak_kv_wm_stats.erl @@ -34,7 +34,7 @@ pretty_print/2 ]). --define(TIMEOUT, 30000). +-define(TIMEOUT, 30000). %% In milliseconds -include_lib("webmachine/include/webmachine.hrl"). -include("riak_kv_wm_raw.hrl"). @@ -87,7 +87,8 @@ malformed_request(RD, Ctx) -> {true, wrq:append_to_resp_body( io_lib:format( - "Bad timeout value ~0p", + "Bad timeout value ~0p " + "expected milliseconds > 0", [TimeoutStr] ), wrq:set_resp_header(?HEAD_CTYPE, "text/plain", RD)), @@ -130,6 +131,10 @@ pretty_print(RD, Ctx) -> {{halt, RepsonseCode}, UpdRD, UpdCtx} -> {{halt, RepsonseCode}, UpdRD, UpdCtx}; {Json, UpdRD, UpdCtx} -> - {json_pp:print(Json), UpdRD, UpdCtx} + { + json_pp:print(binary_to_list(list_to_binary(Json))), + UpdRD, + UpdCtx + } end.