Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ private Map<String, Object> getProperties(
}
} else if (FlinkDeployMode.isKubernetesMode(application.getDeployModeEnum())) {
properties.put(ConfigKeys.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
properties.putAll(application.getHotParamsMap());
}

if (FlinkDeployMode.isKubernetesApplicationMode(application.getDeployMode())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.console.core.service.application.impl;

import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkDeployMode;
Expand Down Expand Up @@ -267,7 +268,6 @@ public IPage<FlinkApplication> page(FlinkApplication appParam, RestRequest reque
}
this.baseMapper.selectPage(page, appParam);
List<FlinkApplication> records = page.getRecords();
long now = System.currentTimeMillis();

List<Long> appIds = records.stream().map(FlinkApplication::getId).collect(Collectors.toList());
Map<Long, PipelineStatusEnum> pipeStates = appBuildPipeService.listAppIdPipelineStatusMap(appIds);
Expand All @@ -280,10 +280,7 @@ record -> {
// in time.
if (record.isKubernetesModeJob()) {
// set duration
String restUrl = k8SFlinkTrackMonitor
.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(record));
record.setFlinkRestUrl(restUrl);
setAppDurationIfNeeded(record, now);
fillPropsForK8SModeJob(record);
}
if (pipeStates.containsKey(record.getId())) {
record.setBuildStatus(pipeStates.get(record.getId()).getCode());
Expand All @@ -296,6 +293,17 @@ record -> {
return page;
}

private void fillPropsForK8SModeJob(FlinkApplication record) {
String restUrl = k8SFlinkTrackMonitor
.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(record));
Object serviceAccount = record.getHotParamsMap().get(ConfigKeys.KEY_KERBEROS_SERVICE_ACCOUNT());
if (serviceAccount != null) {
record.setServiceAccount(serviceAccount.toString());
}
record.setFlinkRestUrl(restUrl);
setAppDurationIfNeeded(record, System.currentTimeMillis());
}

private void setAppDurationIfNeeded(FlinkApplication record, long now) {
if (record.getTracking() == 1
&& record.getStartTime() != null
Expand Down Expand Up @@ -760,12 +768,7 @@ public FlinkApplication getApp(Long id) {
}
// add flink web url info for k8s-mode
if (application.isKubernetesModeJob()) {
String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(application));
application.setFlinkRestUrl(restUrl);

// set duration
long now = System.currentTimeMillis();
setAppDurationIfNeeded(application, now);
fillPropsForK8SModeJob(application);
}

application.setYarnQueueByHotParams();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ export default {
yarnQueuePlaceholder: 'Please enter yarn queue label',
descriptionPlaceholder: 'Please enter description for this application',
kubernetesNamespacePlaceholder: 'Please enter kubernetes Namespace, e.g: default',
serviceAccountPlaceholder: 'Please enter kubernetes service account, default: default',
kubernetesClusterIdPlaceholder: 'Please enter Kubernetes clusterId',
kubernetesClusterIdRequire:
"lower case alphanumeric characters, '-', and must start and end with an alphanumeric character,and no more than 45 characters",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ export default {
yarnQueuePlaceholder: '请输入yarn队列标签名称',
descriptionPlaceholder: '请输入此作业的描述',
kubernetesNamespacePlaceholder: '请输入 Kubernetes 命名空间, 如: default',
serviceAccountPlaceholder: '请输入 Kubernetes 服务账号, 默认: default',
kubernetesClusterIdPlaceholder: '请选择 Kubernetes ClusterId',
kubernetesClusterIdRequire:
'小写字母、数字、“-”,并且必须以字母数字字符开头和结尾,并且不超过45个字符',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
k8sRestExposedType: app.k8sRestExposedType,
flinkImage: app.flinkImage,
k8sNamespace: app.k8sNamespace,
serviceAccount: app.serviceAccount,
alertId: selectAlertId,
projectName: app.projectName,
module: app.module,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
},
flinkImage: app.flinkImage,
k8sNamespace: app.k8sNamespace,
serviceAccount: app.serviceAccount,
...resetParams,
};
switch (app.deployMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ export const useCreateAndEditSchema = (
render: ({ model, field }) =>
renderInputDropdown(model, field, {
placeholder: t('flink.app.addAppTips.serviceAccountPlaceholder'),
options: unref(historyRecord)?.k8sNamespace || [],
options: unref(historyRecord)?.serviceAccount || [],
}),
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
@UtilityClass
public class Constants {

public static final Integer DEFAULT_SLEEP_MILLISECONDS = 2000;
public static final Integer DEFAULT_SLEEP_MILLISECONDS = 3000;

public static final Duration DEFAULT_WEBDRIVER_WAIT_DURATION = Duration.ofSeconds(10);

Expand Down