[Feature] Support writing CDC data in sink table's column order for StarRock#4543
Open
zhuxt2015 wants to merge 4 commits intoDataLinkDC:devfrom
Open
[Feature] Support writing CDC data in sink table's column order for StarRock#4543zhuxt2015 wants to merge 4 commits intoDataLinkDC:devfrom
zhuxt2015 wants to merge 4 commits intoDataLinkDC:devfrom
Conversation
- StarRocksTypeConvertTest: 验证 datetime→TIMESTAMP 和 date→DATE 类型映射 - PostgresCDCBuilderTest: 覆盖 composeJdbcProperties 的边界场景(null/空/单参数/多参数) - CreateCDCSourceOperationTest: 通过反射测试 setSinkTable()(匹配、不匹配、带schema前缀、driver加载列、首个匹配优先) - TableTest: 覆盖新增 sinkTable 字段(默认null、存取、隔离性、重置)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose of the pull request
When using CDCSOURCE to sync data from a source database (e.g. PostgreSQL) to
StarRocks, the generated Flink DDL was built from the source table's schema.
If the column order in the StarRocks sink table differs from the source, data
would be written into the wrong columns.
This PR fetches the sink table's actual column metadata at job submission time
and uses it as the basis for DDL generation and INSERT statements, ensuring
data is written in the correct column order as defined in the destination table.
Brief change log
Core Logic (
CreateCDCSourceOperation)setSinkTable(): matches each source table to its corresponding sinktable by name (supports prefix/suffix/schema-prefix rules) and attaches it
as
table.sinkTable.getSinkTables(): lists all tables from the sink schema using thesink driver.
getDriver(): extracted connector-specific URL resolution(
starrocks→jdbc-url,doris→fenodes,jdbc→url) into asingle reusable method, removing duplication with
checkAndCreateSinkSchema.Sink DDL Generation (
SQLSinkBuilder)addSinkInsert()now resolves the effective table object astable.getSinkTable()when available, falling back to the source table.This makes both
getFlinkDDLandcreateInsertOperationsuse the sinktable's column definitions.
Model (
Table)sinkTablefield to carry the resolved sink-side table metadataalongside the source table object.
Metadata Integration (
PostgresCDCBuilder)parseMetaDataConfig()with a helpercomposeJdbcProperties()to correctly append JDBC connection parameters (
?key=value&...) to thePostgreSQL metadata URL.
Verify this pull request
StarRocksTypeConvertTestdatetime→TIMESTAMP,date→DATEmappings; regression for other types andconvertToDBPostgresCDCBuilderTestcomposeJdbcPropertiesedge cases: null / empty / single / multiple params, trailing&guardCreateCDCSourceOperationTestsetSinkTablevia reflection: plain match, schema-prefixed name, no match, driver column loading, first-match-winsTableTestsinkTablefield: default null, set/get, source field isolation, reset to nullthe source table.
This pull request is code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows: