Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix asymmetric failure behavior of Future#{zip,zipWith,traverse,sequence} by making them fail fast regardless of ordering #9655

Merged
merged 1 commit into from Jun 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions project/MimaFilters.scala
Expand Up @@ -33,6 +33,9 @@ object MimaFilters extends AutoPlugin {

// #8835
ProblemFilters.exclude[ReversedMissingMethodProblem]("scala.reflect.runtime.SynchronizedOps#SynchronizedBaseTypeSeq.scala$reflect$runtime$SynchronizedOps$SynchronizedBaseTypeSeq$$super$maxDepthOfElems"),

// this is an internal class and adding a final override here should not be a problem
ProblemFilters.exclude[FinalMethodProblem]("scala.concurrent.impl.Promise#DefaultPromise.zipWith"),
)

override val buildSettings = Seq(
Expand Down
28 changes: 17 additions & 11 deletions src/library/scala/concurrent/Future.scala
Expand Up @@ -383,10 +383,11 @@ trait Future[+T] extends Awaitable[T] {
/** Zips the values of `this` and `that` future, and creates
* a new future holding the tuple of their results.
*
* If `this` future fails, the resulting future is failed
* with the throwable stored in `this`.
* Otherwise, if `that` future fails, the resulting future is failed
* with the throwable stored in `that`.
* If either input future fails, the resulting future is failed with the same
* throwable, without waiting for the other input future to complete.
*
* If the application of `f` throws a non-fatal throwable, the resulting future
* is failed with that throwable.
*
* @tparam U the type of the other `Future`
* @param that the other `Future`
Expand All @@ -399,12 +400,11 @@ trait Future[+T] extends Awaitable[T] {
/** Zips the values of `this` and `that` future using a function `f`,
* and creates a new future holding the result.
*
* If `this` future fails, the resulting future is failed
* with the throwable stored in `this`.
* Otherwise, if `that` future fails, the resulting future is failed
* with the throwable stored in `that`.
* If the application of `f` throws a throwable, the resulting future
* is failed with that throwable if it is non-fatal.
* If either input future fails, the resulting future is failed with the same
* throwable, without waiting for the other input future to complete.
*
* If the application of `f` throws a non-fatal throwable, the resulting future
* is failed with that throwable.
*
* @tparam U the type of the other `Future`
* @tparam R the type of the resulting `Future`
Expand All @@ -413,8 +413,14 @@ trait Future[+T] extends Awaitable[T] {
* @return a `Future` with the result of the application of `f` to the results of `this` and `that`
* @group Transformations
*/
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] =
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = {
viktorklang marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scaladocs should be updated to reflect the relaxed specification. Also on Future.zip.

// This is typically overriden by the implementation in DefaultPromise, which provides
// symmetric fail-fast behavior regardless of which future fails first.
//
// TODO: remove this implementation and make Future#zipWith abstract
// when we're next willing to make a binary incompatible change
flatMap(r1 => that.map(r2 => f(r1, r2)))(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic)
}
viktorklang marked this conversation as resolved.
Show resolved Hide resolved

/** Creates a new future which holds the result of this future if it was completed successfully, or, if not,
* the result of the `that` future if `that` is completed successfully.
Expand Down
36 changes: 36 additions & 0 deletions src/library/scala/concurrent/impl/Promise.scala
Expand Up @@ -130,6 +130,42 @@ private[concurrent] object Promise {
override final def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] =
dispatchOrAddCallbacks(get(), new Transformation[T, S](Xform_transformWith, f, executor))

override final def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = {
val state = get()
if (state.isInstanceOf[Try[T]]) {
if (state.asInstanceOf[Try[T]].isFailure) this.asInstanceOf[Future[R]]
else {
val l = state.asInstanceOf[Success[T]].get
that.map(r => f(l, r))
}
} else {
val buffer = new AtomicReference[Success[Any]]()
val zipped = new DefaultPromise[R]()

val thisF: Try[T] => Unit = {
case left: Success[T] =>
val right = buffer.getAndSet(left).asInstanceOf[Success[U]]
if (right ne null)
zipped.tryComplete(try Success(f(left.get, right.get)) catch { case e if NonFatal(e) => Failure(e) })
case f => // Can only be Failure
zipped.tryComplete(f.asInstanceOf[Failure[R]])
}

val thatF: Try[U] => Unit = {
case right: Success[U] =>
val left = buffer.getAndSet(right).asInstanceOf[Success[T]]
if (left ne null)
zipped.tryComplete(try Success(f(left.get, right.get)) catch { case e if NonFatal(e) => Failure(e) })
case f => // Can only be Failure
zipped.tryComplete(f.asInstanceOf[Failure[R]])
}
// Cheaper than this.onComplete since we already polled the state
this.dispatchOrAddCallbacks(state, new Transformation[T, Unit](Xform_onComplete, thisF, executor))
that.onComplete(thatF)
zipped.future
}
}
viktorklang marked this conversation as resolved.
Show resolved Hide resolved

override final def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = {
val state = get()
if (!state.isInstanceOf[Failure[T]]) dispatchOrAddCallbacks(state, new Transformation[T, Unit](Xform_foreach, f, executor))
Expand Down
1 change: 1 addition & 0 deletions test/files/jvm/future-spec/FutureTests.scala
Expand Up @@ -147,6 +147,7 @@ class FutureTests extends MinimalScalaTest {
assert( ECNotUsed(ec => f.filter(_ => fail("filter should not have been called"))(ec)) eq f)
assert( ECNotUsed(ec => f.collect({ case _ => fail("collect should not have been called")})(ec)) eq f)
assert( ECNotUsed(ec => f.zipWith(f)({ (_,_) => fail("zipWith should not have been called")})(ec)) eq f)
viktorklang marked this conversation as resolved.
Show resolved Hide resolved

}
}

Expand Down
38 changes: 38 additions & 0 deletions test/junit/scala/concurrent/FutureTest.scala
Expand Up @@ -6,8 +6,46 @@ import org.junit.Test

import scala.tools.testkit.AssertUtil._
import scala.util.Try
import duration.Duration.Inf

class FutureTest {
@Test
def testZipWithFailFastBothWays(): Unit = {
import ExecutionContext.Implicits.global

val p1 = Promise[Int]()
val p2 = Promise[Int]()

// Make sure that the combined future fails early, after the earlier failure occurs, and does not
// wait for the later failure regardless of which one is on the left and which is on the right
p1.failure(new Exception("Boom Early"))
val f1 = p1.future
val f2 = p2.future

val scala.util.Failure(fa) = Try(Await.result(f1.zip(f2), Inf))
val scala.util.Failure(fb) = Try(Await.result(f2.zip(f1), Inf))

val scala.util.Failure(fc) = Try(Await.result(f1.zipWith(f2)((_, _)), Inf))
val scala.util.Failure(fd) = Try(Await.result(f2.zipWith(f1)((_, _)), Inf))

val scala.util.Failure(fe) = Try(Await.result(Future.sequence(Seq(f1, f2)), Inf))
val scala.util.Failure(ff) = Try(Await.result(Future.sequence(Seq(f2, f1)), Inf))

val scala.util.Failure(fg) = Try(Await.result(Future.traverse(Seq(0, 1))(Seq(f1, f2)(_)), Inf))
val scala.util.Failure(fh) = Try(Await.result(Future.traverse(Seq(0, 1))(Seq(f1, f2)(_)), Inf))

// Make sure the early failure is always reported, regardless of whether it's on
// the left or right of the zip/zipWith/sequence/traverse
assert(fa.getMessage == "Boom Early")
assert(fb.getMessage == "Boom Early")
assert(fc.getMessage == "Boom Early")
assert(fd.getMessage == "Boom Early")
assert(fe.getMessage == "Boom Early")
assert(ff.getMessage == "Boom Early")
assert(fg.getMessage == "Boom Early")
assert(fh.getMessage == "Boom Early")
}

@Test
def `bug/issues#10513 firstCompletedOf must not leak references`(): Unit = {
val unfulfilled = Promise[AnyRef]()
Expand Down