Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
Expand Down Expand Up @@ -84,5 +85,13 @@ interface Context {

/** Get the fluss principal currently accessing the catalog. */
FlussPrincipal getFlussPrincipal();

/**
* Get the current schema of fluss.
*
* @return the current schema of fluss.
* @since 0.10
*/
Schema getFlussSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,22 @@

package org.apache.fluss.lake.lakestorage;

import org.apache.fluss.metadata.Schema;
import org.apache.fluss.security.acl.FlussPrincipal;

/** A testing implementation of {@link LakeCatalog.Context}. */
public class TestingLakeCatalogContext implements LakeCatalog.Context {

private final Schema schema;

public TestingLakeCatalogContext(Schema schema) {
this.schema = schema;
}

public TestingLakeCatalogContext() {
this(null);
}

@Override
public boolean isCreatingFlussTable() {
return false;
Expand All @@ -31,4 +42,9 @@ public boolean isCreatingFlussTable() {
public FlussPrincipal getFlussPrincipal() {
return null;
}

@Override
public Schema getFlussSchema() {
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
Expand Down Expand Up @@ -113,9 +115,10 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
throws TableNotExistException {
try {
List<SchemaChange> paimonSchemaChanges = toPaimonSchemaChanges(tableChanges);

// Compare current Paimon table schema with expected target schema before altering
if (shouldAlterTable(tablePath, tableChanges)) {
org.apache.fluss.metadata.Schema flussSchema = context.getFlussSchema();
boolean needsApplyChanges =
needsApplyTableChanges(tablePath, tableChanges, flussSchema);
if (needsApplyChanges) {
alterTable(tablePath, paimonSchemaChanges);
} else {
// If schemas already match, treat as idempotent success
Expand All @@ -133,72 +136,149 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
}
}

private boolean shouldAlterTable(TablePath tablePath, List<TableChange> tableChanges)
/**
* Check if table changes need to be applied to reconcile Paimon and Fluss schemas.
*
* <p>This method determines whether the provided table changes need to be applied based on the
* current state of Paimon and Fluss schemas.
*
* @param tablePath the table path
* @param tableChanges the proposed table changes
* @param flussSchema the current Fluss schema
* @return true if changes need to be applied (schemas are consistent), false if changes can be
* skipped (schemas already match after changes, idempotent)
* @throws TableNotExistException if the table does not exist
* @throws InvalidAlterTableException if schemas are inconsistent and cannot be reconciled
*/
private boolean needsApplyTableChanges(
TablePath tablePath,
List<TableChange> tableChanges,
org.apache.fluss.metadata.Schema flussSchema)
throws TableNotExistException {
if (tableChanges.isEmpty()) {
return false;
}

try {
// Get current Paimon table schema
Table table = paimonCatalog.getTable(toPaimon(tablePath));
FileStoreTable fileStoreTable = (FileStoreTable) table;
Schema currentSchema = fileStoreTable.schema().toSchema();

for (TableChange change : tableChanges) {
if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
if (!isColumnAlreadyExists(currentSchema, addColumn)) {
return true;
}
} else {
return true;
}
List<DataField> paimonFields =
fileStoreTable.schema().toSchema().fields().stream()
.filter(field -> !SYSTEM_COLUMNS.containsKey(field.name()))
.collect(Collectors.toList());
List<org.apache.fluss.metadata.Schema.Column> flussColumns = flussSchema.getColumns();

if (paimonFields.size() < flussColumns.size()) {
throw new InvalidAlterTableException(
String.format(
"Paimon table has less columns (%d) than Fluss schema (%d)",
paimonFields.size(), flussColumns.size()));
}

// Validate schema compatibility
validateExistingColumns(paimonFields, flussColumns);

// If schemas are same, can apply all table changes
if (paimonFields.size() == flussColumns.size()) {
return true;
}

// If Paimon has more columns, check if table changes would reconcile schemas
if (canTableChangesReconcileSchemas(flussColumns, paimonFields, tableChanges)) {
// Schemas will match after applying changes, skip duplicate operations
return false;
} else {
throw new InvalidAlterTableException(
String.format(
"Paimon table has more columns (%d) than Fluss schema (%d); "
+ "therefore you need to add the diff columns all at once, "
+ "rather than applying other table changes: %s.",
paimonFields.size(), flussColumns.size(), tableChanges));
}

return false;
} catch (Catalog.TableNotExistException e) {
throw new TableNotExistException("Table " + tablePath + " does not exist.");
}
}

private boolean isColumnAlreadyExists(Schema currentSchema, TableChange.AddColumn addColumn) {
private void validateExistingColumns(
List<DataField> paimonFields,
List<org.apache.fluss.metadata.Schema.Column> flussColumns) {
for (int i = 0; i < flussColumns.size(); i++) {
if (!paimonFields.get(i).name().equals(flussColumns.get(i).getName())) {
throw new InvalidAlterTableException(
String.format(
"Column mismatch at position %d. Paimon: '%s', Fluss: '%s'",
i, paimonFields.get(i).name(), flussColumns.get(i).getName()));
}
}
}

private boolean isColumnAlreadyExists(
org.apache.paimon.types.DataField field, TableChange.AddColumn addColumn) {
String columnName = addColumn.getName();

for (org.apache.paimon.types.DataField field : currentSchema.fields()) {
if (field.name().equals(columnName)) {
org.apache.paimon.types.DataType expectedType =
addColumn
.getDataType()
.accept(
org.apache.fluss.lake.paimon.utils
.FlussDataTypeToPaimonDataType.INSTANCE);

if (!field.type().equals(expectedType)) {
throw new InvalidAlterTableException(
String.format(
"Column '%s' already exists but with different type. "
+ "Existing: %s, Expected: %s",
columnName, field.type(), expectedType));
}
String existingComment = field.description();
String expectedComment = addColumn.getComment();

boolean commentsMatch =
(existingComment == null && expectedComment == null)
|| (existingComment != null
&& existingComment.equals(expectedComment));

if (!commentsMatch) {
throw new InvalidAlterTableException(
String.format(
"Column %s already exists but with different comment. "
+ "Existing: %s, Expected: %s",
columnName, existingComment, expectedComment));
}
if (field.name().equals(columnName)) {
org.apache.paimon.types.DataType expectedType =
addColumn
.getDataType()
.accept(
org.apache.fluss.lake.paimon.utils.FlussDataTypeToPaimonDataType
.INSTANCE);

return true;
if (!field.type().equals(expectedType)) {
throw new InvalidAlterTableException(
String.format(
"Column '%s' already exists but with different type. "
+ "Existing: %s, Expected: %s",
columnName, field.type(), expectedType));
}
String existingComment = field.description();
String expectedComment = addColumn.getComment();

boolean commentsMatch =
(existingComment == null && expectedComment == null)
|| (existingComment != null && existingComment.equals(expectedComment));

if (!commentsMatch) {
throw new InvalidAlterTableException(
String.format(
"Column %s already exists but with different comment. "
+ "Existing: %s, Expected: %s",
columnName, existingComment, expectedComment));
}
}

return true;
}
return false;
}

/** Check if applying the table changes would reconcile Paimon and Fluss schemas. */
private boolean canTableChangesReconcileSchemas(
List<org.apache.fluss.metadata.Schema.Column> flussColumns,
List<DataField> paimonFields,
List<TableChange> tableChanges) {
if (flussColumns.size() + tableChanges.size() != paimonFields.size()) {
return false;
}

for (int i = 0; i < paimonFields.size() - flussColumns.size(); i++) {
DataField paimonDataField = paimonFields.get(i + flussColumns.size());
TableChange tableChange = tableChanges.get(i);
if (!(tableChange instanceof TableChange.AddColumn
&& ((TableChange.AddColumn) tableChange).getPosition()
== TableChange.ColumnPosition.last()
&& isColumnAlreadyExists(
paimonDataField, (TableChange.AddColumn) tableChange))) {
return false;
}
}

// All validations passed: applying these changes would reconcile the schemas
return true;
}

private void createTable(TablePath tablePath, Schema schema, boolean isCreatingFlussTable)
throws Catalog.DatabaseNotExistException {
Identifier paimonPath = toPaimon(tablePath);
Expand Down
Loading