Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,23 +340,6 @@ public static TableInfo of(
modifiedTime);
}

/** Replace a TableInfo with a new SchemaInfo. */
public TableInfo withNewSchema(SchemaInfo schemaInfo) {
return new TableInfo(
tablePath,
tableId,
schemaInfo.getSchemaId(),
schemaInfo.getSchema(),
bucketKeys,
partitionKeys,
numBuckets,
properties,
customProperties,
comment,
createdTime,
modifiedTime);
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.LakeTableAlreadyExistException;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.types.DataTypes;

Expand Down Expand Up @@ -59,6 +61,7 @@
import javax.annotation.Nullable;

import java.nio.file.Files;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -72,6 +75,7 @@
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
import static org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -584,20 +588,27 @@ void testAlterLakeEnabledLogTable() throws Exception {
.build();
TablePath logTablePath = TablePath.of(DATABASE, "log_table_alter");
admin.createTable(logTablePath, logTable, false).get();
long tableId = admin.getTableInfo(logTablePath).get().getTableId();

assertThatThrownBy(
() ->
paimonCatalog.getTable(
Identifier.create(DATABASE, logTablePath.getTableName())))
.isInstanceOf(Catalog.TableNotExistException.class);

// verify LogTablet datalake status is initially disabled
verifyLogTabletDataLakeEnabled(tableId, false);

// enable lake
TableChange.SetOption enableLake =
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
List<TableChange> changes = Collections.singletonList(enableLake);

admin.alterTable(logTablePath, changes, false).get();

// verify LogTablet datalake status is enabled
verifyLogTabletDataLakeEnabled(tableId, true);

Identifier paimonTablePath = Identifier.create(DATABASE, logTablePath.getTableName());
Table enabledPaimonLogTable = paimonCatalog.getTable(paimonTablePath);

Expand Down Expand Up @@ -635,11 +646,17 @@ void testAlterLakeEnabledLogTable() throws Exception {
// paimon table should still exist although lake is disabled
paimonCatalog.getTable(paimonTablePath);

// verify LogTablet datalake status is disabled
verifyLogTabletDataLakeEnabled(tableId, false);

// try to enable lake table again
enableLake = TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
changes = Collections.singletonList(enableLake);
admin.alterTable(logTablePath, changes, false).get();

// verify LogTablet datalake status is enabled again
verifyLogTabletDataLakeEnabled(tableId, true);

// write some data to the lake table
writeData(paimonCatalog.getTable(paimonTablePath));
Optional<Snapshot> snapshot = paimonCatalog.getTable(paimonTablePath).latestSnapshot();
Expand All @@ -649,10 +666,16 @@ void testAlterLakeEnabledLogTable() throws Exception {
changes = Collections.singletonList(disableLake);
admin.alterTable(logTablePath, changes, false).get();

// verify LogTablet datalake status is disabled again
verifyLogTabletDataLakeEnabled(tableId, false);

// try to enable lake table again, the snapshot should not change
changes = Collections.singletonList(enableLake);
admin.alterTable(logTablePath, changes, false).get();
assertThat(paimonCatalog.getTable(paimonTablePath).latestSnapshot()).isEqualTo(snapshot);

// verify LogTablet datalake status is enabled
verifyLogTabletDataLakeEnabled(tableId, true);
}

@Test
Expand Down Expand Up @@ -1021,6 +1044,19 @@ private void verifyPaimonTable(
assertThat(paimonTable.comment()).isEqualTo(flussTable.getComment());
}

private void verifyLogTabletDataLakeEnabled(long tableId, boolean isDataLakeEnabled) {
for (int bucket = 0; bucket < BUCKET_NUM; bucket++) {
TableBucket tb = new TableBucket(tableId, bucket);
retry(
Duration.ofMinutes(1),
() -> {
Replica replica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb);
assertThat(replica.getLogTablet().isDataLakeEnabled())
.isEqualTo(isDataLakeEnabled);
});
}
}

private TableDescriptor createTableDescriptor(
int columnNum,
int bucketNum,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.fluss.server.coordinator.event.RebalanceEvent;
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent;
import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher;
import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
import org.apache.fluss.server.coordinator.rebalance.RebalanceManager;
Expand Down Expand Up @@ -560,6 +561,8 @@ public void process(CoordinatorEvent event) {
} else if (event instanceof SchemaChangeEvent) {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
processSchemaChange(schemaChangeEvent);
} else if (event instanceof TableRegistrationChangeEvent) {
processTableRegistrationChange((TableRegistrationChangeEvent) event);
} else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) {
processNotifyLeaderAndIsrResponseReceivedEvent(
(NotifyLeaderAndIsrResponseReceivedEvent) event);
Expand Down Expand Up @@ -720,6 +723,47 @@ private void processSchemaChange(SchemaChangeEvent schemaChangeEvent) {
null);
}

private void processTableRegistrationChange(TableRegistrationChangeEvent event) {
TablePath tablePath = event.getTablePath();
Long tableId = coordinatorContext.getTableIdByPath(tablePath);

// Skip if the table is not yet registered in coordinator context.
// Should not happen in normal cases.
if (tableId == null) {
return;
}

TableInfo oldTableInfo = coordinatorContext.getTableInfoById(tableId);

TableInfo newTableInfo =
event.getTableRegistration().toTableInfo(tablePath, oldTableInfo.getSchemaInfo());
coordinatorContext.putTableInfo(newTableInfo);
postAlterTableProperties(oldTableInfo, newTableInfo);

// Notify tablet servers about the metadata change
updateTabletServerMetadataCache(
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
tableId,
null,
null);
}

private void postAlterTableProperties(TableInfo oldTableInfo, TableInfo newTableInfo) {
boolean toEnableDataLake =
!oldTableInfo.getTableConfig().isDataLakeEnabled()
&& newTableInfo.getTableConfig().isDataLakeEnabled();
boolean toDisableDataLake =
oldTableInfo.getTableConfig().isDataLakeEnabled()
&& !newTableInfo.getTableConfig().isDataLakeEnabled();

if (toEnableDataLake) {
// if the table is lake table, we need to add it to lake table tiering manager
lakeTableTieringManager.addNewLakeTable(newTableInfo);
} else if (toDisableDataLake) {
lakeTableTieringManager.removeLakeTable(newTableInfo.getTableId());
}
}

private void processCreatePartition(CreatePartitionEvent createPartitionEvent) {
long partitionId = createPartitionEvent.getPartitionId();
// skip the partition if it already exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,6 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
+ "table properties or table schema.");
}

LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer =
lakeCatalogDynamicLoader.getLakeCatalogContainer();
LakeCatalog.Context lakeCatalogContext =
new DefaultLakeCatalogContext(false, currentSession().getPrincipal());

Expand All @@ -388,7 +386,6 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
tablePath,
alterSchemaChanges,
request.isIgnoreIfNotExists(),
lakeCatalogContainer.getLakeCatalog(),
lakeCatalogContext);
}

