Skip to content

Commit d6269f2

Browse files
lhoguinmergify[bot]
authored andcommitted
Fix CQ shared store files not deleted with large messages
We must consider whether the previous current file is empty (has data written, but was already removed) when writing large messages and opening a file specifically for the large message. If we don't, then the file will never get deleted as we only consider files for deletion when a message gets removed (and there are none). This is only an issue for large messages. Small messages write a message than roll over to a new file, so there is at least one valid message. Large messages close the current file first, regardless of there being a valid message. (cherry picked from commit 6cf69e2) (cherry picked from commit f98637c)
1 parent 5cbfa09 commit d6269f2

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

deps/rabbit/src/rabbit_msg_store.erl

+11-4
Original file line numberDiff line numberDiff line change
@@ -1274,19 +1274,26 @@ write_large_message(MsgId, MsgBodyBin,
12741274
ok = index_insert(IndexEts,
12751275
#msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile,
12761276
offset = 0, total_size = TotalSize }),
1277-
_ = case CurFile of
1277+
State1 = case CurFile of
12781278
%% We didn't open a new file. We must update the existing value.
12791279
LargeMsgFile ->
12801280
[_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile,
12811281
[{#file_summary.valid_total_size, TotalSize},
1282-
{#file_summary.file_size, TotalSize}]);
1282+
{#file_summary.file_size, TotalSize}]),
1283+
State0;
12831284
%% We opened a new file. We can insert it all at once.
1285+
%% We must also check whether we need to delete the previous
1286+
%% current file, because if there is no valid data this is
1287+
%% the only time we will consider it (outside recovery).
12841288
_ ->
12851289
true = ets:insert_new(FileSummaryEts, #file_summary {
12861290
file = LargeMsgFile,
12871291
valid_total_size = TotalSize,
12881292
file_size = TotalSize,
1289-
locked = false })
1293+
locked = false }),
1294+
delete_file_if_empty(CurFile, State0 #msstate { current_file_handle = LargeMsgHdl,
1295+
current_file = LargeMsgFile,
1296+
current_file_offset = TotalSize })
12901297
end,
12911298
%% Roll over to the next file.
12921299
NextFile = LargeMsgFile + 1,
@@ -1299,7 +1306,7 @@ write_large_message(MsgId, MsgBodyBin,
12991306
%% Delete messages from the cache that were written to disk.
13001307
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
13011308
%% Process confirms (this won't flush; we already did) and continue.
1302-
State = internal_sync(State0),
1309+
State = internal_sync(State1),
13031310
State #msstate { current_file_handle = NextHdl,
13041311
current_file = NextFile,
13051312
current_file_offset = 0 }.

0 commit comments

Comments
 (0)