Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object SchemaMergeUtils extends Logging {
sparkSession: SparkSession,
parameters: Map[String, String],
files: Seq[FileStatus],
schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType])
schemaReader: (Seq[FileStatus], Configuration, Boolean, Boolean) => Seq[StructType])
: Option[StructType] = {
val serializedConf = new SerializableConfiguration(
sparkSession.sessionState.newHadoopConfWithOptions(parameters))
Expand All @@ -62,8 +62,9 @@ object SchemaMergeUtils extends Logging {
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
sparkSession.sparkContext.defaultParallelism)

val ignoreCorruptFiles =
new FileSourceOptions(CaseInsensitiveMap(parameters)).ignoreCorruptFiles
val fileSourceOptions = new FileSourceOptions(CaseInsensitiveMap(parameters))
val ignoreCorruptFiles = fileSourceOptions.ignoreCorruptFiles
val ignoreMissingFiles = fileSourceOptions.ignoreMissingFiles
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis

// Issues a Spark job to read Parquet/ORC schema in parallel.
Expand All @@ -77,7 +78,8 @@ object SchemaMergeUtils extends Logging {
new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
}.toSeq

val schemas = schemaReader(fakeFileStatuses, serializedConf.value, ignoreCorruptFiles)
val schemas = schemaReader(
fakeFileStatuses, serializedConf.value, ignoreCorruptFiles, ignoreMissingFiles)

if (schemas.isEmpty) {
Iterator.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.spark.sql.execution.datasources.orc

import java.io.FileNotFoundException
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Locale

import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.serde2.io.DateWritable
Expand Down Expand Up @@ -72,8 +74,8 @@ object OrcUtils extends Logging {
paths
}

def readSchema(file: Path, conf: Configuration, ignoreCorruptFiles: Boolean)
: Option[TypeDescription] = {
def readSchema(file: Path, conf: Configuration, ignoreCorruptFiles: Boolean,
ignoreMissingFiles: Boolean = false): Option[TypeDescription] = {
val fs = file.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
try {
Expand All @@ -86,6 +88,10 @@ object OrcUtils extends Logging {
Some(schema)
}
} catch {
case e: Exception if ignoreMissingFiles &&
ExceptionUtils.getThrowables(e).exists(_.isInstanceOf[FileNotFoundException]) =>
logWarning(log"Skipped missing file: ${MDC(PATH, file)}", e)
None
case e: org.apache.orc.FileFormatException =>
if (ignoreCorruptFiles) {
logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, file)}", e)
Expand Down Expand Up @@ -159,9 +165,11 @@ object OrcUtils extends Logging {
* This is visible for testing.
*/
def readOrcSchemasInParallel(
files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean): Seq[StructType] = {
files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean,
ignoreMissingFiles: Boolean): Seq[StructType] = {
ThreadUtils.parmap(files, "readingOrcSchemas", 8) { currentFile =>
OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles).map(toCatalystSchema)
OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles, ignoreMissingFiles)
.map(toCatalystSchema)
}.flatten
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.io.Closeable
import java.io.{Closeable, FileNotFoundException}
import java.time.ZoneId
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Try}

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapred.FileSplit
Expand Down Expand Up @@ -502,11 +503,14 @@ object ParquetFileFormat extends Logging {
* Reads Parquet footers in multi-threaded manner.
* If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
* files when reading footers.
* If the config "spark.sql.files.ignoreMissingFiles" is set to true, we will ignore the missing
* files when reading footers.
*/
private[parquet] def readParquetFootersInParallel(
conf: Configuration,
partFiles: Seq[FileStatus],
ignoreCorruptFiles: Boolean): Seq[Footer] = {
ignoreCorruptFiles: Boolean,
ignoreMissingFiles: Boolean = false): Seq[Footer] = {
ThreadUtils.parmap(partFiles, "readingParquetFooters", 8) { currentFile =>
try {
// Skips row group information since we only need the schema.
Expand All @@ -515,13 +519,16 @@ object ParquetFileFormat extends Logging {
Some(new Footer(currentFile.getPath,
ParquetFooterReader.readFooter(
HadoopInputFile.fromStatus(currentFile, conf), SKIP_ROW_GROUPS)))
} catch { case e: RuntimeException =>
if (ignoreCorruptFiles) {
} catch {
case e: Exception if ignoreMissingFiles &&
ExceptionUtils.getThrowables(e).exists(_.isInstanceOf[FileNotFoundException]) =>
logWarning(log"Skipped missing file: ${MDC(PATH, currentFile)}", e)
None
case e: RuntimeException if ignoreCorruptFiles =>
logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, currentFile)}", e)
None
} else {
case e: Exception =>
throw QueryExecutionErrors.cannotReadFooterForFileError(currentFile.getPath, e)
}
}
}.flatten
}
Expand Down Expand Up @@ -550,15 +557,16 @@ object ParquetFileFormat extends Logging {
val inferTimestampNTZ = sqlConf.parquetInferTimestampNTZEnabled
val nanosAsLong = sqlConf.legacyParquetNanosAsLong

val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean,
ignoreMissingFiles: Boolean) => {
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
inferTimestampNTZ = inferTimestampNTZ,
nanosAsLong = nanosAsLong)

readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
readParquetFootersInParallel(conf, files, ignoreCorruptFiles, ignoreMissingFiles)
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.sql.Timestamp
import java.time.LocalDateTime

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
Expand Down Expand Up @@ -633,6 +633,74 @@ abstract class OrcQueryTest extends OrcTest {
}
}

test("SPARK-55857: Enabling/disabling ignoreMissingFiles") {
def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
val fs = FileSystem.get(spark.sessionState.newHadoopConf())
val path1 = new Path(basePath, "first")
val path2 = new Path(basePath, "second")
spark.range(1).toDF("a").write.orc(path1.toString)
spark.range(1, 2).toDF("a").write.orc(path2.toString)
// Create DataFrame before deleting to capture file references in the query plan,
// then delete to simulate a race condition between listing and reading.
val df = spark.read.options(options).orc(path1.toString, path2.toString)
fs.listStatus(path1)
.filter(f => f.isFile && !f.getPath.getName.startsWith("_"))
.foreach(f => fs.delete(f.getPath, false))
checkAnswer(df, Seq(Row(1)))
}
}

def testIgnoreMissingFilesWithoutSchemaInfer(options: Map[String, String]): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
val fs = FileSystem.get(spark.sessionState.newHadoopConf())
val path1 = new Path(basePath, "first")
val path2 = new Path(basePath, "second")
spark.range(1).toDF("a").write.orc(path1.toString)
spark.range(1, 2).toDF("a").write.orc(path2.toString)
val df = spark.read.schema("a long").options(options)
.orc(path1.toString, path2.toString)
fs.listStatus(path1)
.filter(f => f.isFile && !f.getPath.getName.startsWith("_"))
.foreach(f => fs.delete(f.getPath, false))
checkAnswer(df, Seq(Row(1)))
}
}

// Test ignoreMissingFiles = true
Seq("SQLConf", "FormatOption").foreach { by =>
val (sqlConf, options) = by match {
case "SQLConf" => ("true", Map.empty[String, String])
// Explicitly set SQLConf to false but still should ignore missing files
case "FormatOption" => ("false", Map("ignoreMissingFiles" -> "true"))
}
withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) {
testIgnoreMissingFiles(options)
testIgnoreMissingFilesWithoutSchemaInfer(options)
}
}

// Test ignoreMissingFiles = false
Seq("SQLConf", "FormatOption").foreach { by =>
val (sqlConf, options) = by match {
case "SQLConf" => ("false", Map.empty[String, String])
// Explicitly set SQLConf to true but still should not ignore missing files
case "FormatOption" => ("true", Map("ignoreMissingFiles" -> "false"))
}
withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) {
checkErrorMatchPVals(
exception = intercept[SparkException] {
testIgnoreMissingFiles(options)
},
condition = "FAILED_READ_FILE.FILE_NOT_EXIST",
parameters = Map("path" -> ".*")
)
}
}
}

test("SPARK-27160 Predicate pushdown correctness on DecimalType for ORC") {
withTempPath { dir =>
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.orc

import java.io.File
import java.io.{File, FileNotFoundException}
import java.nio.charset.StandardCharsets.UTF_8
import java.sql.{Date, Timestamp}
import java.time.{Duration, Period}
Expand Down Expand Up @@ -197,7 +197,7 @@ abstract class OrcSuite

protected def testMergeSchemasInParallel(
ignoreCorruptFiles: Boolean,
schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]): Unit = {
schemaReader: (Seq[FileStatus], Configuration, Boolean, Boolean) => Seq[StructType]): Unit = {
withSQLConf(
SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorruptFiles.toString,
SQLConf.ORC_IMPLEMENTATION.key -> orcImp) {
Expand Down Expand Up @@ -228,7 +228,7 @@ abstract class OrcSuite
}

protected def testMergeSchemasInParallel(
schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]): Unit = {
schemaReader: (Seq[FileStatus], Configuration, Boolean, Boolean) => Seq[StructType]): Unit = {
testMergeSchemasInParallel(true, schemaReader)
checkErrorMatchPVals(
exception = intercept[SparkException] {
Expand Down Expand Up @@ -461,6 +461,42 @@ abstract class OrcSuite
}
}

test("SPARK-55857: test schema merging with missing files") {
withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> orcImp) {
withTempDir { dir =>
val fs = FileSystem.get(spark.sessionState.newHadoopConf())
val basePath = dir.getCanonicalPath
val path1 = new Path(basePath, "first")
val path2 = new Path(basePath, "second")
spark.range(0, 10).toDF("a").coalesce(1).write.orc(path1.toString)
spark.range(0, 10).toDF("b").coalesce(1).write.orc(path2.toString)

// Collect FileStatuses before deleting, to simulate the race condition where a file
// is listed for schema inference but then disappears before it can be read.
// Filter to actual ORC data files (exclude _SUCCESS and directories).
val allStatuses = Seq(fs.listStatus(path1), fs.listStatus(path2)).flatten
val fileStatuses = allStatuses.filter(s => s.isFile && !s.getPath.getName.startsWith("_"))
val deletedFile = fileStatuses.head
fs.delete(deletedFile.getPath, false)

val hadoopConf = spark.sessionState.newHadoopConf()

// ignoreMissingFiles=true: skips the missing file, returns schema from remaining files
val schemas = OrcUtils.readOrcSchemasInParallel(
fileStatuses, hadoopConf, ignoreCorruptFiles = false, ignoreMissingFiles = true)
assert(schemas.nonEmpty)

// ignoreMissingFiles=false: the FileNotFoundException propagates uncaught.
// ThreadUtils.parmap wraps it in a SparkException; unwrap to verify the cause.
val ex = intercept[SparkException] {
OrcUtils.readOrcSchemasInParallel(
fileStatuses, hadoopConf, ignoreCorruptFiles = false, ignoreMissingFiles = false)
}
assert(ex.getCause.isInstanceOf[FileNotFoundException])
}
}
}

test("SPARK-11412 test schema merging with corrupt files") {
withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "true") {
withTempDir { dir =>
Expand Down
Loading