Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to defer fetching a value in the fold - but without requiring Journal check #472

Merged
merged 3 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 57 additions & 41 deletions src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -946,15 +946,22 @@ book_objectfold(Pid, Tag, Bucket, Limiter, FoldAccT, SnapPreFold) ->
%% `Acc'. The ProxyObject is an object that only contains the
%% head/metadata, and no object data from the journal. The `Acc' in
%% the first call is that provided as the second element of `FoldAccT'
%% and thereafter the return of the previous all to the fold fun. If
%% `JournalCheck' is `true' then the journal is checked to see if the
%% object in the ledger is present, which means a snapshot of the
%% whole store is required, if `false', then no such check is
%% performed, and onlt ledger need be snapshotted. `SnapPreFold' is a
%% boolean that determines if the snapshot is taken when the folder is
%% requested `true', or when when run `false'. `SegmentList' can be
%% `false' meaning, all heads, or a list of integers that designate
%% segments in a TicTac Tree.
%% and thereafter the return of the previous all to the fold fun.
%%
%% If `JournalCheck' is `true' then the journal is checked to see if the
%% object in the ledger is present, which means a snapshot of the whole store
%% is required, if `false', then no such check is performed, and only ledger
%% need be snapshotted. However, if the intention is to defer fetching the
%% value but don't wish to cost of chekcing the Journal to be made during the
%% fold (e.g. as any exception will be handled later), then the `defer`
%% option can be used. This will snapshot the Journal, but not check for
%% presence. Note that the fetch must still be made within the timefroma of
%% the fold (as the snapshot will expire with the fold).
%%
%% `SnapPreFold' is a boolean that determines if the snapshot is taken when
%% the folder is requested `true', or when when run `false'. `SegmentList' can
%% be `false' meaning, all heads, or a list of integers that designate segments
%% in a TicTac Tree.
-spec book_headfold(pid(), Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList) ->
{async, Runner} when
Tag :: leveled_codec:tag(),
Expand All @@ -964,7 +971,7 @@ book_objectfold(Pid, Tag, Bucket, Limiter, FoldAccT, SnapPreFold) ->
Bucket :: term(),
Key :: term(),
Value :: term(),
JournalCheck :: boolean(),
JournalCheck :: boolean()|defer,
SnapPreFold :: boolean(),
SegmentList :: false | list(integer()),
Runner :: fun(() -> Acc).
Expand Down Expand Up @@ -999,7 +1006,7 @@ book_headfold(Pid, Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList) ->
Bucket :: term(),
Key :: term(),
Value :: term(),
JournalCheck :: boolean(),
JournalCheck :: boolean()|defer,
SnapPreFold :: boolean(),
SegmentList :: false | list(integer()),
Runner :: fun(() -> Acc).
Expand Down Expand Up @@ -1032,7 +1039,7 @@ book_headfold(Pid, Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold, SegmentLis
Bucket :: term(),
Key :: term(),
Value :: term(),
JournalCheck :: boolean(),
JournalCheck :: boolean()|defer,
SnapPreFold :: boolean(),
SegmentList :: false | list(integer()),
LastModRange :: false | leveled_codec:lastmod_range(),
Expand Down Expand Up @@ -1989,7 +1996,7 @@ return_snapfun(
fun() -> {ok, LS, JS, fun() -> ok end} end
end.

-spec snaptype_by_presence(boolean()) -> store|ledger.
-spec snaptype_by_presence(boolean()|defer) -> store|ledger.
%% @doc
%% Folds that traverse over object heads, may also either require to return
%% the object, or at least confirm the object is present in the Ledger. This
Expand All @@ -1998,6 +2005,8 @@ return_snapfun(
%% rather than just the ledger.
snaptype_by_presence(true) ->
store;
snaptype_by_presence(defer) ->
store;
snaptype_by_presence(false) ->
ledger.

Expand Down Expand Up @@ -2030,9 +2039,8 @@ get_runner(State, {keylist, Tag, Bucket, FoldAccT}) ->
leveled_runner:bucketkey_query(SnapFun, Tag, Bucket, FoldAccT);
get_runner(State, {keylist, Tag, Bucket, KeyRange, FoldAccT, TermRegex}) ->
SnapFun = return_snapfun(State, ledger, no_lookup, true, true),
leveled_runner:bucketkey_query(SnapFun,
Tag, Bucket, KeyRange,
FoldAccT, TermRegex);
leveled_runner:bucketkey_query(
SnapFun, Tag, Bucket, KeyRange, FoldAccT, TermRegex);
%% Set of runners for object or metadata folds
get_runner(State,
{foldheads_allkeys,
Expand All @@ -2041,10 +2049,15 @@ get_runner(State,
LastModRange, MaxObjectCount}) ->
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold),
leveled_runner:foldheads_allkeys(SnapFun,
Tag, FoldFun,
JournalCheck, SegmentList,
LastModRange, MaxObjectCount);
leveled_runner:foldheads_allkeys(
SnapFun,
Tag,
FoldFun,
JournalCheck,
SegmentList,
LastModRange,
MaxObjectCount
);
get_runner(State,
{foldobjects_allkeys, Tag, FoldFun, SnapPreFold}) ->
get_runner(State,
Expand All @@ -2071,13 +2084,16 @@ get_runner(State,
end,
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold),
leveled_runner:foldheads_bybucket(SnapFun,
Tag,
lists:map(KeyRangeFun, BucketList),
FoldFun,
JournalCheck,
SegmentList,
LastModRange, MaxObjectCount);
leveled_runner:foldheads_bybucket(
SnapFun,
Tag,
lists:map(KeyRangeFun, BucketList),
FoldFun,
JournalCheck,
SegmentList,
LastModRange,
MaxObjectCount
);
get_runner(State,
{foldheads_bybucket,
Tag,
Expand All @@ -2088,33 +2104,33 @@ get_runner(State,
{StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange),
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, SnapQ, true, SnapPreFold),
leveled_runner:foldheads_bybucket(SnapFun,
Tag,
[{StartKey, EndKey}],
FoldFun,
JournalCheck,
SegmentList,
LastModRange, MaxObjectCount);
leveled_runner:foldheads_bybucket(
SnapFun,
Tag,
[{StartKey, EndKey}],
FoldFun,
JournalCheck,
SegmentList,
LastModRange,
MaxObjectCount
);
get_runner(State,
{foldobjects_bybucket,
Tag, Bucket, KeyRange,
FoldFun,
SnapPreFold}) ->
{StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange),
SnapFun = return_snapfun(State, store, SnapQ, true, SnapPreFold),
leveled_runner:foldobjects_bybucket(SnapFun,
Tag,
[{StartKey, EndKey}],
FoldFun);
leveled_runner:foldobjects_bybucket(
SnapFun, Tag, [{StartKey, EndKey}], FoldFun);
get_runner(State,
{foldobjects_byindex,
Tag, Bucket, {Field, FromTerm, ToTerm},
FoldObjectsFun,
SnapPreFold}) ->
SnapFun = return_snapfun(State, store, no_lookup, true, SnapPreFold),
leveled_runner:foldobjects_byindex(SnapFun,
{Tag, Bucket, Field, FromTerm, ToTerm},
FoldObjectsFun);
leveled_runner:foldobjects_byindex(
SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm},FoldObjectsFun);
get_runner(State, {bucket_list, Tag, FoldAccT}) ->
{FoldBucketsFun, Acc} = FoldAccT,
SnapFun = return_snapfun(State, ledger, no_lookup, false, false),
Expand Down
2 changes: 1 addition & 1 deletion src/leveled_pclerk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ do_merge(

add_entry(empty, FileName, _TS1, Additions) ->
leveled_log:log(pc013, [FileName]),
{[], [], Additions};
{Additions, [], []};
add_entry({ok, Pid, Reply, Bloom}, FileName, TS1, Additions) ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
Entry =
Expand Down
28 changes: 18 additions & 10 deletions src/leveled_runner.erl
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,14 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) ->
end,
{async, Runner}.

-spec foldheads_allkeys(snap_fun(), leveled_codec:tag(),
fold_objects_fun()|{fold_objects_fun(), foldacc()},
boolean(), false|list(integer()),
false|leveled_codec:lastmod_range(),
false|pos_integer()) -> {async, runner_fun()}.
-spec foldheads_allkeys(
snap_fun(),
leveled_codec:tag(),
fold_objects_fun()|{fold_objects_fun(), foldacc()},
boolean()|defer,
false|list(integer()),
false|leveled_codec:lastmod_range(),
false|pos_integer()) -> {async, runner_fun()}.
%% @doc
%% Fold over all heads in the store for a given tag - applying the passed
%% function to each proxy object
Expand Down Expand Up @@ -412,7 +415,7 @@ foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) ->
leveled_codec:tag(),
list(key_range()),
fold_objects_fun()|{fold_objects_fun(), foldacc()},
boolean(),
boolean()|defer,
false|list(integer()),
false|leveled_codec:lastmod_range(),
false|pos_integer())
Expand Down Expand Up @@ -501,7 +504,7 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) ->

-spec foldobjects(snap_fun(), atom(), list(),
fold_objects_fun()|{fold_objects_fun(), foldacc()},
false|{true, boolean()},
false|{true, boolean()|defer},
false|list(integer()),
false|leveled_codec:lastmod_range(),
false|pos_integer()) -> {async, runner_fun()}.
Expand Down Expand Up @@ -609,9 +612,14 @@ get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) ->
AccFun.

-spec accumulate_objects
(fold_objects_fun(), pid(), leveled_head:object_tag(), false|{true, boolean()})
(fold_objects_fun(),
pid(),
leveled_head:object_tag(),
false|{true, boolean()|defer})
-> objectacc_fun();
(fold_objects_fun(), null, leveled_head:headonly_tag(), {true, false})
(fold_objects_fun(),
null, leveled_head:headonly_tag(),
{true, false})
-> objectacc_fun().
accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
AccFun =
Expand Down Expand Up @@ -652,7 +660,7 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
missing ->
Acc
end;
{false, _} ->
_ ->
FoldObjectsFun(B, K, ProxyObj, Acc)
end;
false ->
Expand Down
Loading
Loading