Skip to content

Commit

Permalink
Fix asymmetric failure behavior of Future#{zip,zipWith,traverse,seque…
Browse files Browse the repository at this point in the history
…nce} by making them fail fast regardless of ordering

Currently, given the following setup:

```scala
val f1 = Future{Thread.sleep(10000)}
val f2 = Future{Thread.sleep(2000); throw new Exception("Boom")}
```

The following two snippets exhibit different failure behavior:

```scala
val fa = Await.result(f1.zip(f2). Duration.Inf)
```

```scala
val fb = Await.result(f2.zip(f1). Duration.Inf)
```

`fa` fails after 10000ms, while `fb` fails after 2000ms. Both fail with `java.lang.Exception: boom`.

When zipping two `Future`s together, if the left `Future` fails early, the zipped `Future` fails early. But if the right `Future` fails early, the zipped `Future` waits until the right `Future` completes before failing.

`traverse` and `sequence` are similarly implemented with `zipWith` and should exhibit the same behavior. This all arises because `zipWith` is implemented using `flatMap`, which by definition asymmetric due to waiting fo the left `Future` to complete before even considering the right `Future`.

The current behavior makes the failure behavior of `Future`s most unpredictable; in general nobody pays attention to the order of `Future`s when zipping them together, and thus whether a `zipWith`ed/`zip`ed/`traverse`d/`sequence`d `Future` fails early or not is entirely arbitrary.

This PR replaces the implementation of `zipWith`, turning it from `flatMap`-based to `Promise`-based, so that when a `Future` fails early, regardless of whether it's the left or right `Future`, the resultant `Future` will fail immediately.

Implementation-wise I'm using an `AtomicReference` and `compareAndSet`, which should give us the behavior we want without any locking. It may well be possible to achieve with even less overhead, e.g. using only `volatile`s or even using no concurrency controls at all, but I couldn't come up with anything better. If anyone has a better solution I'll happily include it.

This fix would apply to all of `zip`/`zipWith`/`traverse`/`sequence`, since they all are implemented on top of `zipWith`

While it is possible that someone could be relying on the left-biased nature of current `zip`/`zipWith`/`traverse`/`sequence` implementation, but it seems like something that's unlikely to be reliable enough to depend upon. In my experience people generally aren't aware that `zipWith`/`zip`/`traverse`/`sequence`, and they don't generally know the total ordering of how long their Futures take to run. That means status quo behavior would just result in some `Future` fails mysterious taking longer to report for no clear reason.

Notably, the biased nature of these operators is not documented in any of their scaladoc comments.

While there is a non-zero chance that somebody could be intentionally or unintentionally depending on the biased nature of these combinators, there is a much greater chance that someone unaware of the current bias would be puzzled why their highly-concurrent system seems to be taking longer than expected in certain scenarios. It seems likely that this PR would fix more bugs than it would introduce

Note that this does not fix the left-biased fail-fast behavior of `flatMap` chains, or their equivalent `for`-comprehensions, as `flatMap`'s API is inherently left-biased. But anyone who wants fail-fast behavior can convert sections of their `flatMap` chains into `.zip`s where possible, and where not possible that's generally because there is some true data dependency between the `flatMap`s
  • Loading branch information
lihaoyi committed Jun 7, 2021
1 parent 36e218f commit 136c821
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 2 deletions.
3 changes: 3 additions & 0 deletions project/MimaFilters.scala
Expand Up @@ -33,6 +33,9 @@ object MimaFilters extends AutoPlugin {

// #8835
ProblemFilters.exclude[ReversedMissingMethodProblem]("scala.reflect.runtime.SynchronizedOps#SynchronizedBaseTypeSeq.scala$reflect$runtime$SynchronizedOps$SynchronizedBaseTypeSeq$$super$maxDepthOfElems"),

// this is an internal class and adding a final override here should not be a problem
ProblemFilters.exclude[FinalMethodProblem]("scala.concurrent.impl.Promise#DefaultPromise.zipWith"),
)

