Skip to content

Commit 103131f

Browse files
committed
[CELEBORN-2211] Avoid allocating additional buffers When HdfsFlushTask writes data
1 parent 2dd1b7a commit 103131f

File tree

4 files changed

+47
-11
lines changed

4 files changed

+47
-11
lines changed

common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
955955
def clientShuffleDynamicResourceFactor: Double = get(CLIENT_SHUFFLE_DYNAMIC_RESOURCE_FACTOR)
956956
def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
957957
def dfsExpireDirsTimeoutMS: Long = get(DFS_EXPIRE_DIRS_TIMEOUT)
958+
def reuseHdfsOuputSteamEnabled: Boolean = get(REUSE_HDFS_OUTPUT_STREAM_ENABLED)
958959
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
959960
def applicationUnregisterEnabled: Boolean = get(APPLICATION_UNREGISTER_ENABLED)
960961

@@ -2488,6 +2489,14 @@ object CelebornConf extends Logging {
24882489
.timeConf(TimeUnit.MILLISECONDS)
24892490
.createWithDefaultString("1h")
24902491

2492+
val REUSE_HDFS_OUTPUT_STREAM_ENABLED: ConfigEntry[Boolean] =
2493+
buildConf("celeborn.worker.reuse.hdfs.outputStream.enabled")
2494+
.categories("worker")
2495+
.version("0.7.0")
2496+
.doc("Whether to enable reuse output stream on hdfs.")
2497+
.booleanConf
2498+
.createWithDefault(false)
2499+
24912500
val WORKER_HEARTBEAT_TIMEOUT: ConfigEntry[Long] =
24922501
buildConf("celeborn.master.heartbeat.worker.timeout")
24932502
.withAlternative("celeborn.worker.heartbeat.timeout")

docs/configuration/worker.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ license: |
177177
| celeborn.worker.replicate.port | 0 | false | Server port for Worker to receive replicate data request from other Workers. | 0.2.0 | |
178178
| celeborn.worker.replicate.randomConnection.enabled | true | false | Whether worker will create random connection to peer when replicate data. When false, worker tend to reuse the same cached TransportClient to a specific replicate worker; when true, worker tend to use different cached TransportClient. Netty will use the same thread to serve the same connection, so with more connections replicate server can leverage more netty threads | 0.2.1 | |
179179
| celeborn.worker.replicate.threads | 64 | false | Thread number of worker to replicate shuffle data. | 0.2.0 | |
180+
| celeborn.worker.reuse.hdfs.outputStream.enabled | false | false | Whether to enable reuse output stream on hdfs. | 0.7.0 | |
180181
| celeborn.worker.rpc.port | 0 | false | Server port for Worker to receive RPC request. | 0.2.0 | |
181182
| celeborn.worker.shuffle.partitionSplit.enabled | true | false | enable the partition split on worker side | 0.3.0 | celeborn.worker.partition.split.enabled |
182183
| celeborn.worker.shuffle.partitionSplit.max | 2g | false | Specify the maximum partition size for splitting, and ensure that individual partition files are always smaller than this limit. | 0.3.0 | |

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, Closeable, IOException}
2121
import java.nio.channels.FileChannel
2222

2323
import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
24-
import org.apache.hadoop.fs.Path
24+
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
2525

