Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,11 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
setClauses += "engine_state = ?"
params += metadata.engineState
}
metadata.engineError.foreach { error =>
// Update engineError when it's defined or when engineId is defined
// This ensures pending reasons are cleared when app transitions to success
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... when app transitions to success

the code does not reflect the condition. is there a chance that the batch transfers to another negative state and clears the error message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K8S App:

Pending (appError=Some("reason")) -> FINISHED (appError = None), and then the pending reason is stored in the metadata store.

if (metadata.engineError.isDefined || Option(metadata.engineId).isDefined) {
setClauses += "engine_error = ?"
params += error
params += metadata.engineError.orNull
}
if (metadata.peerInstanceClosed) {
setClauses += "peer_instance_closed = ?"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,68 @@ class JDBCMetadataStoreSuite extends KyuubiFunSuite {
assert(jdbcMetadataStore.getLatestSchemaUrl(Seq(url1, url2, url3, url4, url5)).get === url5)
}

test("update engineError conditionally based on engineError or engineId presence") {
val batchId = UUID.randomUUID().toString
val kyuubiInstance = "localhost:10099"
val batchMetadata = Metadata(
identifier = batchId,
sessionType = SessionType.BATCH,
realUser = "kyuubi",
username = "kyuubi",
ipAddress = "127.0.0.1",
kyuubiInstance = kyuubiInstance,
state = "PENDING",
resource = "intern",
className = "org.apache.kyuubi.SparkWC",
requestName = "kyuubi_batch",
requestConf = Map("spark.master" -> "local"),
requestArgs = Seq("100"),
createTime = System.currentTimeMillis(),
engineType = "spark",
clusterManager = Some("local"))

jdbcMetadataStore.insertMetadata(batchMetadata)

// Case 1: Update engineError when engineError is defined
val pendingMetadata = batchMetadata.copy(
state = "PENDING",
engineError = Some("Pod pending: Insufficient CPU"))
jdbcMetadataStore.updateMetadata(pendingMetadata)

var retrievedMetadata = jdbcMetadataStore.getMetadata(batchId)
assert(retrievedMetadata.engineError == Some("Pod pending: Insufficient CPU"))

// Case 2: When app transitions to running with engineId, engineError should be cleared
val runningMetadata = pendingMetadata.copy(
state = "RUNNING",
engineId = "app-123",
engineError = None)
jdbcMetadataStore.updateMetadata(runningMetadata)

retrievedMetadata = jdbcMetadataStore.getMetadata(batchId)
assert(retrievedMetadata.engineError == None)
assert(retrievedMetadata.engineId == "app-123")

// Case 3: Update without engineError and without engineId should not update engineError
// First set an error again
val errorMetadata = runningMetadata.copy(engineError = Some("New error"))
jdbcMetadataStore.updateMetadata(errorMetadata)
retrievedMetadata = jdbcMetadataStore.getMetadata(batchId)
assert(retrievedMetadata.engineError == Some("New error"))

// Now update state without engineError and without engineId - error should remain
val stateOnlyUpdate = Metadata(
identifier = batchId,
state = "FINISHED")
jdbcMetadataStore.updateMetadata(stateOnlyUpdate)

retrievedMetadata = jdbcMetadataStore.getMetadata(batchId)
assert(retrievedMetadata.engineError == Some("New error")) // Should remain unchanged

// Clean up
jdbcMetadataStore.cleanupMetadataByIdentifier(batchId)
}

test("kubernetes engine info") {
val tag = UUID.randomUUID().toString
val metadata = KubernetesEngineInfo(
Expand Down