diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java index 3e57dce297..9ec9e21b1b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java @@ -485,7 +485,6 @@ private void handleNotRunState( doPersistMetrics(application, false); break; case CANCELED: - case FINISHED: log.info( "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job state {}, stop tracking and delete stopFrom!", currentState.name()); @@ -504,6 +503,26 @@ private void handleNotRunState( doPersistMetrics(application, true); cleanOptioning(optionState, application.getId()); break; + case FINISHED: + log.info( + "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job state {}, stop tracking and delete stopFrom!", + currentState.name()); + cleanSavepoint(application); + application.setState(currentState.getValue()); + if (StopFromEnum.NONE.equals(stopFrom) || applicationInfoService.checkAlter(application)) { + if (StopFromEnum.NONE.equals(stopFrom)) { + log.info( + "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job finished is not form StreamPark,savepoint expired!"); + savepointService.expire(application.getId()); + } + stopCanceledJob(application.getId()); + // 不发送报警,因为FINISHED对于批作业是正常状态 + // doAlert(application, FlinkAppStateEnum.CANCELED); + } + STOP_FROM_MAP.remove(application.getId()); + doPersistMetrics(application, true); + cleanOptioning(optionState, application.getId()); + break; case FAILED: cleanSavepoint(application); STOP_FROM_MAP.remove(application.getId());