Skip to content

Commit 2dd1b7a

Browse files
xy2953396112RexXiong
authored andcommitted
[CELEBORN-2210] When a flushBuffer consolidation OOM exception occurs…
…, support setting the Buffer for fileInfo. ### What changes were proposed in this pull request? When a flushBuffer consolidation OOM exception occurs, support setting the Buffer for fileInfo. ### Why are the changes needed? When a flushBuffer consolidation OOM exception occurs, the current logic does not allow setting the Buffer for fileInfo. ### Does this PR resolve a correctness bug? NO ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? CI Closes #3547 from xy2953396112/CELEBORN-2210. Authored-by: xxx <[email protected]> Signed-off-by: Shuang <[email protected]>
1 parent 783a7cc commit 2dd1b7a

File tree

1 file changed

+12
-2
lines changed
  • worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage

1 file changed

+12
-2
lines changed

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,18 @@ class MemoryTierWriter(
341341
}
342342

343343
override def closeStreams(): Unit = {
344-
flushBuffer.consolidate()
345-
fileInfo.setBuffer(flushBuffer)
344+
try {
345+
flushBuffer.consolidate()
346+
} catch {
347+
case oom: OutOfMemoryError =>
348+
logError(
349+
s"MemoryTierWriter shuffleKey:${partitionDataWriterContext.getShuffleKey}, " +
350+
s"partitionId:${partitionDataWriterContext.getPartitionLocation.getFileName} " +
351+
s"failed to consolidate flush buffer due to OutOfMemoryError.",
352+
oom)
353+
} finally {
354+
fileInfo.setBuffer(flushBuffer)
355+
}
346356
}
347357

348358
override def takeBufferInternal(): CompositeByteBuf = {

0 commit comments

Comments
 (0)