diff --git a/fluss-flink/fluss-flink-tiering/src/README.md b/fluss-flink/fluss-flink-tiering/src/README.md
index 2de96debcf..e3880d7061 100644
--- a/fluss-flink/fluss-flink-tiering/src/README.md
+++ b/fluss-flink/fluss-flink-tiering/src/README.md
@@ -18,7 +18,9 @@
# Fluss Flink Tiering
-This module contains one class FlussLakeTiering.
+This module provides the infrastructure for tiering Fluss data to lake formats (e.g., Apache Paimon),
+consisting of FlussLakeTiering which encapsulates the core configuration and job graph logic,
+and FlussLakeTieringEntrypoint which serves as the official Flink job main class and entrypoint.
The reason for extracting it as a separate module is that: When executing the Flink jar job, a jar must be specified.
If a `fluss-flink.jar` is specified, it may cause various classloader issues, as there are also `fluss-flink.jar`
diff --git a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTiering.java b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTiering.java
new file mode 100644
index 0000000000..20f3535e26
--- /dev/null
+++ b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTiering.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.adapter.MultipleParameterToolAdapter;
+
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Map;
+
+import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME;
+import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX;
+import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
+import static org.apache.fluss.utils.PropertiesUtils.extractPrefix;
+
+/**
+ * The entrypoint logic for building and launching a Fluss-to-Lake (e.g., Paimon) data tiering job.
+ *
+ *
This class is responsible for parsing configuration parameters, initializing the Flink
+ * execution environment, and coordinating the construction of the tiering pipeline.
+ *
+ *
Design Motivation: By decoupling the logic from {@link FlussLakeTieringEntrypoint} into this
+ * class, extensibility is significantly improved. Developers can now extend this class to customize
+ * configuration extraction (e.g., injecting internal security tokens) without duplicating the core
+ * entrypoint boilerplate.
+ */
+public class FlussLakeTiering {
+
+ private static final String FLUSS_CONF_PREFIX = "fluss.";
+ private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering.";
+
+ protected final StreamExecutionEnvironment execEnv;
+ protected final String dataLake;
+ protected final Map flussConfigMap;
+ protected final Map lakeConfigMap;
+ protected final Map lakeTieringConfigMap;
+
+ public FlussLakeTiering(String[] args) {
+ // parse params
+ final MultipleParameterToolAdapter params = MultipleParameterToolAdapter.fromArgs(args);
+ Map paramsMap = params.toMap();
+
+ // extract fluss config
+ flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX);
+ // we need to get bootstrap.servers
+ String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key());
+ if (bootstrapServers == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The bootstrap server to fluss is not configured, please configure %s",
+ FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key()));
+ }
+
+ dataLake = paramsMap.get(ConfigOptions.DATALAKE_FORMAT.key());
+ if (dataLake == null) {
+ throw new IllegalArgumentException(
+ ConfigOptions.DATALAKE_FORMAT.key() + " is not configured");
+ }
+
+ // extract lake config
+ lakeConfigMap =
+ extractAndRemovePrefix(
+ paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake));
+
+ // extract tiering service config
+ lakeTieringConfigMap = extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX);
+
+ // now, we must use full restart strategy if any task is failed,
+ // since committer is stateless, if tiering committer is failover, committer
+ // will lost the collected committable, and will never collect all committable to do commit
+ // todo: support region failover
+ org.apache.flink.configuration.Configuration flinkConfig =
+ new org.apache.flink.configuration.Configuration();
+ flinkConfig.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FULL_RESTART_STRATEGY_NAME);
+
+ execEnv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);
+ }
+
+ protected void run() throws Exception {
+ // build and run lake tiering job
+ JobClient jobClient =
+ LakeTieringJobBuilder.newBuilder(
+ execEnv,
+ Configuration.fromMap(flussConfigMap),
+ Configuration.fromMap(lakeConfigMap),
+ Configuration.fromMap(lakeTieringConfigMap),
+ dataLake)
+ .build();
+
+ System.out.printf(
+ "Starting data tiering service from Fluss to %s, jobId is %s.....%n",
+ dataLake, jobClient.getJobID());
+ }
+}
diff --git a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java
index a2ce6ce1af..04ce86af14 100644
--- a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java
+++ b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java
@@ -17,84 +17,10 @@
package org.apache.fluss.flink.tiering;
-import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.config.Configuration;
-import org.apache.fluss.flink.adapter.MultipleParameterToolAdapter;
-
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.util.Map;
-
-import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME;
-import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX;
-import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
-import static org.apache.fluss.utils.PropertiesUtils.extractPrefix;
-
/** The entrypoint for Flink to tier fluss data to lake format like paimon. */
public class FlussLakeTieringEntrypoint {
- private static final String FLUSS_CONF_PREFIX = "fluss.";
- private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering.";
-
public static void main(String[] args) throws Exception {
-
- // parse params
- final MultipleParameterToolAdapter params = MultipleParameterToolAdapter.fromArgs(args);
- Map paramsMap = params.toMap();
-
- // extract fluss config
- Map flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX);
- // we need to get bootstrap.servers
- String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key());
- if (bootstrapServers == null) {
- throw new IllegalArgumentException(
- String.format(
- "The bootstrap server to fluss is not configured, please configure %s",
- FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key()));
- }
- flussConfigMap.put(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
-
- String dataLake = paramsMap.get(ConfigOptions.DATALAKE_FORMAT.key());
- if (dataLake == null) {
- throw new IllegalArgumentException(
- ConfigOptions.DATALAKE_FORMAT.key() + " is not configured");
- }
-
- // extract lake config
- Map lakeConfigMap =
- extractAndRemovePrefix(
- paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake));
-
- // extract tiering service config
- Map lakeTieringConfigMap =
- extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX);
-
- // now, we must use full restart strategy if any task is failed,
- // since committer is stateless, if tiering committer is failover, committer
- // will lost the collected committable, and will never collect all committable to do commit
- // todo: support region failover
- org.apache.flink.configuration.Configuration flinkConfig =
- new org.apache.flink.configuration.Configuration();
- flinkConfig.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FULL_RESTART_STRATEGY_NAME);
-
- // build tiering source
- final StreamExecutionEnvironment execEnv =
- StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);
-
- // build lake tiering job
- JobClient jobClient =
- LakeTieringJobBuilder.newBuilder(
- execEnv,
- Configuration.fromMap(flussConfigMap),
- Configuration.fromMap(lakeConfigMap),
- Configuration.fromMap(lakeTieringConfigMap),
- dataLake)
- .build();
-
- System.out.printf(
- "Starting data tiering service from Fluss to %s, jobId is %s.....%n",
- dataLake, jobClient.getJobID());
+ new FlussLakeTiering(args).run();
}
}
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index 16e942860e..2e7d79c504 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -465,6 +465,7 @@
org.apache.fluss.flink.tiering.LakeTieringJobBuilder
org.apache.fluss.flink.tiering.FlussLakeTieringEntrypoint
+ org.apache.fluss.flink.tiering.FlussLakeTiering
org.apache.flink.table.catalog.*