Skip to content

Commit

Permalink
[SPARK-39696][CORE] Fix data race in access to TaskMetrics.externalAc…
Browse files Browse the repository at this point in the history
…cums

### What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats.

This fix of using of using `CopyOnWriteArrayList` is cherry picked from #37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here #37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as  sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

### Why are the changes needed?
The current code has a data race.

### Does this PR introduce _any_ user-facing change?
It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13.

### How was this patch tested?
This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.

Closes #40663 from eejbyfeldt/SPARK-39696.

Lead-authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Co-authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
2 people authored and HyukjinKwon committed Apr 7, 2023
1 parent ad013d3 commit 6ce0822
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.executor

import java.util.concurrent.CopyOnWriteArrayList

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}

Expand Down Expand Up @@ -262,10 +264,12 @@ class TaskMetrics private[spark] () extends Serializable {
/**
* External accumulators registered with this task.
*/
@transient private[spark] lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]]
@transient private[spark] lazy val _externalAccums = new CopyOnWriteArrayList[AccumulatorV2[_, _]]

private[spark] def externalAccums = _externalAccums.asScala

private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
externalAccums += a
_externalAccums.add(a)
}

private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums
Expand Down Expand Up @@ -331,7 +335,7 @@ private[spark] object TaskMetrics extends Logging {
tmAcc.metadata = acc.metadata
tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
} else {
tm.externalAccums += acc
tm._externalAccums.add(acc)
}
}
tm
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
Expand Up @@ -30,6 +30,7 @@ import scala.collection.mutable.{ArrayBuffer, Map}
import scala.concurrent.duration._

import com.google.common.cache.{CacheBuilder, CacheLoader}
import org.apache.logging.log4j._
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, eq => meq}
import org.mockito.Mockito.{inOrder, verify, when}
Expand Down Expand Up @@ -270,6 +271,27 @@ class ExecutorSuite extends SparkFunSuite
heartbeatZeroAccumulatorUpdateTest(false)
}

test("SPARK-39696: Using accumulators should not cause heartbeat to fail") {
val conf = new SparkConf().setMaster("local").setAppName("executor suite test")
conf.set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1ms")
sc = new SparkContext(conf)

val accums = (1 to 10).map(i => sc.longAccumulator(s"mapperRunAccumulator$i"))
val input = sc.parallelize(1 to 10, 10)
var testRdd = input.map(i => (i, i))
(0 to 10).foreach( i =>
testRdd = testRdd.map(x => { accums.foreach(_.add(1)); (x._1 * i, x._2) }).reduceByKey(_ + _)
)

val logAppender = new LogAppender("heartbeat thread should not die")
withLogAppender(logAppender, level = Some(Level.ERROR)) {
val _ = testRdd.count()
}
val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
.filter(_.contains("Uncaught exception in thread executor-heartbeater"))
assert(logs.isEmpty)
}

private def withMockHeartbeatReceiverRef(executor: Executor)
(func: RpcEndpointRef => Unit): Unit = {
val executorClass = classOf[Executor]
Expand Down

0 comments on commit 6ce0822

Please sign in to comment.