From efcc2f9090fc606b547a654255132ee3e1d8e229 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Tue, 13 Jan 2026 21:21:50 +0800 Subject: [PATCH] [lake] Make FlussLakeTiering pluggable to customize tiering job construct --- fluss-flink/fluss-flink-tiering/src/README.md | 4 +- .../fluss/flink/tiering/FlussLakeTiering.java | 114 ++++++++++++++++++ .../tiering/FlussLakeTieringEntrypoint.java | 76 +----------- fluss-test-coverage/pom.xml | 1 + 4 files changed, 119 insertions(+), 76 deletions(-) create mode 100644 fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTiering.java diff --git a/fluss-flink/fluss-flink-tiering/src/README.md b/fluss-flink/fluss-flink-tiering/src/README.md index 2de96debcf..e3880d7061 100644 --- a/fluss-flink/fluss-flink-tiering/src/README.md +++ b/fluss-flink/fluss-flink-tiering/src/README.md @@ -18,7 +18,9 @@ # Fluss Flink Tiering -This module contains one class FlussLakeTiering. +This module provides the infrastructure for tiering Fluss data to lake formats (e.g., Apache Paimon), +consisting of FlussLakeTiering which encapsulates the core configuration and job graph logic, +and FlussLakeTieringEntrypoint which serves as the official Flink job main class and entrypoint. The reason for extracting it as a separate module is that: When executing the Flink jar job, a jar must be specified. If a `fluss-flink.jar` is specified, it may cause various classloader issues, as there are also `fluss-flink.jar` diff --git a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTiering.java b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTiering.java new file mode 100644 index 0000000000..20f3535e26 --- /dev/null +++ b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTiering.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.adapter.MultipleParameterToolAdapter; + +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Map; + +import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME; +import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX; +import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix; +import static org.apache.fluss.utils.PropertiesUtils.extractPrefix; + +/** + * The entrypoint logic for building and launching a Fluss-to-Lake (e.g., Paimon) data tiering job. + * + *

This class is responsible for parsing configuration parameters, initializing the Flink + * execution environment, and coordinating the construction of the tiering pipeline. + * + *

Design Motivation: By decoupling the logic from {@link FlussLakeTieringEntrypoint} into this + * class, extensibility is significantly improved. Developers can now extend this class to customize + * configuration extraction (e.g., injecting internal security tokens) without duplicating the core + * entrypoint boilerplate. + */ +public class FlussLakeTiering { + + private static final String FLUSS_CONF_PREFIX = "fluss."; + private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering."; + + protected final StreamExecutionEnvironment execEnv; + protected final String dataLake; + protected final Map flussConfigMap; + protected final Map lakeConfigMap; + protected final Map lakeTieringConfigMap; + + public FlussLakeTiering(String[] args) { + // parse params + final MultipleParameterToolAdapter params = MultipleParameterToolAdapter.fromArgs(args); + Map paramsMap = params.toMap(); + + // extract fluss config + flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX); + // we need to get bootstrap.servers + String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key()); + if (bootstrapServers == null) { + throw new IllegalArgumentException( + String.format( + "The bootstrap server to fluss is not configured, please configure %s", + FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key())); + } + + dataLake = paramsMap.get(ConfigOptions.DATALAKE_FORMAT.key()); + if (dataLake == null) { + throw new IllegalArgumentException( + ConfigOptions.DATALAKE_FORMAT.key() + " is not configured"); + } + + // extract lake config + lakeConfigMap = + extractAndRemovePrefix( + paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake)); + + // extract tiering service config + lakeTieringConfigMap = extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX); + + // now, we must use full restart strategy if any task is failed, + // since committer is stateless, if tiering committer is failover, committer + // will lost the collected committable, and will never collect all committable to do commit + // todo: support region failover + org.apache.flink.configuration.Configuration flinkConfig = + new org.apache.flink.configuration.Configuration(); + flinkConfig.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FULL_RESTART_STRATEGY_NAME); + + execEnv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig); + } + + protected void run() throws Exception { + // build and run lake tiering job + JobClient jobClient = + LakeTieringJobBuilder.newBuilder( + execEnv, + Configuration.fromMap(flussConfigMap), + Configuration.fromMap(lakeConfigMap), + Configuration.fromMap(lakeTieringConfigMap), + dataLake) + .build(); + + System.out.printf( + "Starting data tiering service from Fluss to %s, jobId is %s.....%n", + dataLake, jobClient.getJobID()); + } +} diff --git a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java index a2ce6ce1af..04ce86af14 100644 --- a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java +++ b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java @@ -17,84 +17,10 @@ package org.apache.fluss.flink.tiering; -import org.apache.fluss.config.ConfigOptions; -import org.apache.fluss.config.Configuration; -import org.apache.fluss.flink.adapter.MultipleParameterToolAdapter; - -import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -import java.util.Map; - -import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME; -import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX; -import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix; -import static org.apache.fluss.utils.PropertiesUtils.extractPrefix; - /** The entrypoint for Flink to tier fluss data to lake format like paimon. */ public class FlussLakeTieringEntrypoint { - private static final String FLUSS_CONF_PREFIX = "fluss."; - private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering."; - public static void main(String[] args) throws Exception { - - // parse params - final MultipleParameterToolAdapter params = MultipleParameterToolAdapter.fromArgs(args); - Map paramsMap = params.toMap(); - - // extract fluss config - Map flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX); - // we need to get bootstrap.servers - String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key()); - if (bootstrapServers == null) { - throw new IllegalArgumentException( - String.format( - "The bootstrap server to fluss is not configured, please configure %s", - FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key())); - } - flussConfigMap.put(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers); - - String dataLake = paramsMap.get(ConfigOptions.DATALAKE_FORMAT.key()); - if (dataLake == null) { - throw new IllegalArgumentException( - ConfigOptions.DATALAKE_FORMAT.key() + " is not configured"); - } - - // extract lake config - Map lakeConfigMap = - extractAndRemovePrefix( - paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake)); - - // extract tiering service config - Map lakeTieringConfigMap = - extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX); - - // now, we must use full restart strategy if any task is failed, - // since committer is stateless, if tiering committer is failover, committer - // will lost the collected committable, and will never collect all committable to do commit - // todo: support region failover - org.apache.flink.configuration.Configuration flinkConfig = - new org.apache.flink.configuration.Configuration(); - flinkConfig.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FULL_RESTART_STRATEGY_NAME); - - // build tiering source - final StreamExecutionEnvironment execEnv = - StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig); - - // build lake tiering job - JobClient jobClient = - LakeTieringJobBuilder.newBuilder( - execEnv, - Configuration.fromMap(flussConfigMap), - Configuration.fromMap(lakeConfigMap), - Configuration.fromMap(lakeTieringConfigMap), - dataLake) - .build(); - - System.out.printf( - "Starting data tiering service from Fluss to %s, jobId is %s.....%n", - dataLake, jobClient.getJobID()); + new FlussLakeTiering(args).run(); } } diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 16e942860e..2e7d79c504 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -465,6 +465,7 @@ org.apache.fluss.flink.tiering.LakeTieringJobBuilder org.apache.fluss.flink.tiering.FlussLakeTieringEntrypoint + org.apache.fluss.flink.tiering.FlussLakeTiering org.apache.flink.table.catalog.*