Skip to content

Commit

Permalink
CQ: Send confirms when flushing to disk in shared store
Browse files Browse the repository at this point in the history
Before they would only be sent periodically or when
rolling over to a new file.
  • Loading branch information
lhoguin committed Jun 9, 2023
1 parent 4779400 commit 5e1a83a
Showing 1 changed file with 20 additions and 16 deletions.
36 changes: 20 additions & 16 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@

-include_lib("rabbit_common/include/rabbit_msg_store.hrl").

-define(SYNC_INTERVAL, 25). %% milliseconds
%% We flush to disk when the write buffer gets above the max size,
%% or at an interval to make sure we don't keep the data in memory
%% too long. Confirms are sent after the data is flushed to disk.
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB.
-define(SYNC_INTERVAL, 200). %% Milliseconds.

-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").

Expand All @@ -37,8 +42,6 @@
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").

-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB

%% We keep track of flying messages for writes and removes. The idea is that
%% when a remove comes in before we could process the write, we skip the
%% write and send a publisher confirm immediately. We later skip the remove
Expand Down Expand Up @@ -1003,10 +1006,7 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
false -> [{CRef, MsgIds} | NS]
end
end, [], CTM),
ok = case CGs of
[] -> ok;
_ -> writer_flush(CurHdl)
end,
writer_flush(CurHdl),
lists:foldl(fun ({CRef, MsgIds}, StateN) ->
client_confirm(CRef, MsgIds, written, StateN)
end, State1, CGs).
Expand Down Expand Up @@ -1150,11 +1150,15 @@ gc_candidate(File, State = #msstate{ gc_candidates = Candidates }) ->
State#msstate{ gc_candidates = Candidates#{ File => true }}.

write_message(MsgId, Msg,
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
current_file_offset = CurOffset,
file_summary_ets = FileSummaryEts }) ->
{ok, TotalSize} = writer_append(CurHdl, MsgId, Msg),
State0 = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
current_file_offset = CurOffset,
file_summary_ets = FileSummaryEts }) ->
{MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, Msg),
State = case MaybeFlush of
flush -> internal_sync(State0);
ok -> State0
end,
ok = index_insert(
#msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
offset = CurOffset, total_size = TotalSize }, State),
Expand Down Expand Up @@ -1312,7 +1316,7 @@ writer_recover(Dir, Num, Offset) ->
ok = file:truncate(Fd),
{ok, #writer{fd = Fd, buffer = prim_buffer:new()}}.

writer_append(#writer{fd = Fd, buffer = Buffer}, MsgId, MsgBody) ->
writer_append(#writer{buffer = Buffer}, MsgId, MsgBody) ->
MsgBodyBin = term_to_binary(MsgBody),
MsgBodyBinSize = byte_size(MsgBodyBin),
EntrySize = MsgBodyBinSize + 16, %% Size of MsgId + MsgBodyBin.
Expand All @@ -1323,13 +1327,13 @@ writer_append(#writer{fd = Fd, buffer = Buffer}, MsgId, MsgBody) ->
MsgBodyBin,
<<255>> %% OK marker.
]),
case prim_buffer:size(Buffer) of
Res = case prim_buffer:size(Buffer) of
Size when Size >= ?MAX_BUFFER_SIZE ->
ok = file:write(Fd, prim_buffer:read_iovec(Buffer, Size));
flush;
_ ->
ok
end,
{ok, EntrySize + 9}. %% EntrySize + size field + OK marker.
{Res, EntrySize + 9}. %% EntrySize + size field + OK marker.

%% Note: the message store no longer fsyncs; it only flushes data
%% to disk. This is in line with classic queues v2 behavior.
Expand Down

0 comments on commit 5e1a83a

Please sign in to comment.