Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
lihaoyi committed Jun 4, 2021
1 parent 36e218f commit 1c3db52
Showing 1 changed file with 25 additions and 2 deletions.
27 changes: 25 additions & 2 deletions src/library/scala/concurrent/Future.scala
Expand Up @@ -413,8 +413,31 @@ trait Future[+T] extends Awaitable[T] {
* @return a `Future` with the result of the application of `f` to the results of `this` and `that`
* @group Transformations
*/
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] =
flatMap(r1 => that.map(r2 => f(r1, r2)))(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic)
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = {
val buffer = new AtomicReference[Either[T, U]]()
val outPromise = Promise[R]()
this.onComplete{
case scala.util.Success(v) =>
val isFirst = buffer.compareAndSet(null, Left(v))
if (!isFirst) {
val oldValue = buffer.get.getOrElse(throw new Exception("CANT HAPPEN"))
outPromise.complete(Try(f(v, oldValue)))
}

case scala.util.Failure(e) => outPromise.tryFailure(e)
}
that.onComplete{
case scala.util.Success(v) =>
val isFirst = buffer.compareAndSet(null, Right(v))
if (!isFirst) {
val oldValue = buffer.get.swap.getOrElse(throw new Exception("CANT HAPPEN"))
outPromise.complete(Try(f(oldValue, v)))
}

case scala.util.Failure(e) => outPromise.tryFailure(e)
}
outPromise.future
}

/** Creates a new future which holds the result of this future if it was completed successfully, or, if not,
* the result of the `that` future if `that` is completed successfully.
Expand Down

0 comments on commit 1c3db52

Please sign in to comment.