diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index fa9c4274c9..af40444581 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -21,7 +21,7 @@ import org.apache.fluss.annotation.VisibleForTesting; import java.lang.reflect.Field; -import java.util.Collections; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,7 +42,9 @@ public class FlussConfigUtils { TABLE_OPTIONS = extractConfigOptions("table."); CLIENT_OPTIONS = extractConfigOptions("client."); ALTERABLE_TABLE_OPTIONS = - Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); + Arrays.asList( + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), + ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()); } public static boolean isTableStorageConfig(String key) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 70b359e4d6..94d56dd338 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -919,6 +919,11 @@ public DataLakeFormat getDataLakeFormat() { return lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat(); } + @VisibleForTesting + public LakeTableTieringManager getLakeTableTieringManager() { + return lakeTableTieringManager; + } + private void validateHeartbeatRequest( PbHeartbeatReqForTable heartbeatReqForTable, int currentEpoch) { if (heartbeatReqForTable.getCoordinatorEpoch() != currentEpoch) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java index 165ae7b844..c25c779691 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java @@ -130,6 +130,9 @@ public class LakeTableTieringManager implements AutoCloseable { // from table_id -> last heartbeat time by the tiering service private final Map liveTieringTableIds; + // table_id -> delayed tiering task + private final Map delayedTieringByTableId; + private final Lock lock = new ReentrantLock(); public LakeTableTieringManager() { @@ -159,6 +162,7 @@ protected LakeTableTieringManager( this::checkTieringServiceTimeout, 0, 15, TimeUnit.SECONDS); this.tableTierEpoch = new HashMap<>(); this.tableLastTieredTime = new HashMap<>(); + this.delayedTieringByTableId = new HashMap<>(); } public void initWithLakeTables(List> tableInfoWithTieredTime) { @@ -205,10 +209,17 @@ private void scheduleTableTiering(long tableId) { // the table has been dropped, return directly return; } + // Before reschedule, remove the existing DelayedTiering if present + DelayedTiering existingDelayedTiering = delayedTieringByTableId.remove(tableId); + if (existingDelayedTiering != null) { + existingDelayedTiering.cancel(); + } long delayMs = freshnessInterval - (clock.milliseconds() - lastTieredTime); // if the delayMs is < 0, the DelayedTiering will be triggered at once without // adding into timing wheel. - lakeTieringScheduleTimer.add(new DelayedTiering(tableId, delayMs)); + DelayedTiering delayedTiering = new DelayedTiering(tableId, delayMs); + delayedTieringByTableId.put(tableId, delayedTiering); + lakeTieringScheduleTimer.add(delayedTiering); } public void removeLakeTable(long tableId) { @@ -221,6 +232,53 @@ public void removeLakeTable(long tableId) { tieringStates.remove(tableId); liveTieringTableIds.remove(tableId); tableTierEpoch.remove(tableId); + // Remove and cancel the delayed tiering task if present + DelayedTiering delayedTiering = delayedTieringByTableId.remove(tableId); + if (delayedTiering != null) { + delayedTiering.cancel(); + } + }); + } + + /** + * Update the lake freshness for a table. This method should be called when the table's datalake + * freshness property is changed via ALTER TABLE. + * + * @param tableId the table id + * @param newFreshnessMs the new freshness interval in milliseconds + */ + public void updateTableLakeFreshness(long tableId, long newFreshnessMs) { + inLock( + lock, + () -> { + Long currentFreshness = tableLakeFreshness.get(tableId); + if (currentFreshness == null) { + // the table is not a lake table or has been dropped, skip update + LOG.warn( + "Cannot update lake freshness for table {} as it's not tracked by lake tiering manager.", + tableId); + return; + } + + if (currentFreshness.equals(newFreshnessMs)) { + // no change, skip update + return; + } + + tableLakeFreshness.put(tableId, newFreshnessMs); + LOG.info( + "Updated lake freshness for table {} from {} ms to {} ms.", + tableId, + currentFreshness, + newFreshnessMs); + + // If the table is in Scheduled state, we need to reschedule it with the new + // freshness + TieringState currentState = tieringStates.get(tableId); + if (currentState == TieringState.Scheduled) { + // Reschedule the table tiering with the new freshness interval + scheduleTableTiering(tableId); + } }); } @@ -458,9 +516,12 @@ public DelayedTiering(long tableId, long delayMs) { public void run() { inLock( lock, - () -> - // to pending state - doHandleStateChange(tableId, TieringState.Pending)); + () -> { + // to pending state + doHandleStateChange(tableId, TieringState.Pending); + // Remove from map after execution + delayedTieringByTableId.remove(tableId); + }); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 05c5d51de2..f728e91da5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -60,12 +60,14 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; @@ -516,12 +518,9 @@ private void postAlterTableProperties( TableRegistration newTableRegistration, LakeTableTieringManager lakeTableTieringManager) { - boolean toEnableDataLake = - !isDataLakeEnabled(oldTableDescriptor) - && isDataLakeEnabled(newTableRegistration.properties); - boolean toDisableDataLake = - isDataLakeEnabled(oldTableDescriptor) - && !isDataLakeEnabled(newTableRegistration.properties); + boolean dataLakeEnabled = isDataLakeEnabled(newTableRegistration.properties); + boolean toEnableDataLake = !isDataLakeEnabled(oldTableDescriptor) && dataLakeEnabled; + boolean toDisableDataLake = isDataLakeEnabled(oldTableDescriptor) && !dataLakeEnabled; if (toEnableDataLake) { TableInfo newTableInfo = newTableRegistration.toTableInfo(tablePath, schemaInfo); @@ -529,6 +528,20 @@ private void postAlterTableProperties( lakeTableTieringManager.addNewLakeTable(newTableInfo); } else if (toDisableDataLake) { lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId); + } else if (dataLakeEnabled) { + // The table is still a lake table, check if freshness has changed + Duration oldFreshness = + Configuration.fromMap(oldTableDescriptor.getProperties()) + .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS); + Duration newFreshness = + Configuration.fromMap(newTableRegistration.properties) + .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS); + + // Check if freshness has changed + if (!Objects.equals(oldFreshness, newFreshness)) { + lakeTableTieringManager.updateTableLakeFreshness( + newTableRegistration.tableId, newFreshness.toMillis()); + } } // more post-alter actions can be added here } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java index fb653b7d65..2472d5041f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java @@ -19,23 +19,37 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.cluster.AlterConfigOpType; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.rpc.gateway.AdminGateway; +import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; +import org.apache.fluss.rpc.messages.GetTableInfoResponse; +import org.apache.fluss.rpc.messages.PbAlterConfig; +import org.apache.fluss.server.entity.LakeTieringTableInfo; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newAlterTableRequest; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateDatabaseRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateTableRequest; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropDatabaseRequest; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropTableRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newGetTableInfoRequest; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -106,4 +120,231 @@ void testCreateAndGetTable() throws Exception { .isInstanceOf(TableAlreadyExistException.class) .hasMessage("Table %s already exists.", lakeTablePath); } + + @Test + void testAlterAndResetTableDatalakeProperties() throws Exception { + AdminReadOnlyGateway gateway = getAdminOnlyGateway(true); + AdminGateway adminGateway = getAdminGateway(); + + String db1 = "test_alter_reset_datalake_db"; + String tb1 = "tb1"; + TablePath tablePath = TablePath.of(db1, tb1); + // first create a database + adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get(); + + // Step 1: create a table with datalake enabled and initial freshness (5min) + Map initialProperties = new HashMap<>(); + initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); + initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "5min"); + TableDescriptor tableDescriptor = newPkTable().withProperties(initialProperties); + adminGateway.createTable(newCreateTableRequest(tablePath, tableDescriptor, false)).get(); + + // Step 2: verify initial properties + GetTableInfoResponse response = + gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); + TableDescriptor gottenTable = TableDescriptor.fromJsonBytes(response.getTableJson()); + assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_ENABLED.key())) + .isEqualTo("true"); + assertThat(gottenTable.getProperties().get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key())) + .isEqualTo("5min"); + + // Step 3: alter table to change datalake freshness (SET operation) + Map setProperties = new HashMap<>(); + setProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "3min"); + + adminGateway + .alterTable( + newAlterTableRequest( + tablePath, + alterTableProperties(setProperties, new ArrayList<>()), + Collections.emptyList(), + false)) + .get(); + + // Step 4: verify freshness was updated to 3min + GetTableInfoResponse responseAfterSet = + gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); + TableDescriptor gottenTableAfterSet = + TableDescriptor.fromJsonBytes(responseAfterSet.getTableJson()); + assertThat( + gottenTableAfterSet + .getProperties() + .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key())) + .isEqualTo("3min"); + assertThat( + gottenTableAfterSet + .getProperties() + .get(ConfigOptions.TABLE_DATALAKE_ENABLED.key())) + .isEqualTo("true"); + + // Step 5: reset datalake freshness property (RESET operation) + List resetProperties = new ArrayList<>(); + resetProperties.add(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()); + + adminGateway + .alterTable( + newAlterTableRequest( + tablePath, + alterTableProperties(new HashMap<>(), resetProperties), + Collections.emptyList(), + false)) + .get(); + + // Step 6: verify freshness was removed but datalake.enabled remains + GetTableInfoResponse responseAfterReset = + gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); + TableDescriptor gottenTableAfterReset = + TableDescriptor.fromJsonBytes(responseAfterReset.getTableJson()); + assertThat( + gottenTableAfterReset + .getProperties() + .containsKey(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key())) + .isFalse(); + assertThat( + gottenTableAfterReset + .getProperties() + .get(ConfigOptions.TABLE_DATALAKE_ENABLED.key())) + .isEqualTo("true"); + + // Step 7: reset datalake enabled property + List resetProperties2 = new ArrayList<>(); + resetProperties2.add(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); + + adminGateway + .alterTable( + newAlterTableRequest( + tablePath, + alterTableProperties(new HashMap<>(), resetProperties2), + Collections.emptyList(), + false)) + .get(); + + // Step 8: verify datalake.enabled was also removed + GetTableInfoResponse responseAfterReset2 = + gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); + TableDescriptor gottenTableAfterReset2 = + TableDescriptor.fromJsonBytes(responseAfterReset2.getTableJson()); + assertThat( + gottenTableAfterReset2 + .getProperties() + .containsKey(ConfigOptions.TABLE_DATALAKE_ENABLED.key())) + .isFalse(); + + // cleanup + adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get(); + adminGateway.dropDatabase(newDropDatabaseRequest(db1, false, true)).get(); + } + + @Test + void testAlterTableDatalakeFreshnessAffectsTiering() throws Exception { + AdminGateway adminGateway = getAdminGateway(); + + String db1 = "test_tiering_freshness_db"; + String tb1 = "tb1"; + TablePath tablePath = TablePath.of(db1, tb1); + adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get(); + + // Step 1: Create a table with a large datalake freshness (10 minutes) + Map initialProperties = new HashMap<>(); + initialProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); + initialProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "10min"); + TableDescriptor tableDescriptor = newPkTable().withProperties(initialProperties); + adminGateway.createTable(newCreateTableRequest(tablePath, tableDescriptor, false)).get(); + + // Get the table id for later verification + GetTableInfoResponse response = + adminGateway.getTableInfo(newGetTableInfoRequest(tablePath)).get(); + long tableId = response.getTableId(); + + LakeTableTieringManager tieringManager = + FLUSS_CLUSTER_EXTENSION + .getCoordinatorServer() + .getCoordinatorService() + .getLakeTableTieringManager(); + + // Wait a bit for the table to be registered in tiering manager + Thread.sleep(1000); + + // Step 2: Try to request the table for tiering within 3 seconds, should NOT get it + retry( + Duration.ofSeconds(3), + () -> { + assertThat(tieringManager.requestTable()).isNull(); + }); + + // Step 3: Change freshness to a very small value (100ms) + Map setProperties = new HashMap<>(); + setProperties.put(ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "100ms"); + + adminGateway + .alterTable( + newAlterTableRequest( + tablePath, + alterTableProperties(setProperties, new ArrayList<>()), + Collections.emptyList(), + false)) + .get(); + + // Step 4: Now retry requesting the table, should get it within 3 seconds + retry( + Duration.ofSeconds(3), + () -> { + LakeTieringTableInfo table = tieringManager.requestTable(); + assertThat(table).isNotNull(); + assertThat(table.tableId()).isEqualTo(tableId); + assertThat(table.tablePath()).isEqualTo(tablePath); + }); + + // cleanup + adminGateway.dropTable(newDropTableRequest(db1, tb1, false)).get(); + adminGateway.dropDatabase(newDropDatabaseRequest(db1, false, true)).get(); + } + + private AdminReadOnlyGateway getAdminOnlyGateway(boolean isCoordinatorServer) { + if (isCoordinatorServer) { + return getAdminGateway(); + } else { + return FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(0); + } + } + + private AdminGateway getAdminGateway() { + return FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + } + + private static TableDescriptor newPkTable() { + return TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("a", DataTypes.INT()) + .withComment("a comment") + .column("b", DataTypes.STRING()) + .primaryKey("a") + .build()) + .comment("first table") + .distributedBy(3, "a") + .build(); + } + + private static List alterTableProperties( + Map setProperties, List resetProperties) { + List res = new ArrayList<>(); + + for (Map.Entry entry : setProperties.entrySet()) { + PbAlterConfig info = new PbAlterConfig(); + info.setConfigKey(entry.getKey()); + info.setConfigValue(entry.getValue()); + info.setOpType(AlterConfigOpType.SET.value()); + res.add(info); + } + + for (String resetProperty : resetProperties) { + PbAlterConfig info = new PbAlterConfig(); + info.setConfigKey(resetProperty); + info.setOpType(AlterConfigOpType.DELETE.value()); + res.add(info); + } + + return res; + } } diff --git a/website/docs/engine-flink/ddl.md b/website/docs/engine-flink/ddl.md index b333a14001..1482e9118a 100644 --- a/website/docs/engine-flink/ddl.md +++ b/website/docs/engine-flink/ddl.md @@ -261,9 +261,14 @@ When using SET to modify [Storage Options](engine-flink/options.md#storage-optio - All [Read Options](engine-flink/options.md#read-options), [Write Options](engine-flink/options.md#write-options), [Lookup Options](engine-flink/options.md#lookup-options) and [Other Options](engine-flink/options.md#other-options) except `bootstrap.servers`. - The following [Storage Options](engine-flink/options.md#storage-options): - `table.datalake.enabled`: Enable or disable lakehouse storage for the table. + - `table.datalake.freshness`: Set the data freshness for lakehouse storage. ```sql title="Flink SQL" +-- Enable lakehouse storage for the table ALTER TABLE my_table SET ('table.datalake.enabled' = 'true'); + +-- Set the freshness to 5 minutes for lakehouse storage +ALTER TABLE my_table SET ('table.datalake.freshness' = '5min'); ``` **Limits**