From 680341e38851a7111974dc6a6d22440672eb9ac4 Mon Sep 17 00:00:00 2001 From: LeiWang Date: Thu, 9 Oct 2025 16:03:59 +0800 Subject: [PATCH 1/2] fix job silent status can't convert to lost status bugs --- .../streampark/common/conf/K8sFlinkConfig.scala | 7 +++++++ .../flink/kubernetes/DefaultFlinkK8sWatcher.scala | 2 +- .../flink/kubernetes/FlinkK8sWatchController.scala | 10 +++++----- .../streampark/flink/kubernetes/TrackConfig.scala | 14 ++++++++++---- 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala index f2ae17852e..79173ac219 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala @@ -28,6 +28,13 @@ object K8sFlinkConfig { classType = classOf[java.lang.Long], description = "run timeout seconds of single flink-k8s metrics tracking task") + @deprecated + val jobStatusTrackCacheTimeoutSec: InternalOption = InternalOption( + key = "streampark.flink-k8s.tracking.cache-timeout-sec.job-status", + defaultValue = 300, + classType = classOf[java.lang.Integer], + description = "status cache timeout seconds of single flink-k8s job status tracking task") + @deprecated val metricTrackTaskTimeoutSec: InternalOption = InternalOption( key = "streampark.flink-k8s.tracking.polling-task-timeout-sec.cluster-metric", diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala index 45155f0a04..ae7ab5e835 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala @@ -33,7 +33,7 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = FlinkTrackConfig.defaultCo // cache pool for storage tracking result implicit val watchController: FlinkK8sWatchController = - new FlinkK8sWatchController() + new FlinkK8sWatchController(conf.jobStatusWatcherConf) // eventBus for change event implicit lazy val eventBus: ChangeEventBus = { diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala index f96e9e5f6d..01e67def03 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala @@ -27,7 +27,7 @@ import java.util.Objects import java.util.concurrent.TimeUnit /** Tracking info cache pool on flink kubernetes mode. */ -class FlinkK8sWatchController extends Logger with AutoCloseable { +class FlinkK8sWatchController(conf: JobStatusWatcherConfig = JobStatusWatcherConfig.defaultConf) extends Logger with AutoCloseable { // cache for tracking identifiers lazy val trackIds: TrackIdCache = TrackIdCache.build() @@ -38,7 +38,7 @@ class FlinkK8sWatchController extends Logger with AutoCloseable { lazy val endpoints: EndpointCache = EndpointCache.build() // cache for tracking flink job status - lazy val jobStatuses: JobStatusCache = JobStatusCache.build() + lazy val jobStatuses: JobStatusCache = JobStatusCache.build(conf.jobStatusCacheTimeOutSec) // cache for tracking kubernetes events with Deployment kind lazy val k8sDeploymentEvents: K8sDeploymentEventCache = @@ -156,10 +156,10 @@ object TrackIdCache { } } -class JobStatusCache { +class JobStatusCache(timeout: Int) { private[this] lazy val cache: Cache[CacheKey, JobStatusCV] = - Caffeine.newBuilder.expireAfterWrite(20, TimeUnit.SECONDS).build() + Caffeine.newBuilder.expireAfterWrite(timeout, TimeUnit.SECONDS).build() def putAll(kvs: Map[TrackId, JobStatusCV]): Unit = cache.putAll(kvs.map(t => (CacheKey(t._1.appId), t._2))) @@ -183,7 +183,7 @@ class JobStatusCache { object JobStatusCache { - def build(): JobStatusCache = new JobStatusCache() + def build(timeout: Int): JobStatusCache = new JobStatusCache(timeout) } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/TrackConfig.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/TrackConfig.scala index d0f0f388cf..2ca25b973b 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/TrackConfig.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/TrackConfig.scala @@ -48,11 +48,14 @@ case class MetricWatcherConfig(requestTimeoutSec: Long, requestIntervalSec: Long * interval seconds between two single tracking task * @param silentStateJobKeepTrackingSec * retained tracking time for SILENT state flink tasks + * @param jobStatusCacheTimeOutSec + * job status cache time out of single tracking task, must bigger than silentStateJobKeepTrackingSec */ case class JobStatusWatcherConfig( requestTimeoutSec: Long, requestIntervalSec: Long, - silentStateJobKeepTrackingSec: Int) + silentStateJobKeepTrackingSec: Int, + jobStatusCacheTimeOutSec: Int) object FlinkTrackConfig { def defaultConf: FlinkTrackConfig = @@ -66,7 +69,8 @@ object FlinkTrackConfig { JobStatusWatcherConfig( InternalConfigHolder.get(K8sFlinkConfig.jobStatusTrackTaskTimeoutSec), InternalConfigHolder.get(K8sFlinkConfig.jobStatueTrackTaskIntervalSec), - InternalConfigHolder.get(K8sFlinkConfig.silentStateJobKeepTrackingSec)), + InternalConfigHolder.get(K8sFlinkConfig.silentStateJobKeepTrackingSec), + InternalConfigHolder.get(K8sFlinkConfig.jobStatusTrackCacheTimeoutSec)), MetricWatcherConfig( InternalConfigHolder.get(K8sFlinkConfig.metricTrackTaskTimeoutSec), InternalConfigHolder.get(K8sFlinkConfig.metricTrackTaskIntervalSec))) @@ -77,12 +81,14 @@ object JobStatusWatcherConfig { def defaultConf: JobStatusWatcherConfig = JobStatusWatcherConfig( requestTimeoutSec = 120, requestIntervalSec = 5, - silentStateJobKeepTrackingSec = 60) + silentStateJobKeepTrackingSec = 60, + jobStatusCacheTimeOutSec = 300) def debugConf: JobStatusWatcherConfig = JobStatusWatcherConfig( requestTimeoutSec = 120, requestIntervalSec = 2, - silentStateJobKeepTrackingSec = 5) + silentStateJobKeepTrackingSec = 5, + jobStatusCacheTimeOutSec = 30) } object MetricWatcherConfig { From 15d8452e93f3cb99602e7219c06cba34f610b499 Mon Sep 17 00:00:00 2001 From: LeiWang Date: Thu, 9 Oct 2025 19:12:38 +0800 Subject: [PATCH 2/2] fix job silent status can't convert to lost status bugs --- .../flink/kubernetes/watcher/FlinkJobStatusWatcher.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala index c3cfa9cb41..3f9c575b74 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala @@ -128,11 +128,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi case _ => touchSessionJob(trackId) match { case Some(state) => - if (FlinkJobState.isEndState(state.jobState)) { - // can't find that job in the k8s cluster. - watchController.unWatching(trackId) - } - eventBus.postSync(FlinkJobStatusChangeEvent(trackId, state)) + updateState(trackId, state) case _ => } }