diff --git a/streampark-common/src/main/java/org/apache/streampark/common/util/MemorySize.java b/streampark-common/src/main/java/org/apache/streampark/common/util/MemorySize.java
new file mode 100644
index 0000000000..3cb371d3d6
--- /dev/null
+++ b/streampark-common/src/main/java/org/apache/streampark/common/util/MemorySize.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import static org.apache.streampark.common.util.MemorySize.MemoryUnit.BYTES;
+import static org.apache.streampark.common.util.MemorySize.MemoryUnit.GIGA_BYTES;
+import static org.apache.streampark.common.util.MemorySize.MemoryUnit.KILO_BYTES;
+import static org.apache.streampark.common.util.MemorySize.MemoryUnit.MEGA_BYTES;
+import static org.apache.streampark.common.util.MemorySize.MemoryUnit.TERA_BYTES;
+import static org.apache.streampark.common.util.MemorySize.MemoryUnit.hasUnit;
+
+/**
+ * MemorySize is a representation of a number of bytes, viewable in different units.
+ *
+ * Parsing
+ *
+ * The size can be parsed from a text expression. If the expression is a pure number, the value
+ * will be interpreted as bytes.
+ */
+public class MemorySize implements java.io.Serializable, Comparable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final MemorySize ZERO = new MemorySize(0L);
+
+ public static final MemorySize MAX_VALUE = new MemorySize(Long.MAX_VALUE);
+
+ private static final List ORDERED_UNITS =
+ Arrays.asList(BYTES, KILO_BYTES, MEGA_BYTES, GIGA_BYTES, TERA_BYTES);
+
+ // ------------------------------------------------------------------------
+
+ /** The memory size, in bytes. */
+ private final long bytes;
+
+ /** The memorized value returned by toString(). */
+ private transient String stringified;
+
+ /** The memorized value returned by toHumanReadableString(). */
+ private transient String humanReadableStr;
+
+ /**
+ * Constructs a new MemorySize.
+ *
+ * @param bytes The size, in bytes. Must be zero or larger.
+ */
+ public MemorySize(long bytes) {
+ if (bytes < 0) {
+ throw new IllegalArgumentException("bytes must be >= 0");
+ }
+ this.bytes = bytes;
+ }
+
+ public static MemorySize ofMebiBytes(long mebiBytes) {
+ return new MemorySize(mebiBytes << 20);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** Gets the memory size in bytes. */
+ public long getBytes() {
+ return bytes;
+ }
+
+ /** Gets the memory size in Kibibytes (= 1024 bytes). */
+ public long getKibiBytes() {
+ return bytes >> 10;
+ }
+
+ /** Gets the memory size in Mebibytes (= 1024 Kibibytes). */
+ public int getMebiBytes() {
+ return (int) (bytes >> 20);
+ }
+
+ /** Gets the memory size in Gibibytes (= 1024 Mebibytes). */
+ public long getGibiBytes() {
+ return bytes >> 30;
+ }
+
+ /** Gets the memory size in Tebibytes (= 1024 Gibibytes). */
+ public long getTebiBytes() {
+ return bytes >> 40;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return (int) (bytes ^ (bytes >>> 32));
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this
+ || (obj != null
+ && obj.getClass() == this.getClass()
+ && ((MemorySize) obj).bytes == this.bytes);
+ }
+
+ @Override
+ public String toString() {
+ if (stringified == null) {
+ stringified = formatToString();
+ }
+
+ return stringified;
+ }
+
+ private String formatToString() {
+ MemoryUnit highestIntegerUnit =
+ IntStream.range(0, ORDERED_UNITS.size())
+ .sequential()
+ .filter(idx -> bytes % ORDERED_UNITS.get(idx).getMultiplier() != 0)
+ .boxed()
+ .findFirst()
+ .map(
+ idx -> {
+ if (idx == 0) {
+ return ORDERED_UNITS.get(0);
+ } else {
+ return ORDERED_UNITS.get(idx - 1);
+ }
+ })
+ .orElse(BYTES);
+
+ return String.format(
+ "%d %s", bytes / highestIntegerUnit.getMultiplier(), highestIntegerUnit.getUnits()[1]);
+ }
+
+ public String toHumanReadableString() {
+ if (humanReadableStr == null) {
+ humanReadableStr = formatToHumanReadableString();
+ }
+
+ return humanReadableStr;
+ }
+
+ private String formatToHumanReadableString() {
+ MemoryUnit highestUnit =
+ IntStream.range(0, ORDERED_UNITS.size())
+ .sequential()
+ .filter(idx -> bytes > ORDERED_UNITS.get(idx).getMultiplier())
+ .boxed()
+ .max(Comparator.naturalOrder())
+ .map(ORDERED_UNITS::get)
+ .orElse(BYTES);
+
+ if (highestUnit == BYTES) {
+ return String.format("%d %s", bytes, BYTES.getUnits()[1]);
+ } else {
+ double approximate = 1.0 * bytes / highestUnit.getMultiplier();
+ return String.format(
+ Locale.ROOT, "%.3f%s (%d bytes)", approximate, highestUnit.getUnits()[1], bytes);
+ }
+ }
+
+ @Override
+ public int compareTo(MemorySize that) {
+ return Long.compare(this.bytes, that.bytes);
+ }
+
+ // ------------------------------------------------------------------------
+ // Calculations
+ // ------------------------------------------------------------------------
+
+ public MemorySize add(MemorySize that) {
+ return new MemorySize(Math.addExact(this.bytes, that.bytes));
+ }
+
+ public MemorySize subtract(MemorySize that) {
+ return new MemorySize(Math.subtractExact(this.bytes, that.bytes));
+ }
+
+ public MemorySize multiply(double multiplier) {
+ if (multiplier < 0) {
+ throw new IllegalArgumentException("multiplier must be >= 0");
+ }
+
+ BigDecimal product = BigDecimal.valueOf(this.bytes).multiply(BigDecimal.valueOf(multiplier));
+ if (product.compareTo(BigDecimal.valueOf(Long.MAX_VALUE)) > 0) {
+ throw new ArithmeticException("long overflow");
+ }
+ return new MemorySize(product.longValue());
+ }
+
+ public MemorySize divide(long by) {
+ if (by < 0) {
+ throw new IllegalArgumentException("divisor must be != 0");
+ }
+ return new MemorySize(bytes / by);
+ }
+
+ // ------------------------------------------------------------------------
+ // Parsing
+ // ------------------------------------------------------------------------
+
+ /**
+ * Parses the given string as as MemorySize.
+ *
+ * @param text The string to parse
+ * @return The parsed MemorySize
+ * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
+ */
+ public static MemorySize parse(String text) throws IllegalArgumentException {
+ return new MemorySize(parseBytes(text));
+ }
+
+ /**
+ * Parses the given string with a default unit.
+ *
+ * @param text The string to parse.
+ * @param defaultUnit specify the default unit.
+ * @return The parsed MemorySize.
+ * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
+ */
+ public static MemorySize parse(String text, MemoryUnit defaultUnit)
+ throws IllegalArgumentException {
+ if (!hasUnit(text)) {
+ return parse(text + defaultUnit.getUnits()[0]);
+ }
+
+ return parse(text);
+ }
+
+ /**
+ * Parses the given string as bytes. The supported expressions are listed under {@link
+ * MemorySize}.
+ *
+ * @param text The string to parse
+ * @return The parsed size, in bytes.
+ * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
+ */
+ public static long parseBytes(String text) throws IllegalArgumentException {
+ Objects.requireNonNull(text, "text cannot be null");
+
+ final String trimmed = text.trim();
+ if (trimmed.isEmpty()) {
+ throw new IllegalArgumentException("argument is an empty- or whitespace-only string");
+ }
+
+ final int len = trimmed.length();
+ int pos = 0;
+
+ char current;
+ while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
+ pos++;
+ }
+
+ final String number = trimmed.substring(0, pos);
+ final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+ if (number.isEmpty()) {
+ throw new NumberFormatException("text does not start with a number");
+ }
+
+ final long value;
+ try {
+ value = Long.parseLong(number); // this throws a NumberFormatException on overflow
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "The value '"
+ + number
+ + "' cannot be re represented as 64bit number (numeric overflow).");
+ }
+
+ final long multiplier = parseUnit(unit).map(MemoryUnit::getMultiplier).orElse(1L);
+ final long result = value * multiplier;
+
+ // check for overflow
+ if (result / multiplier != value) {
+ throw new IllegalArgumentException(
+ "The value '"
+ + text
+ + "' cannot be re represented as 64bit number of bytes (numeric overflow).");
+ }
+
+ return result;
+ }
+
+ private static Optional parseUnit(String unit) {
+ if (matchesAny(unit, BYTES)) {
+ return Optional.of(BYTES);
+ } else if (matchesAny(unit, KILO_BYTES)) {
+ return Optional.of(KILO_BYTES);
+ } else if (matchesAny(unit, MEGA_BYTES)) {
+ return Optional.of(MEGA_BYTES);
+ } else if (matchesAny(unit, GIGA_BYTES)) {
+ return Optional.of(GIGA_BYTES);
+ } else if (matchesAny(unit, TERA_BYTES)) {
+ return Optional.of(TERA_BYTES);
+ } else if (!unit.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Memory size unit '"
+ + unit
+ + "' does not match any of the recognized units: "
+ + MemoryUnit.getAllUnits());
+ }
+
+ return Optional.empty();
+ }
+
+ private static boolean matchesAny(String str, MemoryUnit unit) {
+ for (String s : unit.getUnits()) {
+ if (s.equals(str)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Enum which defines memory unit, mostly used to parse value from configuration file.
+ *
+ * To make larger values more compact, the common size suffixes are supported:
+ *
+ *
+ * - 1b or 1bytes (bytes)
+ *
- 1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
+ *
- 1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
+ *
- 1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
+ *
- 1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
+ *
+ */
+ public enum MemoryUnit {
+ BYTES(new String[] {"b", "bytes"}, 1L),
+ KILO_BYTES(new String[] {"k", "kb", "kibibytes"}, 1024L),
+ MEGA_BYTES(new String[] {"m", "mb", "mebibytes"}, 1024L * 1024L),
+ GIGA_BYTES(new String[] {"g", "gb", "gibibytes"}, 1024L * 1024L * 1024L),
+ TERA_BYTES(new String[] {"t", "tb", "tebibytes"}, 1024L * 1024L * 1024L * 1024L);
+
+ private final String[] units;
+
+ private final long multiplier;
+
+ MemoryUnit(String[] units, long multiplier) {
+ this.units = units;
+ this.multiplier = multiplier;
+ }
+
+ public String[] getUnits() {
+ return units;
+ }
+
+ public long getMultiplier() {
+ return multiplier;
+ }
+
+ public static String getAllUnits() {
+ return concatenateUnits(
+ BYTES.getUnits(),
+ KILO_BYTES.getUnits(),
+ MEGA_BYTES.getUnits(),
+ GIGA_BYTES.getUnits(),
+ TERA_BYTES.getUnits());
+ }
+
+ public static boolean hasUnit(String text) {
+ Objects.requireNonNull(text, "text cannot be null");
+
+ final String trimmed = text.trim();
+ if (trimmed.isEmpty()) {
+ throw new IllegalArgumentException("argument is an empty- or whitespace-only string");
+ }
+
+ final int len = trimmed.length();
+ int pos = 0;
+
+ char current;
+ while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
+ pos++;
+ }
+
+ final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+ return unit.length() > 0;
+ }
+
+ private static String concatenateUnits(final String[]... allUnits) {
+ final StringBuilder builder = new StringBuilder(128);
+
+ for (String[] units : allUnits) {
+ builder.append('(');
+
+ for (String unit : units) {
+ builder.append(unit);
+ builder.append(" | ");
+ }
+
+ builder.setLength(builder.length() - 3);
+ builder.append(") / ");
+ }
+
+ builder.setLength(builder.length() - 3);
+ return builder.toString();
+ }
+ }
+}
diff --git a/streampark-common/src/main/java/org/apache/streampark/common/util/TimeUtils.java b/streampark-common/src/main/java/org/apache/streampark/common/util/TimeUtils.java
new file mode 100644
index 0000000000..c1efee3a19
--- /dev/null
+++ b/streampark-common/src/main/java/org/apache/streampark/common/util/TimeUtils.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Collection of utilities about time intervals. */
+public class TimeUtils {
+
+ private static final Map LABEL_TO_UNIT_MAP =
+ Collections.unmodifiableMap(initMap());
+
+ /**
+ * Parse the given string to a java {@link Duration}. The string is in format "{length value}{time
+ * unit label}", e.g. "123ms", "321 s". If no time unit label is specified, it will be considered
+ * as milliseconds.
+ *
+ * Supported time unit labels are:
+ *
+ *
+ * - DAYS: "d", "day"
+ *
- HOURS: "h", "hour"
+ *
- MINUTES: "m", "min", "minute"
+ *
- SECONDS: "s", "sec", "second"
+ *
- MILLISECONDS: "ms", "milli", "millisecond"
+ *
- MICROSECONDS: "µs", "micro", "microsecond"
+ *
- NANOSECONDS: "ns", "nano", "nanosecond"
+ *
+ *
+ * @param text string to parse.
+ */
+ public static Duration parseDuration(String text) {
+ AssertUtils.notNull(text);
+
+ final String trimmed = text.trim();
+
+ if (trimmed.isEmpty()) {
+ throw new IllegalArgumentException("argument is an empty- or whitespace-only string.");
+ }
+
+ final int len = trimmed.length();
+ int pos = 0;
+
+ char current;
+ while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
+ pos++;
+ }
+
+ final String number = trimmed.substring(0, pos);
+ final String unitLabel = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+ if (number.isEmpty()) {
+ throw new NumberFormatException("text does not start with a number");
+ }
+
+ final long value;
+ try {
+ value = Long.parseLong(number); // this throws a NumberFormatException on overflow
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "The value '"
+ + number
+ + "' cannot be re represented as 64bit number (numeric overflow).");
+ }
+
+ if (unitLabel.isEmpty()) {
+ return Duration.of(value, ChronoUnit.MILLIS);
+ }
+
+ ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel);
+ if (unit != null) {
+ return Duration.of(value, unit);
+ } else {
+ throw new IllegalArgumentException(
+ "Time interval unit label '"
+ + unitLabel
+ + "' does not match any of the recognized units: "
+ + TimeUnit.getAllUnits());
+ }
+ }
+
+ private static Map initMap() {
+ Map labelToUnit = new HashMap<>();
+ for (TimeUnit timeUnit : TimeUnit.values()) {
+ for (String label : timeUnit.getLabels()) {
+ labelToUnit.put(label, timeUnit.getUnit());
+ }
+ }
+ return labelToUnit;
+ }
+
+ /**
+ * @param duration to convert to string
+ * @return duration string in millis
+ */
+ public static String getStringInMillis(final Duration duration) {
+ return duration.toMillis() + TimeUnit.MILLISECONDS.labels.get(0);
+ }
+
+ /**
+ * Pretty prints the duration as a lowest granularity unit that does not lose precision.
+ *
+ * Examples:
+ *
+ *
{@code
+ * Duration.ofMilliseconds(60000) will be printed as 1 min
+ * Duration.ofHours(1).plusSeconds(1) will be printed as 3601 s
+ * }
+ *
+ * NOTE: It supports only durations that fit into long.
+ */
+ public static String formatWithHighestUnit(Duration duration) {
+ long nanos = duration.toNanos();
+
+ TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos);
+ return String.format(
+ "%d %s",
+ nanos / highestIntegerUnit.unit.getDuration().toNanos(),
+ highestIntegerUnit.getLabels().get(0));
+ }
+
+ private static TimeUnit getHighestIntegerUnit(long nanos) {
+ if (nanos == 0) {
+ return TimeUnit.MILLISECONDS;
+ }
+
+ final List orderedUnits =
+ Arrays.asList(
+ TimeUnit.NANOSECONDS,
+ TimeUnit.MICROSECONDS,
+ TimeUnit.MILLISECONDS,
+ TimeUnit.SECONDS,
+ TimeUnit.MINUTES,
+ TimeUnit.HOURS,
+ TimeUnit.DAYS);
+
+ TimeUnit highestIntegerUnit = null;
+ for (TimeUnit timeUnit : orderedUnits) {
+ if (nanos % timeUnit.unit.getDuration().toNanos() != 0) {
+ break;
+ }
+ highestIntegerUnit = timeUnit;
+ }
+
+ return AssertUtils.notNull(highestIntegerUnit, "Should find a highestIntegerUnit.");
+ }
+
+ /** Enum which defines time unit, mostly used to parse value from configuration file. */
+ private enum TimeUnit {
+ DAYS(ChronoUnit.DAYS, singular("d"), plural("day")),
+ HOURS(ChronoUnit.HOURS, singular("h"), plural("hour")),
+ MINUTES(ChronoUnit.MINUTES, singular("min"), singular("m"), plural("minute")),
+ SECONDS(ChronoUnit.SECONDS, singular("s"), plural("sec"), plural("second")),
+ MILLISECONDS(ChronoUnit.MILLIS, singular("ms"), plural("milli"), plural("millisecond")),
+ MICROSECONDS(ChronoUnit.MICROS, singular("µs"), plural("micro"), plural("microsecond")),
+ NANOSECONDS(ChronoUnit.NANOS, singular("ns"), plural("nano"), plural("nanosecond"));
+
+ private static final String PLURAL_SUFFIX = "s";
+
+ private final List labels;
+
+ private final ChronoUnit unit;
+
+ TimeUnit(ChronoUnit unit, String[]... labels) {
+ this.unit = unit;
+ this.labels =
+ Arrays.stream(labels).flatMap(ls -> Arrays.stream(ls)).collect(Collectors.toList());
+ }
+
+ /**
+ * @param label the original label
+ * @return the singular format of the original label
+ */
+ private static String[] singular(String label) {
+ return new String[] {label};
+ }
+
+ /**
+ * @param label the original label
+ * @return both the singular format and plural format of the original label
+ */
+ private static String[] plural(String label) {
+ return new String[] {label, label + PLURAL_SUFFIX};
+ }
+
+ public List getLabels() {
+ return labels;
+ }
+
+ public ChronoUnit getUnit() {
+ return unit;
+ }
+
+ public static String getAllUnits() {
+ return Arrays.stream(TimeUnit.values())
+ .map(TimeUnit::createTimeUnitString)
+ .collect(Collectors.joining(", "));
+ }
+
+ private static String createTimeUnitString(TimeUnit timeUnit) {
+ return timeUnit.name() + ": (" + String.join(" | ", timeUnit.getLabels()) + ")";
+ }
+ }
+
+ private static ChronoUnit toChronoUnit(java.util.concurrent.TimeUnit timeUnit) {
+ switch (timeUnit) {
+ case NANOSECONDS:
+ return ChronoUnit.NANOS;
+ case MICROSECONDS:
+ return ChronoUnit.MICROS;
+ case MILLISECONDS:
+ return ChronoUnit.MILLIS;
+ case SECONDS:
+ return ChronoUnit.SECONDS;
+ case MINUTES:
+ return ChronoUnit.MINUTES;
+ case HOURS:
+ return ChronoUnit.HOURS;
+ case DAYS:
+ return ChronoUnit.DAYS;
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported time unit %s.", timeUnit));
+ }
+ }
+}
diff --git a/streampark-common/src/main/java/org/apache/streampark/common/util/YamlParserUtils.java b/streampark-common/src/main/java/org/apache/streampark/common/util/YamlParserUtils.java
new file mode 100644
index 0000000000..850985f7f1
--- /dev/null
+++ b/streampark-common/src/main/java/org/apache/streampark/common/util/YamlParserUtils.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.snakeyaml.engine.v2.api.Dump;
+import org.snakeyaml.engine.v2.api.DumpSettings;
+import org.snakeyaml.engine.v2.api.Load;
+import org.snakeyaml.engine.v2.api.LoadSettings;
+import org.snakeyaml.engine.v2.common.FlowStyle;
+import org.snakeyaml.engine.v2.exceptions.Mark;
+import org.snakeyaml.engine.v2.exceptions.MarkedYamlEngineException;
+import org.snakeyaml.engine.v2.exceptions.YamlEngineException;
+import org.snakeyaml.engine.v2.nodes.Node;
+import org.snakeyaml.engine.v2.nodes.ScalarNode;
+import org.snakeyaml.engine.v2.nodes.Tag;
+import org.snakeyaml.engine.v2.representer.StandardRepresenter;
+import org.snakeyaml.engine.v2.schema.CoreSchema;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains utility methods to load standard yaml file and convert object to standard
+ * yaml syntax.
+ */
+public class YamlParserUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(YamlParserUtils.class);
+
+ private static final DumpSettings blockerDumperSettings =
+ DumpSettings.builder()
+ .setDefaultFlowStyle(FlowStyle.BLOCK)
+ // Disable split long lines to avoid add unexpected line breaks
+ .setSplitLines(false)
+ .setSchema(new CoreSchema())
+ .build();
+
+ private static final DumpSettings flowDumperSettings =
+ DumpSettings.builder()
+ .setDefaultFlowStyle(FlowStyle.FLOW)
+ // Disable split long lines to avoid add unexpected line breaks
+ .setSplitLines(false)
+ .setSchema(new CoreSchema())
+ .build();
+
+ private static final Dump blockerDumper =
+ new Dump(blockerDumperSettings, new FlinkConfigRepresenter(blockerDumperSettings));
+
+ private static final Dump flowDumper =
+ new Dump(flowDumperSettings, new FlinkConfigRepresenter(flowDumperSettings));
+
+ private static final Load loader =
+ new Load(LoadSettings.builder().setSchema(new CoreSchema()).build());
+
+ /**
+ * Loads the contents of the given YAML file into a map.
+ *
+ * @param file the YAML file to load.
+ * @return a non-null map representing the YAML content. If the file is empty or only contains
+ * comments, an empty map is returned.
+ * @throws FileNotFoundException if the YAML file is not found.
+ * @throws YamlEngineException if the file cannot be parsed.
+ * @throws IOException if an I/O error occurs while reading from the file stream.
+ */
+ public static synchronized @Nonnull Map loadYamlFile(File file) throws Exception {
+ try (FileInputStream inputStream = new FileInputStream((file))) {
+ Map yamlResult =
+ (Map) loader.loadFromInputStream(inputStream);
+
+ return yamlResult == null ? new HashMap<>() : yamlResult;
+ } catch (FileNotFoundException e) {
+ LOG.error("Failed to find YAML file", e);
+ throw e;
+ } catch (IOException | YamlEngineException e) {
+ if (e instanceof MarkedYamlEngineException) {
+ YamlEngineException exception =
+ wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e);
+ LOG.error("Failed to parse YAML configuration", exception);
+ throw exception;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ public static synchronized @Nonnull Map loadYamlInput(InputStream inputStream)
+ throws Exception {
+ try {
+ Map yamlResult =
+ (Map) loader.loadFromInputStream(inputStream);
+ return yamlResult == null ? new HashMap<>() : yamlResult;
+ } catch (YamlEngineException e) {
+ if (e instanceof MarkedYamlEngineException) {
+ YamlEngineException exception =
+ wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e);
+ LOG.error("Failed to parse YAML configuration", exception);
+ throw exception;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ public static synchronized @Nonnull Map loadYamlString(String text) {
+ try {
+ Map yamlResult = (Map) loader.loadFromString(text);
+
+ return yamlResult == null ? new HashMap<>() : yamlResult;
+ } catch (YamlEngineException e) {
+ if (e instanceof MarkedYamlEngineException) {
+ YamlEngineException exception =
+ wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e);
+ LOG.error("Failed to parse YAML configuration", exception);
+ throw exception;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Converts the given value to a string representation in the YAML syntax. This method uses a YAML
+ * parser to convert the object to YAML format.
+ *
+ * The resulting YAML string may have line breaks at the end of each line. This method removes
+ * the line break at the end of the string if it exists.
+ *
+ *
Note: This method may perform escaping on certain characters in the value to ensure proper
+ * YAML syntax.
+ *
+ * @param value The value to be converted.
+ * @return The string representation of the value in YAML syntax.
+ */
+ public static synchronized String toYAMLString(Object value) {
+ try {
+ String output = flowDumper.dumpToString(value);
+ // remove the line break
+ String linebreak = flowDumperSettings.getBestLineBreak();
+ if (output.endsWith(linebreak)) {
+ output = output.substring(0, output.length() - linebreak.length());
+ }
+ return output;
+ } catch (MarkedYamlEngineException exception) {
+ throw wrapExceptionToHiddenSensitiveData(exception);
+ }
+ }
+
+ /**
+ * Converts a flat map into a nested map structure and outputs the result as a list of
+ * YAML-formatted strings. Each item in the list represents a single line of the YAML data. The
+ * method is synchronized and thus thread-safe.
+ *
+ * @param flattenMap A map containing flattened keys (e.g., "parent.child.key") associated with
+ * their values.
+ * @return A list of strings that represents the YAML data, where each item corresponds to a line
+ * of the data.
+ */
+ @SuppressWarnings("unchecked")
+ public static synchronized List convertAndDumpYamlFromFlatMap(
+ Map flattenMap) {
+ try {
+ Map nestedMap = new LinkedHashMap<>();
+ for (Map.Entry entry : flattenMap.entrySet()) {
+ String[] keys = entry.getKey().split("\\.");
+ Map currentMap = nestedMap;
+ for (int i = 0; i < keys.length - 1; i++) {
+ currentMap =
+ (Map) currentMap.computeIfAbsent(keys[i], k -> new LinkedHashMap<>());
+ }
+ currentMap.put(keys[keys.length - 1], entry.getValue());
+ }
+ String data = blockerDumper.dumpToString(nestedMap);
+ String linebreak = blockerDumperSettings.getBestLineBreak();
+ return Arrays.asList(data.split(linebreak));
+ } catch (MarkedYamlEngineException exception) {
+ throw wrapExceptionToHiddenSensitiveData(exception);
+ }
+ }
+
+ public static synchronized T convertToObject(String value, Class type) {
+ try {
+ return type.cast(loader.loadFromString(value));
+ } catch (MarkedYamlEngineException exception) {
+ throw wrapExceptionToHiddenSensitiveData(exception);
+ }
+ }
+
+ /**
+ * This method wraps a MarkedYAMLException to hide sensitive data in its message. Before using
+ * this method, an exception message might include sensitive information like:
+ *
+ * {@code
+ * while constructing a mapping
+ * in 'reader', line 1, column 1:
+ * key1: secret1
+ * ^
+ * found duplicate key key1
+ * in 'reader', line 2, column 1:
+ * key1: secret2
+ * ^
+ * }
+ *
+ * After using this method, the message will be sanitized to hide the sensitive details:
+ *
+ *
{@code
+ * while constructing a mapping
+ * in 'reader', line 1, column 1
+ * found duplicate key key1
+ * in 'reader', line 2, column 1
+ * }
+ *
+ * @param exception The MarkedYamlEngineException containing potentially sensitive data.
+ * @return A YamlEngineException with a message that has sensitive data hidden.
+ */
+ private static YamlEngineException wrapExceptionToHiddenSensitiveData(
+ MarkedYamlEngineException exception) {
+ StringBuilder lines = new StringBuilder();
+ String context = exception.getContext();
+ Optional contextMark = exception.getContextMark();
+ Optional problemMark = exception.getProblemMark();
+ String problem = exception.getProblem();
+
+ if (context != null) {
+ lines.append(context);
+ lines.append("\n");
+ }
+
+ if (contextMark.isPresent()
+ && (problem == null
+ || !problemMark.isPresent()
+ || contextMark.get().getName().equals(problemMark.get().getName())
+ || contextMark.get().getLine() != problemMark.get().getLine()
+ || contextMark.get().getColumn() != problemMark.get().getColumn())) {
+ lines.append(hiddenSensitiveDataInMark(contextMark.get()));
+ lines.append("\n");
+ }
+
+ if (problem != null) {
+ lines.append(problem);
+ lines.append("\n");
+ }
+
+ if (problemMark.isPresent()) {
+ lines.append(hiddenSensitiveDataInMark(problemMark.get()));
+ lines.append("\n");
+ }
+
+ Throwable cause = exception.getCause();
+ if (cause instanceof MarkedYamlEngineException) {
+ cause = wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) cause);
+ }
+
+ YamlEngineException yamlException = new YamlEngineException(lines.toString(), cause);
+ yamlException.setStackTrace(exception.getStackTrace());
+ return yamlException;
+ }
+
+ /**
+ * This method is a mock implementation of the Mark#toString() method, specifically designed to
+ * exclude the Mark#get_snippet(), to prevent leaking any sensitive data.
+ */
+ private static String hiddenSensitiveDataInMark(Mark mark) {
+ return " in "
+ + mark.getName()
+ + ", line "
+ + (mark.getLine() + 1)
+ + ", column "
+ + (mark.getColumn() + 1);
+ }
+
+ private static class FlinkConfigRepresenter extends StandardRepresenter {
+ public FlinkConfigRepresenter(DumpSettings dumpSettings) {
+ super(dumpSettings);
+ representers.put(Duration.class, this::representDuration);
+ representers.put(MemorySize.class, this::representMemorySize);
+ parentClassRepresenters.put(Enum.class, this::representEnum);
+ }
+
+ private Node representDuration(Object data) {
+ Duration duration = (Duration) data;
+ String durationString = TimeUtils.formatWithHighestUnit(duration);
+ return new ScalarNode(Tag.STR, durationString, settings.getDefaultScalarStyle());
+ }
+
+ private Node representMemorySize(Object data) {
+ MemorySize memorySize = (MemorySize) data;
+ return new ScalarNode(Tag.STR, memorySize.toString(), settings.getDefaultScalarStyle());
+ }
+
+ private Node representEnum(Object data) {
+ return new ScalarNode(Tag.STR, data.toString(), settings.getDefaultScalarStyle());
+ }
+ }
+}
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index f839fa207f..f66b113f34 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -18,7 +18,6 @@ package org.apache.streampark.common.util
import com.typesafe.config.ConfigFactory
import org.apache.commons.lang3.StringUtils
-import org.yaml.snakeyaml.Yaml
import javax.annotation.Nonnull
@@ -82,16 +81,8 @@ object PropertiesUtils extends Logger {
}
def fromYamlText(text: String): Map[String, String] = {
- try {
- new Yaml()
- .load(text)
- .asInstanceOf[java.util.Map[String, Map[String, Any]]]
- .flatMap(x => eachYamlItem(x._1, x._2))
- .toMap
- } catch {
- case e: IOException =>
- throw new IllegalArgumentException(s"Failed when loading conf error:", e)
- }
+ val map = YamlParserUtils.loadYamlString(text)
+ map.flatMap(x => eachYamlItem(x._1, x._2)).toMap
}
def fromHoconText(conf: String): Map[String, String] = {
@@ -146,11 +137,8 @@ object PropertiesUtils extends Logger {
inputStream != null,
s"[StreamPark] fromYamlFile: Properties inputStream must not be null")
try {
- new Yaml()
- .load(inputStream)
- .asInstanceOf[java.util.Map[String, Map[String, Any]]]
- .flatMap(x => eachYamlItem(x._1, x._2))
- .toMap
+ val map = YamlParserUtils.loadYamlInput(inputStream)
+ map.flatMap(x => eachYamlItem(x._1, x._2)).toMap
} catch {
case e: IOException =>
throw new IllegalArgumentException(s"Failed when loading yaml from inputStream", e)
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 4cdcf07c19..86bc2bf8d0 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -26,7 +26,6 @@ import org.apache.streampark.flink.util.FlinkUtils
import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper
import org.apache.commons.io.FileUtils
-import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, SavepointRestoreSettings}
import javax.annotation.Nullable
@@ -35,7 +34,7 @@ import java.io.File
import java.util.{Map => JavaMap}
import scala.collection.JavaConversions._
-import scala.util.{Success, Try}
+import scala.util.Try
case class SubmitRequest(
flinkVersion: FlinkVersion,
@@ -99,32 +98,6 @@ case class SubmitRequest(
}
}
- lazy val flinkDefaultConfiguration: Configuration = {
- Try(GlobalConfiguration.loadConfiguration(s"${flinkVersion.flinkHome}/conf")) match {
- case Success(value) =>
- executionMode match {
- case ExecutionMode.YARN_SESSION | ExecutionMode.KUBERNETES_NATIVE_SESSION |
- ExecutionMode.REMOTE =>
- value
- case _ =>
- value
- .keySet()
- .foreach(
- k => {
- val v = value.getString(k, null)
- if (v != null) {
- val result = v
- .replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", effectiveAppName)
- .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", id.toString)
- value.setString(k, result)
- }
- })
- value
- }
- case _ => new Configuration()
- }
- }
-
def hasProp(key: String): Boolean = properties.containsKey(key)
def getProp(key: String): Any = properties.get(key)
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
index 2e642312da..0a40d9a7ed 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
@@ -87,6 +87,11 @@
json4s-jackson_${scala.binary.version}
+
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/java/org/apache/flink/configuration/FlinkGlobalConfiguration.java b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/java/org/apache/flink/configuration/FlinkGlobalConfiguration.java
new file mode 100644
index 0000000000..715972d50f
--- /dev/null
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/java/org/apache/flink/configuration/FlinkGlobalConfiguration.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.streampark.common.util.AssertUtils;
+import org.apache.streampark.common.util.YamlParserUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Global configuration object for Flink. Similar to Java properties configuration objects it
+ * includes key-value pairs which represent the framework's configuration.
+ */
+public final class FlinkGlobalConfiguration {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkGlobalConfiguration.class);
+
+ public static final String LEGACY_FLINK_CONF_FILENAME = "flink-conf.yaml";
+
+ public static final String FLINK_CONF_FILENAME = "config.yaml";
+
+ // key separator character
+ private static final String KEY_SEPARATOR = ".";
+
+ // the keys whose values should be hidden
+ private static final String[] SENSITIVE_KEYS =
+ new String[] {
+ "password",
+ "secret",
+ "fs.azure.account.key",
+ "apikey",
+ "auth-params",
+ "service-key",
+ "token",
+ "basic-auth",
+ "jaas.config",
+ "http-headers"
+ };
+
+ // the hidden content to be displayed
+ public static final String HIDDEN_CONTENT = "******";
+
+ private static boolean standardYaml = true;
+
+ // --------------------------------------------------------------------------------------------
+
+ private FlinkGlobalConfiguration() {}
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Loads the global configuration from the environment. Fails if an error occurs during loading.
+ * Returns an empty configuration object if the environment variable is not set. In production
+ * this variable is set but tests and local execution/debugging don't have this environment
+ * variable set. That's why we should fail if it is not set.
+ *
+ * @return Returns the Configuration
+ */
+ public static Configuration loadConfiguration() {
+ return loadConfiguration(new Configuration());
+ }
+
+ /**
+ * Loads the global configuration and adds the given dynamic properties configuration.
+ *
+ * @param dynamicProperties The given dynamic properties
+ * @return Returns the loaded global configuration with dynamic properties
+ */
+ public static Configuration loadConfiguration(Configuration dynamicProperties) {
+ final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+ if (configDir == null) {
+ return new Configuration(dynamicProperties);
+ }
+
+ return loadConfiguration(configDir, dynamicProperties);
+ }
+
+ /**
+ * Loads the configuration files from the specified directory.
+ *
+ * YAML files are supported as configuration files.
+ *
+ * @param configDir the directory which contains the configuration files
+ */
+ public static Configuration loadConfiguration(final String configDir) {
+ return loadConfiguration(configDir, null);
+ }
+
+ /**
+ * Loads the configuration files from the specified directory. If the dynamic properties
+ * configuration is not null, then it is added to the loaded configuration.
+ *
+ * @param configDir directory to load the configuration from
+ * @param dynamicProperties configuration file containing the dynamic properties. Null if none.
+ * @return The configuration loaded from the given configuration directory
+ */
+ public static Configuration loadConfiguration(
+ final String configDir, @Nullable final Configuration dynamicProperties) {
+
+ if (configDir == null) {
+ throw new IllegalArgumentException(
+ "Given configuration directory is null, cannot load configuration");
+ }
+
+ final File confDirFile = new File(configDir);
+ if (!(confDirFile.exists())) {
+ throw new IllegalConfigurationException(
+ "The given configuration directory name '"
+ + configDir
+ + "' ("
+ + confDirFile.getAbsolutePath()
+ + ") does not describe an existing directory.");
+ }
+
+ // get Flink yaml configuration file
+ File yamlConfigFile = new File(confDirFile, LEGACY_FLINK_CONF_FILENAME);
+ Configuration configuration;
+
+ if (!yamlConfigFile.exists()) {
+ yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
+ if (!yamlConfigFile.exists()) {
+ throw new IllegalConfigurationException(
+ "The Flink config file '"
+ + yamlConfigFile
+ + "' ("
+ + yamlConfigFile.getAbsolutePath()
+ + ") does not exist.");
+ } else {
+ standardYaml = true;
+ LOG.info(
+ "Using standard YAML parser to load flink configuration file from {}.",
+ yamlConfigFile.getAbsolutePath());
+ configuration = loadYAMLResource(yamlConfigFile);
+ }
+ } else {
+ standardYaml = false;
+ LOG.info(
+ "Using legacy YAML parser to load flink configuration file from {}.",
+ yamlConfigFile.getAbsolutePath());
+ configuration = loadLegacyYAMLResource(yamlConfigFile);
+ }
+
+ logConfiguration("Loading", configuration);
+
+ if (dynamicProperties != null) {
+ logConfiguration("Loading dynamic", dynamicProperties);
+ configuration.addAll(dynamicProperties);
+ }
+
+ return configuration;
+ }
+
+ private static void logConfiguration(String prefix, Configuration config) {
+ config.confData.forEach(
+ (key, value) ->
+ LOG.info(
+ "{} configuration property: {}, {}",
+ prefix,
+ key,
+ isSensitive(key) ? HIDDEN_CONTENT : value));
+ }
+
+ private static Configuration loadLegacyYAMLResource(File file) {
+ final Configuration config = new Configuration();
+
+ try (BufferedReader reader =
+ new BufferedReader(new InputStreamReader(new FileInputStream(file)))) {
+
+ String line;
+ int lineNo = 0;
+ while ((line = reader.readLine()) != null) {
+ lineNo++;
+ // 1. check for comments
+ String[] comments = line.split("#", 2);
+ String conf = comments[0].trim();
+
+ // 2. get key and value
+ if (conf.length() > 0) {
+ String[] kv = conf.split(": ", 2);
+
+ // skip line with no valid key-value pair
+ if (kv.length == 1) {
+ LOG.warn(
+ "Error while trying to split key and value in configuration file "
+ + file
+ + ":"
+ + lineNo
+ + ": Line is not a key-value pair (missing space after ':'?)");
+ continue;
+ }
+
+ String key = kv[0].trim();
+ String value = kv[1].trim();
+
+ // sanity check
+ if (key.length() == 0 || value.length() == 0) {
+ LOG.warn(
+ "Error after splitting key and value in configuration file "
+ + file
+ + ":"
+ + lineNo
+ + ": Key or value was empty");
+ continue;
+ }
+
+ config.setString(key, value);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error parsing YAML configuration.", e);
+ }
+
+ return config;
+ }
+
+ private static Map flatten(Map config, String keyPrefix) {
+ final Map flattenedMap = new HashMap<>();
+
+ config.forEach(
+ (key, value) -> {
+ String flattenedKey = keyPrefix + key;
+ if (value instanceof Map) {
+ Map e = (Map) value;
+ flattenedMap.putAll(flatten(e, flattenedKey + KEY_SEPARATOR));
+ } else {
+ if (value instanceof List) {
+ flattenedMap.put(flattenedKey, YamlParserUtils.toYAMLString(value));
+ } else {
+ flattenedMap.put(flattenedKey, value);
+ }
+ }
+ });
+
+ return flattenedMap;
+ }
+
+ private static Map flatten(Map config) {
+ // Since we start flattening from the root, keys should not be prefixed with anything.
+ return flatten(config, "");
+ }
+
+ private static Configuration loadYAMLResource(File file) {
+ final Configuration config = new Configuration();
+
+ try {
+ Map configDocument = flatten(YamlParserUtils.loadYamlFile(file));
+ configDocument.forEach((k, v) -> config.setValueInternal(k, v, false));
+
+ return config;
+ } catch (Exception e) {
+ throw new RuntimeException("Error parsing YAML configuration.", e);
+ }
+ }
+
+ /**
+ * Check whether the key is a hidden key.
+ *
+ * @param key the config key
+ */
+ public static boolean isSensitive(String key) {
+ AssertUtils.notNull(key, "key");
+ final String keyInLower = key.toLowerCase();
+ for (String hideKey : SENSITIVE_KEYS) {
+ if (keyInLower.length() >= hideKey.length() && keyInLower.contains(hideKey)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static String getFlinkConfFilename() {
+ if (isStandardYaml()) {
+ return FLINK_CONF_FILENAME;
+ } else {
+ return LEGACY_FLINK_CONF_FILENAME;
+ }
+ }
+
+ public static boolean isStandardYaml() {
+ return standardYaml;
+ }
+
+ public static void setStandardYaml(boolean standardYaml) {
+ FlinkGlobalConfiguration.standardYaml = standardYaml;
+ }
+}
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 0437dbaab4..b652aa67cc 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode}
+import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode}
import org.apache.streampark.common.util.{DeflaterUtils, Logger, PropertiesUtils, Utils}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -284,7 +284,8 @@ trait FlinkClientTrait extends Logger {
val configurationDirectory = s"$flinkHome/conf"
// 2. load the custom command lines
val flinkConfig =
- Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration())
+ Try(FlinkGlobalConfiguration.loadConfiguration(s"$flinkHome/conf"))
+ .getOrElse(new Configuration())
loadCustomCommandLines(flinkConfig, configurationDirectory)
}
@@ -413,7 +414,8 @@ trait FlinkClientTrait extends Logger {
validateAndGetActiveCommandLine(getCustomCommandLines(flinkHome), commandLine)
val flinkDefaultConfiguration =
- Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration())
+ Try(FlinkGlobalConfiguration.loadConfiguration(s"$flinkHome/conf"))
+ .getOrElse(new Configuration())
val configuration = new Configuration(flinkDefaultConfiguration)
configuration.addAll(activeCommandLine.toConfiguration(commandLine))
@@ -451,6 +453,39 @@ trait FlinkClientTrait extends Logger {
}
}
+ implicit private[client] class EnhanceConfiguration(request: SubmitRequest) {
+ lazy val flinkDefaultConfiguration: Configuration = {
+ Try(
+ FlinkGlobalConfiguration.loadConfiguration(
+ s"${request.flinkVersion.flinkHome}/conf")) match {
+ case Success(value) =>
+ request.executionMode match {
+ case ExecutionMode.YARN_SESSION | ExecutionMode.KUBERNETES_NATIVE_SESSION |
+ ExecutionMode.REMOTE =>
+ value
+ case _ =>
+ value
+ .keySet()
+ .foreach(
+ k => {
+ val v = value.getString(k, null)
+ if (v != null) {
+ val result = v
+ .replaceAll(
+ "\\$\\{job(Name|name)}|\\$job(Name|name)",
+ request.effectiveAppName)
+ .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", request.id.toString)
+ value.setString(k, result)
+ }
+ })
+ value
+ }
+ case _ => new Configuration()
+ }
+ }
+
+ }
+
private[client] def cancelJob(
cancelRequest: CancelRequest,
jobID: JobID,
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
index 6451d5908d..f51defcaf5 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
@@ -65,7 +65,7 @@ object YarnPerJobTestCase extends Logger {
lazy val flinkDefaultConfiguration: Configuration = {
require(FLINK_HOME != null)
// get flink config
- GlobalConfiguration.loadConfiguration(s"$FLINK_HOME/conf")
+ FlinkGlobalConfiguration.loadConfiguration(s"$FLINK_HOME/conf")
}
lazy val customCommandLines: util.List[CustomCommandLine] = {