Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39696][CORE] Fix data race in access to TaskMetrics.externalAccums #40663

Closed
wants to merge 5 commits into from

Conversation

eejbyfeldt
Copy link
Contributor

@eejbyfeldt eejbyfeldt commented Apr 4, 2023

What changes were proposed in this pull request?

This PR fixes a data race around concurrent access to TaskMetrics.externalAccums. The race occurs between the executor-heartbeater thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 due this change scala/scala#9258 (@LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the executor-heartbeater thread, which means that the executor will eventually be terminated due to missing hearbeats.

This fix of using of using CopyOnWriteArrayList is cherry picked from #37206 where is was suggested as a fix by @LuciferYang since TaskMetrics.externalAccums is also accessed from outside the class TaskMetrics. The old PR was closed because at that point there was no clear understanding of the race condition. @JoshRosen commented here #37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here:

task = ser.deserialize[Task[Any]](
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here:
val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as sql metrics will also be externalAccums. One way metrics will be sent as part of the taskBinary is when the dep is a ShuffleDependency:
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
with a ShuffleWriteProcessor that comes from
/**
* Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the default metrics reporter
* with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]].
*/
def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = {
new ShuffleWriteProcessor {
override protected def createMetricsReporter(
context: TaskContext): ShuffleWriteMetricsReporter = {
new SQLShuffleWriteMetricsReporter(context.taskMetrics().shuffleWriteMetrics, metrics)
}
}
}

Why are the changes needed?

The current code has a data race.

Does this PR introduce any user-facing change?

It will fix an uncaught exception in the executor-hearbeater thread when using scala 2.13.

How was this patch tested?

This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.

@github-actions github-actions bot added the CORE label Apr 4, 2023
@eejbyfeldt eejbyfeldt marked this pull request as ready for review April 5, 2023 12:03
@eejbyfeldt eejbyfeldt changed the title [SPARK-39696][CORE][WIP] Test case showing race in access to TaskMetrics.externalAccums [SPARK-39696][CORE] Test case showing race in access to TaskMetrics.externalAccums Apr 5, 2023
@eejbyfeldt
Copy link
Contributor Author

Ping @LuciferYang @JoshRosen that previously discussed this issue in #37206 let me know if would be a better approach to reopen the old PR rather then creating a new one. And/or if my description of the data race does not sounds accurate to you.

@eejbyfeldt eejbyfeldt changed the title [SPARK-39696][CORE] Test case showing race in access to TaskMetrics.externalAccums [SPARK-39696][CORE] Fix data race in access to TaskMetrics.externalAccums Apr 5, 2023
@LuciferYang
Copy link
Contributor

It's ok to me to further discussion in this one

@LuciferYang
Copy link
Contributor

I found that before Scala 2.13.6(include) seems no this issue and the new test will failed after 2.13.7.

@eejbyfeldt I am not sure if this is caused by change of scala/scala#9258, as it has been added to Scala 2.13.4.

also cc @srowen @mridulm and @xinrong-meng

@eejbyfeldt
Copy link
Contributor Author

eejbyfeldt commented Apr 5, 2023

I found that before Scala 2.13.6(include) seems no this issue and the new test will failed after 2.13.7.

@eejbyfeldt I am not sure if this is caused by change of scala/scala#9258, as it has been added to Scala 2.13.4.

Thanks for looking in to that closer. Should probably formulated myself more clearly that was only a guess on my part and not something I had verified. But now that you narrowed it down to 2.13.7 another possible candidate change could be scala/scala#9786 that changed how that mutations are tracked in ArrayBuffer.

EDIT: Even if scala/scala#9258 was tagged milestone 2.13.4 at some point it looks to me like it actually landed in 2.13.7 with this commit scala/scala@5f25002

@mridulm
Copy link
Contributor

mridulm commented Apr 6, 2023

This makes sense.
The TaskRunner is visible to heartbeater since it gets added to runningTasks before the task binary is deserialized.
During TaskRunner.run, we are registering the accumulators as part of the deserialization - which can result in race with the heartbearter reading the metrics.

So this is essentially a bug waiting to happen for two reasons:

  • externalAccums is getting used in an MT-unsafe way by the task thread and the reporter thread.
  • This is broken in 2.12 as well - we were just not aware of it.
    • There always was potential visibility issues in heartbeat for external accumulators.

Note, the task itself is visible only after the deserialization is complete, since it is volatile - but the registration is not covered by it.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple of minor nits, nice job chasing this down @eejbyfeldt !

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang
Copy link
Contributor

@eejbyfeldt unused-imports check failed, please fix it

[error] /home/runner/work/spark/spark/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala:48:41: Unused import
[error] import org.apache.spark.internal.config.Network
[error]                                         ^
[error] one error found

@LuciferYang
Copy link
Contributor

I found that before Scala 2.13.6(include) seems no this issue and the new test will failed after 2.13.7.

@eejbyfeldt I am not sure if this is caused by change of scala/scala#9258, as it has been added to Scala 2.13.4.

also cc @srowen @mridulm and @xinrong-meng

also cc @dongjoon-hyun due to Spark 3.2.x also use Scala 2.13.8 and maybe Spark 3.2.4 should include this one

Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM , pending ci

@HyukjinKwon
Copy link
Member

Merged to master and branch-3.4.

HyukjinKwon pushed a commit that referenced this pull request Apr 7, 2023
…cums

### What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats.

This fix of using of using `CopyOnWriteArrayList` is cherry picked from #37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here #37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as  sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

### Why are the changes needed?
The current code has a data race.

### Does this PR introduce _any_ user-facing change?
It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13.

### How was this patch tested?
This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.

Closes #40663 from eejbyfeldt/SPARK-39696.

Lead-authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Co-authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 6ce0822)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@LuciferYang
Copy link
Contributor

