Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 5 additions & 126 deletions common/src/main/java/org/apache/comet/parquet/BatchReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

import scala.Option;

Expand All @@ -36,9 +32,7 @@

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
Expand Down Expand Up @@ -87,7 +81,10 @@
* reader.close();
* }
* </pre>
*
* @deprecated since 0.14.0. This class is kept for Iceberg compatibility only.
*/
@Deprecated
@IcebergApi
public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
Expand All @@ -110,8 +107,6 @@ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Cl
protected AbstractColumnReader[] columnReaders;
private CometSchemaImporter importer;
protected ColumnarBatch currentBatch;
private Future<Option<Throwable>> prefetchTask;
private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue;
private FileReader fileReader;
private boolean[] missingColumns;
protected boolean isInitialized;
Expand Down Expand Up @@ -363,26 +358,7 @@ public void init() throws URISyntaxException, IOException {
}
}

// Pre-fetching
boolean preFetchEnabled =
conf.getBoolean(
CometConf.COMET_SCAN_PREFETCH_ENABLED().key(),
(boolean) CometConf.COMET_SCAN_PREFETCH_ENABLED().defaultValue().get());

if (preFetchEnabled) {
LOG.info("Prefetch enabled for BatchReader.");
this.prefetchQueue = new LinkedBlockingQueue<>();
}

isInitialized = true;
synchronized (this) {
// if prefetch is enabled, `init()` is called in separate thread. When
// `BatchReader.nextBatch()` is called asynchronously, it is possibly that
// `init()` is not called or finished. We need to hold on `nextBatch` until
// initialization of `BatchReader` is done. Once we are close to finish
// initialization, we notify the waiting thread of `nextBatch` to continue.
notifyAll();
}
}

/**
Expand Down Expand Up @@ -436,51 +412,13 @@ public ColumnarBatch currentBatch() {
return currentBatch;
}

// Only for testing
public Future<Option<Throwable>> getPrefetchTask() {
return this.prefetchTask;
}

// Only for testing
public LinkedBlockingQueue<Pair<PageReadStore, Long>> getPrefetchQueue() {
return this.prefetchQueue;
}

/**
* Loads the next batch of rows.
*
* @return true if there are no more rows to read, false otherwise.
*/
public boolean nextBatch() throws IOException {
if (this.prefetchTask == null) {
Preconditions.checkState(isInitialized, "init() should be called first!");
} else {
// If prefetch is enabled, this reader will be initialized asynchronously from a
// different thread. Wait until it is initialized
while (!isInitialized) {
synchronized (this) {
try {
// Wait until initialization of current `BatchReader` is finished (i.e., `init()`),
// is done. It is possibly that `init()` is done after entering this while loop,
// so a short timeout is given.
wait(100);

// Checks if prefetch task is finished. If so, tries to get exception if any.
if (prefetchTask.isDone()) {
Option<Throwable> exception = prefetchTask.get();
if (exception.isDefined()) {
throw exception.get();
}
}
} catch (RuntimeException e) {
// Spark will check certain exception e.g. `SchemaColumnConvertNotSupportedException`.
throw e;
} catch (Throwable e) {
throw new IOException(e);
}
}
}
}
Preconditions.checkState(isInitialized, "init() should be called first!");

if (rowsRead >= totalRowCount) return false;
boolean hasMore;
Expand Down Expand Up @@ -547,7 +485,6 @@ public void close() throws IOException {
}
}

