Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
lihaoyi committed Jun 4, 2021
1 parent 36e218f commit a0e2fbc
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions src/library/scala/concurrent/Future.scala
Expand Up @@ -413,8 +413,29 @@ trait Future[+T] extends Awaitable[T] {
* @return a `Future` with the result of the application of `f` to the results of `this` and `that`
* @group Transformations
*/
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] =
flatMap(r1 => that.map(r2 => f(r1, r2)))(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic)
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = {
val buffer = new AtomicReference[Either[T, U]]()
val outPromise = Promise[R]()
this.onComplete{
case scala.util.Success(v) =>
val isFirst = buffer.compareAndSet(null, Left(v))
if (!isFirst) {
outPromise.success(f(v, buffer.get.getOrElse(throw new Exception("CANT HAPPEN"))))
}

case scala.util.Failure(e) => outPromise.failure(e)
}
that.onComplete{
case scala.util.Success(v) =>
val isFirst = buffer.compareAndSet(null, Right(v))
if (!isFirst) {
outPromise.success(f(buffer.get.swap.getOrElse(throw new Exception("CANT HAPPEN")), v))
}

case scala.util.Failure(e) => outPromise.failure(e)
}
outPromise.future
}

/** Creates a new future which holds the result of this future if it was completed successfully, or, if not,
* the result of the `that` future if `that` is completed successfully.
Expand Down

0 comments on commit a0e2fbc

Please sign in to comment.