Skip to content

Commit

Permalink
Fix read handling of corrupted journal segments (#9378)
Browse files Browse the repository at this point in the history
Previously the journal reader got stuck waiting when reading
from an offset that returned an empty result.

In that case the journal reader assumes that it reached the
end of the journal and waits for more messages to come in.
Once new messages have been written to the journal, the
reader is trying again with the same offset.

With a corrupted log segment, reading the same offset will
return an empty result again. Even though there are valid
messages in new segments after the corrupted one.

This put the journal reader in an infinite loop until
the corrupted journal segmet gets deleted manually or users
delete the complete journal.

The code is now checking if there are more messages after
the current read offset when an empty result is received
from the read operation. In that case it increases the
offset one by one and retries the read until the result is
not empty anymore or the end of the log has been reached.

We reproduced this issue with a journal where the first
segment was truncated at the end. Later log segments
contained valid messages.

Rename fault-tolerant read method to `readNext`.
So that the JournalDecode command uses the old method.
Otherwise decoding a faulty offset explicitly with
`journal decode` would decode a message at the next
non-faulty offset, which is probably not expected.

Add test to simulate truncated segment.

Fixes #9305

Co-authored-by: Bernd Ahlers <bernd@graylog.com>
Co-authored-by: Othello Maurer <othello@graylog.com>
  • Loading branch information
3 people committed Jul 9, 2021
1 parent 4e94563 commit 7a0b955
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 1 deletion.
Expand Up @@ -591,9 +591,60 @@ public long write(byte[] idBytes, byte[] messageBytes) {

@Override
public List<JournalReadEntry> read(long requestedMaximumCount) {
return read(nextReadOffset, requestedMaximumCount);
return readNext(nextReadOffset, requestedMaximumCount);
}

/**
* Read next messages from the journal, starting at the given offset. If the underlying journal implementation
* returns an empty list of entries, but we know there are more entries in the journal, we'll try to skip the
* problematic offset(s) until we find entries again.
*
* @param startOffset Offset to start reading at
* @param requestedMaximumCount Maximum number of entries to return.
* @return A list of entries
*/
public List<JournalReadEntry> readNext(long startOffset, long requestedMaximumCount) {
// Capture the log end offset early for the failure handling below. The end offset will change during the
// runtime of the retry loop because new messages are written to the journal. If we would use the changing
// end offset in the error handling while loop, we would skip valid messages.
final long logEndOffset = getLogEndOffset();

List<JournalReadEntry> messages = read(startOffset, requestedMaximumCount);

if (messages.isEmpty()) {
// If we got an empty result BUT we know that there are more messages in the log, we bump the readOffset
// by 1 and try to read again. We continue until we either get an non-empty result or we reached the
// end of the log.
// This can happen when a log segment is truncated at the end but later segments have valid messages again.
long failedReadOffset = startOffset;
long retryReadOffset = failedReadOffset + 1;

while (messages.isEmpty() && failedReadOffset < (logEndOffset - 1)) {
LOG.warn(
"Couldn't read any messages from offset <{}> but journal has more messages. Skipping and " +
"trying to read from offset <{}>",
failedReadOffset, retryReadOffset);

// Retry the read with an increased offset to skip corrupt segments
messages = read(retryReadOffset, requestedMaximumCount);

// Bump offsets in case we still read an empty result
failedReadOffset++;
retryReadOffset++;
}
}

return messages;
}

/**
* Read from the journal, starting at the given offset. If the underlying journal implementation returns an empty
* list of entries, it will be returned even if we know there are more entries in the journal.
*
* @param readOffset Offset to start reading at
* @param requestedMaximumCount Maximum number of entries to return.
* @return A list of entries
*/
public List<JournalReadEntry> read(long readOffset, long requestedMaximumCount) {
// Always read at least one!
final long maximumCount = Math.max(1, requestedMaximumCount);
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.joda.time.Period;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand All @@ -47,13 +48,18 @@

import java.io.File;
import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.EnumSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.stream.IntStream;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.io.filefilter.FileFilterUtils.and;
Expand Down Expand Up @@ -493,4 +499,79 @@ public void serverStatusUnthrottledIfJournalUtilizationIsLowerThanThreshold() th
journal.cleanupLogs();
assertThat(serverStatus.getLifecycle()).isEqualTo(Lifecycle.RUNNING);
}

@Test
public void truncatedSegment() throws Exception {
final Size segmentSize = Size.kilobytes(1L);
final LocalKafkaJournal journal = new LocalKafkaJournal(journalDirectory.toPath(),
scheduler,
segmentSize,
Duration.standardHours(1),
Size.kilobytes(10L),
Duration.standardDays(1),
1_000_000,
Duration.standardMinutes(1),
100,
new MetricRegistry(),
serverStatus);

// this will create two segments, each containing 25 messages
createBulkChunks(journal, segmentSize, 2);

final Path firstSegmentPath =
Paths.get(journalDirectory.getAbsolutePath(), "messagejournal-0", "00000000000000000000.log");

assertThat(firstSegmentPath).isRegularFile();

// truncate the first segment so that the last message is cut off
final File firstSegment = firstSegmentPath.toFile();
try (FileChannel channel = new FileOutputStream(firstSegment, true).getChannel()) {
channel.truncate(firstSegment.length() - 1);
}

final List<Journal.JournalReadEntry> entriesFromFirstSegment = journal.read(25);
assertThat(entriesFromFirstSegment).hasSize(24);
final List<Journal.JournalReadEntry> entriesFromSecondSegment = journal.read(25);
assertThat(entriesFromSecondSegment).hasSize(25);
}

/**
* Test a race condition between reading and writing:
*
* We get an empty result back from the journal but at the time we check the last offset, more entries have been
* written to the journal. In that case we can't assume that the journal is corrupted, which we did with the
* first implementation.
*
* This is a manual test. It will always pass but you will see warnings, if the bug is triggered. You might have
* to run the test multiple times because as it's a race condition, it can't be triggered reliably.
*/
@Test
@Ignore
public void readNext() throws Exception {
final LocalKafkaJournal journal = new LocalKafkaJournal(journalDirectory.toPath(),
scheduler, Size.kilobytes(1L),
Duration.standardHours(1),
Size.kilobytes(10L),
Duration.standardDays(1),
1_000_000,
Duration.standardMinutes(1),
100,
new MetricRegistry(),
serverStatus);

final byte[] idBytes = "id".getBytes(UTF_8);
final byte[] messageBytes = "message".getBytes(UTF_8);

// High frequency reading with occasional writing.
// This doesn't trigger the bug reliably every time though.
IntStream.range(0, 1_000_000).parallel().forEach(
(i) -> {
if (i % 1000 == 0) {
journal.write(idBytes, messageBytes);
} else {
journal.read(1L).forEach(e -> journal.markJournalOffsetCommitted(e.getOffset()));
}
}
);
}
}

0 comments on commit 7a0b955

Please sign in to comment.