Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion fluss-flink/fluss-flink-tiering/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

# Fluss Flink Tiering

This module contains one class FlussLakeTiering.
This module provides the infrastructure for tiering Fluss data to lake formats (e.g., Apache Paimon),
consisting of FlussLakeTiering which encapsulates the core configuration and job graph logic,
and FlussLakeTieringEntrypoint which serves as the official Flink job main class and entrypoint.

The reason for extracting it as a separate module is that: When executing the Flink jar job, a jar must be specified.
If a `fluss-flink.jar` is specified, it may cause various classloader issues, as there are also `fluss-flink.jar`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.tiering;

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.adapter.MultipleParameterToolAdapter;

import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Map;

import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME;
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX;
import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
import static org.apache.fluss.utils.PropertiesUtils.extractPrefix;

/**
* The entrypoint logic for building and launching a Fluss-to-Lake (e.g., Paimon) data tiering job.
*
* <p>This class is responsible for parsing configuration parameters, initializing the Flink
* execution environment, and coordinating the construction of the tiering pipeline.
*
* <p>Design Motivation: By decoupling the logic from {@link FlussLakeTieringEntrypoint} into this
* class, extensibility is significantly improved. Developers can now extend this class to customize
* configuration extraction (e.g., injecting internal security tokens) without duplicating the core
* entrypoint boilerplate.
*/
public class FlussLakeTiering {

private static final String FLUSS_CONF_PREFIX = "fluss.";
private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering.";

protected final StreamExecutionEnvironment execEnv;
protected final String dataLake;
protected final Map<String, String> flussConfigMap;
protected final Map<String, String> lakeConfigMap;
protected final Map<String, String> lakeTieringConfigMap;

public FlussLakeTiering(String[] args) {
// parse params
final MultipleParameterToolAdapter params = MultipleParameterToolAdapter.fromArgs(args);
Map<String, String> paramsMap = params.toMap();

// extract fluss config
flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX);
// we need to get bootstrap.servers
String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key());
if (bootstrapServers == null) {
throw new IllegalArgumentException(
String.format(
"The bootstrap server to fluss is not configured, please configure %s",
FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key()));
}

dataLake = paramsMap.get(ConfigOptions.DATALAKE_FORMAT.key());
if (dataLake == null) {
throw new IllegalArgumentException(
ConfigOptions.DATALAKE_FORMAT.key() + " is not configured");
}

// extract lake config
lakeConfigMap =
extractAndRemovePrefix(
paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake));

// extract tiering service config
lakeTieringConfigMap = extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX);

// now, we must use full restart strategy if any task is failed,
// since committer is stateless, if tiering committer is failover, committer
// will lost the collected committable, and will never collect all committable to do commit
// todo: support region failover
org.apache.flink.configuration.Configuration flinkConfig =
new org.apache.flink.configuration.Configuration();
flinkConfig.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FULL_RESTART_STRATEGY_NAME);

execEnv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);
}

protected void run() throws Exception {
// build and run lake tiering job
JobClient jobClient =
LakeTieringJobBuilder.newBuilder(
execEnv,
Configuration.fromMap(flussConfigMap),
Configuration.fromMap(lakeConfigMap),
Configuration.fromMap(lakeTieringConfigMap),
dataLake)
.build();

System.out.printf(
"Starting data tiering service from Fluss to %s, jobId is %s.....%n",
dataLake, jobClient.getJobID());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,84 +17,10 @@

package org.apache.fluss.flink.tiering;

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.adapter.MultipleParameterToolAdapter;

import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Map;

import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME;
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX;
import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
import static org.apache.fluss.utils.PropertiesUtils.extractPrefix;

/** The entrypoint for Flink to tier fluss data to lake format like paimon. */
public class FlussLakeTieringEntrypoint {

private static final String FLUSS_CONF_PREFIX = "fluss.";
private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering.";

public static void main(String[] args) throws Exception {

// parse params
final MultipleParameterToolAdapter params = MultipleParameterToolAdapter.fromArgs(args);
Map<String, String> paramsMap = params.toMap();

// extract fluss config
Map<String, String> flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX);
// we need to get bootstrap.servers
String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key());
if (bootstrapServers == null) {
throw new IllegalArgumentException(
String.format(
"The bootstrap server to fluss is not configured, please configure %s",
FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key()));
}
flussConfigMap.put(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);

String dataLake = paramsMap.get(ConfigOptions.DATALAKE_FORMAT.key());
if (dataLake == null) {
throw new IllegalArgumentException(
ConfigOptions.DATALAKE_FORMAT.key() + " is not configured");
}

// extract lake config
Map<String, String> lakeConfigMap =
extractAndRemovePrefix(
paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake));

// extract tiering service config
Map<String, String> lakeTieringConfigMap =
extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX);

// now, we must use full restart strategy if any task is failed,
// since committer is stateless, if tiering committer is failover, committer
// will lost the collected committable, and will never collect all committable to do commit
// todo: support region failover
org.apache.flink.configuration.Configuration flinkConfig =
new org.apache.flink.configuration.Configuration();
flinkConfig.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FULL_RESTART_STRATEGY_NAME);

// build tiering source
final StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);

// build lake tiering job
JobClient jobClient =
LakeTieringJobBuilder.newBuilder(
execEnv,
Configuration.fromMap(flussConfigMap),
Configuration.fromMap(lakeConfigMap),
Configuration.fromMap(lakeTieringConfigMap),
dataLake)
.build();

System.out.printf(
"Starting data tiering service from Fluss to %s, jobId is %s.....%n",
dataLake, jobClient.getJobID());
new FlussLakeTiering(args).run();
}
}
1 change: 1 addition & 0 deletions fluss-test-coverage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@
org.apache.fluss.flink.tiering.LakeTieringJobBuilder
</exclude>
<exclude>org.apache.fluss.flink.tiering.FlussLakeTieringEntrypoint</exclude>
<exclude>org.apache.fluss.flink.tiering.FlussLakeTiering</exclude>
<!-- end exclude for flink tiering service -->
<!-- exclude flink compatibility class for catalogs -->
<exclude>org.apache.flink.table.catalog.*</exclude>
Expand Down