@SuppressWarnings("deprecation")
private boolean loadNextRowGroupIfNecessary() throws Throwable {
// More rows can be read from loaded row group. No need to load next one.
if (rowsRead != totalRowsLoaded) return true;
Expand All @@ -556,21 +493,7 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
SQLMetric numRowGroupsMetric = metrics.get("ParquetRowGroups");
long startNs = System.nanoTime();

PageReadStore rowGroupReader = null;
if (prefetchTask != null && prefetchQueue != null) {
// Wait for pre-fetch task to finish.
Pair<PageReadStore, Long> rowGroupReaderPair = prefetchQueue.take();
rowGroupReader = rowGroupReaderPair.getLeft();

// Update incremental byte read metric. Because this metric in Spark is maintained
// by thread local variable, we need to manually update it.
// TODO: We may expose metrics from `FileReader` and get from it directly.
long incBytesRead = rowGroupReaderPair.getRight();
FileSystem.getAllStatistics().stream()
.forEach(statistic -> statistic.incrementBytesRead(incBytesRead));
} else {
rowGroupReader = fileReader.readNextRowGroup();
}
PageReadStore rowGroupReader = fileReader.readNextRowGroup();

if (rowGroupTimeMetric != null) {
rowGroupTimeMetric.add(System.nanoTime() - startNs);
Expand Down Expand Up @@ -608,48 +531,4 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
totalRowsLoaded += rowGroupReader.getRowCount();
return true;
}

// Submits a prefetch task for this reader.
public void submitPrefetchTask(ExecutorService threadPool) {
this.prefetchTask = threadPool.submit(new PrefetchTask());
}

// A task for prefetching parquet row groups.
private class PrefetchTask implements Callable<Option<Throwable>> {
private long getBytesRead() {
return FileSystem.getAllStatistics().stream()
.mapToLong(s -> s.getThreadStatistics().getBytesRead())
.sum();
}

@Override
public Option<Throwable> call() throws Exception {
// Gets the bytes read so far.
long baseline = getBytesRead();

try {
init();

while (true) {
PageReadStore rowGroupReader = fileReader.readNextRowGroup();

if (rowGroupReader == null) {
// Reaches the end of row groups.
return Option.empty();
} else {
long incBytesRead = getBytesRead() - baseline;

prefetchQueue.add(Pair.of(rowGroupReader, incBytesRead));
}
}
} catch (Throwable e) {
// Returns exception thrown from the reader. The reader will re-throw it.
return Option.apply(e);
} finally {
if (fileReader != null) {
fileReader.closeStream();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import org.apache.comet.IcebergApi;
import org.apache.comet.vector.CometVector;

/** This class is a public interface used by Apache Iceberg to read batches using Comet */
@IcebergApi
public class IcebergCometBatchReader extends BatchReader {
public IcebergCometBatchReader(int numColumns, StructType schema) {
this.columnReaders = new AbstractColumnReader[numColumns];
Expand Down
17 changes: 0 additions & 17 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -651,23 +651,6 @@ object CometConf extends ShimCometConf {
.doubleConf
.createWithDefault(1.0)

val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.preFetch.enabled")
.category(CATEGORY_SCAN)
.doc("Whether to enable pre-fetching feature of CometScan.")
.booleanConf
.createWithDefault(false)

val COMET_SCAN_PREFETCH_THREAD_NUM: ConfigEntry[Int] =
conf("spark.comet.scan.preFetch.threadNum")
.category(CATEGORY_SCAN)
.doc(
"The number of threads running pre-fetching for CometScan. Effective if " +
s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. Note that more " +
"pre-fetching threads means more memory requirement to store pre-fetched row groups.")
.intConf
.createWithDefault(2)

val COMET_NATIVE_LOAD_REQUIRED: ConfigEntry[Boolean] = conf("spark.comet.nativeLoadRequired")
.category(CATEGORY_EXEC)
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ abstract class CometReaderThreadPool {

}

// A thread pool used for pre-fetching files.
object CometPrefetchThreadPool extends CometReaderThreadPool {
override def threadNamePrefix: String = "prefetch_thread"
}

// Thread pool used by the Parquet parallel reader
object CometFileReaderThreadPool extends CometReaderThreadPool {
override def threadNamePrefix: String = "file_reader_thread"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import org.apache.comet.vector.CometVector
* in [[org.apache.comet.CometSparkSessionExtensions]]
* - `buildReaderWithPartitionValues`, so Spark calls Comet's Parquet reader to read values.
*/
class CometParquetFileFormat(session: SparkSession, scanImpl: String)
class CometParquetFileFormat(session: SparkSession)
extends ParquetFileFormat
with MetricsSupport
with ShimSQLConf {
Expand Down Expand Up @@ -110,8 +110,6 @@ class CometParquetFileFormat(session: SparkSession, scanImpl: String)
// Comet specific configurations
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)

val nativeIcebergCompat = scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT

(file: PartitionedFile) => {
val sharedConf = broadcastedHadoopConf.value.value
val footer = FooterReader.readFooter(sharedConf, file)
Expand All @@ -135,85 +133,42 @@ class CometParquetFileFormat(session: SparkSession, scanImpl: String)
isCaseSensitive,
datetimeRebaseSpec)

val recordBatchReader =
if (nativeIcebergCompat) {
// We still need the predicate in the conf to allow us to generate row indexes based on
// the actual row groups read
val pushed = if (parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates
// can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why
// a `flatMap` is used here.
.flatMap(parquetFilters.createFilter)
.reduceOption(FilterApi.and)
} else {
None
}
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
val pushedNative = if (parquetFilterPushDown) {
parquetFilters.createNativeFilters(filters)
} else {
None
}
val batchReader = new NativeBatchReader(
sharedConf,
file,
footer,
pushedNative.orNull,
capacity,
requiredSchema,
dataSchema,
isCaseSensitive,
useFieldId,
ignoreMissingIds,
datetimeRebaseSpec.mode == CORRECTED,
partitionSchema,
file.partitionValues,
metrics.asJava,
CometMetricNode(metrics))
try {
batchReader.init()
} catch {
case e: Throwable =>
batchReader.close()
throw e
}
batchReader
} else {
val pushed = if (parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates
// can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why
// a `flatMap` is used here.
.flatMap(parquetFilters.createFilter)
.reduceOption(FilterApi.and)
} else {
None
}
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))

val batchReader = new BatchReader(
sharedConf,
file,
footer,
capacity,
requiredSchema,
isCaseSensitive,
useFieldId,
ignoreMissingIds,
datetimeRebaseSpec.mode == CORRECTED,
partitionSchema,
file.partitionValues,
metrics.asJava)
try {
batchReader.init()
} catch {
case e: Throwable =>
batchReader.close()
throw e
}
batchReader
}
val pushed = if (parquetFilterPushDown) {
filters
.flatMap(parquetFilters.createFilter)
.reduceOption(FilterApi.and)
} else {
None
}
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
val pushedNative = if (parquetFilterPushDown) {
parquetFilters.createNativeFilters(filters)
} else {
None
}
val recordBatchReader = new NativeBatchReader(
sharedConf,
file,
footer,
pushedNative.orNull,
capacity,
requiredSchema,
dataSchema,
isCaseSensitive,
useFieldId,
ignoreMissingIds,
datetimeRebaseSpec.mode == CORRECTED,
partitionSchema,
file.partitionValues,
metrics.asJava,
CometMetricNode(metrics))
try {
recordBatchReader.init()
} catch {
case e: Throwable =>
recordBatchReader.close()
throw e
}
val iter = new RecordReaderIterator(recordBatchReader)
try {
iter.asInstanceOf[Iterator[InternalRow]]
Expand Down
Loading
Loading