-
Notifications
You must be signed in to change notification settings - Fork 486
[flink] changelog read support for pk table without pushdown optimizations #2347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Great! Thank you @MehulBatra , I will review it. |
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @MehulBatra for the contribution. I left some comments.
| website/versioned_sidebars | ||
| website/versions.json | ||
| website/pnpm-lock.yaml | ||
| /.claude/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be .claude/?
nit: And we can move it near the .vscode/
| assertThat(schemaRows.get(0)).contains("_change_type"); | ||
| assertThat(schemaRows.get(0)).contains("STRING"); | ||
| // Flink DESCRIBE shows nullability as 'false' for NOT NULL columns | ||
| assertThat(schemaRows.get(0)).contains("false"); | ||
|
|
||
| assertThat(schemaRows.get(1)).contains("_log_offset"); | ||
| assertThat(schemaRows.get(1)).contains("BIGINT"); | ||
| assertThat(schemaRows.get(1)).contains("false"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DESCRIBE result is deterministic, so we should assert the entire output string rather than using contains.
Using contains makes the test intent unclear to readers and risks missing regressions in other parts of the output that should also be validated.
| assertThat(updateResults.get(1)).contains("+U"); | ||
| assertThat(updateResults.get(1)).contains("Item-1-Updated"); | ||
| assertThat(updateResults.get(1)).contains("150"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplify to assertThat(updateResults.get(0)).contains("-U", "Item-1", "100");
Update others like this.
| // Collect delete record | ||
| List<String> deleteResult = collectRowsWithTimeout(rowIter, 1, true); | ||
| assertThat(deleteResult).hasSize(1); | ||
| // Verify the delete record contains the row data (the change type may be -D or -U) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is a delete, the change type must be -D, so we should assert the change type here.
| import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
|
|
||
| /** Integration test for $changelog virtual table functionality. */ | ||
| abstract class ChangelogVirtualTableITCase extends AbstractTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a thorough test. However, we are facing limited testing resource (the Flink test is reaching to the 1 hour limit). And IT cases are most time-consuming. I think many tests in this IT case are doing duplicated things and we can improve this.
-
testChangelogVirtualTableWithPrimaryKeyTableandtestBasicChangelogScanWithMetadataValidationdo the same thing, buttestBasicChangelogScanWithMetadataValidationverified more things (metadata). So I think we can removetestChangelogVirtualTableWithPrimaryKeyTableor merge it intotestBasicChangelogScanWithMetadataValidation. -
testAllChangeTypes()andtestBasicChangelogScanWithMetadataValidationboth verified all change types. So I think we can changetestAllChangeTypestotestProjectionOnChangelogTableto test projections. We can add one more column to the source table, and select_change_typeand other 2 columns. -
testChangelogVirtualTableConcurrentChangescan be removed as it doesn't test concurrent updates. -
testChangelogVirtualTableWithComplexSchemacan be merged intotestChangelogVirtualTableSchemaIntrospection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides, please add tests to verify $changelog also works when scan.startup.mode option is applied, and also works for partitioned table. Take org.apache.fluss.flink.source.FlinkTableSourceITCase#testReadLogTableWithDifferentScanStartupMode as an example.
| import java.io.Serializable; | ||
|
|
||
| /** Interface for converting Fluss {@link LogRecord} to Flink {@link RowData}. */ | ||
| public interface RecordToFlinkRowConverter extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we will remove PlainRowConverter, there is only one implementation of this interface. Then this interface abstraction looks useless. I think we can remove this interface class. (add Serializable for ChangelogRowConverter if we decide to remove RecordToFlinkRowConverter).
| BUCKET_COLUMN_NAME, | ||
| CHANGELOG_CHANGE_TYPE_COLUMN, | ||
| CHANGELOG_LOG_OFFSET_COLUMN, | ||
| CHANGELOG_COMMIT_TIMESTAMP_COLUMN))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add tests for the columns in org.apache.fluss.client.admin.FlussAdminITCase#testSystemsColumns
| package org.apache.fluss.flink.source; | ||
|
|
||
| /** IT case for {@link ChangelogVirtualTableITCase} in Flink 1.20. */ | ||
| public class Flink120ChangelogVirtualTableITCase extends ChangelogVirtualTableITCase {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create ITCases for all Flink versions to test compatibilites?
| // Parse the row to validate structure | ||
| String[] parts = result.substring(3, result.length() - 1).split(", ", 6); | ||
|
|
||
| // Validate change type column | ||
| assertThat(parts[0]).isEqualTo("+I"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can set ManualClock to the FlussClusterExtension to control the commit timestamp by setting the current time CLOCK.advanceTime(xxxxx, TimeUnit.MILLISECONDS), see the usage in FlinkTableSourceITCase.
In this way, we can assert the changelog result by comparing the whole row string. (set bucket to 1 to have consistent log_offset numbers)
| /** | ||
| * Creates a virtual $changelog table by modifying the base table's to include metadata columns. | ||
| */ | ||
| private CatalogBaseTable getVirtualChangelogTable(ObjectPath objectPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add tests in FlinkCatalogITCase to assert the CatalogTable got from a $changelog table.
Purpose
Linked issue: close #2333
Brief change log
File: ChangelogFlinkTableSource.java
Purpose: Flink table source for $changelog virtual tables
────────────────────────────────────────
File: ChangelogDeserializationSchema.java
Purpose: Deserializes LogRecord → RowData with metadata
────────────────────────────────────────
File: ChangelogRowConverter.java
Purpose: Converts records, adds _change_type, _log_offset,
commit_timestamp
────────────────────────────────────────
File: RecordToFlinkRowConverter.java
Purpose: Interface for record converters
────────────────────────────────────────
File: FlinkTableFactory.java
Change: Routes to ChangelogFlinkTableSource for changelog
tables
────────────────────────────────────────
File: TableDescriptor.java
Change: Added CHANGELOG* column name constants
────────────────────────────────────────
File: TableDescriptorValidation.java
Change: Validates reserved column names
Test Coverage
Test: testChangelogVirtualTableSchema
Type: Schema
What it verifies: DESCRIBE shows correct columns
────────────────────────────────────────
File: FlinkCatalog.java
Change: Detects $changelog suffix, builds virtual table
schema
Tests
File: ChangelogVirtualTableITCase.java
Purpose: Integration tests (7 tests)
────────────────────────────────────────
File: ChangelogRowConverterTest.java
Purpose: Unit tests for converter
────────────────────────────────────────
API and Format
Documentation