Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ object K8sFlinkConfig {
classType = classOf[java.lang.Long],
description = "run timeout seconds of single flink-k8s metrics tracking task")

@deprecated
val jobStatusTrackCacheTimeoutSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.cache-timeout-sec.job-status",
defaultValue = 300,
classType = classOf[java.lang.Integer],
description = "status cache timeout seconds of single flink-k8s job status tracking task")

@deprecated
val metricTrackTaskTimeoutSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.polling-task-timeout-sec.cluster-metric",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = FlinkTrackConfig.defaultCo

// cache pool for storage tracking result
implicit val watchController: FlinkK8sWatchController =
new FlinkK8sWatchController()
new FlinkK8sWatchController(conf.jobStatusWatcherConf)

// eventBus for change event
implicit lazy val eventBus: ChangeEventBus = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.util.Objects
import java.util.concurrent.TimeUnit

/** Tracking info cache pool on flink kubernetes mode. */
class FlinkK8sWatchController extends Logger with AutoCloseable {
class FlinkK8sWatchController(conf: JobStatusWatcherConfig = JobStatusWatcherConfig.defaultConf) extends Logger with AutoCloseable {

// cache for tracking identifiers
lazy val trackIds: TrackIdCache = TrackIdCache.build()
Expand All @@ -38,7 +38,7 @@ class FlinkK8sWatchController extends Logger with AutoCloseable {
lazy val endpoints: EndpointCache = EndpointCache.build()

// cache for tracking flink job status
lazy val jobStatuses: JobStatusCache = JobStatusCache.build()
lazy val jobStatuses: JobStatusCache = JobStatusCache.build(conf.jobStatusCacheTimeOutSec)

// cache for tracking kubernetes events with Deployment kind
lazy val k8sDeploymentEvents: K8sDeploymentEventCache =
Expand Down Expand Up @@ -156,10 +156,10 @@ object TrackIdCache {
}
}

class JobStatusCache {
class JobStatusCache(timeout: Int) {

private[this] lazy val cache: Cache[CacheKey, JobStatusCV] =
Caffeine.newBuilder.expireAfterWrite(20, TimeUnit.SECONDS).build()
Caffeine.newBuilder.expireAfterWrite(timeout, TimeUnit.SECONDS).build()

def putAll(kvs: Map[TrackId, JobStatusCV]): Unit =
cache.putAll(kvs.map(t => (CacheKey(t._1.appId), t._2)))
Expand All @@ -183,7 +183,7 @@ class JobStatusCache {

object JobStatusCache {

def build(): JobStatusCache = new JobStatusCache()
def build(timeout: Int): JobStatusCache = new JobStatusCache(timeout)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ case class MetricWatcherConfig(requestTimeoutSec: Long, requestIntervalSec: Long
* interval seconds between two single tracking task
* @param silentStateJobKeepTrackingSec
* retained tracking time for SILENT state flink tasks
* @param jobStatusCacheTimeOutSec
* job status cache time out of single tracking task, must bigger than silentStateJobKeepTrackingSec
*/
case class JobStatusWatcherConfig(
requestTimeoutSec: Long,
requestIntervalSec: Long,
silentStateJobKeepTrackingSec: Int)
silentStateJobKeepTrackingSec: Int,
jobStatusCacheTimeOutSec: Int)

object FlinkTrackConfig {
def defaultConf: FlinkTrackConfig =
Expand All @@ -66,7 +69,8 @@ object FlinkTrackConfig {
JobStatusWatcherConfig(
InternalConfigHolder.get(K8sFlinkConfig.jobStatusTrackTaskTimeoutSec),
InternalConfigHolder.get(K8sFlinkConfig.jobStatueTrackTaskIntervalSec),
InternalConfigHolder.get(K8sFlinkConfig.silentStateJobKeepTrackingSec)),
InternalConfigHolder.get(K8sFlinkConfig.silentStateJobKeepTrackingSec),
InternalConfigHolder.get(K8sFlinkConfig.jobStatusTrackCacheTimeoutSec)),
MetricWatcherConfig(
InternalConfigHolder.get(K8sFlinkConfig.metricTrackTaskTimeoutSec),
InternalConfigHolder.get(K8sFlinkConfig.metricTrackTaskIntervalSec)))
Expand All @@ -77,12 +81,14 @@ object JobStatusWatcherConfig {
def defaultConf: JobStatusWatcherConfig = JobStatusWatcherConfig(
requestTimeoutSec = 120,
requestIntervalSec = 5,
silentStateJobKeepTrackingSec = 60)
silentStateJobKeepTrackingSec = 60,
jobStatusCacheTimeOutSec = 300)

def debugConf: JobStatusWatcherConfig = JobStatusWatcherConfig(
requestTimeoutSec = 120,
requestIntervalSec = 2,
silentStateJobKeepTrackingSec = 5)
silentStateJobKeepTrackingSec = 5,
jobStatusCacheTimeOutSec = 30)
}

object MetricWatcherConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
case _ =>
touchSessionJob(trackId) match {
case Some(state) =>
if (FlinkJobState.isEndState(state.jobState)) {
// can't find that job in the k8s cluster.
watchController.unWatching(trackId)
}
eventBus.postSync(FlinkJobStatusChangeEvent(trackId, state))
updateState(trackId, state)
case _ =>
}
}
Expand Down
Loading