Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2148,6 +2148,239 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P
}
}

test("SPARK-55864: directory removed while SHS is running") {
val dir2 = Utils.createTempDir(namePrefix = "logDir2")
try {
val conf = createTestConf().set(HISTORY_LOG_DIR,
s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
val provider = new FsHistoryProvider(conf)

val log1 = newLogFile("app1", None, inProgress = false)
writeFile(log1, None,
SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2", None, None)
val log2 = new File(new Path(logUri2).toUri.getPath)
writeFile(log2, None,
SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", None),
SparkListenerApplicationEnd(6L))

updateAndCheck(provider) { list =>
list.size should be(2)
}

// Remove dir2 while SHS is running
Utils.deleteRecursively(dir2)

// Next scan should not throw and should still list app1 from testDir
updateAndCheck(provider) { list =>
list.size should be(1)
list.head.id should be("app1-id")
}

provider.stop()
} finally {
if (dir2.exists()) {
Utils.deleteRecursively(dir2)
}
}
}

test("SPARK-55864: directory does not exist at startup but created later") {
val dir2 = Utils.createTempDir(namePrefix = "logDir2")
val dir2Path = dir2.getAbsolutePath
Utils.deleteRecursively(dir2)

try {
val conf = createTestConf().set(HISTORY_LOG_DIR,
s"${testDir.getAbsolutePath},${dir2Path}")
val provider = new FsHistoryProvider(conf)

val log1 = newLogFile("app1", None, inProgress = false)
writeFile(log1, None,
SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", None),
SparkListenerApplicationEnd(5L))

// First scan: dir2 does not exist, but app1 from testDir should be listed
updateAndCheck(provider) { list =>
list.size should be(1)
list.head.id should be("app1-id")
}

// Create dir2 and add a log file
dir2.mkdirs()
val logUri2 = SingleEventLogFileWriter.getLogPath(new File(dir2Path).toURI, "app2", None,
None)
val log2 = new File(new Path(logUri2).toUri.getPath)
writeFile(log2, None,
SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", None),
SparkListenerApplicationEnd(6L))

// Next scan should pick up app2
updateAndCheck(provider) { list =>
list.size should be(2)
list.map(_.id).toSet should be(Set("app1-id", "app2-id"))
}

provider.stop()
} finally {
if (new File(dir2Path).exists()) {
Utils.deleteRecursively(new File(dir2Path))
}
}
}

test("SPARK-55864: directory temporarily inaccessible then recovers") {
val dir2 = Utils.createTempDir(namePrefix = "logDir2")
try {
val conf = createTestConf().set(HISTORY_LOG_DIR,
s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
val provider = new FsHistoryProvider(conf)

val log1 = newLogFile("app1", None, inProgress = false)
writeFile(log1, None,
SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2", None, None)
val log2 = new File(new Path(logUri2).toUri.getPath)
writeFile(log2, None,
SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", None),
SparkListenerApplicationEnd(6L))

updateAndCheck(provider) { list =>
list.size should be(2)
}

// Make dir2 inaccessible by removing it
val dir2Backup = Utils.createTempDir(namePrefix = "logDir2Backup")
Utils.deleteRecursively(dir2Backup)
assert(dir2.renameTo(dir2Backup))

// Scan should still work for testDir
updateAndCheck(provider) { list =>
list.size should be(1)
list.head.id should be("app1-id")
}

// Restore dir2
assert(dir2Backup.renameTo(dir2))

// Next scan should recover app2
updateAndCheck(provider) { list =>
list.size should be(2)
list.map(_.id).toSet should be(Set("app1-id", "app2-id"))
}

provider.stop()
} finally {
if (dir2.exists()) {
Utils.deleteRecursively(dir2)
}
}
}

test("SPARK-55864: all directories inaccessible does not crash") {
val dir2 = Utils.createTempDir(namePrefix = "logDir2")
try {
val conf = createTestConf().set(HISTORY_LOG_DIR,
s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
val provider = new FsHistoryProvider(conf)

val log1 = newLogFile("app1", None, inProgress = false)
writeFile(log1, None,
SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", None),
SparkListenerApplicationEnd(5L))

updateAndCheck(provider) { list =>
list.size should be(1)
}

// Remove both directories
val testDirBackup = Utils.createTempDir(namePrefix = "testDirBackup")
Utils.deleteRecursively(testDirBackup)
assert(testDir.renameTo(testDirBackup))
Utils.deleteRecursively(dir2)

try {
// Should not throw
provider.checkForLogs()
// After all dirs gone, listing should return no apps
provider.getListing().toSeq.size should be(0)
} finally {
// Always restore testDir so afterEach / subsequent tests are not affected
assert(testDirBackup.renameTo(testDir))
}
provider.stop()
} finally {
if (dir2.exists()) {
Utils.deleteRecursively(dir2)
}
}
}

test("SPARK-55864: config with empty entries between commas") {
val dir2 = Utils.createTempDir(namePrefix = "logDir2")
try {
// "dir1,,dir2" - empty entry between commas
val conf = createTestConf().set(HISTORY_LOG_DIR,
s"${testDir.getAbsolutePath},,${dir2.getAbsolutePath}")
val provider = new FsHistoryProvider(conf)

val log1 = newLogFile("app1", None, inProgress = false)
writeFile(log1, None,
SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2", None, None)
val log2 = new File(new Path(logUri2).toUri.getPath)
writeFile(log2, None,
SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", None),
SparkListenerApplicationEnd(6L))

updateAndCheck(provider) { list =>
list.size should be(2)
list.map(_.id).toSet should be(Set("app1-id", "app2-id"))
}

provider.stop()
} finally {
Utils.deleteRecursively(dir2)
}
}

test("SPARK-55864: logDirectory.names count mismatch falls back to full paths") {
val dir2 = Utils.createTempDir(namePrefix = "logDir2")
try {
val conf = createTestConf()
.set(HISTORY_LOG_DIR,
s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
.set(HISTORY_LOG_DIR_NAMES, "OnlyOneName")
val provider = new FsHistoryProvider(conf)

val log1 = newLogFile("app1", None, inProgress = false)
writeFile(log1, None,
SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2", None, None)
val log2 = new File(new Path(logUri2).toUri.getPath)
writeFile(log2, None,
SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", None),
SparkListenerApplicationEnd(6L))

updateAndCheck(provider) { list =>
list.size should be(2)
// Names mismatch: should fall back to full paths
val app1 = list.find(_.id == "app1-id").get
val app2 = list.find(_.id == "app2-id").get
app1.attempts.head.logSourceName should be(Some(testDir.getAbsolutePath))
app2.attempts.head.logSourceName should be(Some(dir2.getAbsolutePath))
}

provider.stop()
} finally {
Utils.deleteRecursively(dir2)
}
}

private class SafeModeTestProvider(conf: SparkConf, clock: Clock)
extends FsHistoryProvider(conf, clock) {

Expand Down