Expand All @@ -398,8 +395,6 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
alterTableConfigChanges,
tablePropertyChanges,
request.isIgnoreIfNotExists(),
lakeCatalogContainer.getLakeCatalog(),
lakeTableTieringManager,
lakeCatalogContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ public void alterTableSchema(
TablePath tablePath,
List<TableChange> schemaChanges,
boolean ignoreIfNotExists,
@Nullable LakeCatalog lakeCatalog,
LakeCatalog.Context lakeCatalogContext)
throws TableNotExistException, TableNotPartitionedException {
try {
Expand All @@ -337,8 +336,7 @@ public void alterTableSchema(
Schema newSchema = SchemaUpdate.applySchemaChanges(table, schemaChanges);

// Lake First: sync to Lake before updating Fluss schema
syncSchemaChangesToLake(
tablePath, table, schemaChanges, lakeCatalog, lakeCatalogContext);
syncSchemaChangesToLake(tablePath, table, schemaChanges, lakeCatalogContext);

// Update Fluss schema (ZK) after Lake sync succeeds
if (!newSchema.equals(table.getSchema())) {
Expand Down Expand Up @@ -368,12 +366,13 @@ private void syncSchemaChangesToLake(
TablePath tablePath,
TableInfo tableInfo,
List<TableChange> schemaChanges,
@Nullable LakeCatalog lakeCatalog,
LakeCatalog.Context lakeCatalogContext) {
if (!isDataLakeEnabled(tableInfo.toTableDescriptor())) {
return;
}

LakeCatalog lakeCatalog =
lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog();
if (lakeCatalog == null) {
throw new InvalidAlterTableException(
"Cannot alter schema for datalake enabled table "
Expand All @@ -397,8 +396,6 @@ public void alterTableProperties(
List<TableChange> tableChanges,
TablePropertyChanges tablePropertyChanges,
boolean ignoreIfNotExists,
@Nullable LakeCatalog lakeCatalog,
LakeTableTieringManager lakeTableTieringManager,
LakeCatalog.Context lakeCatalogContext) {
try {
// it throws TableNotExistException if the table or database not exists
Expand Down Expand Up @@ -429,22 +426,12 @@ public void alterTableProperties(
tableDescriptor,
newDescriptor,
tableChanges,
lakeCatalog,
lakeCatalogContext);
// update the table to zk
TableRegistration updatedTableRegistration =
tableReg.newProperties(
newDescriptor.getProperties(), newDescriptor.getCustomProperties());
zookeeperClient.updateTable(tablePath, updatedTableRegistration);

// post alter table properties, e.g. add the table to lake table tiering manager if
// it's to enable datalake for the table
postAlterTableProperties(
tablePath,
schemaInfo,
tableDescriptor,
updatedTableRegistration,
lakeTableTieringManager);
} else {
LOG.info(
"No properties changed when alter table {}, skip update table.", tablePath);
Expand All @@ -469,8 +456,10 @@ private void preAlterTableProperties(
TableDescriptor tableDescriptor,
TableDescriptor newDescriptor,
List<TableChange> tableChanges,
LakeCatalog lakeCatalog,
LakeCatalog.Context lakeCatalogContext) {
LakeCatalog lakeCatalog =
lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog();

if (isDataLakeEnabled(newDescriptor)) {
if (lakeCatalog == null) {
throw new InvalidAlterTableException(
Expand Down Expand Up @@ -509,30 +498,6 @@ private void preAlterTableProperties(
}
}

private void postAlterTableProperties(
TablePath tablePath,
SchemaInfo schemaInfo,
TableDescriptor oldTableDescriptor,
TableRegistration newTableRegistration,
LakeTableTieringManager lakeTableTieringManager) {

boolean toEnableDataLake =
!isDataLakeEnabled(oldTableDescriptor)
&& isDataLakeEnabled(newTableRegistration.properties);
boolean toDisableDataLake =
isDataLakeEnabled(oldTableDescriptor)
&& !isDataLakeEnabled(newTableRegistration.properties);

if (toEnableDataLake) {
TableInfo newTableInfo = newTableRegistration.toTableInfo(tablePath, schemaInfo);
// if the table is lake table, we need to add it to lake table tiering manager
lakeTableTieringManager.addNewLakeTable(newTableInfo);
} else if (toDisableDataLake) {
lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId);
}
// more post-alter actions can be added here
}

/**
* Get a new TableDescriptor with updated properties.
*
Expand Down
Loading