Skip to content

Commit

Permalink
Add logging in Exceptions.throwIf[Jvm]Fatal, add isFatal methods (#3122)
Browse files Browse the repository at this point in the history
This commit improves the discoverability of exceptions that are "fatal"
to Reactor by:
 - adding `Exceptions.isFatal` and `Exceptions.isJvmFatal` methods
 - adding logging to `Exceptions.throwIfFatal` and
 `Exceptions.throwIfJvmFatal` just before a fatal exception is thrown

This should at least help pinpointing such occurrences from the logs,
in cases where the thrown exception bubble all the way up the stack of
a thread with no uncaughtExceptionHandler for example.

Fixes #3111.
  • Loading branch information
simonbasle committed Aug 1, 2022
1 parent 6a89d1f commit 50bb983
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 30 deletions.
95 changes: 82 additions & 13 deletions reactor-core/src/main/java/reactor/core/Exceptions.java
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,8 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import reactor.core.publisher.Flux;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;

Expand All @@ -36,6 +38,8 @@
*/
public abstract class Exceptions {

private static final Logger LOGGER = Loggers.getLogger(Exceptions.class);

/**
* A common error message used when a reactive streams source doesn't seem to respect
* backpressure signals, resulting in an operator's internal queue to be full.
Expand Down Expand Up @@ -412,6 +416,68 @@ public static <T> Throwable terminate(AtomicReferenceFieldUpdater<T, Throwable>
return current;
}

/**
* Check if a {@link Throwable} is considered by Reactor as Jvm Fatal and would be thrown
* by both {@link #throwIfFatal(Throwable)} and {@link #throwIfJvmFatal(Throwable)}.
* This is a subset of {@link #isFatal(Throwable)}, namely:
* <ul>
* <li>{@link VirtualMachineError}</li>
* <li>{@link ThreadDeath}</li>
* <li>{@link LinkageError}</li>
* </ul>
* <p>
* Unless wrapped explicitly, such exceptions would always be thrown by operators instead of
* propagation through onError, potentially interrupting progress of Flux/Mono sequences.
* When they occur, the JVM itself is assumed to be in an unrecoverable state, and so is Reactor.
*
* @see #throwIfFatal(Throwable)
* @see #throwIfJvmFatal(Throwable)
* @see #isFatal(Throwable)
* @param t the {@link Throwable} to check
* @return true if the throwable is considered Jvm Fatal
*/
public static boolean isJvmFatal(@Nullable Throwable t) {
return t instanceof VirtualMachineError ||
t instanceof ThreadDeath ||
t instanceof LinkageError;
}

/**
* Check if a {@link Throwable} is considered by Reactor as Fatal and would be thrown by
* {@link #throwIfFatal(Throwable)}.
* <ul>
* <li>{@code BubblingException} (as detectable by {@link #isBubbling(Throwable)})</li>
* <li>{@code ErrorCallbackNotImplemented} (as detectable by {@link #isErrorCallbackNotImplemented(Throwable)})</li>
* <li> {@link #isJvmFatal(Throwable) Jvm Fatal exceptions}
* <ul>
* <li>{@link VirtualMachineError}</li>
* <li>{@link ThreadDeath}</li>
* <li>{@link LinkageError}</li>
* </ul>
* </li>
* </ul>
* <p>
* Unless wrapped explicitly, such exceptions would always be thrown by operators instead of
* propagation through onError, potentially interrupting progress of Flux/Mono sequences.
* When they occur, the assumption is that Reactor is in an unrecoverable state (notably
* because the JVM itself might be in an unrecoverable state).
*
* @see #throwIfFatal(Throwable)
* @see #isJvmFatal(Throwable)
* @param t the {@link Throwable} to check
* @return true if the throwable is considered fatal
*/
public static boolean isFatal(@Nullable Throwable t) {
return isFatalButNotJvmFatal(t) || isJvmFatal(t);
}

/**
* Internal intermediate test that only detect Fatal but not Jvm Fatal exceptions.
*/
static boolean isFatalButNotJvmFatal(@Nullable Throwable t) {
return t instanceof BubblingException || t instanceof ErrorCallbackNotImplemented;
}

/**
* Throws a particular {@code Throwable} only if it belongs to a set of "fatal" error
* varieties. These varieties are as follows: <ul>
Expand All @@ -422,13 +488,17 @@ public static <T> Throwable terminate(AtomicReferenceFieldUpdater<T, Throwable>
* @param t the exception to evaluate
*/
public static void throwIfFatal(@Nullable Throwable t) {
if (t instanceof BubblingException) {
throw (BubblingException) t;
if (t == null) {
return;
}
if (t instanceof ErrorCallbackNotImplemented) {
throw (ErrorCallbackNotImplemented) t;
if (isFatalButNotJvmFatal(t)) {
LOGGER.warn("throwIfFatal detected a fatal exception, which is thrown and logged below:", t);
throw (RuntimeException) t;
}
if (isJvmFatal(t)) {
LOGGER.warn("throwIfFatal detected a jvm fatal exception, which is thrown and logged below:", t);
throw (Error) t;
}
throwIfJvmFatal(t);
}

/**
Expand All @@ -440,14 +510,13 @@ public static void throwIfFatal(@Nullable Throwable t) {
* @param t the exception to evaluate
*/
public static void throwIfJvmFatal(@Nullable Throwable t) {
if (t instanceof VirtualMachineError) {
throw (VirtualMachineError) t;
}
if (t instanceof ThreadDeath) {
throw (ThreadDeath) t;
if (t == null) {
return;
}
if (t instanceof LinkageError) {
throw (LinkageError) t;
if (isJvmFatal(t)) {
LOGGER.warn("throwIfJvmFatal detected a jvm fatal exception, which is thrown and logged below:", t);
assert t instanceof Error;
throw (Error) t;
}
}

Expand Down
160 changes: 145 additions & 15 deletions reactor-core/src/test/java/reactor/core/ExceptionsTest.java
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,10 +23,12 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.Test;

import reactor.core.publisher.Mono;
import reactor.test.util.RaceTestUtils;
import reactor.test.util.TestLogger;
import reactor.util.annotation.Nullable;

import static org.assertj.core.api.Assertions.*;
Expand All @@ -43,6 +45,32 @@ public class ExceptionsTest {
static final AtomicReferenceFieldUpdater<ExceptionsTest, Throwable> ADD_THROWABLE =
AtomicReferenceFieldUpdater.newUpdater(ExceptionsTest.class, Throwable.class, "addThrowable");

static VirtualMachineError JVM_FATAL_VIRTUAL_MACHINE_ERROR = new VirtualMachineError("expected to be logged") {
@Override
public String toString() {
return "custom VirtualMachineError: expected to be logged";
}
};

static final ThreadDeath JVM_FATAL_THREAD_DEATH = new ThreadDeath() {
@Override
public String getMessage() {
return "expected to be logged";
}

@Override
public String toString() {
return "custom ThreadDeath: expected to be logged";
}
};

static final LinkageError JVM_FATAL_LINKAGE_ERROR = new LinkageError("expected to be logged") {
@Override
public String toString() {
return "custom LinkageError: expected to be logged";
}
};

@Test
public void bubble() throws Exception {
Throwable t = new Exception("test");
Expand Down Expand Up @@ -157,44 +185,146 @@ public void propagateDoesntWrapRuntimeException() {
//TODO test terminate

@Test
public void throwIfFatalThrowsBubbling() {
BubblingException expected = new BubblingException("expected");
void bubblingExceptionIsFatalButNotJvmFatal() {
Throwable exception = new BubblingException("expected");
assertThat(Exceptions.isFatal(exception))
.as("isFatal(bubbling)")
.isTrue();
assertThat(Exceptions.isJvmFatal(exception))
.as("isJvmFatal(bubbling)")
.isFalse();
}

@Test
void errorCallbackNotImplementedIsFatalButNotJvmFatal() {
Throwable exception = new ErrorCallbackNotImplemented(new IllegalStateException("expected cause"));
assertThat(Exceptions.isFatal(exception))
.as("isFatal(ErrorCallbackNotImplemented)")
.isTrue();
assertThat(Exceptions.isJvmFatal(exception))
.as("isJvmFatal(ErrorCallbackNotImplemented)")
.isFalse();
}

@Test
void virtualMachineErrorIsFatalAndJvmFatal() {
assertThat(Exceptions.isFatal(JVM_FATAL_VIRTUAL_MACHINE_ERROR))
.as("isFatal(VirtualMachineError)")
.isTrue();
assertThat(Exceptions.isJvmFatal(JVM_FATAL_VIRTUAL_MACHINE_ERROR))
.as("isJvmFatal(VirtualMachineError)")
.isTrue();
}

@Test
void linkageErrorIsFatalAndJvmFatal() {
assertThat(Exceptions.isFatal(JVM_FATAL_LINKAGE_ERROR))
.as("isFatal(LinkageError)")
.isTrue();
assertThat(Exceptions.isJvmFatal(JVM_FATAL_LINKAGE_ERROR))
.as("isJvmFatal(LinkageError)")
.isTrue();
}

@Test
void threadDeathIsFatalAndJvmFatal() {
assertThat(Exceptions.isFatal(JVM_FATAL_THREAD_DEATH))
.as("isFatal(ThreadDeath)")
.isTrue();
assertThat(Exceptions.isJvmFatal(JVM_FATAL_THREAD_DEATH))
.as("isJvmFatal(ThreadDeath)")
.isTrue();
}

@Test
@TestLoggerExtension.Redirect
void throwIfFatalThrowsAndLogsBubbling(TestLogger testLogger) {
BubblingException expected = new BubblingException("expected to be logged");

assertThatExceptionOfType(BubblingException.class)
.isThrownBy(() -> Exceptions.throwIfFatal(expected))
.isSameAs(expected);

assertThat(testLogger.getErrContent())
.startsWith("[ WARN] throwIfFatal detected a fatal exception, which is thrown and logged below: - reactor.core.Exceptions$BubblingException: expected to be logged");
}

@Test
public void throwIfFatalThrowsErrorCallbackNotImplemented() {
ErrorCallbackNotImplemented expected = new ErrorCallbackNotImplemented(new IllegalStateException("expected cause"));
@TestLoggerExtension.Redirect
void throwIfFatalThrowsAndLogsErrorCallbackNotImplemented(TestLogger testLogger) {
ErrorCallbackNotImplemented expected = new ErrorCallbackNotImplemented(new IllegalStateException("expected to be logged"));

assertThatExceptionOfType(ErrorCallbackNotImplemented.class)
.isThrownBy(() -> Exceptions.throwIfFatal(expected))
.isSameAs(expected)
.withCause(expected.getCause());

assertThat(testLogger.getErrContent())
.startsWith("[ WARN] throwIfFatal detected a fatal exception, which is thrown and logged below: - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: expected to be logged");
}

@Test
public void throwIfJvmFatal() {
VirtualMachineError fatal1 = new VirtualMachineError() {};
ThreadDeath fatal2 = new ThreadDeath();
LinkageError fatal3 = new LinkageError();
@TestLoggerExtension.Redirect
void throwIfFatalWithJvmFatalErrorsDoesThrowAndLog(TestLogger testLogger) {
SoftAssertions.assertSoftly(softly -> {
softly.assertThatExceptionOfType(VirtualMachineError.class)
.as("VirtualMachineError")
.isThrownBy(() -> Exceptions.throwIfFatal(JVM_FATAL_VIRTUAL_MACHINE_ERROR))
.isSameAs(JVM_FATAL_VIRTUAL_MACHINE_ERROR);

softly.assertThatExceptionOfType(ThreadDeath.class)
.as("ThreadDeath")
.isThrownBy(() -> Exceptions.throwIfFatal(JVM_FATAL_THREAD_DEATH))
.isSameAs(JVM_FATAL_THREAD_DEATH);

softly.assertThatExceptionOfType(LinkageError.class)
.as("LinkageError")
.isThrownBy(() -> Exceptions.throwIfFatal(JVM_FATAL_LINKAGE_ERROR))
.isSameAs(JVM_FATAL_LINKAGE_ERROR);

softly.assertThat(testLogger.getErrContent())
.startsWith("[ WARN] throwIfFatal detected a jvm fatal exception, which is thrown and logged below: - custom VirtualMachineError: expected to be logged")
.contains("[ WARN] throwIfFatal detected a jvm fatal exception, which is thrown and logged below: - custom ThreadDeath: expected to be logged")
.contains("[ WARN] throwIfFatal detected a jvm fatal exception, which is thrown and logged below: - custom LinkageError: expected to be logged");
});
}

@Test
@TestLoggerExtension.Redirect
void throwIfJvmFatalDoesThrowAndLog(TestLogger testLogger) {
assertThatExceptionOfType(VirtualMachineError.class)
.as("VirtualMachineError")
.isThrownBy(() -> Exceptions.throwIfJvmFatal(fatal1))
.isSameAs(fatal1);
.isThrownBy(() -> Exceptions.throwIfJvmFatal(JVM_FATAL_VIRTUAL_MACHINE_ERROR))
.isSameAs(JVM_FATAL_VIRTUAL_MACHINE_ERROR);

assertThatExceptionOfType(ThreadDeath.class)
.as("ThreadDeath")
.isThrownBy(() -> Exceptions.throwIfJvmFatal(fatal2))
.isSameAs(fatal2);
.isThrownBy(() -> Exceptions.throwIfJvmFatal(JVM_FATAL_THREAD_DEATH))
.isSameAs(JVM_FATAL_THREAD_DEATH);

assertThatExceptionOfType(LinkageError.class)
.as("LinkageError")
.isThrownBy(() -> Exceptions.throwIfJvmFatal(fatal3))
.isSameAs(fatal3);
.isThrownBy(() -> Exceptions.throwIfJvmFatal(JVM_FATAL_LINKAGE_ERROR))
.isSameAs(JVM_FATAL_LINKAGE_ERROR);

assertThat(testLogger.getErrContent())
.startsWith("[ WARN] throwIfJvmFatal detected a jvm fatal exception, which is thrown and logged below: - custom VirtualMachineError: expected to be logged")
.contains("[ WARN] throwIfJvmFatal detected a jvm fatal exception, which is thrown and logged below: - custom ThreadDeath: expected to be logged")
.contains("[ WARN] throwIfJvmFatal detected a jvm fatal exception, which is thrown and logged below: - custom LinkageError: expected to be logged");
};

@Test
@TestLoggerExtension.Redirect
void throwIfJvmFatalDoesntThrowNorLogsOnSimplyFatalExceptions(TestLogger testLogger) {
SoftAssertions.assertSoftly(softly -> {
softly.assertThatCode(() -> Exceptions.throwIfJvmFatal(new BubblingException("not thrown")))
.doesNotThrowAnyException();

softly.assertThatCode(() -> Exceptions.throwIfJvmFatal(new ErrorCallbackNotImplemented(new RuntimeException("not thrown"))))
.doesNotThrowAnyException();
});

assertThat(testLogger.getErrContent()).isEmpty();
}

@Test
Expand Down
Expand Up @@ -209,9 +209,9 @@ void onNextConsumerFatalDoesntTriggerCancellation(TestLogger testLogger) {
TestSubscription testSubscription = new TestSubscription();
tested.onSubscribe(testSubscription);

//the error is expected to be thrown as it is fatal, so it doesn't go through onErrorDropped
//the error is expected to be thrown as it is fatal, so it doesn't go through onErrorDropped. However, throwIfJvmFatal now logs it.
assertThatExceptionOfType(OutOfMemoryError.class).isThrownBy(() -> tested.onNext("foo"));
Assertions.assertThat(testLogger.getErrContent()).isEmpty();
Assertions.assertThat(testLogger.getErrContent()).startsWith("[ WARN] throwIfFatal detected a jvm fatal exception, which is thrown and logged below: - java.lang.OutOfMemoryError");

assertThat(testSubscription.isCancelled).as("subscription isCancelled")
.isFalse();
Expand Down

0 comments on commit 50bb983

Please sign in to comment.