Skip to content

Commit ff9ad7f

Browse files
committed
[CELEBORN-2211] Avoid allocating additional buffers When HdfsFlushTask writes data
1 parent 8966c9b commit ff9ad7f

File tree

4 files changed

+44
-11
lines changed

4 files changed

+44
-11
lines changed

common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -954,6 +954,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
954954
def clientShuffleDynamicResourceFactor: Double = get(CLIENT_SHUFFLE_DYNAMIC_RESOURCE_FACTOR)
955955
def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
956956
def dfsExpireDirsTimeoutMS: Long = get(DFS_EXPIRE_DIRS_TIMEOUT)
957+
def enableReuseHdfsOuputSteam: Boolean = get(ENABLE_REUSE_HDFS_OUTPUT_STREAM)
957958
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
958959
def applicationUnregisterEnabled: Boolean = get(APPLICATION_UNREGISTER_ENABLED)
959960

@@ -2484,6 +2485,14 @@ object CelebornConf extends Logging {
24842485
.timeConf(TimeUnit.MILLISECONDS)
24852486
.createWithDefaultString("1h")
24862487

2488+
val ENABLE_REUSE_HDFS_OUTPUT_STREAM: ConfigEntry[Boolean] =
2489+
buildConf("celeborn.worker.hdfs.reuseOutputStream.enable")
2490+
.categories("worker")
2491+
.version("0.7.0")
2492+
.doc("Whether to enable reuse output stream on hdfs.")
2493+
.booleanConf
2494+
.createWithDefault(false)
2495+
24872496
val WORKER_HEARTBEAT_TIMEOUT: ConfigEntry[Long] =
24882497
buildConf("celeborn.master.heartbeat.worker.timeout")
24892498
.withAlternative("celeborn.worker.heartbeat.timeout")

docs/configuration/worker.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ license: |
109109
| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false | Whether to call sync method to save committed file infos into DB to handle OS crash. | 0.3.1 | |
110110
| celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's graceful shutdown timeout time. | 0.2.0 | |
111111
| celeborn.worker.hdfs.replication.factor | 2 | false | HDFS replication factor for shuffle files. | 0.7.0 | |
112+
| celeborn.worker.hdfs.reuseOutputStream.enable | false | false | Whether to enable reuse output stream on hdfs. | 0.7.0 | |
112113
| celeborn.worker.http.auth.administers | | false | A comma-separated list of users who have admin privileges, Note, when celeborn.worker.http.auth.supportedSchemes is not set, everyone is treated as administrator. | 0.6.0 | |
113114
| celeborn.worker.http.auth.basic.provider | org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | false | User-defined password authentication implementation of org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0 | |
114115
| celeborn.worker.http.auth.bearer.provider | org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | false | User-defined token authentication implementation of org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 | |

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, Closeable, IOException}
2121
import java.nio.channels.FileChannel
2222

2323
import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
24-
import org.apache.hadoop.fs.Path
24+
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
2525

2626
import org.apache.celeborn.common.internal.Logging
2727
import org.apache.celeborn.common.metrics.source.AbstractSource
@@ -98,18 +98,28 @@ abstract private[worker] class DfsFlushTask(
9898

9999
private[worker] class HdfsFlushTask(
100100
buffer: CompositeByteBuf,
101+
hdfsStream: FSDataOutputStream,
101102
val path: Path,
102103
notifier: FlushNotifier,
103104
keepBuffer: Boolean,
104105
source: AbstractSource) extends DfsFlushTask(buffer, notifier, keepBuffer, source) {
105106
override def flush(copyBytes: Array[Byte]): Unit = {
106107
val readableBytes = buffer.readableBytes()
107-
val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
108-
val hdfsStream = hadoopFs.append(path, 256 * 1024)
109-
flush(hdfsStream) {
110-
hdfsStream.write(convertBufferToBytes(buffer, copyBytes, readableBytes))
111-
source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
112-
source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
108+
if (hdfsStream != null) {
109+
// TODO : If the FSDataOutputStream supports concurrent writes, the lock can be removed.
110+
hdfsStream.synchronized {
111+
hdfsStream.write(convertBufferToBytes(buffer, copyBytes, readableBytes))
112+
source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
113+
source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
114+
}
115+
} else {
116+
val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
117+
val hdfsStream = hadoopFs.append(path, 256 * 1024)
118+
flush(hdfsStream) {
119+
hdfsStream.write(convertBufferToBytes(buffer, copyBytes, readableBytes))
120+
source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
121+
source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
122+
}
113123
}
114124
}
115125
}

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
2626
import scala.collection.JavaConverters.asScalaBufferConverter
2727

2828
import io.netty.buffer.{ByteBuf, CompositeByteBuf}
29-
import org.apache.hadoop.fs.FileSystem
29+
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream}
3030

3131
import org.apache.celeborn.common.CelebornConf
3232
import org.apache.celeborn.common.exception.AlreadyClosedException
@@ -522,6 +522,8 @@ class DfsTierWriter(
522522
private val flushWorkerIndex: Int = flusher.getWorkerIndex
523523
val hadoopFs: FileSystem = StorageManager.hadoopFs.get(storageType)
524524
var deleted = false
525+
private var hdfsStream: FSDataOutputStream = null
526+
private val enableReuseHdfsOutputStream = conf.enableReuseHdfsOuputSteam
525527
private var s3MultipartUploadHandler: MultipartUploadHandler = _
526528
private var ossMultipartUploadHandler: MultipartUploadHandler = _
527529
var partNumber: Int = 1
@@ -536,7 +538,11 @@ class DfsTierWriter(
536538
}
537539

538540
try {
539-
hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
541+
if (enableReuseHdfsOutputStream) {
542+
hdfsStream = hadoopFs.create(dfsFileInfo.getDfsPath, true)
543+
} else {
544+
hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
545+
}
540546
hadoopFs.setReplication(dfsFileInfo.getDfsPath, conf.workerDfsReplicationFactor.toShort);
541547
if (dfsFileInfo.isS3) {
542548
val uri = hadoopFs.getUri
@@ -580,7 +586,11 @@ class DfsTierWriter(
580586
case ex: InterruptedException =>
581587
throw new RuntimeException(ex)
582588
}
583-
hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
589+
if (enableReuseHdfsOutputStream) {
590+
hdfsStream = hadoopFs.create(dfsFileInfo.getDfsPath, true)
591+
} else {
592+
hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
593+
}
584594
}
585595

586596
storageManager.registerDiskFilePartitionWriter(
@@ -595,7 +605,7 @@ class DfsTierWriter(
595605
override def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean): FlushTask = {
596606
notifier.numPendingFlushes.incrementAndGet()
597607
if (dfsFileInfo.isHdfs) {
598-
new HdfsFlushTask(flushBuffer, dfsFileInfo.getDfsPath(), notifier, true, source)
608+
new HdfsFlushTask(flushBuffer, hdfsStream, dfsFileInfo.getDfsPath(), notifier, true, source)
599609
} else if (dfsFileInfo.isOSS) {
600610
val flushTask = new OssFlushTask(
601611
flushBuffer,
@@ -649,6 +659,9 @@ class DfsTierWriter(
649659
}
650660

651661
override def closeStreams(): Unit = {
662+
if (hdfsStream != null) {
663+
hdfsStream.close()
664+
}
652665
if (hadoopFs.exists(dfsFileInfo.getDfsPeerWriterSuccessPath)) {
653666
hadoopFs.delete(dfsFileInfo.getDfsPath, false)
654667
deleted = true

0 commit comments

Comments
 (0)