New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47910][CORE] close stream when DiskBlockObjectWriter closeResources to avoid memory leak #46131
base: master
Are you sure you want to change the base?
Conversation
44a5604
to
6bb8917
Compare
cc @dongjoon-hyun |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for making a PR, @JacobZheng0927 .
However, your PR fails to compile. Please make GitHub Action CI green.
[error] (core / Compile / compileIncremental) Compilation failed
6bb8917
to
8088e75
Compare
Done. |
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Redundant empty line. Please remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like #35613, do you think you can provide a way to validate your PR, @JacobZheng0927 ?
} catch { | ||
case e: IOException => | ||
logError(log"Exception occurred while closing the output stream" + | ||
log"${MDC(ERROR, e.getMessage)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error -> info
and the exception stack trace would be useful
objOut = closeIfNonNull(objOut) | ||
bs = null | ||
} { | ||
bs = closeIfNonNull(bs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
objOut = closeIfNonNull(objOut) | |
bs = null | |
} { | |
bs = closeIfNonNull(bs) | |
if (null != objOut) objOut.close() | |
bs = null | |
} { | |
objOut = null | |
if (null != bs) bs.close() | |
bs = null |
And remove closeIfNonNull
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. I'll fix it.
Ok, I'll try to reproduce the problem with a simple script. |
e7160f8
to
2db509c
Compare
859a7f3
to
2db509c
Compare
I apologize for the long delay in updating. I've just added the steps for reproduction, please take a look. @dongjoon-hyun @mridulm |
@JacobZheng0927, please add it as a unit test. |
I'm not sure how to test for native memory leak cases in unit tests. is there a relevant example I can refer to? |
One way I can quickly think of is to check if (Sorry for the delay in getting back to you - this PR dropped off my todo list unfortunately) |
What changes were proposed in this pull request?
close stream when DiskBlockObjectWriter closeResources to avoid memory leak
Why are the changes needed?
SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. This meant that all usages of CompressionCodec.compressedOutputStream would need to manually close the stream as this would no longer be handled by the finalizer mechanism.
When using zstd for shuffle write compression, if for some reason the execution of this process is interrupted(eg. enable spark.sql.execution.interruptOnCancel and cancel Job). The memory used by
ZstdInputStreamNoFinalizer
may not be freed, causing a memory leak.Does this PR introduce any user-facing change?
No
How was this patch tested?
Spark Shell Configuration
Test Script
Memory Monitor
Results
Before
After
Was this patch authored or co-authored using generative AI tooling?
No