Skip to content

Commit

Permalink
Don't add journal edits to cache until after persisted
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman committed Jan 27, 2025
1 parent b6916eb commit e609064
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -429,16 +429,14 @@ synchronized void journal(RequestInfo reqInfo,
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId +
" ; journal id: " + journalId);
}
if (cache != null) {
cache.storeEdits(records, firstTxnId, lastTxnId, curSegmentLayoutVersion);
}

// If the edit has already been marked as committed, we know
// it has been fsynced on a quorum of other nodes, and we are
// "catching up" with the rest. Hence we do not need to fsync.
boolean isLagging = lastTxnId <= committedTxnId.get();
boolean shouldFsync = !isLagging;

JournalFaultInjector.get().writeEdits();
curSegment.writeRaw(records, 0, records.length);
curSegment.setReadyToFlush();
StopWatch sw = new StopWatch();
Expand Down Expand Up @@ -469,6 +467,12 @@ synchronized void journal(RequestInfo reqInfo,

updateHighestWrittenTxId(lastTxnId);
nextTxId = lastTxnId + 1;

// Only cache the edits if we successfully persisted them
if (cache != null) {
cache.storeEdits(records, firstTxnId, lastTxnId, curSegmentLayoutVersion);
}

lastJournalTimestamp = Time.now();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ public static JournalFaultInjector get() {

public void beforePersistPaxosData() throws IOException {}
public void afterPersistPaxosData() throws IOException {}
public void writeEdits() throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -507,6 +508,29 @@ public void testReadFromCache() throws Exception {
assertJournaledEditsTxnCountAndContents(16, 10, 20, newLayoutVersion);
}

@Test
public void testOnlyReadPersistedEditsFromCache() throws Exception {
JournalFaultInjector faultInjector =
JournalFaultInjector.instance = Mockito.mock(JournalFaultInjector.class);

journal.newEpoch(FAKE_NSINFO, 1);
journal.startLogSegment(makeRI(1), 1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
journal.journal(makeRI(2), 1, 1, 5, QJMTestUtil.createTxnData(1, 5));
assertJournaledEditsTxnCountAndContents(1, 10, 5,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);

Mockito.doThrow(new IOException("Injected")).when(faultInjector)
.writeEdits();
assertThrows(IOException.class, () ->
journal.journal(makeRI(3), 1, 6, 5, QJMTestUtil.createTxnData(6, 5)));
Mockito.reset(faultInjector);

// We should not see the edits that failed to persist to storage
assertJournaledEditsTxnCountAndContents(1, 10, 5,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
}

private void assertJournaledEditsTxnCountAndContents(int startTxn,
int requestedMaxTxns, int expectedEndTxn, int layoutVersion)
throws Exception {
Expand Down

0 comments on commit e609064

Please sign in to comment.