From 9063e612397a8fbb8362fade32f733595f65bc8d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 10 Feb 2026 08:08:04 -0700 Subject: [PATCH 1/2] Remove prefetch feature, V2 scan dead code, and deprecate BatchReader BatchReader is annotated @IcebergApi and kept for Iceberg compatibility, but Comet's own production code no longer uses it. The prefetch feature was entirely built on BatchReader and is dead code now that the native_iceberg_compat path uses NativeBatchReader. V2 Parquet scan acceleration (CometParquetScan) is also no longer active. This commit: - Marks BatchReader as @Deprecated (since 0.14.0) - Removes all prefetch internals from BatchReader (fields, methods, inner class) - Removes COMET_SCAN_PREFETCH_ENABLED and COMET_SCAN_PREFETCH_THREAD_NUM configs - Removes CometPrefetchThreadPool - Deletes CometParquetPartitionReaderFactory and CometParquetScan - Simplifies CometScanExec.prepareRDD to always use newFileScanRDD - Removes dead BatchReader code path from CometParquetFileFormat - Cleans up EliminateRedundantTransitions V2 dead code path - Removes prefetch tests, BatchReader-only tests, and BatchReader benchmark case - Cleans up CometParquetScan references in tests Co-Authored-By: Claude Opus 4.6 --- .../org/apache/comet/parquet/BatchReader.java | 131 +--------- .../parquet/IcebergCometBatchReader.java | 2 + .../scala/org/apache/comet/CometConf.scala | 17 -- .../comet/parquet/CometReaderThreadPool.scala | 5 - .../parquet/CometParquetFileFormat.scala | 119 +++------ .../CometParquetPartitionReaderFactory.scala | 233 ------------------ .../comet/parquet/CometParquetScan.scala | 94 ------- .../rules/EliminateRedundantTransitions.scala | 5 +- .../spark/sql/comet/CometNativeScanExec.scala | 4 +- .../spark/sql/comet/CometScanExec.scala | 49 +--- .../comet/parquet/ParquetReadSuite.scala | 208 +--------------- .../comet/rules/CometScanRuleSuite.scala | 6 - .../sql/benchmark/CometReadBenchmark.scala | 24 -- 13 files changed, 56 insertions(+), 841 deletions(-) delete mode 100644 spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala delete mode 100644 spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index d591454596..5a0bc9f6d3 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -24,10 +24,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import scala.Option; @@ -36,9 +32,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; @@ -87,7 +81,10 @@ * reader.close(); * } * + * + * @deprecated since 0.14.0. This class is kept for Iceberg compatibility only. */ +@Deprecated @IcebergApi public class BatchReader extends RecordReader implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(FileReader.class); @@ -110,8 +107,6 @@ public class BatchReader extends RecordReader implements Cl protected AbstractColumnReader[] columnReaders; private CometSchemaImporter importer; protected ColumnarBatch currentBatch; - private Future> prefetchTask; - private LinkedBlockingQueue> prefetchQueue; private FileReader fileReader; private boolean[] missingColumns; protected boolean isInitialized; @@ -363,26 +358,7 @@ public void init() throws URISyntaxException, IOException { } } - // Pre-fetching - boolean preFetchEnabled = - conf.getBoolean( - CometConf.COMET_SCAN_PREFETCH_ENABLED().key(), - (boolean) CometConf.COMET_SCAN_PREFETCH_ENABLED().defaultValue().get()); - - if (preFetchEnabled) { - LOG.info("Prefetch enabled for BatchReader."); - this.prefetchQueue = new LinkedBlockingQueue<>(); - } - isInitialized = true; - synchronized (this) { - // if prefetch is enabled, `init()` is called in separate thread. When - // `BatchReader.nextBatch()` is called asynchronously, it is possibly that - // `init()` is not called or finished. We need to hold on `nextBatch` until - // initialization of `BatchReader` is done. Once we are close to finish - // initialization, we notify the waiting thread of `nextBatch` to continue. - notifyAll(); - } } /** @@ -436,51 +412,13 @@ public ColumnarBatch currentBatch() { return currentBatch; } - // Only for testing - public Future> getPrefetchTask() { - return this.prefetchTask; - } - - // Only for testing - public LinkedBlockingQueue> getPrefetchQueue() { - return this.prefetchQueue; - } - /** * Loads the next batch of rows. * * @return true if there are no more rows to read, false otherwise. */ public boolean nextBatch() throws IOException { - if (this.prefetchTask == null) { - Preconditions.checkState(isInitialized, "init() should be called first!"); - } else { - // If prefetch is enabled, this reader will be initialized asynchronously from a - // different thread. Wait until it is initialized - while (!isInitialized) { - synchronized (this) { - try { - // Wait until initialization of current `BatchReader` is finished (i.e., `init()`), - // is done. It is possibly that `init()` is done after entering this while loop, - // so a short timeout is given. - wait(100); - - // Checks if prefetch task is finished. If so, tries to get exception if any. - if (prefetchTask.isDone()) { - Option exception = prefetchTask.get(); - if (exception.isDefined()) { - throw exception.get(); - } - } - } catch (RuntimeException e) { - // Spark will check certain exception e.g. `SchemaColumnConvertNotSupportedException`. - throw e; - } catch (Throwable e) { - throw new IOException(e); - } - } - } - } + Preconditions.checkState(isInitialized, "init() should be called first!"); if (rowsRead >= totalRowCount) return false; boolean hasMore; @@ -547,7 +485,6 @@ public void close() throws IOException { } } - @SuppressWarnings("deprecation") private boolean loadNextRowGroupIfNecessary() throws Throwable { // More rows can be read from loaded row group. No need to load next one. if (rowsRead != totalRowsLoaded) return true; @@ -556,21 +493,7 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable { SQLMetric numRowGroupsMetric = metrics.get("ParquetRowGroups"); long startNs = System.nanoTime(); - PageReadStore rowGroupReader = null; - if (prefetchTask != null && prefetchQueue != null) { - // Wait for pre-fetch task to finish. - Pair rowGroupReaderPair = prefetchQueue.take(); - rowGroupReader = rowGroupReaderPair.getLeft(); - - // Update incremental byte read metric. Because this metric in Spark is maintained - // by thread local variable, we need to manually update it. - // TODO: We may expose metrics from `FileReader` and get from it directly. - long incBytesRead = rowGroupReaderPair.getRight(); - FileSystem.getAllStatistics().stream() - .forEach(statistic -> statistic.incrementBytesRead(incBytesRead)); - } else { - rowGroupReader = fileReader.readNextRowGroup(); - } + PageReadStore rowGroupReader = fileReader.readNextRowGroup(); if (rowGroupTimeMetric != null) { rowGroupTimeMetric.add(System.nanoTime() - startNs); @@ -608,48 +531,4 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable { totalRowsLoaded += rowGroupReader.getRowCount(); return true; } - - // Submits a prefetch task for this reader. - public void submitPrefetchTask(ExecutorService threadPool) { - this.prefetchTask = threadPool.submit(new PrefetchTask()); - } - - // A task for prefetching parquet row groups. - private class PrefetchTask implements Callable> { - private long getBytesRead() { - return FileSystem.getAllStatistics().stream() - .mapToLong(s -> s.getThreadStatistics().getBytesRead()) - .sum(); - } - - @Override - public Option call() throws Exception { - // Gets the bytes read so far. - long baseline = getBytesRead(); - - try { - init(); - - while (true) { - PageReadStore rowGroupReader = fileReader.readNextRowGroup(); - - if (rowGroupReader == null) { - // Reaches the end of row groups. - return Option.empty(); - } else { - long incBytesRead = getBytesRead() - baseline; - - prefetchQueue.add(Pair.of(rowGroupReader, incBytesRead)); - } - } - } catch (Throwable e) { - // Returns exception thrown from the reader. The reader will re-throw it. - return Option.apply(e); - } finally { - if (fileReader != null) { - fileReader.closeStream(); - } - } - } - } } diff --git a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java index d4bfa2b878..bd66f2deab 100644 --- a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java @@ -24,9 +24,11 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.comet.IcebergApi; import org.apache.comet.vector.CometVector; /** This class is a public interface used by Apache Iceberg to read batches using Comet */ +@IcebergApi public class IcebergCometBatchReader extends BatchReader { public IcebergCometBatchReader(int numColumns, StructType schema) { this.columnReaders = new AbstractColumnReader[numColumns]; diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 522ccbc94c..fc209ec462 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -651,23 +651,6 @@ object CometConf extends ShimCometConf { .doubleConf .createWithDefault(1.0) - val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] = - conf("spark.comet.scan.preFetch.enabled") - .category(CATEGORY_SCAN) - .doc("Whether to enable pre-fetching feature of CometScan.") - .booleanConf - .createWithDefault(false) - - val COMET_SCAN_PREFETCH_THREAD_NUM: ConfigEntry[Int] = - conf("spark.comet.scan.preFetch.threadNum") - .category(CATEGORY_SCAN) - .doc( - "The number of threads running pre-fetching for CometScan. Effective if " + - s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. Note that more " + - "pre-fetching threads means more memory requirement to store pre-fetched row groups.") - .intConf - .createWithDefault(2) - val COMET_NATIVE_LOAD_REQUIRED: ConfigEntry[Boolean] = conf("spark.comet.nativeLoadRequired") .category(CATEGORY_EXEC) .doc( diff --git a/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala b/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala index ca13bba0c4..1759ea2765 100644 --- a/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala +++ b/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala @@ -54,11 +54,6 @@ abstract class CometReaderThreadPool { } -// A thread pool used for pre-fetching files. -object CometPrefetchThreadPool extends CometReaderThreadPool { - override def threadNamePrefix: String = "prefetch_thread" -} - // Thread pool used by the Parquet parallel reader object CometFileReaderThreadPool extends CometReaderThreadPool { override def threadNamePrefix: String = "file_reader_thread" diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala index e07d16d4dd..7874f3774d 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala @@ -57,7 +57,7 @@ import org.apache.comet.vector.CometVector * in [[org.apache.comet.CometSparkSessionExtensions]] * - `buildReaderWithPartitionValues`, so Spark calls Comet's Parquet reader to read values. */ -class CometParquetFileFormat(session: SparkSession, scanImpl: String) +class CometParquetFileFormat(session: SparkSession) extends ParquetFileFormat with MetricsSupport with ShimSQLConf { @@ -110,8 +110,6 @@ class CometParquetFileFormat(session: SparkSession, scanImpl: String) // Comet specific configurations val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf) - val nativeIcebergCompat = scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT - (file: PartitionedFile) => { val sharedConf = broadcastedHadoopConf.value.value val footer = FooterReader.readFooter(sharedConf, file) @@ -135,85 +133,42 @@ class CometParquetFileFormat(session: SparkSession, scanImpl: String) isCaseSensitive, datetimeRebaseSpec) - val recordBatchReader = - if (nativeIcebergCompat) { - // We still need the predicate in the conf to allow us to generate row indexes based on - // the actual row groups read - val pushed = if (parquetFilterPushDown) { - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates - // can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why - // a `flatMap` is used here. - .flatMap(parquetFilters.createFilter) - .reduceOption(FilterApi.and) - } else { - None - } - pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p)) - val pushedNative = if (parquetFilterPushDown) { - parquetFilters.createNativeFilters(filters) - } else { - None - } - val batchReader = new NativeBatchReader( - sharedConf, - file, - footer, - pushedNative.orNull, - capacity, - requiredSchema, - dataSchema, - isCaseSensitive, - useFieldId, - ignoreMissingIds, - datetimeRebaseSpec.mode == CORRECTED, - partitionSchema, - file.partitionValues, - metrics.asJava, - CometMetricNode(metrics)) - try { - batchReader.init() - } catch { - case e: Throwable => - batchReader.close() - throw e - } - batchReader - } else { - val pushed = if (parquetFilterPushDown) { - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates - // can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why - // a `flatMap` is used here. - .flatMap(parquetFilters.createFilter) - .reduceOption(FilterApi.and) - } else { - None - } - pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p)) - - val batchReader = new BatchReader( - sharedConf, - file, - footer, - capacity, - requiredSchema, - isCaseSensitive, - useFieldId, - ignoreMissingIds, - datetimeRebaseSpec.mode == CORRECTED, - partitionSchema, - file.partitionValues, - metrics.asJava) - try { - batchReader.init() - } catch { - case e: Throwable => - batchReader.close() - throw e - } - batchReader - } + val pushed = if (parquetFilterPushDown) { + filters + .flatMap(parquetFilters.createFilter) + .reduceOption(FilterApi.and) + } else { + None + } + pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p)) + val pushedNative = if (parquetFilterPushDown) { + parquetFilters.createNativeFilters(filters) + } else { + None + } + val recordBatchReader = new NativeBatchReader( + sharedConf, + file, + footer, + pushedNative.orNull, + capacity, + requiredSchema, + dataSchema, + isCaseSensitive, + useFieldId, + ignoreMissingIds, + datetimeRebaseSpec.mode == CORRECTED, + partitionSchema, + file.partitionValues, + metrics.asJava, + CometMetricNode(metrics)) + try { + recordBatchReader.init() + } catch { + case e: Throwable => + recordBatchReader.close() + throw e + } val iter = new RecordReaderIterator(recordBatchReader) try { iter.asInstanceOf[Iterator[InternalRow]] diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala deleted file mode 100644 index 495054fc81..0000000000 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.parquet - -import scala.collection.mutable -import scala.jdk.CollectionConverters._ - -import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} -import org.apache.parquet.hadoop.ParquetInputFormat -import org.apache.parquet.hadoop.metadata.ParquetMetadata -import org.apache.spark.TaskContext -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec -import org.apache.spark.sql.connector.read.InputPartition -import org.apache.spark.sql.connector.read.PartitionReader -import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} -import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions -import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory -import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration - -import org.apache.comet.{CometConf, CometRuntimeException} -import org.apache.comet.shims.ShimSQLConf - -case class CometParquetPartitionReaderFactory( - usingDataFusionReader: Boolean, - @transient sqlConf: SQLConf, - broadcastedConf: Broadcast[SerializableConfiguration], - readDataSchema: StructType, - partitionSchema: StructType, - filters: Array[Filter], - options: ParquetOptions, - metrics: Map[String, SQLMetric]) - extends FilePartitionReaderFactory - with ShimSQLConf - with Logging { - - private val isCaseSensitive = sqlConf.caseSensitiveAnalysis - private val useFieldId = CometParquetUtils.readFieldId(sqlConf) - private val ignoreMissingIds = CometParquetUtils.ignoreMissingIds(sqlConf) - private val pushDownDate = sqlConf.parquetFilterPushDownDate - private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp - private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal - private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate - private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold - private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead - private val parquetFilterPushDown = sqlConf.parquetFilterPushDown - - // Comet specific configurations - private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf) - - // This is only called at executor on a Broadcast variable, so we don't want it to be - // materialized at driver. - @transient private lazy val preFetchEnabled = { - val conf = broadcastedConf.value.value - - conf.getBoolean( - CometConf.COMET_SCAN_PREFETCH_ENABLED.key, - CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) && - !usingDataFusionReader // Turn off prefetch if native_iceberg_compat is enabled - } - - private var cometReaders: Iterator[BatchReader] = _ - private val cometReaderExceptionMap = new mutable.HashMap[PartitionedFile, Throwable]() - - // TODO: we may want to revisit this as we're going to only support flat types at the beginning - override def supportColumnarReads(partition: InputPartition): Boolean = true - - override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = { - if (preFetchEnabled) { - val filePartition = partition.asInstanceOf[FilePartition] - val conf = broadcastedConf.value.value - - val threadNum = conf.getInt( - CometConf.COMET_SCAN_PREFETCH_THREAD_NUM.key, - CometConf.COMET_SCAN_PREFETCH_THREAD_NUM.defaultValue.get) - val prefetchThreadPool = CometPrefetchThreadPool.getOrCreateThreadPool(threadNum) - - this.cometReaders = filePartition.files - .map { file => - // `init()` call is deferred to when the prefetch task begins. - // Otherwise we will hold too many resources for readers which are not ready - // to prefetch. - val cometReader = buildCometReader(file) - if (cometReader != null) { - cometReader.submitPrefetchTask(prefetchThreadPool) - } - - cometReader - } - .toSeq - .toIterator - } - - super.createColumnarReader(partition) - } - - override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = - throw new UnsupportedOperationException("Comet doesn't support 'buildReader'") - - private def buildCometReader(file: PartitionedFile): BatchReader = { - val conf = broadcastedConf.value.value - - try { - val (datetimeRebaseSpec, footer, filters) = getFilter(file) - filters.foreach(pushed => ParquetInputFormat.setFilterPredicate(conf, pushed)) - val cometReader = new BatchReader( - conf, - file, - footer, - batchSize, - readDataSchema, - isCaseSensitive, - useFieldId, - ignoreMissingIds, - datetimeRebaseSpec.mode == CORRECTED, - partitionSchema, - file.partitionValues, - metrics.asJava) - val taskContext = Option(TaskContext.get) - taskContext.foreach(_.addTaskCompletionListener[Unit](_ => cometReader.close())) - return cometReader - } catch { - case e: Throwable if preFetchEnabled => - // Keep original exception - cometReaderExceptionMap.put(file, e) - } - null - } - - override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = { - val cometReader = if (!preFetchEnabled) { - // Prefetch is not enabled, create comet reader and initiate it. - val cometReader = buildCometReader(file) - cometReader.init() - - cometReader - } else { - // If prefetch is enabled, we already tried to access the file when in `buildCometReader`. - // It is possibly we got an exception like `FileNotFoundException` and we need to throw it - // now to let Spark handle it. - val reader = cometReaders.next() - val exception = cometReaderExceptionMap.get(file) - exception.foreach(e => throw e) - - if (reader == null) { - throw new CometRuntimeException(s"Cannot find comet file reader for $file") - } - reader - } - CometPartitionReader(cometReader) - } - - def getFilter(file: PartitionedFile): (RebaseSpec, ParquetMetadata, Option[FilterPredicate]) = { - val sharedConf = broadcastedConf.value.value - val footer = FooterReader.readFooter(sharedConf, file) - val footerFileMetaData = footer.getFileMetaData - val datetimeRebaseSpec = CometParquetFileFormat.getDatetimeRebaseSpec( - file, - readDataSchema, - sharedConf, - footerFileMetaData, - datetimeRebaseModeInRead) - - val pushed = if (parquetFilterPushDown) { - val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters( - parquetSchema, - readDataSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringPredicate, - pushDownInFilterThreshold, - isCaseSensitive, - datetimeRebaseSpec) - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(parquetFilters.createFilter) - .reduceOption(FilterApi.and) - } else { - None - } - (datetimeRebaseSpec, footer, pushed) - } - - override def createReader(inputPartition: InputPartition): PartitionReader[InternalRow] = - throw new UnsupportedOperationException("Only 'createColumnarReader' is supported.") - - /** - * A simple adapter on Comet's [[BatchReader]]. - */ - protected case class CometPartitionReader(reader: BatchReader) - extends PartitionReader[ColumnarBatch] { - - override def next(): Boolean = { - reader.nextBatch() - } - - override def get(): ColumnarBatch = { - reader.currentBatch() - } - - override def close(): Unit = { - reader.close() - } - } -} diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala deleted file mode 100644 index 3f50255761..0000000000 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.parquet - -import scala.jdk.CollectionConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.comet.CometMetricNode -import org.apache.spark.sql.connector.read.PartitionReaderFactory -import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions -import org.apache.spark.sql.execution.datasources.v2.FileScan -import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap -import org.apache.spark.util.SerializableConfiguration - -import org.apache.comet.MetricsSupport - -// TODO: Consider creating a case class and patch SQL tests if needed, will make life easier. -// currently hacking around this by setting the metrics within the object's apply method. -trait CometParquetScan extends FileScan with MetricsSupport { - def sparkSession: SparkSession - def hadoopConf: Configuration - def readDataSchema: StructType - def readPartitionSchema: StructType - def pushedFilters: Array[Filter] - def options: CaseInsensitiveStringMap - - override def equals(obj: Any): Boolean = obj match { - case other: CometParquetScan => - super.equals(other) && readDataSchema == other.readDataSchema && - readPartitionSchema == other.readPartitionSchema && - equivalentFilters(pushedFilters, other.pushedFilters) - case _ => false - } - - override def hashCode(): Int = getClass.hashCode() - - override def createReaderFactory(): PartitionReaderFactory = { - val sqlConf = sparkSession.sessionState.conf - CometParquetFileFormat.populateConf(sqlConf, hadoopConf) - val broadcastedConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - CometParquetPartitionReaderFactory( - usingDataFusionReader = false, // this value is not used since this is v2 scan - sqlConf, - broadcastedConf, - readDataSchema, - readPartitionSchema, - pushedFilters, - new ParquetOptions(options.asScala.toMap, sqlConf), - metrics) - } -} - -object CometParquetScan { - def apply(session: SparkSession, scan: ParquetScan): CometParquetScan = { - val newScan = new ParquetScan( - scan.sparkSession, - scan.hadoopConf, - scan.fileIndex, - scan.dataSchema, - scan.readDataSchema, - scan.readPartitionSchema, - scan.pushedFilters, - scan.options, - partitionFilters = scan.partitionFilters, - dataFilters = scan.dataFilters) with CometParquetScan - - newScan.metrics = CometMetricNode.nativeScanMetrics(session.sparkContext) ++ CometMetricNode - .parquetScanMetrics(session.sparkContext) - - newScan - } -} diff --git a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala index ec33363525..ce57624b75 100644 --- a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala +++ b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala @@ -22,14 +22,13 @@ package org.apache.comet.rules import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.sideBySide -import org.apache.spark.sql.comet.{CometBatchScanExec, CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometScanExec, CometSparkToColumnarExec} +import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometScanExec, CometSparkToColumnarExec} import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.QueryStageExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.comet.CometConf -import org.apache.comet.parquet.CometParquetScan // This rule is responsible for eliminating redundant transitions between row-based and // columnar-based operators for Comet. Currently, three potential redundant transitions are: @@ -157,7 +156,6 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa * This includes: * - CometScanExec with native_iceberg_compat and partition columns - uses * ConstantColumnReader - * - CometBatchScanExec with CometParquetScan (V2 Parquet path) - uses BatchReader */ private def hasScanUsingMutableBuffers(op: SparkPlan): Boolean = { op match { @@ -168,7 +166,6 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa case scan: CometScanExec => scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT && scan.relation.partitionSchema.nonEmpty - case scan: CometBatchScanExec => scan.scan.isInstanceOf[CometParquetScan] case _ => false } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index 841bc21aa2..17c31c1485 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -36,7 +36,6 @@ import org.apache.spark.util.collection._ import com.google.common.base.Objects -import org.apache.comet.CometConf import org.apache.comet.parquet.CometParquetFileFormat import org.apache.comet.serde.OperatorOuterClass.Operator @@ -141,8 +140,7 @@ object CometNativeScanExec { // https://github.com/apache/arrow-datafusion-comet/issues/190 def transform(arg: Any): AnyRef = arg match { case _: HadoopFsRelation => - scanExec.relation.copy(fileFormat = - new CometParquetFileFormat(session, CometConf.SCAN_NATIVE_DATAFUSION))(session) + scanExec.relation.copy(fileFormat = new CometParquetFileFormat(session))(session) case other: AnyRef => other case null => null } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index e283f6b2cf..2707f0c040 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -37,15 +37,13 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} -import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD import org.apache.spark.sql.execution.metric._ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.collection._ import org.apache.comet.{CometConf, MetricsSupport} -import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetPartitionReaderFactory} +import org.apache.comet.parquet.CometParquetFileFormat /** * Comet physical scan node for DataSource V1. Most of the code here follow Spark's @@ -476,43 +474,13 @@ case class CometScanExec( fsRelation: HadoopFsRelation, readFile: (PartitionedFile) => Iterator[InternalRow], partitions: Seq[FilePartition]): RDD[InternalRow] = { - val hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options) - val usingDataFusionReader: Boolean = scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT - - val prefetchEnabled = hadoopConf.getBoolean( - CometConf.COMET_SCAN_PREFETCH_ENABLED.key, - CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) && - !usingDataFusionReader - val sqlConf = fsRelation.sparkSession.sessionState.conf - if (prefetchEnabled) { - CometParquetFileFormat.populateConf(sqlConf, hadoopConf) - val broadcastedConf = - fsRelation.sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val partitionReaderFactory = CometParquetPartitionReaderFactory( - scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT, - sqlConf, - broadcastedConf, - requiredSchema, - relation.partitionSchema, - pushedDownFilters.toArray, - new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf), - metrics) - - new DataSourceRDD( - fsRelation.sparkSession.sparkContext, - partitions.map(Seq(_)), - partitionReaderFactory, - true, - Map.empty) - } else { - newFileScanRDD( - fsRelation, - readFile, - partitions, - new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), - new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf)) - } + newFileScanRDD( + fsRelation, + readFile, + partitions, + new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), + new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf)) } override def doCanonicalize(): CometScanExec = { @@ -556,8 +524,7 @@ object CometScanExec { // https://github.com/apache/arrow-datafusion-comet/issues/190 def transform(arg: Any): AnyRef = arg match { case _: HadoopFsRelation => - scanExec.relation.copy(fileFormat = new CometParquetFileFormat(session, scanImpl))( - session) + scanExec.relation.copy(fileFormat = new CometParquetFileFormat(session))(session) case other: AnyRef => other case null => null } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 928e66b29b..1495eb34ef 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -19,7 +19,7 @@ package org.apache.comet.parquet -import java.io.{File, FileFilter} +import java.io.File import java.math.{BigDecimal, BigInteger} import java.time.{ZoneId, ZoneOffset} @@ -31,20 +31,17 @@ import scala.util.control.Breaks.breakable import org.scalactic.source.Position import org.scalatest.Tag -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.example.data.simple.SimpleGroup import org.apache.parquet.schema.MessageTypeParser import org.apache.spark.SparkException import org.apache.spark.sql.{CometTestBase, DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String import com.google.common.primitives.UnsignedLong @@ -703,76 +700,6 @@ abstract class ParquetReadSuite extends CometTestBase { } } - test("partition column types") { - withTempPath { dir => - Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath) - - val dataTypes = - Seq( - StringType, - BooleanType, - ByteType, - BinaryType, - ShortType, - IntegerType, - LongType, - FloatType, - DoubleType, - DecimalType(25, 5), - DateType, - TimestampType) - - // TODO: support `NullType` here, after we add the support in `ColumnarBatchRow` - val constantValues = - Seq( - UTF8String.fromString("a string"), - true, - 1.toByte, - "Spark SQL".getBytes, - 2.toShort, - 3, - Long.MaxValue, - 0.25.toFloat, - 0.75d, - Decimal("1234.23456"), - DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")), - DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"))) - - dataTypes.zip(constantValues).foreach { case (dt, v) => - val schema = StructType(StructField("pcol", dt) :: Nil) - val conf = SQLConf.get - val partitionValues = new GenericInternalRow(Array(v)) - val file = dir - .listFiles(new FileFilter { - override def accept(pathname: File): Boolean = - pathname.isFile && pathname.toString.endsWith("parquet") - }) - .head - val reader = new BatchReader( - file.toString, - CometConf.COMET_BATCH_SIZE.get(conf), - schema, - partitionValues) - reader.init() - - try { - reader.nextBatch() - val batch = reader.currentBatch() - val actual = batch.getRow(0).get(1, dt) - val expected = v - if (dt.isInstanceOf[BinaryType]) { - assert( - actual.asInstanceOf[Array[Byte]] sameElements expected.asInstanceOf[Array[Byte]]) - } else { - assert(actual == expected) - } - } finally { - reader.close() - } - } - } - } - test("partition columns - multiple batch") { withSQLConf( CometConf.COMET_BATCH_SIZE.key -> 7.toString, @@ -1535,116 +1462,6 @@ abstract class ParquetReadSuite extends CometTestBase { } } - test("test pre-fetching multiple files") { - def makeRawParquetFile( - path: Path, - dictionaryEnabled: Boolean, - n: Int, - pageSize: Int): Seq[Option[Int]] = { - val schemaStr = - """ - |message root { - | optional boolean _1; - | optional int32 _2(INT_8); - | optional int32 _3(INT_16); - | optional int32 _4; - | optional int64 _5; - | optional float _6; - | optional double _7; - | optional binary _8(UTF8); - | optional int32 _9(UINT_8); - | optional int32 _10(UINT_16); - | optional int32 _11(UINT_32); - | optional int64 _12(UINT_64); - | optional binary _13(ENUM); - |} - """.stripMargin - - val schema = MessageTypeParser.parseMessageType(schemaStr) - val writer = createParquetWriter( - schema, - path, - dictionaryEnabled = dictionaryEnabled, - pageSize = pageSize, - dictionaryPageSize = pageSize) - - val rand = new scala.util.Random(42) - val expected = (0 until n).map { i => - if (rand.nextBoolean()) { - None - } else { - Some(i) - } - } - expected.foreach { opt => - val record = new SimpleGroup(schema) - opt match { - case Some(i) => - record.add(0, i % 2 == 0) - record.add(1, i.toByte) - record.add(2, i.toShort) - record.add(3, i) - record.add(4, i.toLong) - record.add(5, i.toFloat) - record.add(6, i.toDouble) - record.add(7, i.toString * 48) - record.add(8, (-i).toByte) - record.add(9, (-i).toShort) - record.add(10, -i) - record.add(11, (-i).toLong) - record.add(12, i.toString) - case _ => - } - writer.write(record) - } - - writer.close() - expected - } - - val conf = new Configuration() - conf.set("spark.comet.scan.preFetch.enabled", "true"); - conf.set("spark.comet.scan.preFetch.threadNum", "4"); - - withTempDir { dir => - val threadPool = CometPrefetchThreadPool.getOrCreateThreadPool(2) - - val readers = (0 to 10).map { idx => - val path = new Path(dir.toURI.toString, s"part-r-$idx.parquet") - makeRawParquetFile(path, dictionaryEnabled = false, 10000, 500) - - val reader = new BatchReader(conf, path.toString, 1000, null, null) - reader.submitPrefetchTask(threadPool) - - reader - } - - // Wait for all pre-fetch tasks - readers.foreach { reader => - val task = reader.getPrefetchTask() - task.get() - } - - val totolRows = readers.map { reader => - val queue = reader.getPrefetchQueue() - var rowCount = 0L - - while (!queue.isEmpty) { - val rowGroup = queue.take().getLeft - rowCount += rowGroup.getRowCount - } - - reader.close() - - rowCount - }.sum - - readParquetFile(dir.toString) { df => - assert(df.count() == totolRows) - } - } - } - test("test merge scan range") { def makeRawParquetFile(path: Path, n: Int): Seq[Option[Int]] = { val dictionaryPageSize = 1024 @@ -1753,23 +1570,6 @@ abstract class ParquetReadSuite extends CometTestBase { } } - override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit - pos: Position): Unit = { - Seq(true, false).foreach { prefetch => - val cometTestName = if (prefetch) { - testName + " (prefetch enabled)" - } else { - testName - } - - super.test(cometTestName, testTags: _*) { - withSQLConf(CometConf.COMET_SCAN_PREFETCH_ENABLED.key -> prefetch.toString) { - testFun - } - } - } - } - private def withId(id: Int) = new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build() @@ -2036,11 +1836,7 @@ class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { val scans = collect(r.filter(f).queryExecution.executedPlan) { case p: CometBatchScanExec => p.scan } - if (CometConf.COMET_ENABLED.get()) { - assert(scans.nonEmpty && scans.forall(_.isInstanceOf[CometParquetScan])) - } else { - assert(!scans.exists(_.isInstanceOf[CometParquetScan])) - } + assert(scans.isEmpty) } } diff --git a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala index a349ab2b93..18dec68171 100644 --- a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.comet.CometConf -import org.apache.comet.parquet.CometParquetScan import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} /** @@ -127,11 +126,6 @@ class CometScanRuleSuite extends CometTestBase { if (cometEnabled) { assert(countOperators(transformedPlan, classOf[BatchScanExec]) == 0) assert(countOperators(transformedPlan, classOf[CometBatchScanExec]) == 1) - - // CometScanRule should have replaced the underlying scan - val scan = transformedPlan.collect { case scan: CometBatchScanExec => scan }.head - assert(scan.wrapped.scan.isInstanceOf[CometParquetScan]) - } else { assert(countOperators(transformedPlan, classOf[BatchScanExec]) == 1) assert(countOperators(transformedPlan, classOf[CometBatchScanExec]) == 0) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala index a2f196a4fc..5b0371b277 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnVector import org.apache.comet.{CometConf, WithHdfsCluster} -import org.apache.comet.parquet.BatchReader /** * Benchmark to measure Comet read performance. To run this benchmark: @@ -179,29 +178,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase { } } - sqlBenchmark.addCase("ParquetReader Comet") { _ => - files.map(_.asInstanceOf[String]).foreach { p => - val reader = new BatchReader(p, vectorizedReaderBatchSize) - reader.init() - try { - var totalNumRows = 0 - while (reader.nextBatch()) { - val batch = reader.currentBatch() - val column = batch.column(0) - val numRows = batch.numRows() - var i = 0 - while (i < numRows) { - if (!column.isNullAt(i)) aggregateValue(column, i) - i += 1 - } - totalNumRows += batch.numRows() - } - } finally { - reader.close() - } - } - } - sqlBenchmark.run() } } From c565e4e3beaf68c5bc2d7f2425d6876130f92adc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 11 Feb 2026 13:15:19 -0700 Subject: [PATCH 2/2] trigger CI