From 486652f410421dbb16997debdcad23a355908789 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Wed, 18 Mar 2026 14:20:39 +0800 Subject: [PATCH 01/10] Data: Add TCK tests for Metadata Columns in BaseFormatModelTests --- .../iceberg/data/BaseFormatModelTests.java | 422 +++++++++++++++++- .../flink/data/TestFlinkFormatModel.java | 25 ++ .../flink/data/TestFlinkFormatModel.java | 25 ++ .../flink/data/TestFlinkFormatModel.java | 25 ++ .../spark/data/TestSparkFormatModel.java | 28 ++ .../spark/data/TestSparkFormatModel.java | 28 ++ .../spark/data/TestSparkFormatModel.java | 28 ++ .../spark/data/TestSparkFormatModel.java | 28 ++ 8 files changed, 607 insertions(+), 2 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index e295b5fbc1bb..b758ac3a8b40 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -35,6 +35,8 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.TableProperties; @@ -56,6 +58,7 @@ import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -73,6 +76,11 @@ public abstract class BaseFormatModelTests { protected abstract void assertEquals(Schema schema, List expected, List actual); + protected abstract Object convertConstantToEngine(Types.NestedField field, Object value); + + protected abstract List convertToPartitionIdentity( + List actual, int index, Class clazz); + protected boolean supportsBatchReads() { return false; } @@ -92,13 +100,14 @@ protected boolean supportsBatchReads() { static final String FEATURE_CASE_SENSITIVE = "caseSensitive"; static final String FEATURE_SPLIT = "split"; static final String FEATURE_REUSE_CONTAINERS = "reuseContainers"; + static final String FEATURE_META_ROW_LINEAGE = "metaRowLineage"; private static final Map MISSING_FEATURES = Map.of( FileFormat.AVRO, new String[] {FEATURE_FILTER, FEATURE_CASE_SENSITIVE, FEATURE_SPLIT}, FileFormat.ORC, - new String[] {FEATURE_REUSE_CONTAINERS}); + new String[] {FEATURE_REUSE_CONTAINERS, FEATURE_META_ROW_LINEAGE}); private InMemoryFileIO fileIO; private EncryptedOutputFile encryptedFile; @@ -204,7 +213,8 @@ void testDataWriterGenericWriteEngineRead(FileFormat fileFormat, DataGenerator d readRecords = ImmutableList.copyOf(reader); } - assertEquals(schema, convertToEngineRecords(genericRecords, schema), readRecords); + List list = convertToEngineRecords(genericRecords, schema); + assertEquals(schema, list, readRecords); } /** Write with engine type T, read with Generic Record */ @@ -628,6 +638,390 @@ void testReaderBuilderRecordsPerBatchNotSupported(FileFormat fileFormat) throws .isInstanceOf(UnsupportedOperationException.class); } + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnsFilePathAndSpecId(FileFormat fileFormat) throws IOException { + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, schema, genericRecords); + + String filePath = "test-data-file.parquet"; + int specId = 0; + Schema projectionSchema = new Schema(MetadataColumns.FILE_PATH, MetadataColumns.SPEC_ID); + + Map idToConstant = + ImmutableMap.of( + MetadataColumns.FILE_PATH.fieldId(), filePath, + MetadataColumns.SPEC_ID.fieldId(), specId); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + List expected = + IntStream.range(0, genericRecords.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy( + MetadataColumns.FILE_PATH.name(), filePath, + MetadataColumns.SPEC_ID.name(), specId)) + .toList(); + + assertThat(readRecords).hasSize(genericRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnRowPosition(FileFormat fileFormat) throws IOException { + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, schema, genericRecords); + + Schema projectionSchema = new Schema(MetadataColumns.ROW_POSITION); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + List expected = + IntStream.range(0, genericRecords.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.ROW_POSITION.name(), (long) i)) + .toList(); + + assertThat(readRecords).hasSize(genericRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnIsDeleted(FileFormat fileFormat) throws IOException { + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, schema, genericRecords); + + Schema projectionSchema = new Schema(MetadataColumns.IS_DELETED); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + List expected = + IntStream.range(0, genericRecords.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.IS_DELETED.name(), false)) + .toList(); + + assertThat(readRecords).hasSize(genericRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnRowLinage(FileFormat fileFormat) throws IOException { + assumeSupports(fileFormat, FEATURE_META_ROW_LINEAGE); + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, schema, genericRecords); + + long baseRowId = 100L; + long fileSeqNumber = 5L; + Schema projectionSchema = + new Schema(MetadataColumns.ROW_ID, MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER); + + Map idToConstant = + ImmutableMap.of( + MetadataColumns.ROW_ID.fieldId(), baseRowId, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), fileSeqNumber); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + List expected = + IntStream.range(0, genericRecords.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy( + MetadataColumns.ROW_ID.name(), + baseRowId + i, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), + fileSeqNumber)) + .toList(); + + assertThat(readRecords).hasSize(genericRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnPartitionIdentity(FileFormat fileFormat) throws IOException { + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + PartitionSpec spec = PartitionSpec.builderFor(dataGenerator.schema()).identity("col_a").build(); + + Types.StructType partitionType = spec.partitionType(); + PartitionData partitionData = new PartitionData(partitionType); + partitionData.set(0, "test_col_a"); + + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(dataGenerator.schema()) + .spec(spec) + .partition(partitionData) + .build(); + + List records = dataGenerator.generateRecords(); + try (writer) { + records.forEach(writer::write); + } + + Types.NestedField partitionField = + Types.NestedField.optional( + MetadataColumns.PARTITION_COLUMN_ID, + MetadataColumns.PARTITION_COLUMN_NAME, + partitionType, + MetadataColumns.PARTITION_COLUMN_DOC); + Schema projectionSchema = new Schema(partitionField); + + Map idToConstant = + ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, partitionData); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(records.size()); + assertThat(convertToPartitionIdentity(readRecords, 0, String.class)) + .allMatch(s -> s.equals("test_col_a")); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnPartitionBucketTransform(FileFormat fileFormat) throws IOException { + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema dataSchema = dataGenerator.schema(); + + PartitionSpec spec = PartitionSpec.builderFor(dataSchema).bucket("col_a", 4).build(); + + Types.StructType partitionType = spec.partitionType(); + PartitionData partitionData = new PartitionData(partitionType); + // bucket(4, 1) = 1 + partitionData.set(0, 1); + + List records = dataGenerator.generateRecords(); + + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(dataSchema) + .spec(spec) + .partition(partitionData) + .build(); + + try (writer) { + records.forEach(writer::write); + } + + Types.NestedField partitionField = + Types.NestedField.optional( + MetadataColumns.PARTITION_COLUMN_ID, + MetadataColumns.PARTITION_COLUMN_NAME, + partitionType, + MetadataColumns.PARTITION_COLUMN_DOC); + Schema projectionSchema = new Schema(partitionField); + + Map idToConstant = + ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, partitionData); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(records.size()); + assertThat(convertToPartitionIdentity(readRecords, 0, Integer.class)).allMatch(s -> s == 1); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnPartitionEvolutionAddColumn(FileFormat fileFormat) throws IOException { + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema dataSchema = dataGenerator.schema(); + + // Old spec: partition by col_a only (spec id = 0) + PartitionSpec oldSpec = PartitionSpec.builderFor(dataSchema).identity("col_a").build(); + + // New spec: partition by col_a + col_b (spec id = 1, simulates partition evolution) + PartitionSpec newSpec = + PartitionSpec.builderFor(dataSchema) + .withSpecId(1) + .identity("col_a") + .identity("col_b") + .build(); + + // Partition data for the old file (only col_a is set, col_b is absent) + PartitionData oldPartitionData = new PartitionData(oldSpec.partitionType()); + oldPartitionData.set(0, "test_data"); + + // Write data using the old spec + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(dataSchema) + .spec(oldSpec) + .partition(oldPartitionData) + .build(); + + List records = dataGenerator.generateRecords(); + + try (writer) { + records.forEach(writer::write); + } + + Types.StructType unifiedPartitionType = newSpec.partitionType(); + + // Build projection schema with PARTITION_COLUMN using the unified partition type + Types.NestedField partitionField = + Types.NestedField.optional( + MetadataColumns.PARTITION_COLUMN_ID, + MetadataColumns.PARTITION_COLUMN_NAME, + unifiedPartitionType, + MetadataColumns.PARTITION_COLUMN_DOC); + Schema projectionSchema = new Schema(partitionField); + + Map idToConstant = + ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, oldPartitionData); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(records.size()); + assertThat(convertToPartitionIdentity(readRecords, 0, String.class)) + .allMatch(s -> s.equals("test_data")); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnPartitionEvolutionRemoveColumn(FileFormat fileFormat) + throws IOException { + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema dataSchema = dataGenerator.schema(); + + PartitionSpec oldSpec = + PartitionSpec.builderFor(dataSchema).identity("col_a").identity("col_b").build(); + + PartitionSpec newSpec = + PartitionSpec.builderFor(dataSchema).withSpecId(1).identity("col_a").build(); + + // Partition data for the old file (both col_a and col_b are set) + PartitionData oldPartitionData = new PartitionData(oldSpec.partitionType()); + oldPartitionData.set(0, "test_col_a"); + oldPartitionData.set(1, 1); + + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(dataSchema) + .spec(oldSpec) + .partition(oldPartitionData) + .build(); + + List records = dataGenerator.generateRecords(); + + try (writer) { + records.forEach(writer::write); + } + + // Use the new spec's partition type for projection (only col_a remains after evolution) + // This simulates reading an old file from the perspective of the new spec + Types.StructType newPartitionType = newSpec.partitionType(); + Types.NestedField partitionField = + Types.NestedField.optional( + MetadataColumns.PARTITION_COLUMN_ID, + MetadataColumns.PARTITION_COLUMN_NAME, + newPartitionType, + MetadataColumns.PARTITION_COLUMN_DOC); + Schema projectionSchema = new Schema(partitionField); + + Map idToConstant = + ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, oldPartitionData); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(records.size()); + assertThat(convertToPartitionIdentity(readRecords, 0, String.class)) + .allMatch(s -> s.equals("test_col_a")); + } + private void readAndAssertGenericRecords( FileFormat fileFormat, Schema schema, List expected) throws IOException { InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); @@ -638,6 +1032,7 @@ private void readAndAssertGenericRecords( .build()) { readRecords = ImmutableList.copyOf(reader); } + DataTestHelpers.assertEquals(schema.asStruct(), expected, readRecords); } @@ -719,4 +1114,27 @@ private static String splitSizeProperty(FileFormat fileFormat) { "No split size property defined for format: " + fileFormat); }; } + + private Map convertConstantsToEngine( + Schema projectionSchema, Map idToConstant) { + return idToConstant.entrySet().stream() + .collect( + ImmutableMap.toImmutableMap( + Map.Entry::getKey, + entry -> + convertConstantToEngine( + projectionSchema.findField(entry.getKey()), entry.getValue()))); + } + + private Record partitionDataToRecord( + Types.StructType partitionType, PartitionData partitionData) { + Record record = GenericRecord.create(partitionType); + List fields = partitionType.fields(); + for (int i = 0; i < fields.size(); i++) { + Types.NestedField field = fields.get(i); + record.setField(field.name(), partitionData.get(i, field.type().typeId().javaClass())); + } + + return record; + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index 8c99fdf52110..b53768169f6e 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -19,13 +19,17 @@ package org.apache.iceberg.flink.data; import java.util.List; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -48,4 +52,25 @@ protected RowData convertToEngine(Record record, Schema schema) { protected void assertEquals(Schema schema, List expected, List actual) { TestHelpers.assertRows(actual, expected, FlinkSchemaUtil.convert(schema)); } + + @Override + protected Object convertConstantToEngine(Types.NestedField field, Object value) { + return RowDataUtil.convertConstant(field.type(), value); + } + + @Override + protected List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (RowData row : actual) { + Object object = ((GenericRowData) row).getField(0); + if (object instanceof PartitionData partition) { + partitionIdentity.add(partition.get(index, clazz)); + } else { + throw new IllegalArgumentException("Not a partition data"); + } + } + + return partitionIdentity; + } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index 8c99fdf52110..b53768169f6e 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -19,13 +19,17 @@ package org.apache.iceberg.flink.data; import java.util.List; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -48,4 +52,25 @@ protected RowData convertToEngine(Record record, Schema schema) { protected void assertEquals(Schema schema, List expected, List actual) { TestHelpers.assertRows(actual, expected, FlinkSchemaUtil.convert(schema)); } + + @Override + protected Object convertConstantToEngine(Types.NestedField field, Object value) { + return RowDataUtil.convertConstant(field.type(), value); + } + + @Override + protected List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (RowData row : actual) { + Object object = ((GenericRowData) row).getField(0); + if (object instanceof PartitionData partition) { + partitionIdentity.add(partition.get(index, clazz)); + } else { + throw new IllegalArgumentException("Not a partition data"); + } + } + + return partitionIdentity; + } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index 8c99fdf52110..b53768169f6e 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -19,13 +19,17 @@ package org.apache.iceberg.flink.data; import java.util.List; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -48,4 +52,25 @@ protected RowData convertToEngine(Record record, Schema schema) { protected void assertEquals(Schema schema, List expected, List actual) { TestHelpers.assertRows(actual, expected, FlinkSchemaUtil.convert(schema)); } + + @Override + protected Object convertConstantToEngine(Types.NestedField field, Object value) { + return RowDataUtil.convertConstant(field.type(), value); + } + + @Override + protected List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (RowData row : actual) { + Object object = ((GenericRowData) row).getField(0); + if (object instanceof PartitionData partition) { + partitionIdentity.add(partition.get(index, clazz)); + } else { + throw new IllegalArgumentException("Not a partition data"); + } + } + + return partitionIdentity; + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index c18e4c053f50..bb47e8de5f9e 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,8 +24,13 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -51,4 +56,27 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (InternalRow row : actual) { + GenericInternalRow genericInternalRow = + (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); + Object value = genericInternalRow.genericGet(index); + if (clazz == String.class && value instanceof UTF8String) { + partitionIdentity.add(clazz.cast(value.toString())); + } else { + partitionIdentity.add(clazz.cast(value)); + } + } + + return partitionIdentity; + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index c18e4c053f50..bb47e8de5f9e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,8 +24,13 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -51,4 +56,27 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (InternalRow row : actual) { + GenericInternalRow genericInternalRow = + (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); + Object value = genericInternalRow.genericGet(index); + if (clazz == String.class && value instanceof UTF8String) { + partitionIdentity.add(clazz.cast(value.toString())); + } else { + partitionIdentity.add(clazz.cast(value)); + } + } + + return partitionIdentity; + } } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index c18e4c053f50..bb47e8de5f9e 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,8 +24,13 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -51,4 +56,27 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (InternalRow row : actual) { + GenericInternalRow genericInternalRow = + (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); + Object value = genericInternalRow.genericGet(index); + if (clazz == String.class && value instanceof UTF8String) { + partitionIdentity.add(clazz.cast(value.toString())); + } else { + partitionIdentity.add(clazz.cast(value)); + } + } + + return partitionIdentity; + } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index c18e4c053f50..bb47e8de5f9e 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,8 +24,13 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -51,4 +56,27 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( + List actual, int index, Class clazz) { + List partitionIdentity = Lists.newArrayList(); + for (InternalRow row : actual) { + GenericInternalRow genericInternalRow = + (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); + Object value = genericInternalRow.genericGet(index); + if (clazz == String.class && value instanceof UTF8String) { + partitionIdentity.add(clazz.cast(value.toString())); + } else { + partitionIdentity.add(clazz.cast(value)); + } + } + + return partitionIdentity; + } } From 455a30d3ab9a8246e0d8b2d68886a9c7dc985964 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Tue, 24 Mar 2026 19:33:43 +0800 Subject: [PATCH 02/10] Address Comment --- .../iceberg/data/BaseFormatModelTests.java | 107 +++++++++++++++--- 1 file changed, 93 insertions(+), 14 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index b758ac3a8b40..ed1e14bca1f0 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.UUID; import java.util.stream.IntStream; +import org.apache.hadoop.util.Lists; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -794,6 +795,97 @@ void testReadMetadataColumnRowLinage(FileFormat fileFormat) throws IOException { assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); } + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnRowLinageExistValue(FileFormat fileFormat) throws IOException { + assumeSupports(fileFormat, FEATURE_META_ROW_LINEAGE); + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema dataSchema = dataGenerator.schema(); + + Schema writeSchema = MetadataColumns.schemaWithRowLineage(dataSchema); + + List baseRecords = dataGenerator.generateRecords(); + List writeRecords = Lists.newArrayListWithExpectedSize(baseRecords.size()); + for (int i = 0; i < baseRecords.size(); i++) { + Record base = baseRecords.get(i); + Record rec = GenericRecord.create(writeSchema); + for (Types.NestedField col : dataSchema.columns()) { + rec.setField(col.name(), base.getField(col.name())); + } + + if (i % 2 == 0) { + rec.setField(MetadataColumns.ROW_ID.name(), 555L + i); + rec.setField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), 7L); + } else { + rec.setField(MetadataColumns.ROW_ID.name(), null); + rec.setField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), null); + } + + writeRecords.add(rec); + } + + DataWriter writer = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) + .schema(writeSchema) + .spec(PartitionSpec.unpartitioned()) + .build(); + + try (writer) { + writeRecords.forEach(writer::write); + } + + long baseRowId = 100L; + long fileSeqNumber = 5L; + Schema projectionSchema = + new Schema(MetadataColumns.ROW_ID, MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER); + + Map idToConstant = + ImmutableMap.of( + MetadataColumns.ROW_ID.fieldId(), baseRowId, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), fileSeqNumber); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema) + .engineProjection(engineSchema(projectionSchema)) + .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + // Expected results: + // - Even rows (explicit values): _row_id = 555+i, _last_updated_sequence_number = 7 + // - Odd rows (null values): _row_id = baseRowId+pos, _last_updated_sequence_number = + // fileSeqNumber + List expected = + IntStream.range(0, baseRecords.size()) + .mapToObj( + i -> { + if (i % 2 == 0) { + return GenericRecord.create(projectionSchema) + .copy( + MetadataColumns.ROW_ID.name(), + 555L + i, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), + 7L); + } else { + return GenericRecord.create(projectionSchema) + .copy( + MetadataColumns.ROW_ID.name(), + baseRowId + i, + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), + fileSeqNumber); + } + }) + .toList(); + + assertThat(readRecords).hasSize(baseRecords.size()); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + } + @ParameterizedTest @FieldSource("FILE_FORMATS") void testReadMetadataColumnPartitionIdentity(FileFormat fileFormat) throws IOException { @@ -808,8 +900,7 @@ void testReadMetadataColumnPartitionIdentity(FileFormat fileFormat) throws IOExc DataWriter writer = FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) .schema(dataGenerator.schema()) - .spec(spec) - .partition(partitionData) + .spec(PartitionSpec.unpartitioned()) .build(); List records = dataGenerator.generateRecords(); @@ -1125,16 +1216,4 @@ private Map convertConstantsToEngine( convertConstantToEngine( projectionSchema.findField(entry.getKey()), entry.getValue()))); } - - private Record partitionDataToRecord( - Types.StructType partitionType, PartitionData partitionData) { - Record record = GenericRecord.create(partitionType); - List fields = partitionType.fields(); - for (int i = 0; i < fields.size(); i++) { - Types.NestedField field = fields.get(i); - record.setField(field.name(), partitionData.get(i, field.type().typeId().javaClass())); - } - - return record; - } } From 2fa854a539d036ddbabd493d945575e61dbd859a Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Tue, 24 Mar 2026 19:59:12 +0800 Subject: [PATCH 03/10] fix ci --- .../test/java/org/apache/iceberg/data/BaseFormatModelTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index ed1e14bca1f0..aa073866a7e5 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.UUID; import java.util.stream.IntStream; -import org.apache.hadoop.util.Lists; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -60,6 +59,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; From d0d35a05e6335457fb86afa8d8aa414ef93d547c Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Wed, 25 Mar 2026 19:53:13 +0800 Subject: [PATCH 04/10] Address Comment --- .../iceberg/data/BaseFormatModelTests.java | 116 ++++++++---------- .../flink/data/TestFlinkFormatModel.java | 33 ++--- .../spark/data/TestSparkFormatModel.java | 27 +--- 3 files changed, 70 insertions(+), 106 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index aa073866a7e5..fcdeaa870522 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -39,6 +39,7 @@ import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.TableProperties; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; @@ -60,6 +61,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -77,10 +79,7 @@ public abstract class BaseFormatModelTests { protected abstract void assertEquals(Schema schema, List expected, List actual); - protected abstract Object convertConstantToEngine(Types.NestedField field, Object value); - - protected abstract List convertToPartitionIdentity( - List actual, int index, Class clazz); + protected abstract Object convertConstantToEngine(Type type, Object value); protected boolean supportsBatchReads() { return false; @@ -930,61 +929,17 @@ void testReadMetadataColumnPartitionIdentity(FileFormat fileFormat) throws IOExc readRecords = ImmutableList.copyOf(reader); } - assertThat(readRecords).hasSize(records.size()); - assertThat(convertToPartitionIdentity(readRecords, 0, String.class)) - .allMatch(s -> s.equals("test_col_a")); - } - - @ParameterizedTest - @FieldSource("FILE_FORMATS") - void testReadMetadataColumnPartitionBucketTransform(FileFormat fileFormat) throws IOException { - DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); - Schema dataSchema = dataGenerator.schema(); - - PartitionSpec spec = PartitionSpec.builderFor(dataSchema).bucket("col_a", 4).build(); - - Types.StructType partitionType = spec.partitionType(); - PartitionData partitionData = new PartitionData(partitionType); - // bucket(4, 1) = 1 - partitionData.set(0, 1); - - List records = dataGenerator.generateRecords(); - - DataWriter writer = - FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) - .schema(dataSchema) - .spec(spec) - .partition(partitionData) - .build(); - - try (writer) { - records.forEach(writer::write); - } - - Types.NestedField partitionField = - Types.NestedField.optional( - MetadataColumns.PARTITION_COLUMN_ID, - MetadataColumns.PARTITION_COLUMN_NAME, - partitionType, - MetadataColumns.PARTITION_COLUMN_DOC); - Schema projectionSchema = new Schema(partitionField); - - Map idToConstant = - ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, partitionData); - - InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); - List readRecords; - try (CloseableIterable reader = - FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) - .project(projectionSchema) - .engineProjection(engineSchema(projectionSchema)) - .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) - .build()) { - readRecords = ImmutableList.copyOf(reader); - } + Record partitionRecord = structLikeToRecord(partitionData, partitionType); + List expected = + IntStream.range(0, records.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.PARTITION_COLUMN_NAME, partitionRecord)) + .toList(); assertThat(readRecords).hasSize(records.size()); - assertThat(convertToPartitionIdentity(readRecords, 0, Integer.class)).allMatch(s -> s == 1); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); } @ParameterizedTest @@ -1012,8 +967,7 @@ void testReadMetadataColumnPartitionEvolutionAddColumn(FileFormat fileFormat) th DataWriter writer = FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) .schema(dataSchema) - .spec(oldSpec) - .partition(oldPartitionData) + .spec(PartitionSpec.unpartitioned()) .build(); List records = dataGenerator.generateRecords(); @@ -1047,9 +1001,17 @@ void testReadMetadataColumnPartitionEvolutionAddColumn(FileFormat fileFormat) th readRecords = ImmutableList.copyOf(reader); } + Record partitionRecord = structLikeToRecord(oldPartitionData, unifiedPartitionType); + List expected = + IntStream.range(0, records.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.PARTITION_COLUMN_NAME, partitionRecord)) + .toList(); + assertThat(readRecords).hasSize(records.size()); - assertThat(convertToPartitionIdentity(readRecords, 0, String.class)) - .allMatch(s -> s.equals("test_data")); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); } @ParameterizedTest @@ -1073,8 +1035,7 @@ void testReadMetadataColumnPartitionEvolutionRemoveColumn(FileFormat fileFormat) DataWriter writer = FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile) .schema(dataSchema) - .spec(oldSpec) - .partition(oldPartitionData) + .spec(PartitionSpec.unpartitioned()) .build(); List records = dataGenerator.generateRecords(); @@ -1108,9 +1069,17 @@ void testReadMetadataColumnPartitionEvolutionRemoveColumn(FileFormat fileFormat) readRecords = ImmutableList.copyOf(reader); } + Record partitionRecord = structLikeToRecord(oldPartitionData, newPartitionType); + List expected = + IntStream.range(0, records.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.PARTITION_COLUMN_NAME, partitionRecord)) + .toList(); + assertThat(readRecords).hasSize(records.size()); - assertThat(convertToPartitionIdentity(readRecords, 0, String.class)) - .allMatch(s -> s.equals("test_col_a")); + assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); } private void readAndAssertGenericRecords( @@ -1214,6 +1183,21 @@ private Map convertConstantsToEngine( Map.Entry::getKey, entry -> convertConstantToEngine( - projectionSchema.findField(entry.getKey()), entry.getValue()))); + projectionSchema.findType(entry.getKey()), entry.getValue()))); + } + + private static Record structLikeToRecord(StructLike structLike, Types.StructType structType) { + Record record = GenericRecord.create(structType); + int sourceSize = structLike.size(); + for (int i = 0; i < structType.fields().size(); i++) { + if (i < sourceSize) { + record.set(i, structLike.get(i, Object.class)); + } else { + Types.NestedField field = structType.fields().get(i); + record.set(i, field.initialDefault()); + } + } + + return record; } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index b53768169f6e..1f0fe70ac53b 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -28,7 +28,7 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -54,23 +54,24 @@ protected void assertEquals(Schema schema, List expected, List } @Override - protected Object convertConstantToEngine(Types.NestedField field, Object value) { - return RowDataUtil.convertConstant(field.type(), value); - } - - @Override - protected List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (RowData row : actual) { - Object object = ((GenericRowData) row).getField(0); - if (object instanceof PartitionData partition) { - partitionIdentity.add(partition.get(index, clazz)); - } else { - throw new IllegalArgumentException("Not a partition data"); + protected Object convertConstantToEngine(Type type, Object value) { + if (value instanceof PartitionData partitionData) { + Types.StructType structType = type.asStructType(); + List fields = structType.fields(); + GenericRowData rowData = new GenericRowData(fields.size()); + int sourceSize = partitionData.size(); + for (int i = 0; i < fields.size(); i++) { + if (i < sourceSize) { + Object fieldValue = partitionData.get(i, Object.class); + rowData.setField(i, convertConstantToEngine(fields.get(i).type(), fieldValue)); + } else { + rowData.setField(i, null); + } } + + return rowData; } - return partitionIdentity; + return RowDataUtil.convertConstant(type, value); } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index bb47e8de5f9e..291bb2bca4f5 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,13 +24,10 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkUtil; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Type; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -58,25 +55,7 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (InternalRow row : actual) { - GenericInternalRow genericInternalRow = - (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); - Object value = genericInternalRow.genericGet(index); - if (clazz == String.class && value instanceof UTF8String) { - partitionIdentity.add(clazz.cast(value.toString())); - } else { - partitionIdentity.add(clazz.cast(value)); - } - } - - return partitionIdentity; + protected Object convertConstantToEngine(Type type, Object value) { + return SparkUtil.internalToSpark(type, value); } } From 8ad4e9cdbf350ef438f3861aaf600cdf1af65338 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Wed, 25 Mar 2026 19:59:22 +0800 Subject: [PATCH 05/10] fix other version --- .../flink/data/TestFlinkFormatModel.java | 33 ++++++++++--------- .../flink/data/TestFlinkFormatModel.java | 33 ++++++++++--------- .../spark/data/TestSparkFormatModel.java | 27 ++------------- .../spark/data/TestSparkFormatModel.java | 27 ++------------- .../spark/data/TestSparkFormatModel.java | 27 ++------------- 5 files changed, 43 insertions(+), 104 deletions(-) diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index b53768169f6e..1f0fe70ac53b 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -28,7 +28,7 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -54,23 +54,24 @@ protected void assertEquals(Schema schema, List expected, List } @Override - protected Object convertConstantToEngine(Types.NestedField field, Object value) { - return RowDataUtil.convertConstant(field.type(), value); - } - - @Override - protected List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (RowData row : actual) { - Object object = ((GenericRowData) row).getField(0); - if (object instanceof PartitionData partition) { - partitionIdentity.add(partition.get(index, clazz)); - } else { - throw new IllegalArgumentException("Not a partition data"); + protected Object convertConstantToEngine(Type type, Object value) { + if (value instanceof PartitionData partitionData) { + Types.StructType structType = type.asStructType(); + List fields = structType.fields(); + GenericRowData rowData = new GenericRowData(fields.size()); + int sourceSize = partitionData.size(); + for (int i = 0; i < fields.size(); i++) { + if (i < sourceSize) { + Object fieldValue = partitionData.get(i, Object.class); + rowData.setField(i, convertConstantToEngine(fields.get(i).type(), fieldValue)); + } else { + rowData.setField(i, null); + } } + + return rowData; } - return partitionIdentity; + return RowDataUtil.convertConstant(type, value); } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java index b53768169f6e..1f0fe70ac53b 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java @@ -28,7 +28,7 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; public class TestFlinkFormatModel extends BaseFormatModelTests { @@ -54,23 +54,24 @@ protected void assertEquals(Schema schema, List expected, List } @Override - protected Object convertConstantToEngine(Types.NestedField field, Object value) { - return RowDataUtil.convertConstant(field.type(), value); - } - - @Override - protected List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (RowData row : actual) { - Object object = ((GenericRowData) row).getField(0); - if (object instanceof PartitionData partition) { - partitionIdentity.add(partition.get(index, clazz)); - } else { - throw new IllegalArgumentException("Not a partition data"); + protected Object convertConstantToEngine(Type type, Object value) { + if (value instanceof PartitionData partitionData) { + Types.StructType structType = type.asStructType(); + List fields = structType.fields(); + GenericRowData rowData = new GenericRowData(fields.size()); + int sourceSize = partitionData.size(); + for (int i = 0; i < fields.size(); i++) { + if (i < sourceSize) { + Object fieldValue = partitionData.get(i, Object.class); + rowData.setField(i, convertConstantToEngine(fields.get(i).type(), fieldValue)); + } else { + rowData.setField(i, null); + } } + + return rowData; } - return partitionIdentity; + return RowDataUtil.convertConstant(type, value); } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index bb47e8de5f9e..291bb2bca4f5 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,13 +24,10 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkUtil; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Type; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -58,25 +55,7 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (InternalRow row : actual) { - GenericInternalRow genericInternalRow = - (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); - Object value = genericInternalRow.genericGet(index); - if (clazz == String.class && value instanceof UTF8String) { - partitionIdentity.add(clazz.cast(value.toString())); - } else { - partitionIdentity.add(clazz.cast(value)); - } - } - - return partitionIdentity; + protected Object convertConstantToEngine(Type type, Object value) { + return SparkUtil.internalToSpark(type, value); } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index bb47e8de5f9e..291bb2bca4f5 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,13 +24,10 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkUtil; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Type; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -58,25 +55,7 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (InternalRow row : actual) { - GenericInternalRow genericInternalRow = - (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); - Object value = genericInternalRow.genericGet(index); - if (clazz == String.class && value instanceof UTF8String) { - partitionIdentity.add(clazz.cast(value.toString())); - } else { - partitionIdentity.add(clazz.cast(value)); - } - } - - return partitionIdentity; + protected Object convertConstantToEngine(Type type, Object value) { + return SparkUtil.internalToSpark(type, value); } } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java index bb47e8de5f9e..291bb2bca4f5 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java @@ -24,13 +24,10 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.BaseFormatModelTests; import org.apache.iceberg.data.Record; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkUtil; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Type; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.unsafe.types.UTF8String; public class TestSparkFormatModel extends BaseFormatModelTests { @@ -58,25 +55,7 @@ protected void assertEquals(Schema schema, List expected, List List convertToPartitionIdentity( - List actual, int index, Class clazz) { - List partitionIdentity = Lists.newArrayList(); - for (InternalRow row : actual) { - GenericInternalRow genericInternalRow = - (GenericInternalRow) ((GenericInternalRow) row).genericGet(0); - Object value = genericInternalRow.genericGet(index); - if (clazz == String.class && value instanceof UTF8String) { - partitionIdentity.add(clazz.cast(value.toString())); - } else { - partitionIdentity.add(clazz.cast(value)); - } - } - - return partitionIdentity; + protected Object convertConstantToEngine(Type type, Object value) { + return SparkUtil.internalToSpark(type, value); } } From ac01dbdb619328374f35b891c0138adf355a66df Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Wed, 25 Mar 2026 22:18:17 +0800 Subject: [PATCH 06/10] add testReaderSchemaEvolutionNewColumnWithDefault --- .../iceberg/data/BaseFormatModelTests.java | 69 ++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index fcdeaa870522..2c856e5d0c87 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -51,6 +51,7 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.formats.FileWriterBuilder; import org.apache.iceberg.formats.FormatModelRegistry; import org.apache.iceberg.inmemory.InMemoryFileIO; @@ -99,6 +100,7 @@ protected boolean supportsBatchReads() { static final String FEATURE_FILTER = "filter"; static final String FEATURE_CASE_SENSITIVE = "caseSensitive"; static final String FEATURE_SPLIT = "split"; + static final String FEATURE_READER_DEFAULT = "readerDefault"; static final String FEATURE_REUSE_CONTAINERS = "reuseContainers"; static final String FEATURE_META_ROW_LINEAGE = "metaRowLineage"; @@ -107,7 +109,9 @@ protected boolean supportsBatchReads() { FileFormat.AVRO, new String[] {FEATURE_FILTER, FEATURE_CASE_SENSITIVE, FEATURE_SPLIT}, FileFormat.ORC, - new String[] {FEATURE_REUSE_CONTAINERS, FEATURE_META_ROW_LINEAGE}); + new String[] { + FEATURE_REUSE_CONTAINERS, FEATURE_META_ROW_LINEAGE, FEATURE_READER_DEFAULT + }); private InMemoryFileIO fileIO; private EncryptedOutputFile encryptedFile; @@ -619,6 +623,69 @@ void testReaderBuilderReuseContainers(FileFormat fileFormat) throws IOException reuseRecords.forEach(r -> assertThat(r).isSameAs(reuseRecords.get(0))); } + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReaderSchemaEvolutionNewColumnWithDefault(FileFormat fileFormat) throws IOException { + + assumeSupports(fileFormat, FEATURE_READER_DEFAULT); + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema writeSchema = dataGenerator.schema(); + + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, writeSchema, genericRecords); + + String defaultStringValue = "default_value"; + int defaultIntValue = 42; + + Schema evolvedSchema = + new Schema( + Types.NestedField.required(1, "col_a", Types.StringType.get()), + Types.NestedField.required(2, "col_b", Types.IntegerType.get()), + Types.NestedField.required(3, "col_c", Types.LongType.get()), + Types.NestedField.required(4, "col_d", Types.FloatType.get()), + Types.NestedField.required(5, "col_e", Types.DoubleType.get()), + Types.NestedField.required("col_f") + .withId(6) + .ofType(Types.StringType.get()) + .withInitialDefault(Literal.of(defaultStringValue)) + .build(), + Types.NestedField.optional("col_g") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(Literal.of(defaultIntValue)) + .build()); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(evolvedSchema) + .engineProjection(engineSchema(evolvedSchema)) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(genericRecords.size()); + + List expectedGenericRecords = + genericRecords.stream() + .map( + record -> { + Record expected = GenericRecord.create(evolvedSchema); + for (Types.NestedField col : writeSchema.columns()) { + expected.setField(col.name(), record.getField(col.name())); + } + + expected.setField("col_f", defaultStringValue); + expected.setField("col_g", defaultIntValue); + return expected; + }) + .toList(); + + List expectedEngineRecords = convertToEngineRecords(expectedGenericRecords, evolvedSchema); + assertEquals(evolvedSchema, expectedEngineRecords, readRecords); + } + @ParameterizedTest @FieldSource("FILE_FORMATS") void testReaderBuilderRecordsPerBatchNotSupported(FileFormat fileFormat) throws IOException { From 14d33d0f96616daefa9d58416f80339b748a40ab Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Wed, 25 Mar 2026 22:20:31 +0800 Subject: [PATCH 07/10] remove useless --- .../java/org/apache/iceberg/data/BaseFormatModelTests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index 2c856e5d0c87..923578975d18 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -217,8 +217,7 @@ void testDataWriterGenericWriteEngineRead(FileFormat fileFormat, DataGenerator d readRecords = ImmutableList.copyOf(reader); } - List list = convertToEngineRecords(genericRecords, schema); - assertEquals(schema, list, readRecords); + assertEquals(schema, convertToEngineRecords(genericRecords, schema), readRecords); } /** Write with engine type T, read with Generic Record */ From 4e63bbc117d914756e0bf160bfb9149a74fc6d93 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Thu, 16 Apr 2026 11:03:42 +0800 Subject: [PATCH 08/10] Address Comments --- .../iceberg/data/BaseFormatModelTests.java | 203 +++++++----------- 1 file changed, 83 insertions(+), 120 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index 923578975d18..021b5b7ccdfc 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -636,35 +636,24 @@ void testReaderSchemaEvolutionNewColumnWithDefault(FileFormat fileFormat) throws String defaultStringValue = "default_value"; int defaultIntValue = 42; - Schema evolvedSchema = - new Schema( - Types.NestedField.required(1, "col_a", Types.StringType.get()), - Types.NestedField.required(2, "col_b", Types.IntegerType.get()), - Types.NestedField.required(3, "col_c", Types.LongType.get()), - Types.NestedField.required(4, "col_d", Types.FloatType.get()), - Types.NestedField.required(5, "col_e", Types.DoubleType.get()), - Types.NestedField.required("col_f") - .withId(6) - .ofType(Types.StringType.get()) - .withInitialDefault(Literal.of(defaultStringValue)) - .build(), - Types.NestedField.optional("col_g") - .withId(7) - .ofType(Types.IntegerType.get()) - .withInitialDefault(Literal.of(defaultIntValue)) - .build()); - - InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); - List readRecords; - try (CloseableIterable reader = - FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) - .project(evolvedSchema) - .engineProjection(engineSchema(evolvedSchema)) - .build()) { - readRecords = ImmutableList.copyOf(reader); - } - - assertThat(readRecords).hasSize(genericRecords.size()); + int maxFieldId = + writeSchema.columns().stream().mapToInt(Types.NestedField::fieldId).max().orElse(0); + + List evolvedColumns = Lists.newArrayList(writeSchema.columns()); + evolvedColumns.add( + Types.NestedField.required("col_f") + .withId(maxFieldId + 1) + .ofType(Types.StringType.get()) + .withInitialDefault(Literal.of(defaultStringValue)) + .build()); + evolvedColumns.add( + Types.NestedField.optional("col_g") + .withId(maxFieldId + 2) + .ofType(Types.IntegerType.get()) + .withInitialDefault(Literal.of(defaultIntValue)) + .build()); + + Schema evolvedSchema = new Schema(evolvedColumns); List expectedGenericRecords = genericRecords.stream() @@ -681,8 +670,7 @@ record -> { }) .toList(); - List expectedEngineRecords = convertToEngineRecords(expectedGenericRecords, evolvedSchema); - assertEquals(evolvedSchema, expectedEngineRecords, readRecords); + readAndAssertGenericRecords(fileFormat, evolvedSchema, expectedGenericRecords); } @ParameterizedTest @@ -706,7 +694,7 @@ void testReaderBuilderRecordsPerBatchNotSupported(FileFormat fileFormat) throws @ParameterizedTest @FieldSource("FILE_FORMATS") - void testReadMetadataColumnsFilePathAndSpecId(FileFormat fileFormat) throws IOException { + void testReadMetadataColumnFilePath(FileFormat fileFormat) throws IOException { DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); Schema schema = dataGenerator.schema(); @@ -714,37 +702,45 @@ void testReadMetadataColumnsFilePathAndSpecId(FileFormat fileFormat) throws IOEx writeGenericRecords(fileFormat, schema, genericRecords); String filePath = "test-data-file.parquet"; - int specId = 0; - Schema projectionSchema = new Schema(MetadataColumns.FILE_PATH, MetadataColumns.SPEC_ID); + Schema projectionSchema = new Schema(MetadataColumns.FILE_PATH); Map idToConstant = - ImmutableMap.of( - MetadataColumns.FILE_PATH.fieldId(), filePath, - MetadataColumns.SPEC_ID.fieldId(), specId); + ImmutableMap.of(MetadataColumns.FILE_PATH.fieldId(), filePath); - InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); - List readRecords; - try (CloseableIterable reader = - FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) - .project(projectionSchema) - .engineProjection(engineSchema(projectionSchema)) - .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) - .build()) { - readRecords = ImmutableList.copyOf(reader); - } + List expected = + IntStream.range(0, genericRecords.size()) + .mapToObj( + i -> + GenericRecord.create(projectionSchema) + .copy(MetadataColumns.FILE_PATH.name(), filePath)) + .toList(); + + readAndAssertMetadataColumn(fileFormat, projectionSchema, idToConstant, expected); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testReadMetadataColumnSpecId(FileFormat fileFormat) throws IOException { + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + List genericRecords = dataGenerator.generateRecords(); + writeGenericRecords(fileFormat, schema, genericRecords); + + int specId = 0; + Schema projectionSchema = new Schema(MetadataColumns.SPEC_ID); + + Map idToConstant = ImmutableMap.of(MetadataColumns.SPEC_ID.fieldId(), specId); List expected = IntStream.range(0, genericRecords.size()) .mapToObj( i -> GenericRecord.create(projectionSchema) - .copy( - MetadataColumns.FILE_PATH.name(), filePath, - MetadataColumns.SPEC_ID.name(), specId)) + .copy(MetadataColumns.SPEC_ID.name(), specId)) .toList(); - assertThat(readRecords).hasSize(genericRecords.size()); - assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + readAndAssertMetadataColumn(fileFormat, projectionSchema, idToConstant, expected); } @ParameterizedTest @@ -758,16 +754,6 @@ void testReadMetadataColumnRowPosition(FileFormat fileFormat) throws IOException Schema projectionSchema = new Schema(MetadataColumns.ROW_POSITION); - InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); - List readRecords; - try (CloseableIterable reader = - FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) - .project(projectionSchema) - .engineProjection(engineSchema(projectionSchema)) - .build()) { - readRecords = ImmutableList.copyOf(reader); - } - List expected = IntStream.range(0, genericRecords.size()) .mapToObj( @@ -776,8 +762,7 @@ void testReadMetadataColumnRowPosition(FileFormat fileFormat) throws IOException .copy(MetadataColumns.ROW_POSITION.name(), (long) i)) .toList(); - assertThat(readRecords).hasSize(genericRecords.size()); - assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + readAndAssertMetadataColumn(fileFormat, projectionSchema, null, expected); } @ParameterizedTest @@ -791,16 +776,6 @@ void testReadMetadataColumnIsDeleted(FileFormat fileFormat) throws IOException { Schema projectionSchema = new Schema(MetadataColumns.IS_DELETED); - InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); - List readRecords; - try (CloseableIterable reader = - FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) - .project(projectionSchema) - .engineProjection(engineSchema(projectionSchema)) - .build()) { - readRecords = ImmutableList.copyOf(reader); - } - List expected = IntStream.range(0, genericRecords.size()) .mapToObj( @@ -809,8 +784,7 @@ void testReadMetadataColumnIsDeleted(FileFormat fileFormat) throws IOException { .copy(MetadataColumns.IS_DELETED.name(), false)) .toList(); - assertThat(readRecords).hasSize(genericRecords.size()); - assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + readAndAssertMetadataColumn(fileFormat, projectionSchema, null, expected); } @ParameterizedTest @@ -833,17 +807,6 @@ void testReadMetadataColumnRowLinage(FileFormat fileFormat) throws IOException { MetadataColumns.ROW_ID.fieldId(), baseRowId, MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), fileSeqNumber); - InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); - List readRecords; - try (CloseableIterable reader = - FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) - .project(projectionSchema) - .engineProjection(engineSchema(projectionSchema)) - .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) - .build()) { - readRecords = ImmutableList.copyOf(reader); - } - List expected = IntStream.range(0, genericRecords.size()) .mapToObj( @@ -856,8 +819,7 @@ void testReadMetadataColumnRowLinage(FileFormat fileFormat) throws IOException { fileSeqNumber)) .toList(); - assertThat(readRecords).hasSize(genericRecords.size()); - assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + readAndAssertMetadataColumn(fileFormat, projectionSchema, idToConstant, expected); } @ParameterizedTest @@ -910,17 +872,6 @@ void testReadMetadataColumnRowLinageExistValue(FileFormat fileFormat) throws IOE MetadataColumns.ROW_ID.fieldId(), baseRowId, MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), fileSeqNumber); - InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); - List readRecords; - try (CloseableIterable reader = - FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) - .project(projectionSchema) - .engineProjection(engineSchema(projectionSchema)) - .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) - .build()) { - readRecords = ImmutableList.copyOf(reader); - } - // Expected results: // - Even rows (explicit values): _row_id = 555+i, _last_updated_sequence_number = 7 // - Odd rows (null values): _row_id = baseRowId+pos, _last_updated_sequence_number = @@ -947,8 +898,7 @@ void testReadMetadataColumnRowLinageExistValue(FileFormat fileFormat) throws IOE }) .toList(); - assertThat(readRecords).hasSize(baseRecords.size()); - assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + readAndAssertMetadataColumn(fileFormat, projectionSchema, idToConstant, expected); } @ParameterizedTest @@ -984,17 +934,6 @@ void testReadMetadataColumnPartitionIdentity(FileFormat fileFormat) throws IOExc Map idToConstant = ImmutableMap.of(MetadataColumns.PARTITION_COLUMN_ID, partitionData); - InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); - List readRecords; - try (CloseableIterable reader = - FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) - .project(projectionSchema) - .engineProjection(engineSchema(projectionSchema)) - .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) - .build()) { - readRecords = ImmutableList.copyOf(reader); - } - Record partitionRecord = structLikeToRecord(partitionData, partitionType); List expected = IntStream.range(0, records.size()) @@ -1004,8 +943,7 @@ void testReadMetadataColumnPartitionIdentity(FileFormat fileFormat) throws IOExc .copy(MetadataColumns.PARTITION_COLUMN_NAME, partitionRecord)) .toList(); - assertThat(readRecords).hasSize(records.size()); - assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); + readAndAssertMetadataColumn(fileFormat, projectionSchema, idToConstant, expected); } @ParameterizedTest @@ -1061,7 +999,6 @@ void testReadMetadataColumnPartitionEvolutionAddColumn(FileFormat fileFormat) th try (CloseableIterable reader = FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) .project(projectionSchema) - .engineProjection(engineSchema(projectionSchema)) .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) .build()) { readRecords = ImmutableList.copyOf(reader); @@ -1129,7 +1066,6 @@ void testReadMetadataColumnPartitionEvolutionRemoveColumn(FileFormat fileFormat) try (CloseableIterable reader = FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) .project(projectionSchema) - .engineProjection(engineSchema(projectionSchema)) .idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)) .build()) { readRecords = ImmutableList.copyOf(reader); @@ -1198,7 +1134,7 @@ private List convertToEngineRecords(List records, Schema schema) { return records.stream().map(r -> convertToEngine(r, schema)).toList(); } - private static void assumeSupports(FileFormat fileFormat, String feature) { + private void assumeSupports(FileFormat fileFormat, String feature) { assumeThat(MISSING_FEATURES.getOrDefault(fileFormat, new String[] {})).doesNotContain(feature); } @@ -1231,7 +1167,7 @@ private DataFile writeRecordsForSplit(FileFormat fileFormat, Schema schema, List return dataFile; } - private static String splitSizeProperty(FileFormat fileFormat) { + private String splitSizeProperty(FileFormat fileFormat) { return switch (fileFormat) { case PARQUET -> TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; case ORC -> TableProperties.ORC_STRIPE_SIZE_BYTES; @@ -1252,7 +1188,7 @@ private Map convertConstantsToEngine( projectionSchema.findType(entry.getKey()), entry.getValue()))); } - private static Record structLikeToRecord(StructLike structLike, Types.StructType structType) { + private Record structLikeToRecord(StructLike structLike, Types.StructType structType) { Record record = GenericRecord.create(structType); int sourceSize = structLike.size(); for (int i = 0; i < structType.fields().size(); i++) { @@ -1266,4 +1202,31 @@ private static Record structLikeToRecord(StructLike structLike, Types.StructType return record; } + + private void readAndAssertMetadataColumn( + FileFormat fileFormat, + Schema projectionSchema, + Map idToConstant, + List expectedRecords) + throws IOException { + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + + var readerBuilder = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(projectionSchema); + + if (idToConstant != null) { + readerBuilder.idToConstant(convertConstantsToEngine(projectionSchema, idToConstant)); + } + + try (CloseableIterable reader = readerBuilder.build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertThat(readRecords).hasSize(expectedRecords.size()); + assertEquals( + projectionSchema, convertToEngineRecords(expectedRecords, projectionSchema), readRecords); + } } From 5e3e62b976afebf505417c3f25a188e973120048 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Thu, 16 Apr 2026 22:24:28 +0800 Subject: [PATCH 09/10] Address Comments --- .../iceberg/data/BaseFormatModelTests.java | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index 021b5b7ccdfc..78c1a14647ab 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -652,6 +652,21 @@ void testReaderSchemaEvolutionNewColumnWithDefault(FileFormat fileFormat) throws .ofType(Types.IntegerType.get()) .withInitialDefault(Literal.of(defaultIntValue)) .build()); + evolvedColumns.add( + Types.NestedField.optional("col_h") + .withId(maxFieldId + 3) + .ofType( + Types.MapType.ofOptional( + maxFieldId + 4, + maxFieldId + 5, + Types.StringType.get(), + Types.IntegerType.get())) + .build()); + evolvedColumns.add( + Types.NestedField.optional("col_i") + .withId(maxFieldId + 6) + .ofType(Types.ListType.ofOptional(maxFieldId + 7, Types.StringType.get())) + .build()); Schema evolvedSchema = new Schema(evolvedColumns); @@ -666,6 +681,8 @@ record -> { expected.setField("col_f", defaultStringValue); expected.setField("col_g", defaultIntValue); + expected.setField("col_h", null); + expected.setField("col_i", null); return expected; }) .toList(); @@ -1116,7 +1133,7 @@ private void writeGenericRecords(FileFormat fileFormat, Schema schema, List projectRecords(List records, Schema projectedSchema) { + private static List projectRecords(List records, Schema projectedSchema) { return records.stream() .map( record -> { @@ -1134,7 +1151,7 @@ private List convertToEngineRecords(List records, Schema schema) { return records.stream().map(r -> convertToEngine(r, schema)).toList(); } - private void assumeSupports(FileFormat fileFormat, String feature) { + private static void assumeSupports(FileFormat fileFormat, String feature) { assumeThat(MISSING_FEATURES.getOrDefault(fileFormat, new String[] {})).doesNotContain(feature); } @@ -1167,7 +1184,7 @@ private DataFile writeRecordsForSplit(FileFormat fileFormat, Schema schema, List return dataFile; } - private String splitSizeProperty(FileFormat fileFormat) { + private static String splitSizeProperty(FileFormat fileFormat) { return switch (fileFormat) { case PARQUET -> TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; case ORC -> TableProperties.ORC_STRIPE_SIZE_BYTES; @@ -1188,7 +1205,7 @@ private Map convertConstantsToEngine( projectionSchema.findType(entry.getKey()), entry.getValue()))); } - private Record structLikeToRecord(StructLike structLike, Types.StructType structType) { + private static Record structLikeToRecord(StructLike structLike, Types.StructType structType) { Record record = GenericRecord.create(structType); int sourceSize = structLike.size(); for (int i = 0; i < structType.fields().size(); i++) { From e47622ed7e29195f617ad9c2084c53504eb4dbd6 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Fri, 17 Apr 2026 22:04:39 +0800 Subject: [PATCH 10/10] Address Comments --- .../iceberg/data/BaseFormatModelTests.java | 58 ++++++++----------- 1 file changed, 24 insertions(+), 34 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index 78c1a14647ab..17b648c850e4 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Function; import java.util.stream.IntStream; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -652,42 +653,22 @@ void testReaderSchemaEvolutionNewColumnWithDefault(FileFormat fileFormat) throws .ofType(Types.IntegerType.get()) .withInitialDefault(Literal.of(defaultIntValue)) .build()); - evolvedColumns.add( - Types.NestedField.optional("col_h") - .withId(maxFieldId + 3) - .ofType( - Types.MapType.ofOptional( - maxFieldId + 4, - maxFieldId + 5, - Types.StringType.get(), - Types.IntegerType.get())) - .build()); - evolvedColumns.add( - Types.NestedField.optional("col_i") - .withId(maxFieldId + 6) - .ofType(Types.ListType.ofOptional(maxFieldId + 7, Types.StringType.get())) - .build()); Schema evolvedSchema = new Schema(evolvedColumns); - - List expectedGenericRecords = - genericRecords.stream() - .map( - record -> { - Record expected = GenericRecord.create(evolvedSchema); - for (Types.NestedField col : writeSchema.columns()) { - expected.setField(col.name(), record.getField(col.name())); - } - - expected.setField("col_f", defaultStringValue); - expected.setField("col_g", defaultIntValue); - expected.setField("col_h", null); - expected.setField("col_i", null); - return expected; - }) - .toList(); - - readAndAssertGenericRecords(fileFormat, evolvedSchema, expectedGenericRecords); + readAndAssertGenericRecords( + fileFormat, + evolvedSchema, + genericRecords, + record -> { + Record expected = GenericRecord.create(evolvedSchema); + for (Types.NestedField col : writeSchema.columns()) { + expected.setField(col.name(), record.getField(col.name())); + } + + expected.setField("col_f", defaultStringValue); + expected.setField("col_g", defaultIntValue); + return expected; + }); } @ParameterizedTest @@ -1101,6 +1082,15 @@ void testReadMetadataColumnPartitionEvolutionRemoveColumn(FileFormat fileFormat) assertEquals(projectionSchema, convertToEngineRecords(expected, projectionSchema), readRecords); } + private void readAndAssertGenericRecords( + FileFormat fileFormat, + Schema schema, + List sourceRecords, + Function transform) + throws IOException { + readAndAssertGenericRecords(fileFormat, schema, sourceRecords.stream().map(transform).toList()); + } + private void readAndAssertGenericRecords( FileFormat fileFormat, Schema schema, List expected) throws IOException { InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();