diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java index d79911d4cf..40e2c07ed0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java @@ -776,6 +776,7 @@ private Map getProperties( } } else if (FlinkDeployMode.isKubernetesMode(application.getDeployModeEnum())) { properties.put(ConfigKeys.KEY_K8S_IMAGE_PULL_POLICY(), "Always"); + properties.putAll(application.getHotParamsMap()); } if (FlinkDeployMode.isKubernetesApplicationMode(application.getDeployMode())) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java index ddbb27cf17..5396b29ac2 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.service.application.impl; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.FlinkDeployMode; @@ -267,7 +268,6 @@ public IPage page(FlinkApplication appParam, RestRequest reque } this.baseMapper.selectPage(page, appParam); List records = page.getRecords(); - long now = System.currentTimeMillis(); List appIds = records.stream().map(FlinkApplication::getId).collect(Collectors.toList()); Map pipeStates = appBuildPipeService.listAppIdPipelineStatusMap(appIds); @@ -280,10 +280,7 @@ record -> { // in time. if (record.isKubernetesModeJob()) { // set duration - String restUrl = k8SFlinkTrackMonitor - .getRemoteRestUrl(k8sWatcherWrapper.toTrackId(record)); - record.setFlinkRestUrl(restUrl); - setAppDurationIfNeeded(record, now); + fillPropsForK8SModeJob(record); } if (pipeStates.containsKey(record.getId())) { record.setBuildStatus(pipeStates.get(record.getId()).getCode()); @@ -296,6 +293,17 @@ record -> { return page; } + private void fillPropsForK8SModeJob(FlinkApplication record) { + String restUrl = k8SFlinkTrackMonitor + .getRemoteRestUrl(k8sWatcherWrapper.toTrackId(record)); + Object serviceAccount = record.getHotParamsMap().get(ConfigKeys.KEY_KERBEROS_SERVICE_ACCOUNT()); + if (serviceAccount != null) { + record.setServiceAccount(serviceAccount.toString()); + } + record.setFlinkRestUrl(restUrl); + setAppDurationIfNeeded(record, System.currentTimeMillis()); + } + private void setAppDurationIfNeeded(FlinkApplication record, long now) { if (record.getTracking() == 1 && record.getStartTime() != null @@ -760,12 +768,7 @@ public FlinkApplication getApp(Long id) { } // add flink web url info for k8s-mode if (application.isKubernetesModeJob()) { - String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(application)); - application.setFlinkRestUrl(restUrl); - - // set duration - long now = System.currentTimeMillis(); - setAppDurationIfNeeded(application, now); + fillPropsForK8SModeJob(application); } application.setYarnQueueByHotParams(); diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts index ab0bc56b82..7cdbe7b959 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts @@ -302,6 +302,7 @@ export default { yarnQueuePlaceholder: 'Please enter yarn queue label', descriptionPlaceholder: 'Please enter description for this application', kubernetesNamespacePlaceholder: 'Please enter kubernetes Namespace, e.g: default', + serviceAccountPlaceholder: 'Please enter kubernetes service account, default: default', kubernetesClusterIdPlaceholder: 'Please enter Kubernetes clusterId', kubernetesClusterIdRequire: "lower case alphanumeric characters, '-', and must start and end with an alphanumeric character,and no more than 45 characters", diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts index 11a8b417a7..5a3db12dc2 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts @@ -293,6 +293,7 @@ export default { yarnQueuePlaceholder: '请输入yarn队列标签名称', descriptionPlaceholder: '请输入此作业的描述', kubernetesNamespacePlaceholder: '请输入 Kubernetes 命名空间, 如: default', + serviceAccountPlaceholder: '请输入 Kubernetes 服务账号, 默认: default', kubernetesClusterIdPlaceholder: '请选择 Kubernetes ClusterId', kubernetesClusterIdRequire: '小写字母、数字、“-”,并且必须以字母数字字符开头和结尾,并且不超过45个字符', diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue index 24aaf5c381..215615f3d9 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue @@ -103,6 +103,7 @@ k8sRestExposedType: app.k8sRestExposedType, flinkImage: app.flinkImage, k8sNamespace: app.k8sNamespace, + serviceAccount: app.serviceAccount, alertId: selectAlertId, projectName: app.projectName, module: app.module, diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue index 2cc118ceb6..aaa33d59db 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue @@ -123,6 +123,7 @@ }, flinkImage: app.flinkImage, k8sNamespace: app.k8sNamespace, + serviceAccount: app.serviceAccount, ...resetParams, }; switch (app.deployMode) { diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts index 62f20eab5f..8985f038dd 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts @@ -250,7 +250,7 @@ export const useCreateAndEditSchema = ( render: ({ model, field }) => renderInputDropdown(model, field, { placeholder: t('flink.app.addAppTips.serviceAccountPlaceholder'), - options: unref(historyRecord)?.k8sNamespace || [], + options: unref(historyRecord)?.serviceAccount || [], }), }, { diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java index d02c20cb6a..7fa4328721 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java @@ -24,7 +24,7 @@ @UtilityClass public class Constants { - public static final Integer DEFAULT_SLEEP_MILLISECONDS = 2000; + public static final Integer DEFAULT_SLEEP_MILLISECONDS = 3000; public static final Duration DEFAULT_WEBDRIVER_WAIT_DURATION = Duration.ofSeconds(10);