override val buildSettings = Seq(
Expand Down
8 changes: 7 additions & 1 deletion src/library/scala/concurrent/Future.scala
Expand Up @@ -413,8 +413,14 @@ trait Future[+T] extends Awaitable[T] {
* @return a `Future` with the result of the application of `f` to the results of `this` and `that`
* @group Transformations
*/
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] =
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = {
// This is typically overriden by the implementation in DefaultPromise, which provides
// symmetric fail-fast behavior regardless of which future fails first.
//
// TODO: remove this implementation and make Future#zipWith abstract
// when we're next willing to make a binary incompatible change
flatMap(r1 => that.map(r2 => f(r1, r2)))(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic)
}

/** Creates a new future which holds the result of this future if it was completed successfully, or, if not,
* the result of the `that` future if `that` is completed successfully.
Expand Down
36 changes: 36 additions & 0 deletions src/library/scala/concurrent/impl/Promise.scala
Expand Up @@ -130,6 +130,42 @@ private[concurrent] object Promise {
override final def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] =
dispatchOrAddCallbacks(get(), new Transformation[T, S](Xform_transformWith, f, executor))

override final def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = {
val state = get()
if (state.isInstanceOf[Try[T]]) {
if (state.asInstanceOf[Try[T]].isFailure) this.asInstanceOf[Future[R]]
else {
val l = state.asInstanceOf[Success[T]].get
that.map(r => f(l, r))
}
} else {
val buffer = new AtomicReference[Success[Any]]()
val zipped = new DefaultPromise[R]()

val thisF: Try[T] => Unit = {
case left: Success[T] =>
val right = buffer.getAndSet(left).asInstanceOf[Success[U]]
if (right ne null)
zipped.tryComplete(try Success(f(left.get, right.get)) catch { case e if NonFatal(e) => Failure(e) })
case f => // Can only be Failure
zipped.tryComplete(f.asInstanceOf[Failure[R]])
}

val thatF: Try[U] => Unit = {
case right: Success[U] =>
val left = buffer.getAndSet(right).asInstanceOf[Success[T]]
if (left ne null)
zipped.tryComplete(try Success(f(left.get, right.get)) catch { case e if NonFatal(e) => Failure(e) })
case f => // Can only be Failure
zipped.tryComplete(f.asInstanceOf[Failure[R]])
}
// Cheaper than this.onComplete since we already polled the state
this.dispatchOrAddCallbacks(state, new Transformation[T, Unit](Xform_onComplete, thisF, executor))
that.onComplete(thatF)
zipped.future
}
}

override final def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = {
val state = get()
if (!state.isInstanceOf[Failure[T]]) dispatchOrAddCallbacks(state, new Transformation[T, Unit](Xform_foreach, f, executor))
Expand Down
1 change: 0 additions & 1 deletion test/files/jvm/future-spec/FutureTests.scala
Expand Up @@ -146,7 +146,6 @@ class FutureTests extends MinimalScalaTest {
assert( ECNotUsed(ec => f.flatMap(_ => fail("flatMap should not have been called"))(ec)) eq f)
assert( ECNotUsed(ec => f.filter(_ => fail("filter should not have been called"))(ec)) eq f)
assert( ECNotUsed(ec => f.collect({ case _ => fail("collect should not have been called")})(ec)) eq f)
assert( ECNotUsed(ec => f.zipWith(f)({ (_,_) => fail("zipWith should not have been called")})(ec)) eq f)
}
}

Expand Down
38 changes: 38 additions & 0 deletions test/junit/scala/concurrent/FutureTest.scala
Expand Up @@ -6,8 +6,46 @@ import org.junit.Test

import scala.tools.testkit.AssertUtil._
import scala.util.Try
import duration.Duration.Inf

class FutureTest {
@Test
def testZipWithFailFastBothWays(): Unit = {
import ExecutionContext.Implicits.global

val p1 = Promise[Int]()
val p2 = Promise[Int]()

// Make sure that the combined future fails early, after the earlier failure occurs, and does not
// wait for the later failure regardless of which one is on the left and which is on the right
p1.failure(new Exception("Boom Early"))
val f1 = p1.future
val f2 = p2.future

val scala.util.Failure(fa) = Try(Await.result(f1.zip(f2), Inf))
val scala.util.Failure(fb) = Try(Await.result(f2.zip(f1), Inf))

val scala.util.Failure(fc) = Try(Await.result(f1.zipWith(f2)((_, _)), Inf))
val scala.util.Failure(fd) = Try(Await.result(f2.zipWith(f1)((_, _)), Inf))

val scala.util.Failure(fe) = Try(Await.result(Future.sequence(Seq(f1, f2)), Inf))
val scala.util.Failure(ff) = Try(Await.result(Future.sequence(Seq(f2, f1)), Inf))

val scala.util.Failure(fg) = Try(Await.result(Future.traverse(Seq(0, 1))(Seq(f1, f2)(_)), Inf))
val scala.util.Failure(fh) = Try(Await.result(Future.traverse(Seq(0, 1))(Seq(f1, f2)(_)), Inf))

// Make sure the early failure is always reported, regardless of whether it's on
// the left or right of the zip/zipWith/sequence/traverse
assert(fa.getMessage == "Boom Early")
assert(fb.getMessage == "Boom Early")
assert(fc.getMessage == "Boom Early")
assert(fd.getMessage == "Boom Early")
assert(fe.getMessage == "Boom Early")
assert(ff.getMessage == "Boom Early")
assert(fg.getMessage == "Boom Early")
assert(fh.getMessage == "Boom Early")
}

@Test
def `bug/issues#10513 firstCompletedOf must not leak references`(): Unit = {
val unfulfilled = Promise[AnyRef]()
Expand Down

0 comments on commit 136c821

Please sign in to comment.