@HyukjinKwon branch-3.3 and branch-3.2 may also require this one, they also use Scala 2.13.8, need @eejbyfeldt to submit an independent PRs?

dongjoon-hyun pushed a commit that referenced this pull request Apr 7, 2023
…cums

### What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats.

This fix of using of using `CopyOnWriteArrayList` is cherry picked from #37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here #37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as  sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

### Why are the changes needed?
The current code has a data race.

### Does this PR introduce _any_ user-facing change?
It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13.

### How was this patch tested?
This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.

Closes #40663 from eejbyfeldt/SPARK-39696.

Lead-authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Co-authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 6ce0822)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit b2ff4c4)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@dongjoon-hyun
Copy link
Member

Thank you, @eejbyfeldt , @mridulm , @HyukjinKwon , and @LuciferYang .
I also backported to branch-3.3.

However, we need a new PR for branch-3.2 due to the compilation error, @LuciferYang and @eejbyfeldt .

[error] /Users/dongjoon/APACHE/spark-merge/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala:33:19: object logging is not a member of package org.apache
[error] import org.apache.logging.log4j._

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Apr 7, 2023

To @LuciferYang and all.

After double-checking, I found that Apache Spark 3.2.x is not affected because it uses Scala 2.13.5.

spark/pom.xml

Line 3389 in 7773740

<scala.version>2.13.5</scala.version>

SPARK-35496 (Scala 2.13.7) landed at Apache Spark 3.3.0+. We don't need to backport this to branch-3.2.

@dongjoon-hyun
Copy link
Member

Please let me know if this is still valid in branch-3.2.

branch-3.3 and branch-3.2 may also require this one, they also use Scala 2.13.8

@LuciferYang
Copy link
Contributor

@dongjoon-hyun Scala 2.13.5 does not require this fix. I apologize for providing incorrect information earlier

@dongjoon-hyun
Copy link
Member

No problem at all. Thank you always, @LuciferYang !

@mridulm
Copy link
Contributor

mridulm commented Apr 8, 2023

Thanks for checking @dongjoon-hyun and @LuciferYang !
Great to finally have this issue fixed :-)

snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
…cums

### What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats.

This fix of using of using `CopyOnWriteArrayList` is cherry picked from apache#37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here apache#37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as  sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

### Why are the changes needed?
The current code has a data race.

### Does this PR introduce _any_ user-facing change?
It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13.

### How was this patch tested?
This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.

Closes apache#40663 from eejbyfeldt/SPARK-39696.

Lead-authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Co-authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 6ce0822)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants