From 6161c127bbacfb6050129416bc03122e7ca89dce Mon Sep 17 00:00:00 2001 From: Yash Botadra Date: Fri, 6 Mar 2026 01:22:23 +0000 Subject: [PATCH 1/2] [SPARK-55857][SQL] Support ignoreMissingFiles during schema inference for Parquet and ORC ### What changes were proposed in this pull request? Schema inference via `mergeSchema` can fail when a file is deleted between the file listing step and the footer-reading step. This is a real race condition in cloud storage environments where file disappearance between listing and reading is common. The `spark.sql.files.ignoreMissingFiles` option already suppresses `FileNotFoundException` during data reads (`FileScanRDD`) but was silently ignored during schema inference. This PR propagates `ignoreMissingFiles` through the Parquet and ORC schema inference paths: - `SchemaMergeUtils.mergeSchemasInParallel`: extracts `ignoreMissingFiles` from `parameters` and passes it as a fourth argument to the `schemaReader` function (type updated accordingly). - `ParquetFileFormat.readParquetFootersInParallel`: catches exceptions with `FileNotFoundException` anywhere in the cause chain (using `ExceptionUtils.getThrowables`) and skips the file when `ignoreMissingFiles=true`. The cause-chain check is needed because Parquet wraps `IOException` in `RuntimeException`. - `OrcUtils.readSchema` / `readOrcSchemasInParallel`: catches `FileNotFoundException` directly before the existing `FileFormatException` handler. - `OrcFileOperator.getFileReader` / `readOrcSchemasInParallel`: same pattern for Hive ORC. ### Why are the changes needed? Without this fix, any user that sets `mergeSchema=true` on a path with concurrent deletes gets an unrecoverable exception even when they have opted into tolerating missing files via `spark.sql.files.ignoreMissingFiles`. ### Does this PR introduce _any_ user-facing change? Yes: when `spark.sql.files.ignoreMissingFiles=true`, files that disappear between listing and schema reading are now silently skipped (consistent with the existing behaviour during data reads) instead of causing an error. ### How was this patch tested? - Unit tests in `ParquetFileFormatSuite`: direct calls to `readParquetFootersInParallel` with a deleted file (local FS), with a `RuntimeException`-wrapped `FileNotFoundException` (via `WrappingFNFLocalFileSystem`), and end-to-end through `mergeSchemasInParallel`. - Unit tests in `OrcSourceSuite`: direct calls to `OrcUtils.readOrcSchemasInParallel` with a deleted file. --- .../datasources/SchemaMergeUtils.scala | 10 +- .../execution/datasources/orc/OrcUtils.scala | 18 ++- .../parquet/ParquetFileFormat.scala | 28 ++-- .../datasources/orc/OrcSourceSuite.scala | 43 +++++- .../parquet/ParquetFileFormatSuite.scala | 123 +++++++++++++++++- .../spark/sql/hive/orc/OrcFileOperator.scala | 17 ++- 6 files changed, 216 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala index cf0e67ecc30fa..a04bfbc67b580 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala @@ -37,7 +37,7 @@ object SchemaMergeUtils extends Logging { sparkSession: SparkSession, parameters: Map[String, String], files: Seq[FileStatus], - schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]) + schemaReader: (Seq[FileStatus], Configuration, Boolean, Boolean) => Seq[StructType]) : Option[StructType] = { val serializedConf = new SerializableConfiguration( sparkSession.sessionState.newHadoopConfWithOptions(parameters)) @@ -62,8 +62,9 @@ object SchemaMergeUtils extends Logging { val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1), sparkSession.sparkContext.defaultParallelism) - val ignoreCorruptFiles = - new FileSourceOptions(CaseInsensitiveMap(parameters)).ignoreCorruptFiles + val fileSourceOptions = new FileSourceOptions(CaseInsensitiveMap(parameters)) + val ignoreCorruptFiles = fileSourceOptions.ignoreCorruptFiles + val ignoreMissingFiles = fileSourceOptions.ignoreMissingFiles val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis // Issues a Spark job to read Parquet/ORC schema in parallel. @@ -77,7 +78,8 @@ object SchemaMergeUtils extends Logging { new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path)) }.toSeq - val schemas = schemaReader(fakeFileStatuses, serializedConf.value, ignoreCorruptFiles) + val schemas = schemaReader( + fakeFileStatuses, serializedConf.value, ignoreCorruptFiles, ignoreMissingFiles) if (schemas.isEmpty) { Iterator.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index aaf5d6f3b79e1..ba00e12ddda82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.orc +import java.io.FileNotFoundException import java.nio.charset.StandardCharsets.UTF_8 import java.util.Locale @@ -72,8 +73,8 @@ object OrcUtils extends Logging { paths } - def readSchema(file: Path, conf: Configuration, ignoreCorruptFiles: Boolean) - : Option[TypeDescription] = { + def readSchema(file: Path, conf: Configuration, ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean = false): Option[TypeDescription] = { val fs = file.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) try { @@ -86,6 +87,13 @@ object OrcUtils extends Logging { Some(schema) } } catch { + case e: FileNotFoundException => + if (ignoreMissingFiles) { + logWarning(log"Skipped missing file: ${MDC(PATH, file)}", e) + None + } else { + throw QueryExecutionErrors.cannotReadFooterForFileError(file, e) + } case e: org.apache.orc.FileFormatException => if (ignoreCorruptFiles) { logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, file)}", e) @@ -159,9 +167,11 @@ object OrcUtils extends Logging { * This is visible for testing. */ def readOrcSchemasInParallel( - files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean): Seq[StructType] = { + files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean): Seq[StructType] = { ThreadUtils.parmap(files, "readingOrcSchemas", 8) { currentFile => - OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles).map(toCatalystSchema) + OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles, ignoreMissingFiles) + .map(toCatalystSchema) }.flatten } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 850bfb4a89857..88e82f673c2ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.io.Closeable +import java.io.{Closeable, FileNotFoundException} import java.time.ZoneId import java.util.concurrent.atomic.AtomicBoolean @@ -25,6 +25,7 @@ import scala.collection.mutable import scala.jdk.CollectionConverters._ import scala.util.{Failure, Try} +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapred.FileSplit @@ -502,11 +503,14 @@ object ParquetFileFormat extends Logging { * Reads Parquet footers in multi-threaded manner. * If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted * files when reading footers. + * If the config "spark.sql.files.ignoreMissingFiles" is set to true, we will ignore the missing + * files when reading footers. */ private[parquet] def readParquetFootersInParallel( conf: Configuration, partFiles: Seq[FileStatus], - ignoreCorruptFiles: Boolean): Seq[Footer] = { + ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean = false): Seq[Footer] = { ThreadUtils.parmap(partFiles, "readingParquetFooters", 8) { currentFile => try { // Skips row group information since we only need the schema. @@ -515,13 +519,20 @@ object ParquetFileFormat extends Logging { Some(new Footer(currentFile.getPath, ParquetFooterReader.readFooter( HadoopInputFile.fromStatus(currentFile, conf), SKIP_ROW_GROUPS))) - } catch { case e: RuntimeException => - if (ignoreCorruptFiles) { + } catch { + case e: Exception + if ExceptionUtils.getThrowables(e).exists(_.isInstanceOf[FileNotFoundException]) => + if (ignoreMissingFiles) { + logWarning(log"Skipped missing file: ${MDC(PATH, currentFile)}", e) + None + } else { + throw QueryExecutionErrors.cannotReadFooterForFileError(currentFile.getPath, e) + } + case e: RuntimeException if ignoreCorruptFiles => logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, currentFile)}", e) None - } else { + case e: Exception => throw QueryExecutionErrors.cannotReadFooterForFileError(currentFile.getPath, e) - } } }.flatten } @@ -550,7 +561,8 @@ object ParquetFileFormat extends Logging { val inferTimestampNTZ = sqlConf.parquetInferTimestampNTZEnabled val nanosAsLong = sqlConf.legacyParquetNanosAsLong - val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => { + val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean) => { // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` val converter = new ParquetToSparkSchemaConverter( assumeBinaryIsString = assumeBinaryIsString, @@ -558,7 +570,7 @@ object ParquetFileFormat extends Logging { inferTimestampNTZ = inferTimestampNTZ, nanosAsLong = nanosAsLong) - readParquetFootersInParallel(conf, files, ignoreCorruptFiles) + readParquetFootersInParallel(conf, files, ignoreCorruptFiles, ignoreMissingFiles) .map(ParquetFileFormat.readSchemaFromFooter(_, converter)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index f36a81945860b..60cbfb36b18f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -197,7 +197,7 @@ abstract class OrcSuite protected def testMergeSchemasInParallel( ignoreCorruptFiles: Boolean, - schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]): Unit = { + schemaReader: (Seq[FileStatus], Configuration, Boolean, Boolean) => Seq[StructType]): Unit = { withSQLConf( SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorruptFiles.toString, SQLConf.ORC_IMPLEMENTATION.key -> orcImp) { @@ -228,7 +228,7 @@ abstract class OrcSuite } protected def testMergeSchemasInParallel( - schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType]): Unit = { + schemaReader: (Seq[FileStatus], Configuration, Boolean, Boolean) => Seq[StructType]): Unit = { testMergeSchemasInParallel(true, schemaReader) checkErrorMatchPVals( exception = intercept[SparkException] { @@ -461,6 +461,45 @@ abstract class OrcSuite } } + test("SPARK-55857: test schema merging with missing files") { + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> orcImp) { + withTempDir { dir => + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) + val basePath = dir.getCanonicalPath + val path1 = new Path(basePath, "first") + val path2 = new Path(basePath, "second") + spark.range(0, 10).toDF("a").coalesce(1).write.orc(path1.toString) + spark.range(0, 10).toDF("b").coalesce(1).write.orc(path2.toString) + + // Collect FileStatuses before deleting, to simulate the race condition where a file + // is listed for schema inference but then disappears before it can be read. + // Filter to actual ORC data files (exclude _SUCCESS and directories). + val allStatuses = Seq(fs.listStatus(path1), fs.listStatus(path2)).flatten + val fileStatuses = allStatuses.filter(s => s.isFile && !s.getPath.getName.startsWith("_")) + val deletedFile = fileStatuses.head + fs.delete(deletedFile.getPath, false) + + val hadoopConf = spark.sessionState.newHadoopConf() + + // ignoreMissingFiles=true: skips the missing file, returns schema from remaining files + val schemas = OrcUtils.readOrcSchemasInParallel( + fileStatuses, hadoopConf, ignoreCorruptFiles = false, ignoreMissingFiles = true) + assert(schemas.nonEmpty) + + // ignoreMissingFiles=false: throws on the missing file. + // ThreadUtils.parmap wraps the exception one level deep, so unwrap via getCause. + checkErrorMatchPVals( + exception = intercept[SparkException] { + OrcUtils.readOrcSchemasInParallel( + fileStatuses, hadoopConf, ignoreCorruptFiles = false, ignoreMissingFiles = false) + }.getCause.asInstanceOf[SparkException], + condition = "FAILED_READ_FILE.CANNOT_READ_FILE_FOOTER", + parameters = Map("path" -> "file:.*") + ) + } + } + } + test("SPARK-11412 test schema merging with corrupt files") { withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "true") { withTempDir { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala index a2aa14fa84823..1fb857bb43b15 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala @@ -17,9 +17,12 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.io.FileNotFoundException +import java.net.URI import java.time.{Duration, LocalTime, Period} -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{QueryTest, Row} @@ -39,6 +42,34 @@ abstract class ParquetFileFormatSuite override protected def dataSourceFormat = "parquet" + private def checkCannotReadFooterError(body: => Unit): Unit = { + checkErrorMatchPVals( + exception = intercept[SparkException] { body }.getCause.asInstanceOf[SparkException], + condition = "FAILED_READ_FILE.CANNOT_READ_FILE_FOOTER", + parameters = Map("path" -> "file:.*") + ) + } + + // Traverses the cause chain to find a SparkException with the given condition. Used when the + // error is thrown inside a Spark job and gets wrapped in a job-level SparkException. + private def findSparkExceptionWithCondition( + t: Throwable, condition: String): Option[SparkException] = { + if (t == null) None + else t match { + case se: SparkException if se.getCondition == condition => Some(se) + case _ => findSparkExceptionWithCondition(t.getCause, condition) + } + } + + private def listFileStatuses(basePath: String, hadoopConf: Configuration): Seq[FileStatus] = + HadoopFSUtils.listFiles( + new Path(basePath), + hadoopConf, + (path: Path) => path.getName != "_SUCCESS").flatMap(_._2) + + private def deleteFile(hadoopConf: Configuration, status: FileStatus): Unit = + status.getPath.getFileSystem(hadoopConf).delete(status.getPath, false) + test("read parquet footers in parallel") { def testReadFooters(ignoreCorruptFiles: Boolean): Unit = { withTempDir { dir => @@ -75,6 +106,81 @@ abstract class ParquetFileFormatSuite ) } + test("SPARK-55857: read parquet footers in parallel - ignoreMissingFiles") { + withTempDir { dir => + val basePath = dir.getCanonicalPath + val hadoopConf = spark.sessionState.newHadoopConf() + spark.range(0, 1).toDF("a").coalesce(1).write.parquet(new Path(basePath, "p1").toString) + spark.range(1, 2).toDF("a").coalesce(1).write.parquet(new Path(basePath, "p2").toString) + val fileStatuses = listFileStatuses(basePath, hadoopConf) + assert(fileStatuses.size == 2) + deleteFile(hadoopConf, fileStatuses.head) + + assert(ParquetFileFormat.readParquetFootersInParallel( + hadoopConf, fileStatuses, ignoreCorruptFiles = false, ignoreMissingFiles = true).size == 1) + + checkCannotReadFooterError { + ParquetFileFormat.readParquetFootersInParallel( + hadoopConf, fileStatuses, ignoreCorruptFiles = false, ignoreMissingFiles = false) + } + } + } + + test("SPARK-55857: read parquet footers in parallel - ignoreMissingFiles via RuntimeException") { + // Simulates cloud storage (e.g., S3) where a missing-file error surfaces as a RuntimeException + // with FileNotFoundException in the cause chain rather than as a direct FileNotFoundException. + val conf = new Configuration() + conf.set(s"fs.${WrappingFNFLocalFileSystem.scheme}.impl", + classOf[WrappingFNFLocalFileSystem].getName) + conf.set(s"fs.${WrappingFNFLocalFileSystem.scheme}.impl.disable.cache", "true") + val fakeStatus = new FileStatus(0L, false, 0, 0L, 0L, + new Path(s"${WrappingFNFLocalFileSystem.scheme}://localhost/fake/file.parquet")) + + assert(ParquetFileFormat.readParquetFootersInParallel( + conf, Seq(fakeStatus), ignoreCorruptFiles = false, ignoreMissingFiles = true).isEmpty) + + checkErrorMatchPVals( + exception = intercept[SparkException] { + ParquetFileFormat.readParquetFootersInParallel( + conf, Seq(fakeStatus), ignoreCorruptFiles = false, ignoreMissingFiles = false) + }.getCause.asInstanceOf[SparkException], + condition = "FAILED_READ_FILE.CANNOT_READ_FILE_FOOTER", + parameters = Map("path" -> s"${WrappingFNFLocalFileSystem.scheme}:.*") + ) + } + + test("SPARK-55857: mergeSchemasInParallel ignoreMissingFiles") { + withTempDir { dir => + val basePath = dir.getCanonicalPath + val hadoopConf = spark.sessionState.newHadoopConf() + spark.range(0, 1).toDF("a").coalesce(1).write.parquet(new Path(basePath, "p1").toString) + spark.range(1, 2).toDF("a").coalesce(1).write.parquet(new Path(basePath, "p2").toString) + spark.range(2, 3).toDF("a").coalesce(1).write.parquet(new Path(basePath, "p3").toString) + val fileStatuses = listFileStatuses(basePath, hadoopConf) + assert(fileStatuses.size == 3) + deleteFile(hadoopConf, fileStatuses.head) + + val schema = ParquetFileFormat.mergeSchemasInParallel( + Map("ignoreMissingFiles" -> "true"), fileStatuses, spark) + assert(schema.isDefined) + assert(schema.get.fieldNames.sameElements(Array("a"))) + + // mergeSchemasInParallel runs via a Spark job, so the error is wrapped in a job-level + // SparkException. Traverse the cause chain to find the specific footer error. + val ex = intercept[SparkException] { + ParquetFileFormat.mergeSchemasInParallel( + Map("ignoreMissingFiles" -> "false"), fileStatuses, spark) + } + checkErrorMatchPVals( + exception = findSparkExceptionWithCondition( + ex, "FAILED_READ_FILE.CANNOT_READ_FILE_FOOTER") + .getOrElse(fail(s"Expected FAILED_READ_FILE.CANNOT_READ_FILE_FOOTER in: $ex")), + condition = "FAILED_READ_FILE.CANNOT_READ_FILE_FOOTER", + parameters = Map("path" -> "file:.*") + ) + } + } + test("SPARK-36825, SPARK-36854: year-month/day-time intervals written and read as INT32/INT64") { Seq(false, true).foreach { offHeapEnabled => withSQLConf(SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offHeapEnabled.toString) { @@ -168,3 +274,18 @@ class ParquetFileFormatV2Suite extends ParquetFileFormatSuite { .sparkConf .set(SQLConf.USE_V1_SOURCE_LIST, "") } + +/** + * A local filesystem that wraps FileNotFoundException in RuntimeException, simulating cloud + * storage (e.g., S3) where missing-file errors surface through the cause chain rather than as + * direct FileNotFoundExceptions. + */ +class WrappingFNFLocalFileSystem extends RawLocalFileSystem { + override def getUri: URI = URI.create(s"${WrappingFNFLocalFileSystem.scheme}:///") + override def open(f: Path, bufferSize: Int): org.apache.hadoop.fs.FSDataInputStream = + throw new RuntimeException(new FileNotFoundException("Simulated missing file: " + f)) +} + +object WrappingFNFLocalFileSystem { + val scheme = "fnfwrap" +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index f0730b743fe67..6cac83b7d1302 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.orc -import java.io.IOException +import java.io.{FileNotFoundException, IOException} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} @@ -53,7 +53,8 @@ private[hive] object OrcFileOperator extends Logging { */ def getFileReader(basePath: String, config: Option[Configuration] = None, - ignoreCorruptFiles: Boolean = false) + ignoreCorruptFiles: Boolean = false, + ignoreMissingFiles: Boolean = false) : Option[Reader] = { def isWithNonEmptySchema(path: Path, reader: Reader): Boolean = { reader.getObjectInspector match { @@ -76,6 +77,13 @@ private[hive] object OrcFileOperator extends Logging { val reader = try { Some(OrcFile.createReader(fs, path)) } catch { + case e: FileNotFoundException => + if (ignoreMissingFiles) { + logWarning(log"Skipped missing file: ${MDC(PATH, path)}", e) + None + } else { + throw QueryExecutionErrors.cannotReadFooterForFileError(path, e) + } case e: IOException => if (ignoreCorruptFiles) { logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, path)}", e) @@ -108,11 +116,12 @@ private[hive] object OrcFileOperator extends Logging { * This is visible for testing. */ def readOrcSchemasInParallel( - partFiles: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) + partFiles: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean) : Seq[StructType] = { ThreadUtils.parmap(partFiles, "readingOrcSchemas", 8) { currentFile => val file = currentFile.getPath.toString - getFileReader(file, Some(conf), ignoreCorruptFiles).map(reader => { + getFileReader(file, Some(conf), ignoreCorruptFiles, ignoreMissingFiles).map(reader => { val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] val schema = readerInspector.getTypeName logDebug(s"Reading schema from file $file., got Hive schema string: $schema") From 5dfd1a12d7bedab73d8d02c9cf3a5f310fbceb4c Mon Sep 17 00:00:00 2001 From: Yash Botadra Date: Mon, 9 Mar 2026 20:29:25 +0000 Subject: [PATCH 2/2] Address review comments --- .../execution/datasources/orc/OrcUtils.scala | 12 ++-- .../parquet/ParquetFileFormat.scala | 12 ++-- .../datasources/orc/OrcQuerySuite.scala | 70 ++++++++++++++++++- .../datasources/orc/OrcSourceSuite.scala | 19 +++-- .../parquet/ParquetQuerySuite.scala | 68 ++++++++++++++++++ .../spark/sql/hive/orc/OrcFileOperator.scala | 12 ++-- 6 files changed, 159 insertions(+), 34 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index ba00e12ddda82..72c7ce5232e86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -24,6 +24,7 @@ import java.util.Locale import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.serde2.io.DateWritable @@ -87,13 +88,10 @@ object OrcUtils extends Logging { Some(schema) } } catch { - case e: FileNotFoundException => - if (ignoreMissingFiles) { - logWarning(log"Skipped missing file: ${MDC(PATH, file)}", e) - None - } else { - throw QueryExecutionErrors.cannotReadFooterForFileError(file, e) - } + case e: Exception if ignoreMissingFiles && + ExceptionUtils.getThrowables(e).exists(_.isInstanceOf[FileNotFoundException]) => + logWarning(log"Skipped missing file: ${MDC(PATH, file)}", e) + None case e: org.apache.orc.FileFormatException => if (ignoreCorruptFiles) { logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, file)}", e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 88e82f673c2ff..0aadd4a97ec24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -520,14 +520,10 @@ object ParquetFileFormat extends Logging { ParquetFooterReader.readFooter( HadoopInputFile.fromStatus(currentFile, conf), SKIP_ROW_GROUPS))) } catch { - case e: Exception - if ExceptionUtils.getThrowables(e).exists(_.isInstanceOf[FileNotFoundException]) => - if (ignoreMissingFiles) { - logWarning(log"Skipped missing file: ${MDC(PATH, currentFile)}", e) - None - } else { - throw QueryExecutionErrors.cannotReadFooterForFileError(currentFile.getPath, e) - } + case e: Exception if ignoreMissingFiles && + ExceptionUtils.getThrowables(e).exists(_.isInstanceOf[FileNotFoundException]) => + logWarning(log"Skipped missing file: ${MDC(PATH, currentFile)}", e) + None case e: RuntimeException if ignoreCorruptFiles => logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, currentFile)}", e) None diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index e24185c94d019..a42c004e3aafd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -23,7 +23,7 @@ import java.sql.Timestamp import java.time.LocalDateTime import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -633,6 +633,74 @@ abstract class OrcQueryTest extends OrcTest { } } + test("SPARK-55857: Enabling/disabling ignoreMissingFiles") { + def testIgnoreMissingFiles(options: Map[String, String]): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) + val path1 = new Path(basePath, "first") + val path2 = new Path(basePath, "second") + spark.range(1).toDF("a").write.orc(path1.toString) + spark.range(1, 2).toDF("a").write.orc(path2.toString) + // Create DataFrame before deleting to capture file references in the query plan, + // then delete to simulate a race condition between listing and reading. + val df = spark.read.options(options).orc(path1.toString, path2.toString) + fs.listStatus(path1) + .filter(f => f.isFile && !f.getPath.getName.startsWith("_")) + .foreach(f => fs.delete(f.getPath, false)) + checkAnswer(df, Seq(Row(1))) + } + } + + def testIgnoreMissingFilesWithoutSchemaInfer(options: Map[String, String]): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) + val path1 = new Path(basePath, "first") + val path2 = new Path(basePath, "second") + spark.range(1).toDF("a").write.orc(path1.toString) + spark.range(1, 2).toDF("a").write.orc(path2.toString) + val df = spark.read.schema("a long").options(options) + .orc(path1.toString, path2.toString) + fs.listStatus(path1) + .filter(f => f.isFile && !f.getPath.getName.startsWith("_")) + .foreach(f => fs.delete(f.getPath, false)) + checkAnswer(df, Seq(Row(1))) + } + } + + // Test ignoreMissingFiles = true + Seq("SQLConf", "FormatOption").foreach { by => + val (sqlConf, options) = by match { + case "SQLConf" => ("true", Map.empty[String, String]) + // Explicitly set SQLConf to false but still should ignore missing files + case "FormatOption" => ("false", Map("ignoreMissingFiles" -> "true")) + } + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) { + testIgnoreMissingFiles(options) + testIgnoreMissingFilesWithoutSchemaInfer(options) + } + } + + // Test ignoreMissingFiles = false + Seq("SQLConf", "FormatOption").foreach { by => + val (sqlConf, options) = by match { + case "SQLConf" => ("false", Map.empty[String, String]) + // Explicitly set SQLConf to true but still should not ignore missing files + case "FormatOption" => ("true", Map("ignoreMissingFiles" -> "false")) + } + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) { + checkErrorMatchPVals( + exception = intercept[SparkException] { + testIgnoreMissingFiles(options) + }, + condition = "FAILED_READ_FILE.FILE_NOT_EXIST", + parameters = Map("path" -> ".*") + ) + } + } + } + test("SPARK-27160 Predicate pushdown correctness on DecimalType for ORC") { withTempPath { dir => withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 60cbfb36b18f2..ebfd594dd789f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.orc -import java.io.File +import java.io.{File, FileNotFoundException} import java.nio.charset.StandardCharsets.UTF_8 import java.sql.{Date, Timestamp} import java.time.{Duration, Period} @@ -486,16 +486,13 @@ abstract class OrcSuite fileStatuses, hadoopConf, ignoreCorruptFiles = false, ignoreMissingFiles = true) assert(schemas.nonEmpty) - // ignoreMissingFiles=false: throws on the missing file. - // ThreadUtils.parmap wraps the exception one level deep, so unwrap via getCause. - checkErrorMatchPVals( - exception = intercept[SparkException] { - OrcUtils.readOrcSchemasInParallel( - fileStatuses, hadoopConf, ignoreCorruptFiles = false, ignoreMissingFiles = false) - }.getCause.asInstanceOf[SparkException], - condition = "FAILED_READ_FILE.CANNOT_READ_FILE_FOOTER", - parameters = Map("path" -> "file:.*") - ) + // ignoreMissingFiles=false: the FileNotFoundException propagates uncaught. + // ThreadUtils.parmap wraps it in a SparkException; unwrap to verify the cause. + val ex = intercept[SparkException] { + OrcUtils.readOrcSchemasInParallel( + fileStatuses, hadoopConf, ignoreCorruptFiles = false, ignoreMissingFiles = false) + } + assert(ex.getCause.isInstanceOf[FileNotFoundException]) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index c530dc0d3dfaa..5da370d98bfc1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -381,6 +381,74 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } + test("SPARK-55857: Enabling/disabling ignoreMissingFiles") { + def testIgnoreMissingFiles(options: Map[String, String]): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) + val path1 = new Path(basePath, "first") + val path2 = new Path(basePath, "second") + spark.range(1).toDF("a").write.parquet(path1.toString) + spark.range(1, 2).toDF("a").write.parquet(path2.toString) + // Create DataFrame before deleting to capture file references in the query plan, + // then delete to simulate a race condition between listing and reading. + val df = spark.read.options(options).parquet(path1.toString, path2.toString) + fs.listStatus(path1) + .filter(f => f.isFile && !f.getPath.getName.startsWith("_")) + .foreach(f => fs.delete(f.getPath, false)) + checkAnswer(df, Seq(Row(1))) + } + } + + def testIgnoreMissingFilesWithoutSchemaInfer(options: Map[String, String]): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) + val path1 = new Path(basePath, "first") + val path2 = new Path(basePath, "second") + spark.range(1).toDF("a").write.parquet(path1.toString) + spark.range(1, 2).toDF("a").write.parquet(path2.toString) + val df = spark.read.schema("a long").options(options) + .parquet(path1.toString, path2.toString) + fs.listStatus(path1) + .filter(f => f.isFile && !f.getPath.getName.startsWith("_")) + .foreach(f => fs.delete(f.getPath, false)) + checkAnswer(df, Seq(Row(1))) + } + } + + // Test ignoreMissingFiles = true + Seq("SQLConf", "FormatOption").foreach { by => + val (sqlConf, options) = by match { + case "SQLConf" => ("true", Map.empty[String, String]) + // Explicitly set SQLConf to false but still should ignore missing files + case "FormatOption" => ("false", Map("ignoreMissingFiles" -> "true")) + } + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) { + testIgnoreMissingFiles(options) + testIgnoreMissingFilesWithoutSchemaInfer(options) + } + } + + // Test ignoreMissingFiles = false + Seq("SQLConf", "FormatOption").foreach { by => + val (sqlConf, options) = by match { + case "SQLConf" => ("false", Map.empty[String, String]) + // Explicitly set SQLConf to true but still should not ignore missing files + case "FormatOption" => ("true", Map("ignoreMissingFiles" -> "false")) + } + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) { + checkErrorMatchPVals( + exception = intercept[SparkException] { + testIgnoreMissingFiles(options) + }, + condition = "FAILED_READ_FILE.FILE_NOT_EXIST", + parameters = Map("path" -> ".*") + ) + } + } + } + /** * this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop * to increase the chance of failure diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 6cac83b7d1302..f40e17be9e3f6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.orc import java.io.{FileNotFoundException, IOException} +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader} @@ -77,13 +78,10 @@ private[hive] object OrcFileOperator extends Logging { val reader = try { Some(OrcFile.createReader(fs, path)) } catch { - case e: FileNotFoundException => - if (ignoreMissingFiles) { - logWarning(log"Skipped missing file: ${MDC(PATH, path)}", e) - None - } else { - throw QueryExecutionErrors.cannotReadFooterForFileError(path, e) - } + case e: Exception if ignoreMissingFiles && + ExceptionUtils.getThrowables(e).exists(_.isInstanceOf[FileNotFoundException]) => + logWarning(log"Skipped missing file: ${MDC(PATH, path)}", e) + None case e: IOException => if (ignoreCorruptFiles) { logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, path)}", e)