New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix asymmetric failure behavior of Future#{zip,zipWith,traverse,sequence}
by making them fail fast regardless of ordering
#9655
Conversation
7145170
to
90c91a4
Compare
/rebuild |
case scala.util.Success(v) => | ||
val isFirst = buffer.compareAndSet(null, Left(v)) | ||
if (!isFirst) { | ||
outPromise.success(f(v, buffer.get.getOrElse(throw new Exception("CANT HAPPEN")))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if f
throws here then that exception will not propagate to outPromise
.
case scala.util.Success(v) => | ||
val isFirst = buffer.compareAndSet(null, Right(v)) | ||
if (!isFirst) { | ||
outPromise.success(f(buffer.get.swap.getOrElse(throw new Exception("CANT HAPPEN")), v)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if f
throws here then that exception will not propagate to outPromise
.
dd0ba1d
to
1c3db52
Compare
@viktorklang I wrapped the calls to I also converted the |
Relates to scala/bug#8994 |
@viktorklang I ran Numbers on this PR:
Numbers on 2.13.x
Not quite sure what to make of this, or what the various |
post = Futures are completed post-operation (pool) is what thread pool is being used. fjp = ForkJoinPool, fix = a fixed-size ThreadPool, fie = Future.InternalExecutionContext (parasitic), gbl = ExecutionContext.global TBH those performance degradations are substantial, some of them are almost 30% of the original performance numbers. |
Here's my latest benchmarks:
Better than 2.13.x on some benchmarks, slower than on others. I'm not sure how much faster I'll be able to micro-optimize it, but if anyone has more suggestions happy to hear them. This is after unboxing the Otherwise, it is what it is. IMO the question of symmetry v.s. asymmetry of error reporting behavior is a correctness issue that takes precedence over micro-optimization overheads, in which case this is more a question of "how fast can we make it" rather than "should we do it or not" |
@lihaoyi From a semantics PoV, the scaladoc says:
Now, changing so that As for performance, I think it should be possible to implement some optimizations in impl/Promise.scala (DefaultPromise) in an override. |
That's a good point; the scaladoc does hint at the left future taking precedence for fail-fast behavior, but it's not really explicit about it. Agreed that this is a behavioral change, as I mentioned in the PR description. It's up to the maintainers to decide what to do with it: get it in now, keep it for some future major version bump where such a behavioral change may be more palatable, or not do it at all. |
I think this makes sense since
|
Ah, no this is not true. The So this LGTM, but I'd like for more people to think about it. |
I'm not sure if this is the way I would put it.
Yeah, the fact these operators complete as soon as the left-most input fails is what I find surprising. I expect most people would expect it to be symmetric, so either "fail as soon as first future fails (time-wise)" or "complete after all futures complete" like how the success behavior works. I don't expect most people were aware that the left-most inputs are fail-fast while the right-most inputs are fail-slow; I know I didn't, it seems @lrytz didn't either, and I suspect if we poll people who've been using Scala for years they mostly won't know. @sjrd in the gitter channel said he would expect the new behavior. @viktorklang is right that this is a behavioral change, so we should tread carefully; people could be depending on this behavior accidentally whether they were aware of it or not. Despite that, I do think that the new behavior is sufficiently more "correct" that it's worth considering, and that changing this behavior has as much potential to fix old bugs as it does to create new ones. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m in favor of this behavioral change.
val buffer = new AtomicReference[AnyRef](sentinel) | ||
val outPromise = Promise[R]() | ||
|
||
val myExecutor = if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lihaoyi This is wrong—it would make f
evaluated on the current thread if this
or that
has already been completed when the callback is added. It must be executed on the passed in EC. If you look at the default implementation of zipWith
(which this PR is replacing) you can see that the choice of doing if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic
only applies to the flatMap
and not the map
: https://github.com/scala/scala/pull/9655/files#diff-1d7e8b347eefe51392356ff23a3a480317df5d42fc10d12336c344bd1407dc75L417
val prevValue = buffer.getAndSet(v.asInstanceOf[AnyRef]) | ||
if (sentinel ne prevValue) { | ||
val oldValue = prevValue.asInstanceOf[U] | ||
outPromise.complete(try Success(f(v, oldValue)) catch{case e: Exception => Failure(e)}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need case e if NonFatal(e) =>
.
val prevValue = buffer.getAndSet(v.asInstanceOf[AnyRef]) | ||
if (sentinel ne prevValue) { | ||
val oldValue = prevValue.asInstanceOf[T] | ||
outPromise.complete(try Success(f(oldValue, v)) catch{case e: Exception => Failure(e)}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need case e if NonFatal(e) =>
.
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = | ||
flatMap(r1 => that.map(r2 => f(r1, r2)))(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic) | ||
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = { | ||
val sentinel = new Object() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sentinel is not needed if you use the logic I proposed in https://github.com/scala/scala/pull/9655/files#r645659226 and thus saves an allocation.
Updated in response to feedback. Also removed two asserts from |
val prevValue = buffer.getAndSet(s) | ||
if (prevValue ne null) { | ||
val oldValue = prevValue.get.asInstanceOf[U] | ||
outPromise.complete(try Success(f(s.get, oldValue)) catch{case NonFatal(e) => Failure(e)}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the extractor for NonFatal is not optimal for performance, so use the guard: catch { case e if NonFatal(e) => Failure(e) }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, I changed it to a guard
I purposefully didn’t do that because we cannot make assumptions w.r.t. the
performance of calling ’value’ on a generic Future, in the DefaultPromuse
implementation it needs to allocate a Some. Allocating a Tuple2 and making
additional type tests and extractors doesn’t make much sense given that
we’ll call ’map’ if we are a Success already, and if you look at the
implementation of ’map’ for DefaultPromuse you can see that it immediate
returns itself if it is already Failure which will be much faster that
allocating Tuples and running extractors. For a generic Future it must be
presumed that ’map’ is a very common operation.--
Cheers,
√
|
ok, happy to go with your code then |
Updated the PR to use @viktorklang's suggested implementation, and made the test suite use |
@lihaoyi I think you need to add the following to make MIMA happy: diff --git project/MimaFilters.scala project/MimaFilters.scala
index 0b35213fff..34ba4e3d3c 100644
--- project/MimaFilters.scala
+++ project/MimaFilters.scala
@@ -39,6 +39,9 @@ object MimaFilters extends AutoPlugin {
ProblemFilters.exclude[DirectMissingMethodProblem]("scala.Predef#SeqCharSequence.isEmpty"),
ProblemFilters.exclude[DirectMissingMethodProblem]("scala.Predef#ArrayCharSequence.isEmpty"),
ProblemFilters.exclude[DirectMissingMethodProblem]("scala.runtime.ArrayCharSequence.isEmpty"),
+
+ // this is an internal class and adding a final override here should not be a problem
+ ProblemFilters.exclude[FinalMethodProblem]("scala.concurrent.impl.Promise#DefaultPromise.zipWith")
)
override val buildSettings = Seq( |
Cool lemme try that |
The timing-based test case is a bit clunky, but I suppose we are testing timing/ordering-based things, so hopefully it'll be stable enough. The other alternative is to rewrite the test using raw @@ -13,9 +13,14 @@ class FutureTest {
def testZipWithFailFastBothWays(): Unit = {
import ExecutionContext.Implicits.global
- val start = System.nanoTime()
- val f1 = Future{Thread.sleep(1000); throw new Exception("Boom Late"); 123}
- val f2 = Future{Thread.sleep(200); throw new Exception("Boom Early"); 456}
+ val p1 = Promise[Int]()
+ val p2 = Promise[Int]()
+
+ // Make sure that the combined future fails early, after the earlier failure occurs, and does not
+ // wait for the later failure regardless of which one is on the left and which is on the right
+ p1.failure(new Exception("Boom Early"))
+ val f1 = p1.future
+ val f2 = p2.future
val scala.util.Failure(fa) = Try(Await.result(f1.zip(f2), Inf))
val scala.util.Failure(fb) = Try(Await.result(f2.zip(f1), Inf))
@@ -29,7 +34,6 @@ class FutureTest {
val scala.util.Failure(fg) = Try(Await.result(Future.traverse(Seq(0, 1))(Seq(f1, f2)(_)), Inf))
val scala.util.Failure(fh) = Try(Await.result(Future.traverse(Seq(0, 1))(Seq(f1, f2)(_)), Inf))
- val end = System.nanoTime()
// Make sure the early failure is always reported, regardless of whether it's on
// the left or right of the zip/zipWith/sequence/traverse
assert(fa.getMessage == "Boom Early")
@@ -40,11 +44,6 @@ class FutureTest {
assert(ff.getMessage == "Boom Early")
assert(fg.getMessage == "Boom Early")
assert(fh.getMessage == "Boom Early")
-
- // Make sure that it fails early, after the earlier failure occurs, and does not wait
- // for the later failure regardless of which one is on the left and which is on the right
- assert(end - start > 150 * 1000 * 1000, (end, start))
- assert(end - start < 950 * 1000 * 1000, (end, start))
}
@Test @viktorklang what do you think of this modified test case? I'd be happy to go with either |
@lihaoyi I think it's great to avoid the timing component and focus on the order-component as it is going to make for a much more robust test case. Great work! |
44eccc3
to
136c821
Compare
Ok, I swapped in the promise-based test case, squashed everything, and copied the PR description into the commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise
@@ -413,8 +413,14 @@ trait Future[+T] extends Awaitable[T] { | |||
* @return a `Future` with the result of the application of `f` to the results of `this` and `that` | |||
* @group Transformations | |||
*/ | |||
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = | |||
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Scaladocs should be updated to reflect the relaxed specification. Also on Future.zip
.
@lrytz I have amended the two scaladocs with the following text:
|
* If either input future fails, the resulting future is failed with the same | ||
* throwable, without waiting for the other input future to complete. | ||
* | ||
* If the application of `f` throws a throwable, the resulting future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* If the application of `f` throws a throwable, the resulting future | |
* If the application of `f` throws a non-fatal throwable, the resulting future |
@viktorklang seems not, I put it back in and things pass, though I cant say I'm that familiar with what those tests are asserting |
@lihaoyi That test verifies that it does not try to schedule anything on the EC if the originating Future is already failed. |
I think this should be ready for merge unless anyone has any other comments |
…nce} by making them fail fast regardless of ordering Currently, given the following setup: ```scala val f1 = Future{Thread.sleep(10000)} val f2 = Future{Thread.sleep(2000); throw new Exception("Boom")} ``` The following two snippets exhibit different failure behavior: ```scala val fa = Await.result(f1.zip(f2). Duration.Inf) ``` ```scala val fb = Await.result(f2.zip(f1). Duration.Inf) ``` `fa` fails after 10000ms, while `fb` fails after 2000ms. Both fail with `java.lang.Exception: boom`. When zipping two `Future`s together, if the left `Future` fails early, the zipped `Future` fails early. But if the right `Future` fails early, the zipped `Future` waits until the right `Future` completes before failing. `traverse` and `sequence` are similarly implemented with `zipWith` and should exhibit the same behavior. This all arises because `zipWith` is implemented using `flatMap`, which by definition asymmetric due to waiting fo the left `Future` to complete before even considering the right `Future`. The current behavior makes the failure behavior of `Future`s most unpredictable; in general nobody pays attention to the order of `Future`s when zipping them together, and thus whether a `zipWith`ed/`zip`ed/`traverse`d/`sequence`d `Future` fails early or not is entirely arbitrary. This PR replaces the implementation of `zipWith`, turning it from `flatMap`-based to `Promise`-based, so that when a `Future` fails early, regardless of whether it's the left or right `Future`, the resultant `Future` will fail immediately. Implementation-wise I'm using an `AtomicReference` and `compareAndSet`, which should give us the behavior we want without any locking. It may well be possible to achieve with even less overhead, e.g. using only `volatile`s or even using no concurrency controls at all, but I couldn't come up with anything better. If anyone has a better solution I'll happily include it. This fix would apply to all of `zip`/`zipWith`/`traverse`/`sequence`, since they all are implemented on top of `zipWith` While it is possible that someone could be relying on the left-biased nature of current `zip`/`zipWith`/`traverse`/`sequence` implementation, but it seems like something that's unlikely to be reliable enough to depend upon. In my experience people generally aren't aware that `zipWith`/`zip`/`traverse`/`sequence`, and they don't generally know the total ordering of how long their Futures take to run. That means status quo behavior would just result in some `Future` fails mysterious taking longer to report for no clear reason. Notably, the biased nature of these operators is not documented in any of their scaladoc comments. While there is a non-zero chance that somebody could be intentionally or unintentionally depending on the biased nature of these combinators, there is a much greater chance that someone unaware of the current bias would be puzzled why their highly-concurrent system seems to be taking longer than expected in certain scenarios. It seems likely that this PR would fix more bugs than it would introduce Note that this does not fix the left-biased fail-fast behavior of `flatMap` chains, or their equivalent `for`-comprehensions, as `flatMap`'s API is inherently left-biased. But anyone who wants fail-fast behavior can convert sections of their `flatMap` chains into `.zip`s where possible, and where not possible that's generally because there is some true data dependency between the `flatMap`s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea, and great work, Li!
Future#{zip,zipWith,traverse,sequence}
by making them fail fast regardless of ordering
Problem Statement
Currently, given the following setup:
The following two snippets exhibit different failure behavior:
fa
fails after 10000ms, whilefb
fails after 2000ms. Both fail withjava.lang.Exception: boom
.When zipping two
Future
s together, if the leftFuture
fails early, the zippedFuture
fails early. But if the rightFuture
fails early, the zippedFuture
waits until the rightFuture
completes before failing.traverse
andsequence
are similarly implemented withzipWith
and should exhibit the same behavior. This all arises becausezipWith
is implemented usingflatMap
, which by definition asymmetric due to waiting fo the leftFuture
to complete before even considering the rightFuture
.The current behavior makes the failure behavior of
Future
s most unpredictable; in general nobody pays attention to the order ofFuture
s when zipping them together, and thus whether azipWith
ed/zip
ed/traverse
d/sequence
dFuture
fails early or not is entirely arbitrary.Fix
This PR replaces the implementation of
zipWith
, turning it fromflatMap
-based toPromise
-based, so that when aFuture
fails early, regardless of whether it's the left or rightFuture
, the resultantFuture
will fail immediately.Implementation-wise I'm using an
AtomicReference
andcompareAndSet
, which should give us the behavior we want without any locking. It may well be possible to achieve with even less overhead, e.g. using onlyvolatile
s or even using no concurrency controls at all, but I couldn't come up with anything better. If anyone has a better solution I'll happily include it.This fix would apply to all of
zip
/zipWith
/traverse
/sequence
, since they all are implemented on top ofzipWith
Considerations
While it is possible that someone could be relying on the left-biased nature of current
zip
/zipWith
/traverse
/sequence
implementation, but it seems like something that's unlikely to be reliable enough to depend upon. In my experience people generally aren't aware thatzipWith
/zip
/traverse
/sequence
, and they don't generally know the total ordering of how long their Futures take to run. That means status quo behavior would just result in someFuture
fails mysterious taking longer to report for no clear reason.Notably, the biased nature of these operators is not documented in any of their scaladoc comments.
While there is a non-zero chance that somebody could be intentionally or unintentionally depending on the biased nature of these combinators, there is a much greater chance that someone unaware of the current bias would be puzzled why their highly-concurrent system seems to be taking longer than expected in certain scenarios. It seems likely that this PR would fix more bugs than it would introduce
Note that this does not fix the left-biased fail-fast behavior of
flatMap
chains, or their equivalentfor
-comprehensions, asflatMap
's API is inherently left-biased. But anyone who wants fail-fast behavior can convert sections of theirflatMap
chains into.zip
s where possible, and where not possible that's generally because there is some true data dependency between theflatMap
s