2626
import org.apache.celeborn.common.internal.Logging
2727
import org.apache.celeborn.common.metrics.source.AbstractSource
@@ -98,18 +98,28 @@ abstract private[worker] class DfsFlushTask(
9898

9999
private[worker] class HdfsFlushTask(
100100
buffer: CompositeByteBuf,
101+
hdfsStream: FSDataOutputStream,
101102
val path: Path,
102103
notifier: FlushNotifier,
103104
keepBuffer: Boolean,
104105
source: AbstractSource) extends DfsFlushTask(buffer, notifier, keepBuffer, source) {
105106
override def flush(copyBytes: Array[Byte]): Unit = {
106107
val readableBytes = buffer.readableBytes()
107-
val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
108-
val hdfsStream = hadoopFs.append(path, 256 * 1024)
109-
flush(hdfsStream) {
110-
hdfsStream.write(convertBufferToBytes(buffer, copyBytes, readableBytes))
111-
source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
112-
source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
108+
if (hdfsStream != null) {
109+
// TODO : If the FSDataOutputStream supports concurrent writes, the lock can be removed.
110+
hdfsStream.synchronized {
111+
hdfsStream.write(convertBufferToBytes(buffer, copyBytes, readableBytes))
112+
source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
113+
source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
114+
}
115+
} else {
116+
val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
117+
val hdfsStream = hadoopFs.append(path, 256 * 1024)
118+
flush(hdfsStream) {
119+
hdfsStream.write(convertBufferToBytes(buffer, copyBytes, readableBytes))
120+
source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
121+
source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
122+
}
113123
}
114124
}
115125
}

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
2626
import scala.collection.JavaConverters.asScalaBufferConverter
2727

2828
import io.netty.buffer.{ByteBuf, CompositeByteBuf}
29-
import org.apache.hadoop.fs.FileSystem
29+
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream}
3030

3131
import org.apache.celeborn.common.CelebornConf
3232
import org.apache.celeborn.common.exception.AlreadyClosedException
@@ -532,6 +532,8 @@ class DfsTierWriter(
532532
private val flushWorkerIndex: Int = flusher.getWorkerIndex
533533
val hadoopFs: FileSystem = StorageManager.hadoopFs.get(storageType)
534534
var deleted = false
535+
private var hdfsStream: FSDataOutputStream = null
536+
private val reuseHdfsOutputStreamEnabled = conf.reuseHdfsOuputSteamEnabled
535537
private var s3MultipartUploadHandler: MultipartUploadHandler = _
536538
private var ossMultipartUploadHandler: MultipartUploadHandler = _
537539
var partNumber: Int = 1
@@ -546,7 +548,11 @@ class DfsTierWriter(
546548
}
547549

548550
try {
549-
hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
551+
if (reuseHdfsOutputStreamEnabled) {
552+
hdfsStream = hadoopFs.create(dfsFileInfo.getDfsPath, true)
553+
} else {
554+
hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
555+
}
550556
hadoopFs.setReplication(dfsFileInfo.getDfsPath, conf.workerDfsReplicationFactor.toShort);
551557
if (dfsFileInfo.isS3) {
552558
val uri = hadoopFs.getUri
@@ -590,7 +596,14 @@ class DfsTierWriter(
590596
case ex: InterruptedException =>
591597
throw new RuntimeException(ex)
592598
}
593-
hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
599+
if (reuseHdfsOutputStreamEnabled) {
600+
if (hdfsStream != null) {
601+
hdfsStream.close()
602+
}
603+
hdfsStream = hadoopFs.create(dfsFileInfo.getDfsPath, true)
604+
} else {
605+
hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
606+
}
594607
}
595608

596609
storageManager.registerDiskFilePartitionWriter(
@@ -605,7 +618,7 @@ class DfsTierWriter(
605618
override def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean): FlushTask = {
606619
notifier.numPendingFlushes.incrementAndGet()
607620
if (dfsFileInfo.isHdfs) {
608-
new HdfsFlushTask(flushBuffer, dfsFileInfo.getDfsPath(), notifier, true, source)
621+
new HdfsFlushTask(flushBuffer, hdfsStream, dfsFileInfo.getDfsPath(), notifier, true, source)
609622
} else if (dfsFileInfo.isOSS) {
610623
val flushTask = new OssFlushTask(
611624
flushBuffer,
@@ -659,6 +672,9 @@ class DfsTierWriter(
659672
}
660673

661674
override def closeStreams(): Unit = {
675+
if (hdfsStream != null) {
676+
hdfsStream.close()
677+
}
662678
if (hadoopFs.exists(dfsFileInfo.getDfsPeerWriterSuccessPath)) {
663679
hadoopFs.delete(dfsFileInfo.getDfsPath, false)
664680
deleted = true

0 commit comments

Comments
 (0)