Skip to content

Commit

Permalink
Skip message bodies larger than 4MB when scanning rdq files
Browse files Browse the repository at this point in the history
When a message (much) larger than 4MB is read from an rdq file in
`rabbit_msg_store:scan/6` seek past the full message size, as the
message payload is not used at all. Reading the message in 4MB chunks
and appending them leaves a lot of garbage and memory fragmentation
behind.
  • Loading branch information
gomoripeti committed May 7, 2024
1 parent a5e2add commit d5b8e19
Showing 1 changed file with 29 additions and 0 deletions.
29 changes: 29 additions & 0 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,15 @@ scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
[{MsgId, TotalSize, Offset}|Acc])
end;
%% This might be the start of a message.
scan_data(<<Size:64, MsgIdInt:128, Rest/bits>> = Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Rest) < Size - 16 + 1, Size < FileSize - Offset ->
RemainingBodySize = Size - 16 - byte_size(Rest),
case RemainingBodySize + 1 > ?SCAN_BLOCK_SIZE of
true ->
skip_body(Size, MsgIdInt, RemainingBodySize, Fd, Offset, FileSize, MsgIdsFound, Acc, Data);
false ->
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
end;
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Rest) < Size + 1, Size < FileSize - Offset ->
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc);
Expand All @@ -1456,6 +1465,26 @@ scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
scan_data(<<_, Rest/bits>>, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Offset + 1, FileSize, MsgIdsFound, Acc).

skip_body(Size, MsgIdInt, RemainingBodySize, Fd, Offset, FileSize, MsgIdsFound, Acc, Data) ->
{ok, _} = file:position(Fd, {cur, RemainingBodySize}),
%% read cannot return eof as skip_body is only called if remaining
%% body size is less than remaining file size
case file:read(Fd, ?SCAN_BLOCK_SIZE) of
{ok, <<255, Rest/bits>>} ->
%% Avoid sub-binary construction.
MsgId = <<MsgIdInt:128>>,
TotalSize = Size + 9,
scan_data(Rest, Fd, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true},
[{MsgId, TotalSize, Offset}|Acc]);
{ok, _} ->
%% FIXME reset file position - does this work? is this too expensive?
{ok, _} = file:position(Fd, {cur, - RemainingBodySize}),

<<_, Rest2/bits>> = Data,
scan_data(Rest2, Fd, Offset + 1, FileSize, MsgIdsFound, Acc)
end.

%%----------------------------------------------------------------------------
%% index
%%----------------------------------------------------------------------------
Expand Down

0 comments on commit d5b8e19

Please sign in to comment.