Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logging in Exceptions.throwIf[Jvm]Fatal, add isFatal methods #3122

Merged
merged 7 commits into from Aug 1, 2022
95 changes: 82 additions & 13 deletions reactor-core/src/main/java/reactor/core/Exceptions.java
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,8 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import reactor.core.publisher.Flux;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;

Expand All @@ -36,6 +38,8 @@
*/
public abstract class Exceptions {

private static final Logger LOGGER = Loggers.getLogger(Exceptions.class);

/**
* A common error message used when a reactive streams source doesn't seem to respect
* backpressure signals, resulting in an operator's internal queue to be full.
Expand Down Expand Up @@ -412,6 +416,68 @@ public static <T> Throwable terminate(AtomicReferenceFieldUpdater<T, Throwable>
return current;
}

/**
* Check if a {@link Throwable} is considered by Reactor as Jvm Fatal and would be thrown
* by both {@link #throwIfFatal(Throwable)} and {@link #throwIfJvmFatal(Throwable)}.
* This is a subset of {@link #isFatal(Throwable)}, namely:
* <ul>
* <li>{@link VirtualMachineError}</li>
* <li>{@link ThreadDeath}</li>
* <li>{@link LinkageError}</li>
* </ul>
* <p>
* Unless wrapped explicitly, such exceptions would always be thrown by operators instead of
* propagation through onError, potentially interrupting progress of Flux/Mono sequences.
* When they occur, the JVM itself is assumed to be in an unrecoverable state, and so is Reactor.
*
* @see #throwIfFatal(Throwable)
* @see #throwIfJvmFatal(Throwable)
* @see #isFatal(Throwable)
* @param t the {@link Throwable} to check
* @return true if the throwable is considered Jvm Fatal
*/
public static boolean isJvmFatal(@Nullable Throwable t) {
return t instanceof VirtualMachineError ||
t instanceof ThreadDeath ||
t instanceof LinkageError;
}

/**
* Check if a {@link Throwable} is considered by Reactor as Fatal and would be thrown by
* {@link #throwIfFatal(Throwable)}.
* <ul>
* <li>{@code BubblingException} (as detectable by {@link #isBubbling(Throwable)})</li>
* <li>{@code ErrorCallbackNotImplemented} (as detectable by {@link #isErrorCallbackNotImplemented(Throwable)})</li>
* <li> {@link #isJvmFatal(Throwable) Jvm Fatal exceptions}
* <ul>
* <li>{@link VirtualMachineError}</li>
* <li>{@link ThreadDeath}</li>
* <li>{@link LinkageError}</li>
* </ul>
* </li>
* </ul>
* <p>
* Unless wrapped explicitly, such exceptions would always be thrown by operators instead of
* propagation through onError, potentially interrupting progress of Flux/Mono sequences.
* When they occur, the assumption is that Reactor is in an unrecoverable state (notably
* because the JVM itself might be in an unrecoverable state).
*
* @see #throwIfFatal(Throwable)
* @see #isJvmFatal(Throwable)
* @param t the {@link Throwable} to check
* @return true if the throwable is considered fatal
*/
public static boolean isFatal(@Nullable Throwable t) {
return isFatalButNotJvmFatal(t) || isJvmFatal(t);
}

/**
* Internal intermediate test that only detect Fatal but not Jvm Fatal exceptions.
*/
static boolean isFatalButNotJvmFatal(@Nullable Throwable t) {
return t instanceof BubblingException || t instanceof ErrorCallbackNotImplemented;
}

/**
* Throws a particular {@code Throwable} only if it belongs to a set of "fatal" error
* varieties. These varieties are as follows: <ul>
Expand All @@ -422,13 +488,17 @@ public static <T> Throwable terminate(AtomicReferenceFieldUpdater<T, Throwable>
* @param t the exception to evaluate
*/
public static void throwIfFatal(@Nullable Throwable t) {
if (t instanceof BubblingException) {
throw (BubblingException) t;
if (t == null) {
return;
}
if (t instanceof ErrorCallbackNotImplemented) {
throw (ErrorCallbackNotImplemented) t;
if (isFatalButNotJvmFatal(t)) {
LOGGER.warn("throwIfFatal detected a fatal exception, which is thrown and logged below:", t);
throw (RuntimeException) t;
}
if (isJvmFatal(t)) {
LOGGER.warn("throwIfFatal detected a jvm fatal exception, which is thrown and logged below:", t);
throw (Error) t;
}
throwIfJvmFatal(t);
}

/**
Expand All @@ -440,14 +510,13 @@ public static void throwIfFatal(@Nullable Throwable t) {
* @param t the exception to evaluate
*/
public static void throwIfJvmFatal(@Nullable Throwable t) {
if (t instanceof VirtualMachineError) {
throw (VirtualMachineError) t;
}
if (t instanceof ThreadDeath) {
throw (ThreadDeath) t;
if (t == null) {
return;
}
if (t instanceof LinkageError) {
throw (LinkageError) t;
if (isJvmFatal(t)) {
LOGGER.warn("throwIfJvmFatal detected a jvm fatal exception, which is thrown and logged below:", t);
assert t instanceof Error;
throw (Error) t;
}
}

Expand Down
160 changes: 145 additions & 15 deletions reactor-core/src/test/java/reactor/core/ExceptionsTest.java
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,10 +23,12 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.Test;

import reactor.core.publisher.Mono;
import reactor.test.util.RaceTestUtils;
import reactor.test.util.TestLogger;
import reactor.util.annotation.Nullable;

import static org.assertj.core.api.Assertions.*;
Expand All @@ -43,6 +45,32 @@ public class ExceptionsTest {
static final AtomicReferenceFieldUpdater<ExceptionsTest, Throwable> ADD_THROWABLE =
AtomicReferenceFieldUpdater.newUpdater(ExceptionsTest.class, Throwable.class, "addThrowable");

static VirtualMachineError JVM_FATAL_VIRTUAL_MACHINE_ERROR = new VirtualMachineError("expected to be logged") {
@Override
public String toString() {
return "custom VirtualMachineError: expected to be logged";
}
};

static final ThreadDeath JVM_FATAL_THREAD_DEATH = new ThreadDeath() {
@Override
public String getMessage() {
return "expected to be logged";
}

@Override
public String toString() {
return "custom ThreadDeath: expected to be logged";
}
};

static final LinkageError JVM_FATAL_LINKAGE_ERROR = new LinkageError("expected to be logged") {
@Override
public String toString() {
return "custom LinkageError: expected to be logged";
}
};

@Test
public void bubble() throws Exception {
Throwable t = new Exception("test");
Expand Down Expand Up @@ -157,44 +185,146 @@ public void propagateDoesntWrapRuntimeException() {
//TODO test terminate

@Test
public void throwIfFatalThrowsBubbling() {
BubblingException expected = new BubblingException("expected");
void bubblingExceptionIsFatalButNotJvmFatal() {
Throwable exception = new BubblingException("expected");
assertThat(Exceptions.isFatal(exception))
.as("isFatal(bubbling)")
.isTrue();
assertThat(Exceptions.isJvmFatal(exception))
.as("isJvmFatal(bubbling)")
.isFalse();
}

@Test
void errorCallbackNotImplementedIsFatalButNotJvmFatal() {
Throwable exception = new ErrorCallbackNotImplemented(new IllegalStateException("expected cause"));
assertThat(Exceptions.isFatal(exception))
.as("isFatal(ErrorCallbackNotImplemented)")
.isTrue();
assertThat(Exceptions.isJvmFatal(exception))
.as("isJvmFatal(ErrorCallbackNotImplemented)")
.isFalse();
}

@Test
void virtualMachineErrorIsFatalAndJvmFatal() {
assertThat(Exceptions.isFatal(JVM_FATAL_VIRTUAL_MACHINE_ERROR))
.as("isFatal(VirtualMachineError)")
.isTrue();
assertThat(Exceptions.isJvmFatal(JVM_FATAL_VIRTUAL_MACHINE_ERROR))
.as("isJvmFatal(VirtualMachineError)")
.isTrue();
}

@Test
void linkageErrorIsFatalAndJvmFatal() {
assertThat(Exceptions.isFatal(JVM_FATAL_LINKAGE_ERROR))
.as("isFatal(LinkageError)")
.isTrue();
assertThat(Exceptions.isJvmFatal(JVM_FATAL_LINKAGE_ERROR))
.as("isJvmFatal(LinkageError)")
.isTrue();
}

@Test
void threadDeathIsFatalAndJvmFatal() {
assertThat(Exceptions.isFatal(JVM_FATAL_THREAD_DEATH))
.as("isFatal(ThreadDeath)")
.isTrue();
assertThat(Exceptions.isJvmFatal(JVM_FATAL_THREAD_DEATH))
.as("isJvmFatal(ThreadDeath)")
.isTrue();
}

@Test
@TestLoggerExtension.Redirect
void throwIfFatalThrowsAndLogsBubbling(TestLogger testLogger) {
BubblingException expected = new BubblingException("expected to be logged");
chemicL marked this conversation as resolved.
Show resolved Hide resolved

assertThatExceptionOfType(BubblingException.class)
.isThrownBy(() -> Exceptions.throwIfFatal(expected))
.isSameAs(expected);

assertThat(testLogger.getErrContent())
.startsWith("[ WARN] throwIfFatal detected a fatal exception, which is thrown and logged below: - reactor.core.Exceptions$BubblingException: expected to be logged");
}

@Test
public void throwIfFatalThrowsErrorCallbackNotImplemented() {
ErrorCallbackNotImplemented expected = new ErrorCallbackNotImplemented(new IllegalStateException("expected cause"));
@TestLoggerExtension.Redirect
void throwIfFatalThrowsAndLogsErrorCallbackNotImplemented(TestLogger testLogger) {
ErrorCallbackNotImplemented expected = new ErrorCallbackNotImplemented(new IllegalStateException("expected to be logged"));

assertThatExceptionOfType(ErrorCallbackNotImplemented.class)
.isThrownBy(() -> Exceptions.throwIfFatal(expected))
.isSameAs(expected)
.withCause(expected.getCause());

assertThat(testLogger.getErrContent())
.startsWith("[ WARN] throwIfFatal detected a fatal exception, which is thrown and logged below: - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: expected to be logged");
}

@Test
public void throwIfJvmFatal() {
VirtualMachineError fatal1 = new VirtualMachineError() {};
ThreadDeath fatal2 = new ThreadDeath();
LinkageError fatal3 = new LinkageError();
@TestLoggerExtension.Redirect
void throwIfFatalWithJvmFatalErrorsDoesThrowAndLog(TestLogger testLogger) {
SoftAssertions.assertSoftly(softly -> {
softly.assertThatExceptionOfType(VirtualMachineError.class)
.as("VirtualMachineError")
.isThrownBy(() -> Exceptions.throwIfFatal(JVM_FATAL_VIRTUAL_MACHINE_ERROR))
.isSameAs(JVM_FATAL_VIRTUAL_MACHINE_ERROR);

softly.assertThatExceptionOfType(ThreadDeath.class)
.as("ThreadDeath")
.isThrownBy(() -> Exceptions.throwIfFatal(JVM_FATAL_THREAD_DEATH))
.isSameAs(JVM_FATAL_THREAD_DEATH);

softly.assertThatExceptionOfType(LinkageError.class)
.as("LinkageError")
.isThrownBy(() -> Exceptions.throwIfFatal(JVM_FATAL_LINKAGE_ERROR))
.isSameAs(JVM_FATAL_LINKAGE_ERROR);

softly.assertThat(testLogger.getErrContent())
.startsWith("[ WARN] throwIfFatal detected a jvm fatal exception, which is thrown and logged below: - custom VirtualMachineError: expected to be logged")
.contains("[ WARN] throwIfFatal detected a jvm fatal exception, which is thrown and logged below: - custom ThreadDeath: expected to be logged")
.contains("[ WARN] throwIfFatal detected a jvm fatal exception, which is thrown and logged below: - custom LinkageError: expected to be logged");
});
}

@Test
@TestLoggerExtension.Redirect
void throwIfJvmFatalDoesThrowAndLog(TestLogger testLogger) {
assertThatExceptionOfType(VirtualMachineError.class)
.as("VirtualMachineError")
.isThrownBy(() -> Exceptions.throwIfJvmFatal(fatal1))
.isSameAs(fatal1);
.isThrownBy(() -> Exceptions.throwIfJvmFatal(JVM_FATAL_VIRTUAL_MACHINE_ERROR))
.isSameAs(JVM_FATAL_VIRTUAL_MACHINE_ERROR);

assertThatExceptionOfType(ThreadDeath.class)
.as("ThreadDeath")
.isThrownBy(() -> Exceptions.throwIfJvmFatal(fatal2))
.isSameAs(fatal2);
.isThrownBy(() -> Exceptions.throwIfJvmFatal(JVM_FATAL_THREAD_DEATH))
.isSameAs(JVM_FATAL_THREAD_DEATH);

assertThatExceptionOfType(LinkageError.class)
.as("LinkageError")
.isThrownBy(() -> Exceptions.throwIfJvmFatal(fatal3))
.isSameAs(fatal3);
.isThrownBy(() -> Exceptions.throwIfJvmFatal(JVM_FATAL_LINKAGE_ERROR))
.isSameAs(JVM_FATAL_LINKAGE_ERROR);

assertThat(testLogger.getErrContent())
.startsWith("[ WARN] throwIfJvmFatal detected a jvm fatal exception, which is thrown and logged below: - custom VirtualMachineError: expected to be logged")
.contains("[ WARN] throwIfJvmFatal detected a jvm fatal exception, which is thrown and logged below: - custom ThreadDeath: expected to be logged")
.contains("[ WARN] throwIfJvmFatal detected a jvm fatal exception, which is thrown and logged below: - custom LinkageError: expected to be logged");
};

@Test
@TestLoggerExtension.Redirect
void throwIfJvmFatalDoesntThrowNorLogsOnSimplyFatalExceptions(TestLogger testLogger) {
SoftAssertions.assertSoftly(softly -> {
softly.assertThatCode(() -> Exceptions.throwIfJvmFatal(new BubblingException("not thrown")))
.doesNotThrowAnyException();

softly.assertThatCode(() -> Exceptions.throwIfJvmFatal(new ErrorCallbackNotImplemented(new RuntimeException("not thrown"))))
.doesNotThrowAnyException();
});

assertThat(testLogger.getErrContent()).isEmpty();
}

@Test
Expand Down
Expand Up @@ -209,9 +209,9 @@ void onNextConsumerFatalDoesntTriggerCancellation(TestLogger testLogger) {
TestSubscription testSubscription = new TestSubscription();
tested.onSubscribe(testSubscription);

//the error is expected to be thrown as it is fatal, so it doesn't go through onErrorDropped
//the error is expected to be thrown as it is fatal, so it doesn't go through onErrorDropped. However, throwIfJvmFatal now logs it.
assertThatExceptionOfType(OutOfMemoryError.class).isThrownBy(() -> tested.onNext("foo"));
Assertions.assertThat(testLogger.getErrContent()).isEmpty();
Assertions.assertThat(testLogger.getErrContent()).startsWith("[ WARN] throwIfFatal detected a jvm fatal exception, which is thrown and logged below: - java.lang.OutOfMemoryError");

assertThat(testSubscription.isCancelled).as("subscription isCancelled")
.isFalse();
Expand Down