Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.fluss.annotation.VisibleForTesting;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,7 +42,9 @@ public class FlussConfigUtils {
TABLE_OPTIONS = extractConfigOptions("table.");
CLIENT_OPTIONS = extractConfigOptions("client.");
ALTERABLE_TABLE_OPTIONS =
Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
Arrays.asList(
ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
ConfigOptions.TABLE_DATALAKE_FRESHNESS.key());
}

public static boolean isTableStorageConfig(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,11 @@ public DataLakeFormat getDataLakeFormat() {
return lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat();
}

@VisibleForTesting
public LakeTableTieringManager getLakeTableTieringManager() {
return lakeTableTieringManager;
}

private void validateHeartbeatRequest(
PbHeartbeatReqForTable heartbeatReqForTable, int currentEpoch) {
if (heartbeatReqForTable.getCoordinatorEpoch() != currentEpoch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ public class LakeTableTieringManager implements AutoCloseable {
// from table_id -> last heartbeat time by the tiering service
private final Map<Long, Long> liveTieringTableIds;

// table_id -> delayed tiering task
private final Map<Long, DelayedTiering> delayedTieringByTableId;

private final Lock lock = new ReentrantLock();

public LakeTableTieringManager() {
Expand Down Expand Up @@ -159,6 +162,7 @@ protected LakeTableTieringManager(
this::checkTieringServiceTimeout, 0, 15, TimeUnit.SECONDS);
this.tableTierEpoch = new HashMap<>();
this.tableLastTieredTime = new HashMap<>();
this.delayedTieringByTableId = new HashMap<>();
}

public void initWithLakeTables(List<Tuple2<TableInfo, Long>> tableInfoWithTieredTime) {
Expand Down Expand Up @@ -205,10 +209,17 @@ private void scheduleTableTiering(long tableId) {
// the table has been dropped, return directly
return;
}
// Before reschedule, remove the existing DelayedTiering if present
DelayedTiering existingDelayedTiering = delayedTieringByTableId.remove(tableId);
if (existingDelayedTiering != null) {
existingDelayedTiering.cancel();
}
long delayMs = freshnessInterval - (clock.milliseconds() - lastTieredTime);
// if the delayMs is < 0, the DelayedTiering will be triggered at once without
// adding into timing wheel.
lakeTieringScheduleTimer.add(new DelayedTiering(tableId, delayMs));
DelayedTiering delayedTiering = new DelayedTiering(tableId, delayMs);
delayedTieringByTableId.put(tableId, delayedTiering);
lakeTieringScheduleTimer.add(delayedTiering);
}

public void removeLakeTable(long tableId) {
Expand All @@ -221,6 +232,53 @@ public void removeLakeTable(long tableId) {
tieringStates.remove(tableId);
liveTieringTableIds.remove(tableId);
tableTierEpoch.remove(tableId);
// Remove and cancel the delayed tiering task if present
DelayedTiering delayedTiering = delayedTieringByTableId.remove(tableId);
if (delayedTiering != null) {
delayedTiering.cancel();
}
});
}

/**
* Update the lake freshness for a table. This method should be called when the table's datalake
* freshness property is changed via ALTER TABLE.
*
* @param tableId the table id
* @param newFreshnessMs the new freshness interval in milliseconds
*/
public void updateTableLakeFreshness(long tableId, long newFreshnessMs) {
inLock(
lock,
() -> {
Long currentFreshness = tableLakeFreshness.get(tableId);
if (currentFreshness == null) {
// the table is not a lake table or has been dropped, skip update
LOG.warn(
"Cannot update lake freshness for table {} as it's not tracked by lake tiering manager.",
tableId);
return;
}

if (currentFreshness.equals(newFreshnessMs)) {
// no change, skip update
return;
}

tableLakeFreshness.put(tableId, newFreshnessMs);
LOG.info(
"Updated lake freshness for table {} from {} ms to {} ms.",
tableId,
currentFreshness,
newFreshnessMs);

// If the table is in Scheduled state, we need to reschedule it with the new
// freshness
TieringState currentState = tieringStates.get(tableId);
if (currentState == TieringState.Scheduled) {
// Reschedule the table tiering with the new freshness interval
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before reschedule, we need to remove the existing DelayedTiering. So, we need to
maintain
private final Map<Long, DelayedTiering> delayedTieringByTableId = new HashMap<>();

  • put into delayedTieringByTableId in method scheduleTableTiering
  • remove it in method removeLakeTable,
  • remove it in here, and cancel the delayedTiering
  • remove it in DelayedTiering#run

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated

scheduleTableTiering(tableId);
}
});
}

Expand Down Expand Up @@ -458,9 +516,12 @@ public DelayedTiering(long tableId, long delayMs) {
public void run() {
inLock(
lock,
() ->
// to pending state
doHandleStateChange(tableId, TieringState.Pending));
() -> {
// to pending state
doHandleStateChange(tableId, TieringState.Pending);
// Remove from map after execution
delayedTieringByTableId.remove(tableId);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -516,19 +518,30 @@ private void postAlterTableProperties(
TableRegistration newTableRegistration,
LakeTableTieringManager lakeTableTieringManager) {

boolean toEnableDataLake =
!isDataLakeEnabled(oldTableDescriptor)
&& isDataLakeEnabled(newTableRegistration.properties);
boolean toDisableDataLake =
isDataLakeEnabled(oldTableDescriptor)
&& !isDataLakeEnabled(newTableRegistration.properties);
boolean dataLakeEnabled = isDataLakeEnabled(newTableRegistration.properties);
boolean toEnableDataLake = !isDataLakeEnabled(oldTableDescriptor) && dataLakeEnabled;
boolean toDisableDataLake = isDataLakeEnabled(oldTableDescriptor) && !dataLakeEnabled;

if (toEnableDataLake) {
TableInfo newTableInfo = newTableRegistration.toTableInfo(tablePath, schemaInfo);
// if the table is lake table, we need to add it to lake table tiering manager
lakeTableTieringManager.addNewLakeTable(newTableInfo);
} else if (toDisableDataLake) {
lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId);
} else if (dataLakeEnabled) {
// The table is still a lake table, check if freshness has changed
Duration oldFreshness =
Configuration.fromMap(oldTableDescriptor.getProperties())
.get(ConfigOptions.TABLE_DATALAKE_FRESHNESS);
Duration newFreshness =
Configuration.fromMap(newTableRegistration.properties)
.get(ConfigOptions.TABLE_DATALAKE_FRESHNESS);

// Check if freshness has changed
if (!Objects.equals(oldFreshness, newFreshness)) {
lakeTableTieringManager.updateTableLakeFreshness(
newTableRegistration.tableId, newFreshness.toMillis());
}
}
// more post-alter actions can be added here
}
Expand Down
Loading