diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 67ab2a2ab4..40509a7aeb 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -123,7 +123,7 @@ jobs: docker exec rss-spark-master-1 /bin/bash -c "cat /example.scala | /opt/spark/bin/spark-shell \ --master spark://rss-spark-master-1:7077 \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ - --conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager \ + --conf spark.shuffle.manager=org.apache.uniffle.spark.shuffle.RssShuffleManager \ --conf spark.rss.coordinator.quorum=rss-coordinator-1:19999,rss-coordinator-2:19999 \ --conf spark.rss.storage.type=MEMORY_LOCALFILE \ --conf spark.task.maxFailures=4 \ diff --git a/README.md b/README.md index 26733eaba6..3caaffa94d 100644 --- a/README.md +++ b/README.md @@ -252,7 +252,7 @@ Deploy Steps: # Uniffle transmits serialized shuffle data over network, therefore a serializer that supports relocation of # serialized object should be used. spark.serializer org.apache.spark.serializer.KryoSerializer # this could also be in the spark-defaults.conf - spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager + spark.shuffle.manager org.apache.uniffle.spark.shuffle.RssShuffleManager spark.rss.coordinator.quorum :19999,:19999 # Note: For Spark2, spark.sql.adaptive.enabled should be false because Spark2 doesn't support AQE. ``` @@ -269,7 +269,7 @@ After apply the patch and rebuild spark, add following configuration in spark co ``` For spark3.5 or above just add one more configuration: ``` - spark.shuffle.sort.io.plugin.class org.apache.spark.shuffle.RssShuffleDataIo + spark.shuffle.sort.io.plugin.class org.apache.uniffle.spark.shuffle.RssShuffleDataIo ``` ### Deploy MapReduce Client diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java index 6a281db2e2..33effde367 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java @@ -44,18 +44,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.SparkEnv; import org.apache.spark.SparkException; -import org.apache.spark.shuffle.RssShuffleHandle; -import org.apache.spark.shuffle.RssSparkConfig; -import org.apache.spark.shuffle.RssSparkShuffleUtils; -import org.apache.spark.shuffle.RssStageInfo; -import org.apache.spark.shuffle.RssStageResubmitManager; -import org.apache.spark.shuffle.ShuffleHandleInfoManager; import org.apache.spark.shuffle.ShuffleManager; -import org.apache.spark.shuffle.SparkVersionUtils; -import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo; -import org.apache.spark.shuffle.handle.ShuffleHandleInfo; -import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo; -import org.apache.spark.shuffle.handle.StageAttemptShuffleHandleInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,6 +73,17 @@ import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.RetryUtils; import org.apache.uniffle.shuffle.BlockIdManager; +import org.apache.uniffle.spark.shuffle.RssShuffleHandle; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; +import org.apache.uniffle.spark.shuffle.RssSparkShuffleUtils; +import org.apache.uniffle.spark.shuffle.RssStageInfo; +import org.apache.uniffle.spark.shuffle.RssStageResubmitManager; +import org.apache.uniffle.spark.shuffle.ShuffleHandleInfoManager; +import org.apache.uniffle.spark.shuffle.SparkVersionUtils; +import org.apache.uniffle.spark.shuffle.handle.MutableShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.SimpleShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.StageAttemptShuffleHandleInfo; import static org.apache.uniffle.common.config.RssClientConf.HADOOP_CONFIG_KEY_PREFIX; import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED; diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java index 77379efb5f..125bf3c2b7 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java @@ -21,11 +21,11 @@ import java.util.Map; import org.apache.spark.SparkException; -import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo; -import org.apache.spark.shuffle.handle.ShuffleHandleInfo; import org.apache.uniffle.common.ReceivingFailureServer; import org.apache.uniffle.shuffle.BlockIdManager; +import org.apache.uniffle.spark.shuffle.handle.MutableShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo; /** * This is a proxy interface that mainly delegates the un-registration of shuffles to the diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java index b9828408c0..8ed8f63d53 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java @@ -29,8 +29,6 @@ import com.google.protobuf.UnsafeByteOperations; import io.grpc.stub.StreamObserver; -import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo; -import org.apache.spark.shuffle.handle.StageAttemptShuffleHandleInfo; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +40,8 @@ import org.apache.uniffle.proto.RssProtos; import org.apache.uniffle.proto.ShuffleManagerGrpc.ShuffleManagerImplBase; import org.apache.uniffle.shuffle.BlockIdManager; +import org.apache.uniffle.spark.shuffle.handle.MutableShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.StageAttemptShuffleHandleInfo; public class ShuffleManagerGrpcService extends ShuffleManagerImplBase { private static final Logger LOG = LoggerFactory.getLogger(ShuffleManagerGrpcService.class); diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleHandle.java similarity index 92% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleHandle.java index acf6815874..d67e09fc13 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssShuffleHandle.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleHandle.java @@ -15,17 +15,18 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.util.List; import java.util.Map; import org.apache.spark.ShuffleDependency; import org.apache.spark.broadcast.Broadcast; -import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo; +import org.apache.spark.shuffle.ShuffleHandle; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.spark.shuffle.handle.SimpleShuffleHandleInfo; public class RssShuffleHandle extends ShuffleHandle { diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssSparkConfig.java similarity index 99% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssSparkConfig.java index ba5e414cc3..e0cb558700 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssSparkConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.util.Set; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssSparkShuffleUtils.java similarity index 98% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssSparkShuffleUtils.java index b3763df32a..2ca23a9bd9 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssSparkShuffleUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.io.IOException; import java.lang.reflect.Constructor; @@ -35,7 +35,8 @@ import org.apache.spark.SparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.deploy.SparkHadoopUtil; -import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo; +import org.apache.spark.shuffle.FetchFailedException; +import org.apache.spark.shuffle.ShuffleManager; import org.apache.spark.storage.BlockManagerId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,9 +56,10 @@ import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.exception.RssFetchFailedException; import org.apache.uniffle.common.util.Constants; +import org.apache.uniffle.spark.shuffle.handle.SimpleShuffleHandleInfo; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED; import static org.apache.uniffle.common.util.Constants.DRIVER_HOST; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED; public class RssSparkShuffleUtils { diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageInfo.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssStageInfo.java similarity index 97% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageInfo.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssStageInfo.java index c8168d6c43..110ca6e9b5 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageInfo.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssStageInfo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; public class RssStageInfo { private String stageAttemptIdAndNumber; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageResubmitManager.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssStageResubmitManager.java similarity index 98% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageResubmitManager.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssStageResubmitManager.java index 028622f922..4e8672f499 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageResubmitManager.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/RssStageResubmitManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.util.Map; import java.util.Set; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/ShuffleHandleInfoManager.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/ShuffleHandleInfoManager.java similarity index 93% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/ShuffleHandleInfoManager.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/ShuffleHandleInfoManager.java index cc3d3b4ce9..29c87f935e 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/ShuffleHandleInfoManager.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/ShuffleHandleInfoManager.java @@ -15,15 +15,14 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.io.Closeable; import java.io.IOException; import java.util.Map; -import org.apache.spark.shuffle.handle.ShuffleHandleInfo; - import org.apache.uniffle.common.util.JavaUtils; +import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo; public class ShuffleHandleInfoManager implements Closeable { private Map shuffleIdToShuffleHandleInfo; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/SparkVersionUtils.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/SparkVersionUtils.java similarity index 97% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/SparkVersionUtils.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/SparkVersionUtils.java index a55f0aa7d4..e7fa4ff31e 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/SparkVersionUtils.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/SparkVersionUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import org.apache.spark.package$; import org.apache.spark.util.VersionUtils; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/MutableShuffleHandleInfo.java similarity index 99% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/MutableShuffleHandleInfo.java index 1e3b29019f..debcf08db2 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/MutableShuffleHandleInfo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.handle; +package org.apache.uniffle.spark.shuffle.handle; import java.util.ArrayList; import java.util.Collections; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/ShuffleHandleInfo.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/ShuffleHandleInfo.java similarity index 97% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/handle/ShuffleHandleInfo.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/ShuffleHandleInfo.java index 99f7a7421b..d057d989e0 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/ShuffleHandleInfo.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/ShuffleHandleInfo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.handle; +package org.apache.uniffle.spark.shuffle.handle; import java.util.List; import java.util.Map; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/ShuffleHandleInfoBase.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/ShuffleHandleInfoBase.java similarity index 96% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/handle/ShuffleHandleInfoBase.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/ShuffleHandleInfoBase.java index f24bd0fa89..4e5345830d 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/ShuffleHandleInfoBase.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/ShuffleHandleInfoBase.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.handle; +package org.apache.uniffle.spark.shuffle.handle; import java.io.Serializable; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/SimpleShuffleHandleInfo.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/SimpleShuffleHandleInfo.java similarity index 98% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/handle/SimpleShuffleHandleInfo.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/SimpleShuffleHandleInfo.java index 60cb6f27aa..4919cc5366 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/SimpleShuffleHandleInfo.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/SimpleShuffleHandleInfo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.handle; +package org.apache.uniffle.spark.shuffle.handle; import java.io.Serializable; import java.util.List; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/StageAttemptShuffleHandleInfo.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/StageAttemptShuffleHandleInfo.java similarity index 99% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/handle/StageAttemptShuffleHandleInfo.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/StageAttemptShuffleHandleInfo.java index 8fd9642ac1..32806cc7ab 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/StageAttemptShuffleHandleInfo.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/handle/StageAttemptShuffleHandleInfo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.handle; +package org.apache.uniffle.spark.shuffle.handle; import java.util.LinkedList; import java.util.List; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssFetchFailedIterator.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/reader/RssFetchFailedIterator.java similarity index 97% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssFetchFailedIterator.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/reader/RssFetchFailedIterator.java index c394f510bb..c568ecf124 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssFetchFailedIterator.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/reader/RssFetchFailedIterator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.reader; +package org.apache.uniffle.spark.shuffle.reader; import java.io.IOException; import java.util.Objects; @@ -25,7 +25,6 @@ import scala.collection.Iterator; import org.apache.spark.shuffle.FetchFailedException; -import org.apache.spark.shuffle.RssSparkShuffleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +35,7 @@ import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.exception.RssFetchFailedException; +import org.apache.uniffle.spark.shuffle.RssSparkShuffleUtils; public class RssFetchFailedIterator extends AbstractIterator> { private static final Logger LOG = LoggerFactory.getLogger(RssFetchFailedIterator.class); diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleDataIterator.java similarity index 98% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleDataIterator.java index 88b2d22d89..37c0f51aef 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleDataIterator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.reader; +package org.apache.uniffle.spark.shuffle.reader; import java.io.IOException; import java.nio.ByteBuffer; @@ -33,7 +33,6 @@ import org.apache.spark.serializer.DeserializationStream; import org.apache.spark.serializer.Serializer; import org.apache.spark.serializer.SerializerInstance; -import org.apache.spark.shuffle.RssSparkConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +41,7 @@ import org.apache.uniffle.common.compression.Codec; import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.util.RssUtils; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; public class RssShuffleDataIterator extends AbstractIterator> { diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/AddBlockEvent.java similarity index 97% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/AddBlockEvent.java index f989fdb0b1..42177ef160 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/AddBlockEvent.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import java.util.ArrayList; import java.util.List; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BlockFailureCallback.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/BlockFailureCallback.java similarity index 95% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BlockFailureCallback.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/BlockFailureCallback.java index 116d1945de..33cafc18d5 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BlockFailureCallback.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/BlockFailureCallback.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import org.apache.uniffle.common.ShuffleBlockInfo; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BlockSuccessCallback.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/BlockSuccessCallback.java similarity index 95% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BlockSuccessCallback.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/BlockSuccessCallback.java index 2b5dc0d09f..f481d819b6 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BlockSuccessCallback.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/BlockSuccessCallback.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import org.apache.uniffle.common.ShuffleBlockInfo; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BufferManagerOptions.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/BufferManagerOptions.java similarity index 97% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BufferManagerOptions.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/BufferManagerOptions.java index 839e684a93..cd401a9bc1 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BufferManagerOptions.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/BufferManagerOptions.java @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; public class BufferManagerOptions { diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/DataPusher.java similarity index 99% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/DataPusher.java index bdf0cf8496..bc28318f4b 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/DataPusher.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import java.io.Closeable; import java.io.IOException; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/TaskAttemptAssignment.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/TaskAttemptAssignment.java similarity index 93% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/writer/TaskAttemptAssignment.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/TaskAttemptAssignment.java index 0044ba20d4..a9186ad3a2 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/TaskAttemptAssignment.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/TaskAttemptAssignment.java @@ -15,15 +15,14 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import java.util.List; import java.util.Map; -import org.apache.spark.shuffle.handle.ShuffleHandleInfo; - import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo; /** This class is to get the partition assignment for ShuffleWriter. */ public class TaskAttemptAssignment { diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WrappedByteArrayOutputStream.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/WrappedByteArrayOutputStream.java similarity index 95% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WrappedByteArrayOutputStream.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/WrappedByteArrayOutputStream.java index 2a77325708..134448d11c 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WrappedByteArrayOutputStream.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/WrappedByteArrayOutputStream.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import java.io.ByteArrayOutputStream; diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/WriteBufferManager.java similarity index 99% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/WriteBufferManager.java index bfd9297773..ab5c1505f9 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/WriteBufferManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import java.util.ArrayList; import java.util.Collections; @@ -41,7 +41,6 @@ import org.apache.spark.serializer.SerializationStream; import org.apache.spark.serializer.Serializer; import org.apache.spark.serializer.SerializerInstance; -import org.apache.spark.shuffle.RssSparkConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +51,7 @@ import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.util.BlockIdLayout; import org.apache.uniffle.common.util.ChecksumUtils; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; public class WriteBufferManager extends MemoryConsumer { diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriterBuffer.java b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/WriterBuffer.java similarity index 98% rename from client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriterBuffer.java rename to client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/WriterBuffer.java index ac6ac9e271..9caf630ca2 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriterBuffer.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/spark/shuffle/writer/WriterBuffer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import java.util.List; diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java index 317d0cd9ea..d0244e1b5e 100644 --- a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java +++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java @@ -22,11 +22,10 @@ import java.util.Map; import java.util.Set; -import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo; -import org.apache.spark.shuffle.handle.ShuffleHandleInfo; - import org.apache.uniffle.common.ReceivingFailureServer; import org.apache.uniffle.shuffle.BlockIdManager; +import org.apache.uniffle.spark.shuffle.handle.MutableShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo; public class DummyRssShuffleManager implements RssShuffleManagerInterface { public Set unregisteredShuffleIds = new LinkedHashSet<>(); diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java index 610b42c8c3..ca0ed14304 100644 --- a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java +++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.junit.jupiter.params.ParameterizedTest; @@ -42,6 +41,7 @@ import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import static org.apache.uniffle.common.rpc.StatusCode.INVALID_REQUEST; import static org.apache.uniffle.common.rpc.StatusCode.SUCCESS; diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/RssSparkShuffleUtilsTest.java similarity index 99% rename from client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java rename to client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/RssSparkShuffleUtilsTest.java index fe3c8d9f08..7099ab2f3f 100644 --- a/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java +++ b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/RssSparkShuffleUtilsTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.util.Arrays; import java.util.Map; diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfoTest.java b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/handle/MutableShuffleHandleInfoTest.java similarity index 99% rename from client-spark/common/src/test/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfoTest.java rename to client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/handle/MutableShuffleHandleInfoTest.java index e861923584..6daafc61de 100644 --- a/client-spark/common/src/test/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfoTest.java +++ b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/handle/MutableShuffleHandleInfoTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.handle; +package org.apache.uniffle.spark.shuffle.handle; import java.util.ArrayList; import java.util.Arrays; diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/reader/AbstractRssReaderTest.java similarity index 99% rename from client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java rename to client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/reader/AbstractRssReaderTest.java index f761c6ea65..849bc7af9b 100644 --- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java +++ b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/reader/AbstractRssReaderTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.reader; +package org.apache.uniffle.spark.shuffle.reader; import java.util.List; import java.util.Map; diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleDataIteratorTest.java similarity index 99% rename from client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java rename to client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleDataIteratorTest.java index 3f6993c826..9197fd533e 100644 --- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java +++ b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleDataIteratorTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.reader; +package org.apache.uniffle.spark.shuffle.reader; import java.nio.ByteBuffer; import java.util.List; @@ -31,7 +31,6 @@ import org.apache.spark.executor.ShuffleReadMetrics; import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.serializer.Serializer; -import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -49,6 +48,7 @@ import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.util.BlockIdLayout; import org.apache.uniffle.common.util.ChecksumUtils; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler; import org.apache.uniffle.storage.util.StorageType; diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/writer/DataPusherTest.java similarity index 99% rename from client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java rename to client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/writer/DataPusherTest.java index 080ba1e33f..cab7a42032 100644 --- a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java +++ b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/writer/DataPusherTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import java.util.Arrays; import java.util.HashSet; diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/writer/WriteBufferManagerTest.java similarity index 99% rename from client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java rename to client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/writer/WriteBufferManagerTest.java index 49ebeef25f..bb5a850c92 100644 --- a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java +++ b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/writer/WriteBufferManagerTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import java.io.IOException; import java.util.ArrayList; @@ -35,7 +35,6 @@ import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.serializer.Serializer; -import org.apache.spark.shuffle.RssSparkConfig; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -49,6 +48,7 @@ import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.util.BlockIdLayout; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferTest.java b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/writer/WriteBufferTest.java similarity index 98% rename from client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferTest.java rename to client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/writer/WriteBufferTest.java index d4533efaf6..6e4e08b995 100644 --- a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferTest.java +++ b/client-spark/common/src/test/java/org/apache/uniffle/spark/shuffle/writer/WriteBufferTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import scala.reflect.ClassTag$; diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/uniffle/spark/shuffle/DelegationRssShuffleManager.java similarity index 96% rename from client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java rename to client-spark/spark2/src/main/java/org/apache/uniffle/spark/shuffle/DelegationRssShuffleManager.java index 7987077f7f..1423a7f339 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/uniffle/spark/shuffle/DelegationRssShuffleManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.util.List; import java.util.Map; @@ -28,6 +28,11 @@ import org.apache.spark.ShuffleDependency; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; +import org.apache.spark.shuffle.ShuffleBlockResolver; +import org.apache.spark.shuffle.ShuffleHandle; +import org.apache.spark.shuffle.ShuffleManager; +import org.apache.spark.shuffle.ShuffleReader; +import org.apache.spark.shuffle.ShuffleWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleManager.java similarity index 96% rename from client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java rename to client-spark/spark2/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleManager.java index 1e5bb49418..ea076ee74b 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.io.IOException; import java.util.ArrayList; @@ -42,14 +42,10 @@ import org.apache.spark.TaskContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.executor.ShuffleWriteMetrics; -import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo; -import org.apache.spark.shuffle.handle.ShuffleHandleInfo; -import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo; -import org.apache.spark.shuffle.handle.StageAttemptShuffleHandleInfo; -import org.apache.spark.shuffle.reader.RssShuffleReader; -import org.apache.spark.shuffle.writer.AddBlockEvent; -import org.apache.spark.shuffle.writer.DataPusher; -import org.apache.spark.shuffle.writer.RssShuffleWriter; +import org.apache.spark.shuffle.ShuffleBlockResolver; +import org.apache.spark.shuffle.ShuffleHandle; +import org.apache.spark.shuffle.ShuffleReader; +import org.apache.spark.shuffle.ShuffleWriter; import org.apache.spark.storage.BlockId; import org.apache.spark.storage.BlockManagerId; import org.roaringbitmap.longlong.Roaring64NavigableMap; @@ -73,12 +69,20 @@ import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase; import org.apache.uniffle.shuffle.manager.ShuffleManagerGrpcService; import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory; +import org.apache.uniffle.spark.shuffle.handle.MutableShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.SimpleShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.StageAttemptShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.reader.RssShuffleReader; +import org.apache.uniffle.spark.shuffle.writer.AddBlockEvent; +import org.apache.uniffle.spark.shuffle.writer.DataPusher; +import org.apache.uniffle.spark.shuffle.writer.RssShuffleWriter; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED; import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT; import static org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED; public class RssShuffleManager extends RssShuffleManagerBase { diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark2/src/main/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleReader.java similarity index 98% rename from client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java rename to client-spark/spark2/src/main/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleReader.java index 3bf5840e8d..a82d0d144a 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java +++ b/client-spark/spark2/src/main/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleReader.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.reader; +package org.apache.uniffle.spark.shuffle.reader; import java.util.List; import java.util.Map; @@ -37,7 +37,6 @@ import org.apache.spark.executor.ShuffleReadMetrics; import org.apache.spark.executor.TempShuffleReadMetrics; import org.apache.spark.serializer.Serializer; -import org.apache.spark.shuffle.RssShuffleHandle; import org.apache.spark.shuffle.ShuffleReader; import org.apache.spark.util.CompletionIterator; import org.apache.spark.util.CompletionIterator$; @@ -53,8 +52,9 @@ import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.spark.shuffle.RssShuffleHandle; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED; public class RssShuffleReader implements ShuffleReader { diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark2/src/main/java/org/apache/uniffle/spark/shuffle/writer/RssShuffleWriter.java similarity index 97% rename from client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java rename to client-spark/spark2/src/main/java/org/apache/uniffle/spark/shuffle/writer/RssShuffleWriter.java index 5ac6a7e9ec..702751a2b9 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java +++ b/client-spark/spark2/src/main/java/org/apache/uniffle/spark/shuffle/writer/RssShuffleWriter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import java.io.IOException; import java.util.ArrayList; @@ -51,13 +51,7 @@ import org.apache.spark.scheduler.MapStatus; import org.apache.spark.scheduler.MapStatus$; import org.apache.spark.shuffle.FetchFailedException; -import org.apache.spark.shuffle.RssShuffleHandle; -import org.apache.spark.shuffle.RssShuffleManager; -import org.apache.spark.shuffle.RssSparkConfig; -import org.apache.spark.shuffle.RssSparkShuffleUtils; import org.apache.spark.shuffle.ShuffleWriter; -import org.apache.spark.shuffle.handle.ShuffleHandleInfo; -import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo; import org.apache.spark.storage.BlockManagerId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,9 +72,15 @@ import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.exception.RssSendFailedException; import org.apache.uniffle.common.exception.RssWaitFailedException; +import org.apache.uniffle.spark.shuffle.RssShuffleHandle; +import org.apache.uniffle.spark.shuffle.RssShuffleManager; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; +import org.apache.uniffle.spark.shuffle.RssSparkShuffleUtils; +import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.SimpleShuffleHandleInfo; import org.apache.uniffle.storage.util.StorageType; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED; public class RssShuffleWriter extends ShuffleWriter { diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/DelegationRssShuffleManagerTest.java similarity index 99% rename from client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java rename to client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/DelegationRssShuffleManagerTest.java index 8f24c5e5c9..74582b2fb4 100644 --- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java +++ b/client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/DelegationRssShuffleManagerTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.util.List; import java.util.NoSuchElementException; diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/RssSpark2ShuffleUtilsTest.java b/client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/RssSpark2ShuffleUtilsTest.java similarity index 93% rename from client-spark/spark2/src/test/java/org/apache/spark/shuffle/RssSpark2ShuffleUtilsTest.java rename to client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/RssSpark2ShuffleUtilsTest.java index dac10d3db9..572e376a14 100644 --- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/RssSpark2ShuffleUtilsTest.java +++ b/client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/RssSpark2ShuffleUtilsTest.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; +import org.apache.spark.shuffle.FetchFailedException; import org.junit.jupiter.api.Test; import org.apache.uniffle.common.exception.RssFetchFailedException; diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java b/client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/SparkVersionUtilsTest.java similarity index 96% rename from client-spark/spark2/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java rename to client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/SparkVersionUtilsTest.java index 58c1fe0eae..7fd847a0cc 100644 --- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java +++ b/client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/SparkVersionUtilsTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import org.junit.jupiter.api.Test; diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java b/client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleReaderTest.java similarity index 97% rename from client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java rename to client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleReaderTest.java index f09223b1c3..ed83db106a 100644 --- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java +++ b/client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleReaderTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.reader; +package org.apache.uniffle.spark.shuffle.reader; import java.util.HashMap; import java.util.List; @@ -31,13 +31,13 @@ import org.apache.spark.executor.TaskMetrics; import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.serializer.Serializer; -import org.apache.spark.shuffle.RssShuffleHandle; import org.junit.jupiter.api.Test; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.spark.shuffle.RssShuffleHandle; import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler; import org.apache.uniffle.storage.util.StorageType; diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java b/client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/writer/RssShuffleWriterTest.java similarity index 98% rename from client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java rename to client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/writer/RssShuffleWriterTest.java index e039ad9d5d..9f26951ef9 100644 --- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java +++ b/client-spark/spark2/src/test/java/org/apache/uniffle/spark/shuffle/writer/RssShuffleWriterTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import java.util.Arrays; import java.util.List; @@ -42,10 +42,6 @@ import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.serializer.Serializer; -import org.apache.spark.shuffle.RssShuffleHandle; -import org.apache.spark.shuffle.RssShuffleManager; -import org.apache.spark.shuffle.RssSparkConfig; -import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo; import org.junit.jupiter.api.Test; import org.apache.uniffle.client.api.ShuffleWriteClient; @@ -54,6 +50,10 @@ import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.StatusCode; +import org.apache.uniffle.spark.shuffle.RssShuffleHandle; +import org.apache.uniffle.spark.shuffle.RssShuffleManager; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; +import org.apache.uniffle.spark.shuffle.handle.SimpleShuffleHandleInfo; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/DelegationRssShuffleManager.java similarity index 96% rename from client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java rename to client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/DelegationRssShuffleManager.java index c99f9b0578..6ceb5efe60 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/DelegationRssShuffleManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.util.List; import java.util.Map; @@ -28,6 +28,13 @@ import org.apache.spark.ShuffleDependency; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; +import org.apache.spark.shuffle.ShuffleBlockResolver; +import org.apache.spark.shuffle.ShuffleHandle; +import org.apache.spark.shuffle.ShuffleManager; +import org.apache.spark.shuffle.ShuffleReadMetricsReporter; +import org.apache.spark.shuffle.ShuffleReader; +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; +import org.apache.spark.shuffle.ShuffleWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/FunctionUtils.java b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/FunctionUtils.java similarity index 96% rename from client-spark/spark3/src/main/java/org/apache/spark/shuffle/FunctionUtils.java rename to client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/FunctionUtils.java index 9ed95e01d2..e2c104f498 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/FunctionUtils.java +++ b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/FunctionUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import scala.Function0; diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDataIo.java b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleDataIo.java similarity index 97% rename from client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDataIo.java rename to client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleDataIo.java index edbe25c9f9..a7895f6503 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDataIo.java +++ b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleDataIo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import org.apache.spark.SparkConf; import org.apache.spark.shuffle.api.ShuffleDataIO; diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDriverComponents.java b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleDriverComponents.java similarity index 97% rename from client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDriverComponents.java rename to client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleDriverComponents.java index 893b2e8f4d..032bfe1446 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDriverComponents.java +++ b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleDriverComponents.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import org.apache.spark.SparkConf; import org.apache.spark.shuffle.sort.io.LocalDiskShuffleDriverComponents; diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleManager.java similarity index 96% rename from client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java rename to client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleManager.java index bf42bf3610..0f9b62da79 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/RssShuffleManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.io.IOException; import java.util.ArrayList; @@ -48,14 +48,12 @@ import org.apache.spark.broadcast.Broadcast; import org.apache.spark.executor.ShuffleReadMetrics; import org.apache.spark.executor.ShuffleWriteMetrics; -import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo; -import org.apache.spark.shuffle.handle.ShuffleHandleInfo; -import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo; -import org.apache.spark.shuffle.handle.StageAttemptShuffleHandleInfo; -import org.apache.spark.shuffle.reader.RssShuffleReader; -import org.apache.spark.shuffle.writer.AddBlockEvent; -import org.apache.spark.shuffle.writer.DataPusher; -import org.apache.spark.shuffle.writer.RssShuffleWriter; +import org.apache.spark.shuffle.ShuffleBlockResolver; +import org.apache.spark.shuffle.ShuffleHandle; +import org.apache.spark.shuffle.ShuffleReadMetricsReporter; +import org.apache.spark.shuffle.ShuffleReader; +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; +import org.apache.spark.shuffle.ShuffleWriter; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.storage.BlockId; import org.apache.spark.storage.BlockManagerId; @@ -82,13 +80,21 @@ import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase; import org.apache.uniffle.shuffle.manager.ShuffleManagerGrpcService; import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory; +import org.apache.uniffle.spark.shuffle.handle.MutableShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.SimpleShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.StageAttemptShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.reader.RssShuffleReader; +import org.apache.uniffle.spark.shuffle.writer.AddBlockEvent; +import org.apache.uniffle.spark.shuffle.writer.DataPusher; +import org.apache.uniffle.spark.shuffle.writer.RssShuffleWriter; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED; import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT; import static org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED; public class RssShuffleManager extends RssShuffleManagerBase { private static final Logger LOG = LoggerFactory.getLogger(RssShuffleManager.class); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/Spark3VersionUtils.java b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/Spark3VersionUtils.java similarity index 96% rename from client-spark/spark3/src/main/java/org/apache/spark/shuffle/Spark3VersionUtils.java rename to client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/Spark3VersionUtils.java index 76f464eec3..118ab830d8 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/Spark3VersionUtils.java +++ b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/Spark3VersionUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import org.apache.spark.package$; diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleReader.java similarity index 97% rename from client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java rename to client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleReader.java index bf47ced6be..cc84063b74 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java +++ b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleReader.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.reader; +package org.apache.uniffle.spark.shuffle.reader; import java.util.List; import java.util.Map; @@ -39,8 +39,6 @@ import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleReadMetrics; import org.apache.spark.serializer.Serializer; -import org.apache.spark.shuffle.FunctionUtils; -import org.apache.spark.shuffle.RssShuffleHandle; import org.apache.spark.shuffle.ShuffleReader; import org.apache.spark.util.CompletionIterator; import org.apache.spark.util.CompletionIterator$; @@ -56,9 +54,11 @@ import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.spark.shuffle.FunctionUtils; +import org.apache.uniffle.spark.shuffle.RssShuffleHandle; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED; import static org.apache.uniffle.common.util.Constants.DRIVER_HOST; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED; public class RssShuffleReader implements ShuffleReader { private static final Logger LOG = LoggerFactory.getLogger(RssShuffleReader.class); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/writer/RssShuffleWriter.java similarity index 97% rename from client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java rename to client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/writer/RssShuffleWriter.java index 6660a5e7b7..dcb358af72 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java +++ b/client-spark/spark3/src/main/java/org/apache/uniffle/spark/shuffle/writer/RssShuffleWriter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import java.io.IOException; import java.util.ArrayList; @@ -58,13 +58,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.scheduler.MapStatus; import org.apache.spark.shuffle.FetchFailedException; -import org.apache.spark.shuffle.RssShuffleHandle; -import org.apache.spark.shuffle.RssShuffleManager; -import org.apache.spark.shuffle.RssSparkConfig; -import org.apache.spark.shuffle.RssSparkShuffleUtils; import org.apache.spark.shuffle.ShuffleWriter; -import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo; -import org.apache.spark.shuffle.handle.ShuffleHandleInfo; import org.apache.spark.storage.BlockManagerId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,11 +84,17 @@ import org.apache.uniffle.common.exception.RssSendFailedException; import org.apache.uniffle.common.exception.RssWaitFailedException; import org.apache.uniffle.common.rpc.StatusCode; +import org.apache.uniffle.spark.shuffle.RssShuffleHandle; +import org.apache.uniffle.spark.shuffle.RssShuffleManager; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; +import org.apache.uniffle.spark.shuffle.RssSparkShuffleUtils; +import org.apache.uniffle.spark.shuffle.handle.MutableShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo; import org.apache.uniffle.storage.util.StorageType; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_CLIENT_MAP_SIDE_COMBINE_ENABLED; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_CLIENT_MAP_SIDE_COMBINE_ENABLED; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED; public class RssShuffleWriter extends ShuffleWriter { diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/DelegationRssShuffleManagerTest.java similarity index 99% rename from client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java rename to client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/DelegationRssShuffleManagerTest.java index c7011f27eb..99249826e4 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java +++ b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/DelegationRssShuffleManagerTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.util.NoSuchElementException; diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/FunctionUtilsTests.java b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/FunctionUtilsTests.java similarity index 96% rename from client-spark/spark3/src/test/java/org/apache/spark/shuffle/FunctionUtilsTests.java rename to client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/FunctionUtilsTests.java index 7640f504e8..361f872e28 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/FunctionUtilsTests.java +++ b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/FunctionUtilsTests.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.util.concurrent.atomic.AtomicInteger; diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTest.java b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/RssShuffleManagerTest.java similarity index 97% rename from client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTest.java rename to client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/RssShuffleManagerTest.java index 2a92b6ed52..58e6327dac 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTest.java +++ b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/RssShuffleManagerTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import org.apache.spark.Partitioner; import org.apache.spark.ShuffleDependency; @@ -34,8 +34,8 @@ import org.apache.uniffle.common.util.BlockIdLayout; import org.apache.uniffle.storage.util.StorageType; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_SHUFFLE_MANAGER_GRPC_PORT; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_SHUFFLE_MANAGER_GRPC_PORT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/RssShuffleManagerTestBase.java similarity index 98% rename from client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java rename to client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/RssShuffleManagerTestBase.java index fc04819424..faf63d8347 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java +++ b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/RssShuffleManagerTestBase.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.util.List; diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssSpark3ShuffleUtilsTest.java b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/RssSpark3ShuffleUtilsTest.java similarity index 93% rename from client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssSpark3ShuffleUtilsTest.java rename to client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/RssSpark3ShuffleUtilsTest.java index adbd575885..c0cc0411ad 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssSpark3ShuffleUtilsTest.java +++ b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/RssSpark3ShuffleUtilsTest.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; +import org.apache.spark.shuffle.FetchFailedException; import org.junit.jupiter.api.Test; import org.apache.uniffle.common.exception.RssFetchFailedException; diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/SparkVersionUtilsTest.java similarity index 97% rename from client-spark/spark3/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java rename to client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/SparkVersionUtilsTest.java index 64742c998e..288e5122c2 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java +++ b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/SparkVersionUtilsTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import org.apache.spark.package$; import org.junit.jupiter.api.Test; diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/TestUtils.java similarity index 94% rename from client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java rename to client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/TestUtils.java index cb2a7f909f..39e4920601 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java +++ b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/TestUtils.java @@ -15,17 +15,17 @@ * limitations under the License. */ -package org.apache.spark.shuffle; +package org.apache.uniffle.spark.shuffle; import java.util.Map; import java.util.Set; import org.apache.commons.lang3.SystemUtils; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.writer.DataPusher; import org.apache.uniffle.client.impl.FailedBlockSendTracker; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.spark.shuffle.writer.DataPusher; public class TestUtils { diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleReaderTest.java similarity index 98% rename from client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java rename to client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleReaderTest.java index aaff4cb8e0..5518d765c3 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java +++ b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/reader/RssShuffleReaderTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.reader; +package org.apache.uniffle.spark.shuffle.reader; import java.util.HashMap; import java.util.List; @@ -32,7 +32,6 @@ import org.apache.spark.executor.TaskMetrics; import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.serializer.Serializer; -import org.apache.spark.shuffle.RssShuffleHandle; import org.junit.jupiter.api.Test; import org.roaringbitmap.longlong.Roaring64NavigableMap; @@ -40,6 +39,7 @@ import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.spark.shuffle.RssShuffleHandle; import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler; import org.apache.uniffle.storage.util.StorageType; diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/writer/RssShuffleWriterTest.java similarity index 99% rename from client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java rename to client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/writer/RssShuffleWriterTest.java index 53a8e71437..92eaecf0dc 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java +++ b/client-spark/spark3/src/test/java/org/apache/uniffle/spark/shuffle/writer/RssShuffleWriterTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.shuffle.writer; +package org.apache.uniffle.spark.shuffle.writer; import java.time.Duration; import java.util.ArrayList; @@ -46,12 +46,6 @@ import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.serializer.Serializer; -import org.apache.spark.shuffle.RssShuffleHandle; -import org.apache.spark.shuffle.RssShuffleManager; -import org.apache.spark.shuffle.RssSparkConfig; -import org.apache.spark.shuffle.TestUtils; -import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo; -import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; @@ -62,6 +56,12 @@ import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.JavaUtils; +import org.apache.uniffle.spark.shuffle.RssShuffleHandle; +import org.apache.uniffle.spark.shuffle.RssShuffleManager; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; +import org.apache.uniffle.spark.shuffle.TestUtils; +import org.apache.uniffle.spark.shuffle.handle.MutableShuffleHandleInfo; +import org.apache.uniffle.spark.shuffle.handle.SimpleShuffleHandleInfo; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/deploy/docker/README.md b/deploy/docker/README.md index 909b00fafb..1adf6b7d0a 100644 --- a/deploy/docker/README.md +++ b/deploy/docker/README.md @@ -82,7 +82,7 @@ Start a Spark shell on the cluster: docker exec -it rss-spark-master-1 /opt/spark/bin/spark-shell \ --master spark://rss-spark-master-1:7077 \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ - --conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager \ + --conf spark.shuffle.manager=org.apache.uniffle.spark.shuffle.RssShuffleManager \ --conf spark.rss.coordinator.quorum=rss-coordinator-1:19999,rss-coordinator-2:19999 \ --conf spark.rss.storage.type=MEMORY_LOCALFILE \ --conf spark.speculation=true diff --git a/docs/benchmark.md b/docs/benchmark.md index d5c518bd17..0d5f39fccb 100644 --- a/docs/benchmark.md +++ b/docs/benchmark.md @@ -31,7 +31,7 @@ spark.executor.cores 4 spark.executor.memory 9g spark.executor.memoryOverhead 1024 - spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager + spark.shuffle.manager org.apache.uniffle.spark.shuffle.RssShuffleManager spark.rss.storage.type MEMORY_LOCALFILE ```` Shuffle Server's configuration diff --git a/docs/benchmark_netty_case_report.md b/docs/benchmark_netty_case_report.md index 7d60b5fc67..b03d9d1a55 100644 --- a/docs/benchmark_netty_case_report.md +++ b/docs/benchmark_netty_case_report.md @@ -44,7 +44,7 @@ Spark's configuration: spark.executor.cores 2 spark.executor.memory 20g spark.executor.memoryOverhead 1024 - spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager + spark.shuffle.manager org.apache.uniffle.spark.shuffle.RssShuffleManager spark.sql.shuffle.partitions 20000 spark.sql.files.maxPartitionBytes 107374182 spark.rss.storage.type MEMORY_LOCALFILE diff --git a/docs/client_guide/spark_client_guide.md b/docs/client_guide/spark_client_guide.md index e7d57141ef..93ad7f6544 100644 --- a/docs/client_guide/spark_client_guide.md +++ b/docs/client_guide/spark_client_guide.md @@ -34,7 +34,7 @@ license: | # Uniffle transmits serialized shuffle data over network, therefore a serializer that supports relocation of # serialized object should be used. spark.serializer org.apache.spark.serializer.KryoSerializer # this could also be in the spark-defaults.conf - spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager + spark.shuffle.manager org.apache.uniffle.spark.shuffle.RssShuffleManager spark.rss.coordinator.quorum :19999,:19999 # Note: For Spark2, spark.sql.adaptive.enabled should be false because Spark2 doesn't support AQE. ``` @@ -138,7 +138,7 @@ To select build-in shuffle or remote shuffle in a smart manner, Uniffle support The client should use `DelegationRssShuffleManager` and provide its unique so that the coordinator could distinguish whether it should enable remote shuffle. ``` -spark.shuffle.manager org.apache.spark.shuffle.DelegationRssShuffleManager +spark.shuffle.manager org.apache.uniffle.spark.shuffle.DelegationRssShuffleManager spark.rss.access.id= ``` diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java index b8d31c5b64..c47f76eea1 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java @@ -25,9 +25,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.DelegationRssShuffleManager; -import org.apache.spark.shuffle.RssShuffleManager; -import org.apache.spark.shuffle.RssSparkConfig; import org.apache.spark.shuffle.ShuffleManager; import org.apache.spark.shuffle.sort.SortShuffleManager; import org.junit.jupiter.api.Test; @@ -35,6 +32,9 @@ import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.DelegationRssShuffleManager; +import org.apache.uniffle.spark.shuffle.RssShuffleManager; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; import static java.lang.Thread.sleep; @@ -48,7 +48,8 @@ public class AutoAccessTest extends IntegrationTestBase { @Test public void test() throws Exception { SparkConf sparkConf = new SparkConf(); - sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.DelegationRssShuffleManager"); + sparkConf.set( + "spark.shuffle.manager", "org.apache.uniffle.spark.shuffle.DelegationRssShuffleManager"); sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); sparkConf.set("spark.mock.2", "no-overwrite-conf"); sparkConf.set(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), "overwrite-path"); diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java index 3bf18a5d85..8f72678be3 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java @@ -25,11 +25,11 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssShuffleManager; -import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.Test; import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.spark.shuffle.RssShuffleManager; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -41,7 +41,7 @@ public class DynamicFetchClientConfTest extends IntegrationTestBase { @Test public void test() throws Exception { SparkConf sparkConf = new SparkConf(); - sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); + sparkConf.set("spark.shuffle.manager", "org.apache.uniffle.spark.shuffle.RssShuffleManager"); sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); sparkConf.set("spark.mock.2", "no-overwrite-conf"); sparkConf.set("spark.shuffle.service.enabled", "true"); @@ -94,7 +94,7 @@ public void test() throws Exception { fs.delete(path, true); shutdownServers(); sparkConf = new SparkConf(); - sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); + sparkConf.set("spark.shuffle.manager", "org.apache.uniffle.spark.shuffle.RssShuffleManager"); sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); sparkConf.set("spark.mock.2", "no-overwrite-conf"); sparkConf.set("spark.shuffle.service.enabled", "true"); diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/FailingTasksTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/FailingTasksTest.java index a10382a790..2b2deb47b1 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/FailingTasksTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/FailingTasksTest.java @@ -24,7 +24,6 @@ import com.google.common.collect.Maps; import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.MapPartitionsFunction; -import org.apache.spark.shuffle.RssSparkConfig; import org.apache.spark.sql.Column; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; @@ -34,6 +33,7 @@ import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; // This test has all tasks fail twice, the third attempt succeeds. diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java index a9ae33a42e..7d2cbcf323 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.Maps; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.BeforeAll; @@ -37,6 +36,7 @@ import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.MockedGrpcServer; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; public class RSSStageDynamicServerReWriteTest extends SparkTaskFailureIntegrationTestBase { diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java index 774ba572bb..f787190c75 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.Maps; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.BeforeAll; @@ -34,6 +33,7 @@ import org.apache.uniffle.server.MockedGrpcServer; import org.apache.uniffle.server.ShuffleServer; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; public class RSSStageResubmitTest extends SparkTaskFailureIntegrationTestBase { diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHadoopHybridStorageRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHadoopHybridStorageRssTest.java index 64626237a5..cbe9bbb345 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHadoopHybridStorageRssTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHadoopHybridStorageRssTest.java @@ -25,7 +25,6 @@ import com.google.common.collect.Maps; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; @@ -34,6 +33,7 @@ import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; public class RepartitionWithHadoopHybridStorageRssTest extends RepartitionTest { diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java index 4f30c49974..928a1273cc 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.io.TempDir; @@ -34,6 +33,7 @@ import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE; diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryHybridStorageRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryHybridStorageRssTest.java index 02545b46f7..e900fc0af0 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryHybridStorageRssTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryHybridStorageRssTest.java @@ -23,13 +23,13 @@ import com.google.common.collect.Maps; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; public class RepartitionWithMemoryHybridStorageRssTest extends RepartitionTest { diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java index 3444a1ceef..15f37b0248 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.Maps; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -31,6 +30,7 @@ import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; public class RepartitionWithMemoryRssTest extends RepartitionTest { diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java index 9589293e04..6bbf1a6b37 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java @@ -33,9 +33,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.SparkEnv; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.shuffle.RssSparkConfig; -import org.apache.spark.shuffle.RssSparkShuffleUtils; -import org.apache.spark.shuffle.handle.ShuffleHandleInfo; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -57,6 +54,9 @@ import org.apache.uniffle.common.util.BlockIdLayout; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; +import org.apache.uniffle.spark.shuffle.RssSparkShuffleUtils; +import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java index dff36b63e4..3b2962caf7 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java @@ -27,7 +27,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.shuffle.RssSparkConfig; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -35,6 +34,7 @@ import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertFalse; diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java index fbaabf7038..90e97039ef 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java @@ -27,7 +27,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.shuffle.RssSparkConfig; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -36,6 +35,7 @@ import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertFalse; diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleTestBase.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleTestBase.java index 08084ecdb6..21069ff7c6 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleTestBase.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleTestBase.java @@ -21,12 +21,12 @@ import com.google.common.collect.Maps; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.BeforeAll; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; public abstract class SimpleTestBase extends SparkIntegrationTestBase { diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java index 71395f281f..c6a72192db 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java @@ -24,12 +24,12 @@ import com.google.common.util.concurrent.Uninterruptibles; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.uniffle.common.ClientType; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -112,9 +112,9 @@ protected SparkConf createSparkConf() { } public void updateSparkConfWithRssGrpc(SparkConf sparkConf) { - sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); + sparkConf.set("spark.shuffle.manager", "org.apache.uniffle.spark.shuffle.RssShuffleManager"); sparkConf.set( - "spark.shuffle.sort.io.plugin.class", "org.apache.spark.shuffle.RssShuffleDataIo"); + "spark.shuffle.sort.io.plugin.class", "org.apache.uniffle.spark.shuffle.RssShuffleDataIo"); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key(), "4m"); sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.key(), "32m"); diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java index dda999b37c..0dd1cfe79e 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java @@ -26,13 +26,13 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; public class SparkSQLWithDelegationShuffleManagerFallbackTest extends SparkSQLTest { @@ -75,7 +75,8 @@ public static void setupServers(@TempDir File tmpDir) throws Exception { @Override public void updateRssStorage(SparkConf sparkConf) { sparkConf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "wrong_id"); - sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.DelegationRssShuffleManager"); + sparkConf.set( + "spark.shuffle.manager", "org.apache.uniffle.spark.shuffle.DelegationRssShuffleManager"); } @Override diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerTest.java index 5b8d9f15ee..4ceb163790 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerTest.java @@ -26,13 +26,13 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; public class SparkSQLWithDelegationShuffleManagerTest extends SparkSQLTest { @@ -88,7 +88,8 @@ private static ShuffleServerConf buildShuffleServerConf(ServerType serverType, S @Override public void updateRssStorage(SparkConf sparkConf) { sparkConf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "test_access_id"); - sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.DelegationRssShuffleManager"); + sparkConf.set( + "spark.shuffle.manager", "org.apache.uniffle.spark.shuffle.DelegationRssShuffleManager"); } @Override diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithMemoryLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithMemoryLocalTest.java index 927b7d5a17..6560830295 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithMemoryLocalTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithMemoryLocalTest.java @@ -22,13 +22,13 @@ import com.google.common.collect.Maps; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java index 59e3415694..c545ac4c2d 100644 --- a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java +++ b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java @@ -33,10 +33,6 @@ import org.apache.spark.TaskContextImpl; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.shuffle.RssShuffleHandle; -import org.apache.spark.shuffle.RssShuffleManager; -import org.apache.spark.shuffle.RssSparkConfig; -import org.apache.spark.shuffle.reader.RssShuffleReader; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.Test; @@ -45,6 +41,10 @@ import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssShuffleHandle; +import org.apache.uniffle.spark.shuffle.RssShuffleManager; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; +import org.apache.uniffle.spark.shuffle.reader.RssShuffleReader; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -56,7 +56,7 @@ public class GetReaderTest extends IntegrationTestBase { @Test public void test() throws Exception { SparkConf sparkConf = new SparkConf(); - sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); + sparkConf.set("spark.shuffle.manager", "org.apache.uniffle.spark.shuffle.RssShuffleManager"); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); sparkConf.setMaster("local[4]"); diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java index 5603dbfb37..b5ab908a43 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SparkSession; @@ -37,6 +36,7 @@ import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java index 09860fccd1..957f9b5be7 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java @@ -24,7 +24,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -38,6 +37,7 @@ import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java index aff3ff3e24..1a26e539b2 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java @@ -18,10 +18,10 @@ package org.apache.uniffle.test; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.config.RssClientConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; public class AQESkewedJoinWithLocalOrderTest extends AQESkewedJoinTest { diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java index 24b9d70b58..5bbeda960c 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -46,9 +45,10 @@ import org.apache.uniffle.server.MockedShuffleServerGrpcService; import org.apache.uniffle.server.ShuffleServer; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java index 986416e845..bce1e55fd3 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java @@ -43,10 +43,6 @@ import org.apache.spark.metrics.source.Source; import org.apache.spark.resource.ResourceInformation; import org.apache.spark.shuffle.FetchFailedException; -import org.apache.spark.shuffle.RssShuffleHandle; -import org.apache.spark.shuffle.RssShuffleManager; -import org.apache.spark.shuffle.RssSparkConfig; -import org.apache.spark.shuffle.reader.RssShuffleReader; import org.apache.spark.sql.SparkSession; import org.apache.spark.util.AccumulatorV2; import org.apache.spark.util.TaskCompletionListener; @@ -58,6 +54,10 @@ import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssShuffleHandle; +import org.apache.uniffle.spark.shuffle.RssShuffleManager; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; +import org.apache.uniffle.spark.shuffle.reader.RssShuffleReader; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -69,9 +69,9 @@ public class GetReaderTest extends IntegrationTestBase { @Test public void test() throws Exception { SparkConf sparkConf = new SparkConf(); - sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); + sparkConf.set("spark.shuffle.manager", "org.apache.uniffle.spark.shuffle.RssShuffleManager"); sparkConf.set( - "spark.shuffle.sort.io.plugin.class", "org.apache.spark.shuffle.RssShuffleDataIo"); + "spark.shuffle.sort.io.plugin.class", "org.apache.uniffle.spark.shuffle.RssShuffleDataIo"); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); sparkConf.setMaster("local[4]"); diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java index cd5510e48d..8868a8f5dc 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java @@ -27,9 +27,6 @@ import com.google.common.collect.Maps; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; -import org.apache.spark.shuffle.RssShuffleHandle; -import org.apache.spark.shuffle.RssShuffleManager; -import org.apache.spark.shuffle.RssSparkConfig; import org.apache.spark.shuffle.ShuffleHandle; import org.apache.spark.shuffle.ShuffleReadMetricsReporter; import org.apache.spark.shuffle.ShuffleReader; @@ -54,9 +51,12 @@ import org.apache.uniffle.server.MockedShuffleServerGrpcService; import org.apache.uniffle.server.ShuffleServer; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssShuffleHandle; +import org.apache.uniffle.spark.shuffle.RssShuffleManager; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/MapSideCombineTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/MapSideCombineTest.java index e2aac274ac..eed6b3c4d0 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/MapSideCombineTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/MapSideCombineTest.java @@ -29,7 +29,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.shuffle.RssSparkConfig; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -38,6 +37,7 @@ import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.test.listener.WriteAndReadMetricsSparkListener; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java index 4dd2bab8ea..06f3edcea4 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.Maps; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; @@ -33,6 +32,7 @@ import org.apache.uniffle.server.ShuffleServer; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.buffer.ShuffleBufferManager; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER; diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java index a01b695e33..b7ce24d04d 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.Maps; import org.apache.spark.SparkConf; -import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.io.TempDir; @@ -33,13 +32,14 @@ import org.apache.uniffle.server.ShuffleServer; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.buffer.ShuffleBufferManager; +import org.apache.uniffle.spark.shuffle.RssSparkConfig; import org.apache.uniffle.storage.util.StorageType; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES; import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER; import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX; import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED; import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES; /** This class is to test the partition reassign mechanism of multiple retries. */ public class PartitionBlockDataReassignMultiTimesTest extends PartitionBlockDataReassignBasicTest { diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java index 663a176c87..27232aab82 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java @@ -23,11 +23,11 @@ import org.apache.uniffle.server.ShuffleServer; import org.apache.uniffle.server.buffer.ShuffleBufferManager; -import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES; import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER; import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX; import static org.apache.uniffle.client.util.RssClientConfig.RSS_RESUBMIT_STAGE; import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED; +import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES; /** * This class is to test the compatibility of reassign and stage retry mechanism that were enabled diff --git a/rust/experimental/server/README.md b/rust/experimental/server/README.md index 7c83cc9243..6a0c09b91a 100644 --- a/rust/experimental/server/README.md +++ b/rust/experimental/server/README.md @@ -36,7 +36,7 @@ spark's conf spark.executor.instances 400 spark.executor.cores 1 spark.executor.memory 2g -spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager +spark.shuffle.manager org.apache.uniffle.spark.shuffle.RssShuffleManager spark.rss.storage.type MEMORY_LOCALFILE ```