Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ArrayBuffer's iterator fail fast when buffer is mutated #9258

Merged
merged 5 commits into from Aug 30, 2021

Conversation

NthPortal
Copy link
Contributor

@NthPortal NthPortal commented Oct 22, 2020

Make ArrayBuffer's iterator fail-fast when the buffer is mutated after the iterator's creation.

Fix inserting an ArrayBuffer into itself (scala/bug#12121).

Partially addresses scala/bug#12121 and scala/bug#12009

@NthPortal NthPortal added the library:collections PRs involving changes to the standard collection library label Oct 22, 2020
@scala-jenkins scala-jenkins added this to the 2.13.4 milestone Oct 22, 2020
@NthPortal
Copy link
Contributor Author

I wonder if these commits from a month ago will Just Work™

@Ichoran
Copy link
Contributor

Ichoran commented Oct 22, 2020

You didn't by any chance try any benchmarks with this, did you?

@NthPortal
Copy link
Contributor Author

I don't have a good benchmarking setup, sorry

@NthPortal

This comment has been minimized.

@SethTisue
Copy link
Member

is there somewhere we've gathered together knowledge/advice on how to benchmark stuff in scala/scala?

I don't know because I also have left the benchmarking to others :-)

if no such single place exists, maybe at minimum we could start a scala-dev ticket as a place to collect knowledge

@NthPortal
Copy link
Contributor Author

NthPortal commented Oct 22, 2020

My issue is that all I have is my laptop, with the following issues:

  • it's a laptop, so it's not the most reliable in the first place, due to thermal throttling
  • it tends to have lots of applications idle/running in the background
  • I can't use it while benchmarks are running

The readme under test/benchmarks is okay (not perfect, but serviceable)

@dwijnand dwijnand modified the milestones: 2.13.4, 2.13.5 Oct 23, 2020
@dwijnand

This comment has been minimized.

@NthPortal

This comment has been minimized.

@dwijnand

This comment has been minimized.

@NthPortal

This comment has been minimized.

@NthPortal
Copy link
Contributor Author

Fixing patchInPlace for scala/bug#12121 is blocked by #9141, since it requires adding an override that returns this.type

@dwijnand dwijnand changed the title [bug#12009] Make ArrayBuffer's iterator fail-fast Make ArrayBuffer's iterator fail-fast Dec 17, 2020
@SethTisue
Copy link
Member

@julienrf you don't think we need benchmarks before merging?

@julienrf
Copy link
Contributor

@SethTisue Good point, yes benchmarks would be welcome.

@SethTisue SethTisue self-assigned this Jan 15, 2021
@SethTisue

This comment has been minimized.

@NthPortal
Copy link
Contributor Author

@SethTisue I just rearranged the commits (and did some squashing/message cleanup) so that the benchmarks came before the changes, to make benchmarking before the changes easier. I didn't actually make any meaningful changes. in retrospect, I could also have temporarily reverted the changes.

I also rebased, for cleanliness

@NthPortal
Copy link
Contributor Author

NthPortal commented Aug 20, 2021

run on Java 16, because it happens to be my default

 Benchmark                              (size)  Mode  Cnt       Score      Error  Units
-ArrayBufferBenchmark.addAll                10  avgt   30      97.711 ±    0.594  ns/op
+ArrayBufferBenchmark.addAll                10  avgt   30     100.307 ±    0.338  ns/op
-ArrayBufferBenchmark.addAll               100  avgt   30     266.845 ±    0.661  ns/op
+ArrayBufferBenchmark.addAll               100  avgt   30     265.249 ±    0.397  ns/op
-ArrayBufferBenchmark.addAll              1000  avgt   30    1690.149 ±    3.764  ns/op
+ArrayBufferBenchmark.addAll              1000  avgt   30    1712.475 ±    2.376  ns/op
-ArrayBufferBenchmark.addAll             10000  avgt   30   24878.508 ±   43.111  ns/op
+ArrayBufferBenchmark.addAll             10000  avgt   30   24931.691 ±   48.856  ns/op
-ArrayBufferBenchmark.filterInPlace         10  avgt   30      61.465 ±    0.099  ns/op
+ArrayBufferBenchmark.filterInPlace         10  avgt   30      61.529 ±    0.166  ns/op
-ArrayBufferBenchmark.filterInPlace        100  avgt   30     365.526 ±    0.842  ns/op
+ArrayBufferBenchmark.filterInPlace        100  avgt   30     377.592 ±    0.657  ns/op
-ArrayBufferBenchmark.filterInPlace       1000  avgt   30    3248.895 ±    5.395  ns/op
+ArrayBufferBenchmark.filterInPlace       1000  avgt   30    3054.808 ±   40.811  ns/op
-ArrayBufferBenchmark.filterInPlace      10000  avgt   30   37520.257 ±   62.193  ns/op
+ArrayBufferBenchmark.filterInPlace      10000  avgt   30   36584.331 ±   98.728  ns/op
-ArrayBufferBenchmark.flatMapInPlace1       10  avgt   30     292.213 ±    1.564  ns/op
+ArrayBufferBenchmark.flatMapInPlace1       10  avgt   30     293.366 ±    4.643  ns/op
-ArrayBufferBenchmark.flatMapInPlace1      100  avgt   30    2144.930 ±   17.724  ns/op
+ArrayBufferBenchmark.flatMapInPlace1      100  avgt   30    2194.724 ±    2.738  ns/op
-ArrayBufferBenchmark.flatMapInPlace1     1000  avgt   30   20215.275 ±  106.838  ns/op
+ArrayBufferBenchmark.flatMapInPlace1     1000  avgt   30   20318.600 ±   70.100  ns/op
-ArrayBufferBenchmark.flatMapInPlace1    10000  avgt   30  210170.468 ± 1116.711  ns/op
+ArrayBufferBenchmark.flatMapInPlace1    10000  avgt   30  210699.714 ±  389.451  ns/op
-ArrayBufferBenchmark.iteratorA             10  avgt   30      46.418 ±    0.087  ns/op
+ArrayBufferBenchmark.iteratorA             10  avgt   30      51.599 ±    0.116  ns/op
-ArrayBufferBenchmark.iteratorA            100  avgt   30     185.297 ±    0.470  ns/op
+ArrayBufferBenchmark.iteratorA            100  avgt   30     229.769 ±    0.669  ns/op
-ArrayBufferBenchmark.iteratorA           1000  avgt   30    1449.335 ±    3.177  ns/op
+ArrayBufferBenchmark.iteratorA           1000  avgt   30    1977.957 ±    4.647  ns/op
-ArrayBufferBenchmark.iteratorA          10000  avgt   30   17397.759 ±  151.373  ns/op
+ArrayBufferBenchmark.iteratorA          10000  avgt   30   22770.041 ±   37.288  ns/op
-ArrayBufferBenchmark.iteratorB             10  avgt   30      82.953 ±    0.259  ns/op
+ArrayBufferBenchmark.iteratorB             10  avgt   30      93.475 ±    0.242  ns/op
-ArrayBufferBenchmark.iteratorB            100  avgt   30     537.971 ±   10.782  ns/op
+ArrayBufferBenchmark.iteratorB            100  avgt   30     583.337 ±    1.681  ns/op
-ArrayBufferBenchmark.iteratorB           1000  avgt   30    4886.867 ±   31.022  ns/op
+ArrayBufferBenchmark.iteratorB           1000  avgt   30    5319.587 ±   96.622  ns/op
-ArrayBufferBenchmark.iteratorB          10000  avgt   30   48486.979 ±  153.746  ns/op
+ArrayBufferBenchmark.iteratorB          10000  avgt   30   52150.922 ±   75.137  ns/op
-ArrayBufferBenchmark.reverseIteratorA      10  avgt   30      44.654 ±    0.087  ns/op
+ArrayBufferBenchmark.reverseIteratorA      10  avgt   30      48.189 ±    0.166  ns/op
-ArrayBufferBenchmark.reverseIteratorA     100  avgt   30     170.628 ±    0.489  ns/op
+ArrayBufferBenchmark.reverseIteratorA     100  avgt   30     165.333 ±    0.456  ns/op
-ArrayBufferBenchmark.reverseIteratorA    1000  avgt   30    1332.711 ±    2.200  ns/op
+ArrayBufferBenchmark.reverseIteratorA    1000  avgt   30    1158.948 ±   11.021  ns/op
-ArrayBufferBenchmark.reverseIteratorA   10000  avgt   30   18947.001 ±  132.302  ns/op
+ArrayBufferBenchmark.reverseIteratorA   10000  avgt   30   16477.355 ±   86.881  ns/op
-ArrayBufferBenchmark.reverseIteratorB      10  avgt   30      89.263 ±    0.214  ns/op
+ArrayBufferBenchmark.reverseIteratorB      10  avgt   30      97.480 ±    0.247  ns/op
-ArrayBufferBenchmark.reverseIteratorB     100  avgt   30     504.658 ±   30.568  ns/op
+ArrayBufferBenchmark.reverseIteratorB     100  avgt   30     618.153 ±    9.257  ns/op
-ArrayBufferBenchmark.reverseIteratorB    1000  avgt   30    4033.524 ±  267.711  ns/op
+ArrayBufferBenchmark.reverseIteratorB    1000  avgt   30    5465.261 ±    7.082  ns/op
-ArrayBufferBenchmark.reverseIteratorB   10000  avgt   30   43521.916 ± 1915.392  ns/op
+ArrayBufferBenchmark.reverseIteratorB   10000  avgt   30   50848.591 ±  109.594  ns/op
-ArrayBufferBenchmark.update                10  avgt   30      41.986 ±    0.110  ns/op
+ArrayBufferBenchmark.update                10  avgt   30      42.774 ±    0.162  ns/op
-ArrayBufferBenchmark.update               100  avgt   30     192.961 ±    0.586  ns/op
+ArrayBufferBenchmark.update               100  avgt   30     209.066 ±    0.539  ns/op
-ArrayBufferBenchmark.update              1000  avgt   30    1592.343 ±    2.882  ns/op
+ArrayBufferBenchmark.update              1000  avgt   30    1736.486 ±    7.763  ns/op
-ArrayBufferBenchmark.update             10000  avgt   30   17729.414 ±   33.149  ns/op
+ArrayBufferBenchmark.update             10000  avgt   30   19053.384 ±   35.325  ns/op

for the record, I'm running on proper "desktop" hardware (rather than a laptop), and can mostly run benchmarks whenever needed. the server sits right behind me at my desk

@NthPortal
Copy link
Contributor Author

my interpretation of the benchmarks is that there is a small penalty to iterators, a very small, probably negligible effect on single-change methods (update) that's only noticeable at all when done at huge scale, and no meaningful effect on bulk-change methods. I'm personally okay with the small penalty to iterators in exchange for better safety.

I still don't understand the reverseIteratorA results

@Ichoran
Copy link
Contributor

Ichoran commented Aug 20, 2021

The desktop Java 16 numbers look more like I'd expect. Seems okay.

For the record, I do not think a ~5-10% speed penalty for traversal/indexing-heavy operations on a collection whose main purpose is to speed up traversal/indexing-heavy operations is a good tradeoff for the safety, mostly because I don't think the safety issue is remotely common. But reasonable people can disagree on this matter, and I do agree that the penalty is fairly small.

@NthPortal
Copy link
Contributor Author

NthPortal commented Aug 21, 2021

there seemed to have been less of a cost with ListBuffer than here. any ideas about what we could do differently to have less of a performance cost? should we make the iterator more bespoke, so it has direct access to the field (as Julien suggested at #9388 (comment))?

@Ichoran
Copy link
Contributor

Ichoran commented Aug 22, 2021

It seems a reasonable thing to try, but I have low confidence in any particular fix. Fairly often, improvements that seem like a clear win end up making the JIT compiler take a different optimization strategy, which kind of randomizes the performance. Not much to do but try and see.

I certainly have sped code up before by having direct monomorphic access to fields. That was one key trick in turning in good performance on some tasks in the Computer Languages Benchmark Game. But in my own code the results have been more mixed--yes, sometimes it works, but sometimes not. And I never feel motivated to dig into the details to the incredible depth that Aleksey Shipilëv does. I usually tell myself some story about "method now has more bytecode" to make myself feel better.

@NthPortal
Copy link
Contributor Author

that change did not seem to be more performant (though I forgot to stop a service running on the server first, so who knows 🤷‍♀️). regardless, it did not end up being binary compatible, so it's a moot point.

I'd be interested in getting someone else's thoughts on the performance costs. I'm obviously biased in that I wrote the code. I'm basically copying what Java did (or trying my best to), but I've no idea where to look to find the performance costws Java had from it, if they even published them.

@SethTisue
Copy link
Member

I don't have a strong opinion, but I'm inclined to go ahead and merge at this point (I mean, after allowing until Friday for feedback, say), as I feel we've already reached roughly the same point as #9174 was at when we went ahead and merged it.

@SethTisue
Copy link
Member

SethTisue commented Aug 23, 2021

@scala/collections anyone see anything here they're inclined to dig deeper into?

@SethTisue SethTisue removed their assignment Aug 25, 2021
@joroKr21
Copy link
Member

Sorry, missclick on my phone - I'm commuting hence bored and browsing GH notifications 😄

@joroKr21
Copy link
Member

joroKr21 commented Aug 30, 2021

TIL that you can disable it from the phone (on the bottom) but not set it back on the top

@joroKr21 joroKr21 merged commit 5f25002 into scala:2.13.x Aug 30, 2021
@NthPortal NthPortal deleted the topic/safe-iterators-2/PR branch August 30, 2021 19:21
@SethTisue SethTisue changed the title Make ArrayBuffer's iterator fail-fast Make ArrayBuffer's iterator fail fast when buffer is mutated Oct 29, 2021
HyukjinKwon pushed a commit to apache/spark that referenced this pull request Apr 7, 2023
…cums

### What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats.

This fix of using of using `CopyOnWriteArrayList` is cherry picked from #37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here #37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as  sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

### Why are the changes needed?
The current code has a data race.

### Does this PR introduce _any_ user-facing change?
It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13.

### How was this patch tested?
This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.

Closes #40663 from eejbyfeldt/SPARK-39696.

Lead-authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Co-authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon pushed a commit to apache/spark that referenced this pull request Apr 7, 2023
…cums

### What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats.

This fix of using of using `CopyOnWriteArrayList` is cherry picked from #37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here #37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as  sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

### Why are the changes needed?
The current code has a data race.

### Does this PR introduce _any_ user-facing change?
It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13.

### How was this patch tested?
This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.

Closes #40663 from eejbyfeldt/SPARK-39696.

Lead-authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Co-authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 6ce0822)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
dongjoon-hyun pushed a commit to apache/spark that referenced this pull request Apr 7, 2023
…cums

### What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats.

This fix of using of using `CopyOnWriteArrayList` is cherry picked from #37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here #37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as  sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

### Why are the changes needed?
The current code has a data race.

### Does this PR introduce _any_ user-facing change?
It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13.

### How was this patch tested?
This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.

Closes #40663 from eejbyfeldt/SPARK-39696.

Lead-authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Co-authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 6ce0822)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit b2ff4c4)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
…cums

### What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats.

This fix of using of using `CopyOnWriteArrayList` is cherry picked from apache#37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here apache#37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as  sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

### Why are the changes needed?
The current code has a data race.

### Does this PR introduce _any_ user-facing change?
It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13.

### How was this patch tested?
This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.

Closes apache#40663 from eejbyfeldt/SPARK-39696.

Lead-authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Co-authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 6ce0822)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
library:collections PRs involving changes to the standard collection library release-notes worth highlighting in next release notes
Projects
None yet
9 participants