Skip to content

Commit

Permalink
Merge pull request #9655 from lihaoyi/zip-fast-fail
Browse files Browse the repository at this point in the history
Fix asymmetric failure behavior of Future#{zip,zipWith,traverse,sequence} by making them fail fast regardless of ordering
  • Loading branch information
lrytz committed Jun 7, 2021
2 parents 0bd34f8 + 0fc323b commit 5b56cd3
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 11 deletions.
3 changes: 3 additions & 0 deletions project/MimaFilters.scala
Expand Up @@ -33,6 +33,9 @@ object MimaFilters extends AutoPlugin {

// #8835
ProblemFilters.exclude[ReversedMissingMethodProblem]("scala.reflect.runtime.SynchronizedOps#SynchronizedBaseTypeSeq.scala$reflect$runtime$SynchronizedOps$SynchronizedBaseTypeSeq$$super$maxDepthOfElems"),

// this is an internal class and adding a final override here should not be a problem
ProblemFilters.exclude[FinalMethodProblem]("scala.concurrent.impl.Promise#DefaultPromise.zipWith"),
)

override val buildSettings = Seq(
Expand Down
28 changes: 17 additions & 11 deletions src/library/scala/concurrent/Future.scala
Expand Up @@ -383,10 +383,11 @@ trait Future[+T] extends Awaitable[T] {
/** Zips the values of `this` and `that` future, and creates
* a new future holding the tuple of their results.
*
* If `this` future fails, the resulting future is failed
* with the throwable stored in `this`.
* Otherwise, if `that` future fails, the resulting future is failed
* with the throwable stored in `that`.
* If either input future fails, the resulting future is failed with the same
* throwable, without waiting for the other input future to complete.
*
* If the application of `f` throws a non-fatal throwable, the resulting future
* is failed with that throwable.
*
* @tparam U the type of the other `Future`
* @param that the other `Future`
Expand All @@ -399,12 +400,11 @@ trait Future[+T] extends Awaitable[T] {
/** Zips the values of `this` and `that` future using a function `f`,
* and creates a new future holding the result.
*
* If `this` future fails, the resulting future is failed
* with the throwable stored in `this`.
* Otherwise, if `that` future fails, the resulting future is failed
* with the throwable stored in `that`.
* If the application of `f` throws a throwable, the resulting future
* is failed with that throwable if it is non-fatal.
* If either input future fails, the resulting future is failed with the same
* throwable, without waiting for the other input future to complete.
*
* If the application of `f` throws a non-fatal throwable, the resulting future
* is failed with that throwable.
*
* @tparam U the type of the other `Future`
* @tparam R the type of the resulting `Future`
Expand All @@ -413,8 +413,14 @@ trait Future[+T] extends Awaitable[T] {
* @return a `Future` with the result of the application of `f` to the results of `this` and `that`
* @group Transformations
*/
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] =
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = {
// This is typically overriden by the implementation in DefaultPromise, which provides
// symmetric fail-fast behavior regardless of which future fails first.
//
// TODO: remove this implementation and make Future#zipWith abstract
// when we're next willing to make a binary incompatible change
flatMap(r1 => that.map(r2 => f(r1, r2)))(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic)
}

/** Creates a new future which holds the result of this future if it was completed successfully, or, if not,
* the result of the `that` future if `that` is completed successfully.
Expand Down
36 changes: 36 additions & 0 deletions src/library/scala/concurrent/impl/Promise.scala
Expand Up @@ -130,6 +130,42 @@ private[concurrent] object Promise {
override final def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] =
dispatchOrAddCallbacks(get(), new Transformation[T, S](Xform_transformWith, f, executor))

override final def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = {
val state = get()
if (state.isInstanceOf[Try[T]]) {
if (state.asInstanceOf[Try[T]].isFailure) this.asInstanceOf[Future[R]]
else {
val l = state.asInstanceOf[Success[T]].get
that.map(r => f(l, r))
}
} else {
val buffer = new AtomicReference[Success[Any]]()
val zipped = new DefaultPromise[R]()

val thisF: Try[T] => Unit = {
case left: Success[T] =>
val right = buffer.getAndSet(left).asInstanceOf[Success[U]]
if (right ne null)
zipped.tryComplete(try Success(f(left.get, right.get)) catch { case e if NonFatal(e) => Failure(e) })
case f => // Can only be Failure
zipped.tryComplete(f.asInstanceOf[Failure[R]])
}

val thatF: Try[U] => Unit = {
case right: Success[U] =>
val left = buffer.getAndSet(right).asInstanceOf[Success[T]]
if (left ne null)
zipped.tryComplete(try Success(f(left.get, right.get)) catch { case e if NonFatal(e) => Failure(e) })
case f => // Can only be Failure
zipped.tryComplete(f.asInstanceOf[Failure[R]])
}
// Cheaper than this.onComplete since we already polled the state
this.dispatchOrAddCallbacks(state, new Transformation[T, Unit](Xform_onComplete, thisF, executor))
that.onComplete(thatF)
zipped.future
}
}

override final def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = {
val state = get()
if (!state.isInstanceOf[Failure[T]]) dispatchOrAddCallbacks(state, new Transformation[T, Unit](Xform_foreach, f, executor))
Expand Down
1 change: 1 addition & 0 deletions test/files/jvm/future-spec/FutureTests.scala
Expand Up @@ -147,6 +147,7 @@ class FutureTests extends MinimalScalaTest {
assert( ECNotUsed(ec => f.filter(_ => fail("filter should not have been called"))(ec)) eq f)
assert( ECNotUsed(ec => f.collect({ case _ => fail("collect should not have been called")})(ec)) eq f)
assert( ECNotUsed(ec => f.zipWith(f)({ (_,_) => fail("zipWith should not have been called")})(ec)) eq f)

}
}

Expand Down
38 changes: 38 additions & 0 deletions test/junit/scala/concurrent/FutureTest.scala
Expand Up @@ -6,8 +6,46 @@ import org.junit.Test

import scala.tools.testkit.AssertUtil._
import scala.util.Try
import duration.Duration.Inf

class FutureTest {
@Test
def testZipWithFailFastBothWays(): Unit = {
import ExecutionContext.Implicits.global

val p1 = Promise[Int]()
val p2 = Promise[Int]()

// Make sure that the combined future fails early, after the earlier failure occurs, and does not
// wait for the later failure regardless of which one is on the left and which is on the right
p1.failure(new Exception("Boom Early"))
val f1 = p1.future
val f2 = p2.future

val scala.util.Failure(fa) = Try(Await.result(f1.zip(f2), Inf))
val scala.util.Failure(fb) = Try(Await.result(f2.zip(f1), Inf))

val scala.util.Failure(fc) = Try(Await.result(f1.zipWith(f2)((_, _)), Inf))
val scala.util.Failure(fd) = Try(Await.result(f2.zipWith(f1)((_, _)), Inf))

val scala.util.Failure(fe) = Try(Await.result(Future.sequence(Seq(f1, f2)), Inf))
val scala.util.Failure(ff) = Try(Await.result(Future.sequence(Seq(f2, f1)), Inf))

val scala.util.Failure(fg) = Try(Await.result(Future.traverse(Seq(0, 1))(Seq(f1, f2)(_)), Inf))
val scala.util.Failure(fh) = Try(Await.result(Future.traverse(Seq(0, 1))(Seq(f1, f2)(_)), Inf))

// Make sure the early failure is always reported, regardless of whether it's on
// the left or right of the zip/zipWith/sequence/traverse
assert(fa.getMessage == "Boom Early")
assert(fb.getMessage == "Boom Early")
assert(fc.getMessage == "Boom Early")
assert(fd.getMessage == "Boom Early")
assert(fe.getMessage == "Boom Early")
assert(ff.getMessage == "Boom Early")
assert(fg.getMessage == "Boom Early")
assert(fh.getMessage == "Boom Early")
}

@Test
def `bug/issues#10513 firstCompletedOf must not leak references`(): Unit = {
val unfulfilled = Promise[AnyRef]()
Expand Down

0 comments on commit 5b56cd3

Please sign in to comment.