From da023aff6b994e522e65219ffa5b6ce17a33a4ff Mon Sep 17 00:00:00 2001 From: zhaomin Date: Wed, 14 Jan 2026 01:07:53 +0800 Subject: [PATCH 1/5] [lake] Support alter table.datalake.freshness --- .../apache/fluss/config/FlussConfigUtils.java | 6 +- .../coordinator/LakeTableManagerITCase.java | 204 ++++++++++++++++++ 2 files changed, 208 insertions(+), 2 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index fa9c4274c9..af40444581 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -21,7 +21,7 @@ import org.apache.fluss.annotation.VisibleForTesting; import java.lang.reflect.Field; -import java.util.Collections; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,7 +42,9 @@ public class FlussConfigUtils { TABLE_OPTIONS = extractConfigOptions("table."); CLIENT_OPTIONS = extractConfigOptions("client."); ALTERABLE_TABLE_OPTIONS = - Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); + Arrays.asList( + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), + ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()); } public static boolean isTableStorageConfig(String key) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java index fb653b7d65..074821f404 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java @@ -19,22 +19,33 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.cluster.AlterConfigOpType; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.rpc.gateway.AdminGateway; +import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; +import org.apache.fluss.rpc.messages.GetTableInfoResponse; +import org.apache.fluss.rpc.messages.PbAlterConfig; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newAlterTableRequest; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateDatabaseRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateTableRequest; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropDatabaseRequest; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropTableRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newGetTableInfoRequest; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -106,4 +117,197 @@ void testCreateAndGetTable() throws Exception { .isInstanceOf(TableAlreadyExistException.class) .hasMessage("Table %s already exists.", lakeTablePath); } + + @Test + void testAlterTableDatalakeFreshness() throws Exception { + AdminReadOnlyGateway gateway = getAdminOnlyGateway(true); + AdminGateway adminGateway = getAdminGateway(); + + String db1 = "test_alter_freshness_db"; + String tb1 = "tb1"; + TablePath tablePath = TablePath.of(db1, tb1); + // first create a database + adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get(); + + // create a table with datalake enabled and initial freshness + Map initialProperties = new HashMap<>(); + initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); + initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "5min"); + TableDescriptor tableDescriptor = newPkTable().withProperties(initialProperties); + adminGateway.createTable(newCreateTableRequest(tablePath, tableDescriptor, false)).get(); + + // get the table and check initial freshness + GetTableInfoResponse response = + gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); + TableDescriptor gottenTable = TableDescriptor.fromJsonBytes(response.getTableJson()); + assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key())) + .isEqualTo("5min"); + + // alter table to change datalake freshness + Map setProperties = new HashMap<>(); + setProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "3min"); + + adminGateway + .alterTable( + newAlterTableRequest( + tablePath, + alterTableProperties(setProperties, new ArrayList<>()), + Collections.emptyList(), + false)) + .get(); + + // get the table and check updated freshness + GetTableInfoResponse responseAfterAlter = + gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); + TableDescriptor gottenTableAfterAlter = + TableDescriptor.fromJsonBytes(responseAfterAlter.getTableJson()); + + String freshnessAfterAlter = + gottenTableAfterAlter + .getProperties() + .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()); + assertThat(freshnessAfterAlter).isEqualTo("3min"); + + // cleanup + adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get(); + adminGateway.dropDatabase(newDropDatabaseRequest(db1, false, true)).get(); + } + + @Test + void testResetTableDatalakeProperties() throws Exception { + AdminReadOnlyGateway gateway = getAdminOnlyGateway(true); + AdminGateway adminGateway = getAdminGateway(); + + String db1 = "test_reset_datalake_db"; + String tb1 = "tb1"; + TablePath tablePath = TablePath.of(db1, tb1); + // first create a database + adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get(); + + // create a table with datalake enabled and custom freshness + Map initialProperties = new HashMap<>(); + initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); + initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "3min"); + TableDescriptor tableDescriptor = newPkTable().withProperties(initialProperties); + adminGateway.createTable(newCreateTableRequest(tablePath, tableDescriptor, false)).get(); + + // get the table and check initial state + GetTableInfoResponse response = + gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); + TableDescriptor gottenTable = TableDescriptor.fromJsonBytes(response.getTableJson()); + assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_ENABLED.key())) + .isEqualTo("true"); + assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key())) + .isEqualTo("3min"); + + // reset datalake freshness property + List resetProperties = new ArrayList<>(); + resetProperties.add(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()); + + adminGateway + .alterTable( + newAlterTableRequest( + tablePath, + alterTableProperties(new HashMap<>(), resetProperties), + Collections.emptyList(), + false)) + .get(); + + // get the table and check freshness is removed + GetTableInfoResponse responseAfterReset = + gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); + TableDescriptor gottenTableAfterReset = + TableDescriptor.fromJsonBytes(responseAfterReset.getTableJson()); + + // freshness should be removed from properties + assertThat( + gottenTableAfterReset + .getProperties() + .containsKey(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key())) + .isFalse(); + // but datalake.enabled should still be there + assertThat( + gottenTableAfterReset + .getProperties() + .get(ConfigOptions.TABLE_DATALAKE_ENABLED.key())) + .isEqualTo("true"); + + // reset datalake enabled property + List resetProperties2 = new ArrayList<>(); + resetProperties2.add(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); + + adminGateway + .alterTable( + newAlterTableRequest( + tablePath, + alterTableProperties(new HashMap<>(), resetProperties2), + Collections.emptyList(), + false)) + .get(); + + // get the table and check datalake enabled is removed + GetTableInfoResponse responseAfterReset2 = + gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); + TableDescriptor gottenTableAfterReset2 = + TableDescriptor.fromJsonBytes(responseAfterReset2.getTableJson()); + + // datalake.enabled should be removed from properties + assertThat( + gottenTableAfterReset2 + .getProperties() + .containsKey(ConfigOptions.TABLE_DATALAKE_ENABLED.key())) + .isFalse(); + + // cleanup + adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get(); + adminGateway.dropDatabase(newDropDatabaseRequest(db1, false, true)).get(); + } + + private AdminReadOnlyGateway getAdminOnlyGateway(boolean isCoordinatorServer) { + if (isCoordinatorServer) { + return getAdminGateway(); + } else { + return FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(0); + } + } + + private AdminGateway getAdminGateway() { + return FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + } + + private static TableDescriptor newPkTable() { + return TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("a", DataTypes.INT()) + .withComment("a comment") + .column("b", DataTypes.STRING()) + .primaryKey("a") + .build()) + .comment("first table") + .distributedBy(3, "a") + .build(); + } + + private static List alterTableProperties( + Map setProperties, List resetProperties) { + List res = new ArrayList<>(); + + for (Map.Entry entry : setProperties.entrySet()) { + PbAlterConfig info = new PbAlterConfig(); + info.setConfigKey(entry.getKey()); + info.setConfigValue(entry.getValue()); + info.setOpType(AlterConfigOpType.SET.value()); + res.add(info); + } + + for (String resetProperty : resetProperties) { + PbAlterConfig info = new PbAlterConfig(); + info.setConfigKey(resetProperty); + info.setOpType(AlterConfigOpType.DELETE.value()); + res.add(info); + } + + return res; + } } From f517c54f54764cdede088d82fbc37dd6540fa16f Mon Sep 17 00:00:00 2001 From: zhaomin Date: Wed, 14 Jan 2026 22:36:19 +0800 Subject: [PATCH 2/5] fix --- .../coordinator/CoordinatorService.java | 5 + .../coordinator/LakeTableTieringManager.java | 42 +++++ .../server/coordinator/MetadataManager.java | 25 ++- .../coordinator/LakeTableManagerITCase.java | 150 +++++++++++------- website/docs/engine-flink/ddl.md | 5 + 5 files changed, 166 insertions(+), 61 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 70b359e4d6..94d56dd338 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -919,6 +919,11 @@ public DataLakeFormat getDataLakeFormat() { return lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat(); } + @VisibleForTesting + public LakeTableTieringManager getLakeTableTieringManager() { + return lakeTableTieringManager; + } + private void validateHeartbeatRequest( PbHeartbeatReqForTable heartbeatReqForTable, int currentEpoch) { if (heartbeatReqForTable.getCoordinatorEpoch() != currentEpoch) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java index 165ae7b844..0a59cbe175 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java @@ -224,6 +224,48 @@ public void removeLakeTable(long tableId) { }); } + /** + * Update the lake freshness for a table. This method should be called when the table's datalake + * freshness property is changed via ALTER TABLE. + * + * @param tableId the table id + * @param newFreshnessMs the new freshness interval in milliseconds + */ + public void updateTableLakeFreshness(long tableId, long newFreshnessMs) { + inLock( + lock, + () -> { + Long currentFreshness = tableLakeFreshness.get(tableId); + if (currentFreshness == null) { + // the table is not a lake table or has been dropped, skip update + LOG.warn( + "Cannot update lake freshness for table {} as it's not tracked by lake tiering manager.", + tableId); + return; + } + + if (currentFreshness.equals(newFreshnessMs)) { + // no change, skip update + return; + } + + tableLakeFreshness.put(tableId, newFreshnessMs); + LOG.info( + "Updated lake freshness for table {} from {} ms to {} ms.", + tableId, + currentFreshness, + newFreshnessMs); + + // If the table is in Scheduled state, we need to reschedule it with the new + // freshness + TieringState currentState = tieringStates.get(tableId); + if (currentState == TieringState.Scheduled) { + // Reschedule the table tiering with the new freshness interval + scheduleTableTiering(tableId); + } + }); + } + @VisibleForTesting protected void checkTieringServiceTimeout() { inLock( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 05c5d51de2..f728e91da5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -60,12 +60,14 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; @@ -516,12 +518,9 @@ private void postAlterTableProperties( TableRegistration newTableRegistration, LakeTableTieringManager lakeTableTieringManager) { - boolean toEnableDataLake = - !isDataLakeEnabled(oldTableDescriptor) - && isDataLakeEnabled(newTableRegistration.properties); - boolean toDisableDataLake = - isDataLakeEnabled(oldTableDescriptor) - && !isDataLakeEnabled(newTableRegistration.properties); + boolean dataLakeEnabled = isDataLakeEnabled(newTableRegistration.properties); + boolean toEnableDataLake = !isDataLakeEnabled(oldTableDescriptor) && dataLakeEnabled; + boolean toDisableDataLake = isDataLakeEnabled(oldTableDescriptor) && !dataLakeEnabled; if (toEnableDataLake) { TableInfo newTableInfo = newTableRegistration.toTableInfo(tablePath, schemaInfo); @@ -529,6 +528,20 @@ private void postAlterTableProperties( lakeTableTieringManager.addNewLakeTable(newTableInfo); } else if (toDisableDataLake) { lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId); + } else if (dataLakeEnabled) { + // The table is still a lake table, check if freshness has changed + Duration oldFreshness = + Configuration.fromMap(oldTableDescriptor.getProperties()) + .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS); + Duration newFreshness = + Configuration.fromMap(newTableRegistration.properties) + .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS); + + // Check if freshness has changed + if (!Objects.equals(oldFreshness, newFreshness)) { + lakeTableTieringManager.updateTableLakeFreshness( + newTableRegistration.tableId, newFreshness.toMillis()); + } } // more post-alter actions can be added here } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java index 074821f404..2c2f9e10a5 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java @@ -29,12 +29,14 @@ import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; import org.apache.fluss.rpc.messages.GetTableInfoResponse; import org.apache.fluss.rpc.messages.PbAlterConfig; +import org.apache.fluss.server.entity.LakeTieringTableInfo; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -47,6 +49,7 @@ import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropDatabaseRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropTableRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newGetTableInfoRequest; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -119,31 +122,33 @@ void testCreateAndGetTable() throws Exception { } @Test - void testAlterTableDatalakeFreshness() throws Exception { + void testAlterAndResetTableDatalakeProperties() throws Exception { AdminReadOnlyGateway gateway = getAdminOnlyGateway(true); AdminGateway adminGateway = getAdminGateway(); - String db1 = "test_alter_freshness_db"; + String db1 = "test_alter_reset_datalake_db"; String tb1 = "tb1"; TablePath tablePath = TablePath.of(db1, tb1); // first create a database adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get(); - // create a table with datalake enabled and initial freshness + // Step 1: create a table with datalake enabled and initial freshness (5min) Map initialProperties = new HashMap<>(); initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "5min"); TableDescriptor tableDescriptor = newPkTable().withProperties(initialProperties); adminGateway.createTable(newCreateTableRequest(tablePath, tableDescriptor, false)).get(); - // get the table and check initial freshness + // Step 2: verify initial properties GetTableInfoResponse response = gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); TableDescriptor gottenTable = TableDescriptor.fromJsonBytes(response.getTableJson()); + assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_ENABLED.key())) + .isEqualTo("true"); assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key())) .isEqualTo("5min"); - // alter table to change datalake freshness + // Step 3: alter table to change datalake freshness (SET operation) Map setProperties = new HashMap<>(); setProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "3min"); @@ -156,51 +161,23 @@ void testAlterTableDatalakeFreshness() throws Exception { false)) .get(); - // get the table and check updated freshness - GetTableInfoResponse responseAfterAlter = - gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); - TableDescriptor gottenTableAfterAlter = - TableDescriptor.fromJsonBytes(responseAfterAlter.getTableJson()); - - String freshnessAfterAlter = - gottenTableAfterAlter - .getProperties() - .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()); - assertThat(freshnessAfterAlter).isEqualTo("3min"); - - // cleanup - adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get(); - adminGateway.dropDatabase(newDropDatabaseRequest(db1, false, true)).get(); - } - - @Test - void testResetTableDatalakeProperties() throws Exception { - AdminReadOnlyGateway gateway = getAdminOnlyGateway(true); - AdminGateway adminGateway = getAdminGateway(); - - String db1 = "test_reset_datalake_db"; - String tb1 = "tb1"; - TablePath tablePath = TablePath.of(db1, tb1); - // first create a database - adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get(); - - // create a table with datalake enabled and custom freshness - Map initialProperties = new HashMap<>(); - initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); - initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "3min"); - TableDescriptor tableDescriptor = newPkTable().withProperties(initialProperties); - adminGateway.createTable(newCreateTableRequest(tablePath, tableDescriptor, false)).get(); - - // get the table and check initial state - GetTableInfoResponse response = + // Step 4: verify freshness was updated to 3min + GetTableInfoResponse responseAfterSet = gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); - TableDescriptor gottenTable = TableDescriptor.fromJsonBytes(response.getTableJson()); - assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_ENABLED.key())) - .isEqualTo("true"); - assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key())) + TableDescriptor gottenTableAfterSet = + TableDescriptor.fromJsonBytes(responseAfterSet.getTableJson()); + assertThat( + gottenTableAfterSet + .getProperties() + .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key())) .isEqualTo("3min"); + assertThat( + gottenTableAfterSet + .getProperties() + .get(ConfigOptions.TABLE_DATALAKE_ENABLED.key())) + .isEqualTo("true"); - // reset datalake freshness property + // Step 5: reset datalake freshness property (RESET operation) List resetProperties = new ArrayList<>(); resetProperties.add(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()); @@ -213,26 +190,23 @@ void testResetTableDatalakeProperties() throws Exception { false)) .get(); - // get the table and check freshness is removed + // Step 6: verify freshness was removed but datalake.enabled remains GetTableInfoResponse responseAfterReset = gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); TableDescriptor gottenTableAfterReset = TableDescriptor.fromJsonBytes(responseAfterReset.getTableJson()); - - // freshness should be removed from properties assertThat( gottenTableAfterReset .getProperties() .containsKey(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key())) .isFalse(); - // but datalake.enabled should still be there assertThat( gottenTableAfterReset .getProperties() .get(ConfigOptions.TABLE_DATALAKE_ENABLED.key())) .isEqualTo("true"); - // reset datalake enabled property + // Step 7: reset datalake enabled property List resetProperties2 = new ArrayList<>(); resetProperties2.add(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); @@ -245,13 +219,11 @@ void testResetTableDatalakeProperties() throws Exception { false)) .get(); - // get the table and check datalake enabled is removed + // Step 8: verify datalake.enabled was also removed GetTableInfoResponse responseAfterReset2 = gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); TableDescriptor gottenTableAfterReset2 = TableDescriptor.fromJsonBytes(responseAfterReset2.getTableJson()); - - // datalake.enabled should be removed from properties assertThat( gottenTableAfterReset2 .getProperties() @@ -263,6 +235,74 @@ void testResetTableDatalakeProperties() throws Exception { adminGateway.dropDatabase(newDropDatabaseRequest(db1, false, true)).get(); } + @Test + void testAlterTableDatalakeFreshnessAffectsTiering() throws Exception { + AdminGateway adminGateway = getAdminGateway(); + + String db1 = "test_tiering_freshness_db"; + String tb1 = "tb1"; + TablePath tablePath = TablePath.of(db1, tb1); + adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get(); + + // Step 1: Create a table with a large datalake freshness (10 minutes) + Map initialProperties = new HashMap<>(); + initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); + initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "10min"); + TableDescriptor tableDescriptor = newPkTable().withProperties(initialProperties); + adminGateway.createTable(newCreateTableRequest(tablePath, tableDescriptor, false)).get(); + + // Get the table id for later verification + GetTableInfoResponse response = + adminGateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); + long tableId = response.getTableId(); + + LakeTableTieringManager tieringManager = + FLUSS_CLUSTER_EXTENSION + .getCoordinatorServer() + .getCoordinatorService() + .getLakeTableTieringManager(); + + // Wait a bit for the table to be registered in tiering manager + Thread.sleep(1000); + + // Step 2: Try to request the table for tiering within 3 seconds, should NOT get it + retry( + Duration.ofSeconds(3), + () -> { + LakeTieringTableInfo table = tieringManager.requestTable(); + assertThat(table == null || table.tableId() != tableId) + .as("Should not get table for tiering with large freshness (10min)") + .isTrue(); + }); + + // Step 3: Change freshness to a very small value (100ms) + Map setProperties = new HashMap<>(); + setProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "100ms"); + + adminGateway + .alterTable( + newAlterTableRequest( + tablePath, + alterTableProperties(setProperties, new ArrayList<>()), + Collections.emptyList(), + false)) + .get(); + + // Step 4: Now retry requesting the table, should get it within 3 seconds + retry( + Duration.ofSeconds(3), + () -> { + LakeTieringTableInfo table = tieringManager.requestTable(); + assertThat(table).isNotNull(); + assertThat(table.tableId()).isEqualTo(tableId); + assertThat(table.tablePath()).isEqualTo(tablePath); + }); + + // cleanup + adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get(); + adminGateway.dropDatabase(newDropDatabaseRequest(db1, false, true)).get(); + } + private AdminReadOnlyGateway getAdminOnlyGateway(boolean isCoordinatorServer) { if (isCoordinatorServer) { return getAdminGateway(); diff --git a/website/docs/engine-flink/ddl.md b/website/docs/engine-flink/ddl.md index b333a14001..b4baf1a75e 100644 --- a/website/docs/engine-flink/ddl.md +++ b/website/docs/engine-flink/ddl.md @@ -261,9 +261,14 @@ When using SET to modify [Storage Options](engine-flink/options.md#storage-optio - All [Read Options](engine-flink/options.md#read-options), [Write Options](engine-flink/options.md#write-options), [Lookup Options](engine-flink/options.md#lookup-options) and [Other Options](engine-flink/options.md#other-options) except `bootstrap.servers`. - The following [Storage Options](engine-flink/options.md#storage-options): - `table.datalake.enabled`: Enable or disable lakehouse storage for the table. + - `table.datalake.freshness`: Set the freshness time for lakehouse storage. ```sql title="Flink SQL" +-- Enable lakehouse storage for the table ALTER TABLE my_table SET ('table.datalake.enabled' = 'true'); + +-- Set the freshness to 5 minutes for lakehouse storage +ALTER TABLE my_table SET ('table.datalake.freshness' = '5min'); ``` **Limits** From c0bebf7b72a7375ce80466d0ac2268b9beaa295a Mon Sep 17 00:00:00 2001 From: zhaomin1423 Date: Thu, 15 Jan 2026 17:13:10 +0800 Subject: [PATCH 3/5] Update website/docs/engine-flink/ddl.md Co-authored-by: yuxia Luo --- website/docs/engine-flink/ddl.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/docs/engine-flink/ddl.md b/website/docs/engine-flink/ddl.md index b4baf1a75e..1482e9118a 100644 --- a/website/docs/engine-flink/ddl.md +++ b/website/docs/engine-flink/ddl.md @@ -261,7 +261,7 @@ When using SET to modify [Storage Options](engine-flink/options.md#storage-optio - All [Read Options](engine-flink/options.md#read-options), [Write Options](engine-flink/options.md#write-options), [Lookup Options](engine-flink/options.md#lookup-options) and [Other Options](engine-flink/options.md#other-options) except `bootstrap.servers`. - The following [Storage Options](engine-flink/options.md#storage-options): - `table.datalake.enabled`: Enable or disable lakehouse storage for the table. - - `table.datalake.freshness`: Set the freshness time for lakehouse storage. + - `table.datalake.freshness`: Set the data freshness for lakehouse storage. ```sql title="Flink SQL" -- Enable lakehouse storage for the table From ab784255aeac009b9bd20b65ccf7f35510544c73 Mon Sep 17 00:00:00 2001 From: zhaomin1423 Date: Thu, 15 Jan 2026 17:13:20 +0800 Subject: [PATCH 4/5] Update fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java Co-authored-by: yuxia Luo --- .../apache/fluss/server/coordinator/LakeTableManagerITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java index 2c2f9e10a5..f6991f740d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java @@ -266,7 +266,7 @@ void testAlterTableDatalakeFreshnessAffectsTiering() throws Exception { Thread.sleep(1000); // Step 2: Try to request the table for tiering within 3 seconds, should NOT get it - retry( + assertThat(tieringManager.requestTable()).isNull(); Duration.ofSeconds(3), () -> { LakeTieringTableInfo table = tieringManager.requestTable(); From 6d42a3fdbe02406048883b271fae27dfedce7ffc Mon Sep 17 00:00:00 2001 From: zhaomin Date: Sun, 18 Jan 2026 10:38:40 +0800 Subject: [PATCH 5/5] fix --- .../coordinator/LakeTableTieringManager.java | 27 ++++++++++++++++--- .../coordinator/LakeTableManagerITCase.java | 7 ++--- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java index 0a59cbe175..c25c779691 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java @@ -130,6 +130,9 @@ public class LakeTableTieringManager implements AutoCloseable { // from table_id -> last heartbeat time by the tiering service private final Map liveTieringTableIds; + // table_id -> delayed tiering task + private final Map delayedTieringByTableId; + private final Lock lock = new ReentrantLock(); public LakeTableTieringManager() { @@ -159,6 +162,7 @@ protected LakeTableTieringManager( this::checkTieringServiceTimeout, 0, 15, TimeUnit.SECONDS); this.tableTierEpoch = new HashMap<>(); this.tableLastTieredTime = new HashMap<>(); + this.delayedTieringByTableId = new HashMap<>(); } public void initWithLakeTables(List> tableInfoWithTieredTime) { @@ -205,10 +209,17 @@ private void scheduleTableTiering(long tableId) { // the table has been dropped, return directly return; } + // Before reschedule, remove the existing DelayedTiering if present + DelayedTiering existingDelayedTiering = delayedTieringByTableId.remove(tableId); + if (existingDelayedTiering != null) { + existingDelayedTiering.cancel(); + } long delayMs = freshnessInterval - (clock.milliseconds() - lastTieredTime); // if the delayMs is < 0, the DelayedTiering will be triggered at once without // adding into timing wheel. - lakeTieringScheduleTimer.add(new DelayedTiering(tableId, delayMs)); + DelayedTiering delayedTiering = new DelayedTiering(tableId, delayMs); + delayedTieringByTableId.put(tableId, delayedTiering); + lakeTieringScheduleTimer.add(delayedTiering); } public void removeLakeTable(long tableId) { @@ -221,6 +232,11 @@ public void removeLakeTable(long tableId) { tieringStates.remove(tableId); liveTieringTableIds.remove(tableId); tableTierEpoch.remove(tableId); + // Remove and cancel the delayed tiering task if present + DelayedTiering delayedTiering = delayedTieringByTableId.remove(tableId); + if (delayedTiering != null) { + delayedTiering.cancel(); + } }); } @@ -500,9 +516,12 @@ public DelayedTiering(long tableId, long delayMs) { public void run() { inLock( lock, - () -> - // to pending state - doHandleStateChange(tableId, TieringState.Pending)); + () -> { + // to pending state + doHandleStateChange(tableId, TieringState.Pending); + // Remove from map after execution + delayedTieringByTableId.remove(tableId); + }); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java index f6991f740d..2472d5041f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java @@ -266,13 +266,10 @@ void testAlterTableDatalakeFreshnessAffectsTiering() throws Exception { Thread.sleep(1000); // Step 2: Try to request the table for tiering within 3 seconds, should NOT get it - assertThat(tieringManager.requestTable()).isNull(); + retry( Duration.ofSeconds(3), () -> { - LakeTieringTableInfo table = tieringManager.requestTable(); - assertThat(table == null || table.tableId() != tableId) - .as("Should not get table for tiering with large freshness (10min)") - .isTrue(); + assertThat(tieringManager.requestTable()).isNull(); }); // Step 3: Change freshness to a very small value (100ms)