Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47910][CORE] close stream when DiskBlockObjectWriter closeResources to avoid memory leak #46131

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

JacobZheng0927
Copy link
Contributor

@JacobZheng0927 JacobZheng0927 commented Apr 19, 2024

What changes were proposed in this pull request?

close stream when DiskBlockObjectWriter closeResources to avoid memory leak

Why are the changes needed?

SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. This meant that all usages of CompressionCodec.compressedOutputStream would need to manually close the stream as this would no longer be handled by the finalizer mechanism.
When using zstd for shuffle write compression, if for some reason the execution of this process is interrupted(eg. enable spark.sql.execution.interruptOnCancel and cancel Job). The memory used by ZstdInputStreamNoFinalizer may not be freed, causing a memory leak.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Spark Shell Configuration

$> export SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch -Xms1g"
$> $SPARK_HOME/bin/spark-shell --conf spark.io.compression.codec=zstd

Test Script

import java.util.concurrent.TimeUnit
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random

sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
(1 to 50).foreach { batch => {
  val jobA = Future {
    val df1 = spark.range(2000000).map { _ =>(Random.nextString(20),Random.nextInt(1000),Random.nextInt(1000),Random.nextInt(10))}.toDF("a","b","c","d")
    val df2 = spark.range(2000000).map { _ =>(Random.nextString(20),Random.nextInt(1000),Random.nextInt(1000),Random.nextInt(10))}.toDF("a","b","c","d")
    df1.join(df2,"b").show()  }
  Thread.sleep(5000)
  sc.cancelJobGroup("jobA")
}}

Memory Monitor

$> while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x <PID> | grep "total kB" | awk '{print $4}'); sleep 10; done;

Results

Before
"2024-05-13 16:54:23",1332384
"2024-05-13 16:54:33",1417112
"2024-05-13 16:54:43",2211684
"2024-05-13 16:54:53",3060820
"2024-05-13 16:55:03",3850444
"2024-05-13 16:55:14",4631744
"2024-05-13 16:55:24",5317200
"2024-05-13 16:55:34",6019464
"2024-05-13 16:55:44",6489180
"2024-05-13 16:55:54",7255548
"2024-05-13 16:56:05",7718728
"2024-05-13 16:56:15",8388392
"2024-05-13 16:56:25",8927636
"2024-05-13 16:56:36",9473412
"2024-05-13 16:56:46",10000380
"2024-05-13 16:56:56",10344024
"2024-05-13 16:57:07",10734204
"2024-05-13 16:57:17",11211900
"2024-05-13 16:57:27",11665524
"2024-05-13 16:57:38",12268976
"2024-05-13 16:57:48",12896264
"2024-05-13 16:57:58",13572244
"2024-05-13 16:58:09",14252416
"2024-05-13 16:58:19",14915560
"2024-05-13 16:58:30",15484196
"2024-05-13 16:58:40",16170324
After
"2024-05-13 16:35:44",1355428
"2024-05-13 16:35:54",1391028
"2024-05-13 16:36:04",1673720
"2024-05-13 16:36:14",2103716
"2024-05-13 16:36:24",2129876
"2024-05-13 16:36:35",2166412
"2024-05-13 16:36:45",2177672
"2024-05-13 16:36:55",2188340
"2024-05-13 16:37:05",2190688
"2024-05-13 16:37:15",2195168
"2024-05-13 16:37:26",2199296
"2024-05-13 16:37:36",2228052
"2024-05-13 16:37:46",2238104
"2024-05-13 16:37:56",2260624
"2024-05-13 16:38:06",2307184
"2024-05-13 16:38:16",2331140
"2024-05-13 16:38:27",2323388
"2024-05-13 16:38:37",2357552
"2024-05-13 16:38:47",2352948
"2024-05-13 16:38:57",2364744
"2024-05-13 16:39:07",2368528
"2024-05-13 16:39:18",2385492
"2024-05-13 16:39:28",2389184
"2024-05-13 16:39:38",2388060
"2024-05-13 16:39:48",2388336
"2024-05-13 16:39:58",2386916

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Apr 19, 2024
@HyukjinKwon HyukjinKwon changed the title SPARK-47910; close stream when DiskBlockObjectWriter closeResources to avoid memory leak [SPARK-47910][CORE] close stream when DiskBlockObjectWriter closeResources to avoid memory leak Apr 19, 2024
@JacobZheng0927
Copy link
Contributor Author

cc @dongjoon-hyun
This is a similar to #35613. Please take a look, thanks!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making a PR, @JacobZheng0927 .

However, your PR fails to compile. Please make GitHub Action CI green.

[error] (core / Compile / compileIncremental) Compilation failed

@JacobZheng0927
Copy link
Contributor Author

Thank you for making a PR, @JacobZheng0927 .

However, your PR fails to compile. Please make GitHub Action CI green.

[error] (core / Compile / compileIncremental) Compilation failed

Done.

}
}
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Redundant empty line. Please remove this.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like #35613, do you think you can provide a way to validate your PR, @JacobZheng0927 ?

} catch {
case e: IOException =>
logError(log"Exception occurred while closing the output stream" +
log"${MDC(ERROR, e.getMessage)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error -> info
and the exception stack trace would be useful

Comment on lines 180 to 183
objOut = closeIfNonNull(objOut)
bs = null
} {
bs = closeIfNonNull(bs)
Copy link
Contributor

@mridulm mridulm Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
objOut = closeIfNonNull(objOut)
bs = null
} {
bs = closeIfNonNull(bs)
if (null != objOut) objOut.close()
bs = null
} {
objOut = null
if (null != bs) bs.close()
bs = null

And remove closeIfNonNull

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I'll fix it.

@JacobZheng0927
Copy link
Contributor Author

Like #35613, do you think you can provide a way to validate your PR, @JacobZheng0927 ?

Ok, I'll try to reproduce the problem with a simple script.

@JacobZheng0927
Copy link
Contributor Author

I apologize for the long delay in updating. I've just added the steps for reproduction, please take a look. @dongjoon-hyun @mridulm

@mridulm
Copy link
Contributor

mridulm commented May 13, 2024

@JacobZheng0927, please add it as a unit test.

@JacobZheng0927
Copy link
Contributor Author

@JacobZheng0927, please add it as a unit test.

I'm not sure how to test for native memory leak cases in unit tests. is there a relevant example I can refer to?

@mridulm
Copy link
Contributor

mridulm commented May 23, 2024

One way I can quickly think of is to check if objOut.close() or bs.close() is being called or not.
For example, adapt the "calling closeAndDelete() on a partial write file" test and use either a custom serializer or a custom compression codec to check for close being invoked ?

(Sorry for the delay in getting back to you - this PR dropped off my todo list unfortunately)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants